RabbitMQ(DAY2)
数据隔离
对Virtual Hosts进行添加
使用java进行操作
spring AMQP
导入依赖maven
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
写publisherTests
类
1 2 3 4 5 6 7 8 9 10 11 12
| @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testSimpleQueue(){ String queueName = "simple.queue";
String message = "hello , rabbitmq!";
rabbitTemplate.convertAndSend(queueName,message); }
|
写ConsumerMqListener
1 2 3 4 5
| @RabbitListener(queues = "simple.queue") void listenSimpleQueue(String msg){
System.out.println("收到" + msg ); }
|
使用RabbitListener
注解 记得要将该类写入springbean环境进行管理 持久化注入@Component
消费者消息推送限制
默认情况,RabbitMq会将消息依次轮询投递给绑定在队列上的每一个消费者,这并没有考虑消费者是否处理完消息,会造成消息堆积。
修改配置,确保同一时刻只能推送给消费者一个消息:
1 2 3 4 5 6 7 8 9 10 11 12
| spring:
rabbitmq: host: 106.53.212.49 port: 5672 virtual-host: /Demo username: guest password: guest listener: simple: prefetch: 1
|
Work模型
- 可以在一个队列上绑定多个消费者
- 同一个消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
Fanout交换机
我们不会直接发到队列中 一般会发到交换机上 交换机有三种类型;
交换机会将收到的消息广播到每一个绑定的queue
根据规则路由到指定的Queue
每一个Queue都与Exchange设置一个BlindingKey
发布者要指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
与DirectExchange类似,区别在于routingKey可以是多个单词列表,并且以.
分割
Queue与Exchange指定BindingKey可以使用通配符:
声明队列交互机
SpringAMQP提供了解决方法
- Queue: 声明队列 可以用工厂类QueueBuilder
- Exchange: ExchangeBuilder构建
- Binding : 用于声明队列和交互机的绑定关系,用BindingBuilder构建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Configuration public class FanoutConfiguration {
@Bean public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout2"); }
@Bean public Queue fanoutQueue3() {
return new Queue("fanout.queue3"); } @Bean public Binding fanoutBinding3(){ return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange()); }
@Bean public Queue fanoutQueue4() {
return new Queue("fanout.queue4"); }
@Bean public Binding fanoutBinding4(){ return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange()); }
|
基于注解来来声明队列和交换机
1 2 3 4 5
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1", durable = "ture"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red","blue"} ))
|
消息转换器
当我们发送object时,会以字节方式发送到mq
建议采用JSON序列化替代JDK序列化
引入jackson工具
将Message打入bean