RabbitMQ(DAY2)

数据隔离

image-20240904071135687

对Virtual Hosts进行添加

使用java进行操作

spring AMQP

导入依赖maven

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

publisherTests

1
2
3
4
5
6
7
8
9
10
11
12
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleQueue(){
String queueName = "simple.queue";

String message = "hello , rabbitmq!";

rabbitTemplate.convertAndSend(queueName,message);
}

ConsumerMqListener

1
2
3
4
5
@RabbitListener(queues = "simple.queue")
void listenSimpleQueue(String msg){

System.out.println("收到" + msg );
}

使用RabbitListener注解 记得要将该类写入springbean环境进行管理 持久化注入@Component

消费者消息推送限制

默认情况,RabbitMq会将消息依次轮询投递给绑定在队列上的每一个消费者,这并没有考虑消费者是否处理完消息,会造成消息堆积。

修改配置,确保同一时刻只能推送给消费者一个消息:

1
2
3
4
5
6
7
8
9
10
11
12
spring:

rabbitmq:
host: 106.53.212.49
port: 5672
virtual-host: /Demo
username: guest
password: guest
listener:
simple:
prefetch: 1 # 每次只能取一个消息

Work模型

  • 可以在一个队列上绑定多个消费者
  • 同一个消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

Fanout交换机

我们不会直接发到队列中 一般会发到交换机上 交换机有三种类型;

  • Fanout:广播

交换机会将收到的消息广播到每一个绑定的queue

image-20240904190224480

image-20240904191134835

  • Direct:定向

根据规则路由到指定的Queue

每一个Queue都与Exchange设置一个BlindingKey

发布者要指定消息的RoutingKey

Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

  • Topic: 话题

与DirectExchange类似,区别在于routingKey可以是多个单词列表,并且以.分割

Queue与Exchange指定BindingKey可以使用通配符:

  • #:代指0个或多个单词
  • *: 代指一个单词

声明队列交互机

SpringAMQP提供了解决方法

  • Queue: 声明队列 可以用工厂类QueueBuilder
  • Exchange: ExchangeBuilder构建
  • Binding : 用于声明队列和交互机的绑定关系,用BindingBuilder构建

image-20240904201931836

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class FanoutConfiguration {

@Bean
public FanoutExchange fanoutExchange(){

// ExchangeBuilder.fanoutExchange("").build();
return new FanoutExchange("hmall.fanout2");
}

@Bean
public Queue fanoutQueue3() {

// QueueBuilder.durable("ff").build();

return new Queue("fanout.queue3");
}
//将队列3绑定到交换机
@Bean
public Binding fanoutBinding3(){
return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}

@Bean
public Queue fanoutQueue4() {

// QueueBuilder.durable("ff").build();

return new Queue("fanout.queue4");
}

//将队列4绑定到交换机
@Bean
public Binding fanoutBinding4(){
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
}

基于注解来来声明队列和交换机

1
2
3
4
5
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1", durable = "ture"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))

消息转换器

当我们发送object时,会以字节方式发送到mq

image-20240905152813455

建议采用JSON序列化替代JDK序列化

引入jackson工具

将Message打入bean

image-20240905160226812

image-20240905160150609