环境搭建
创建一个SpringBoot项目rabbitmq_springboot

可以选择导入依赖
也可以使用坐标导入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
配置文件配置RabbitMQ

1 2 3 4 5 6 7 8 9
| spring: application: name: rabbitmq-springboot rabbitmq: host: localhost port: 5672 username: ems password: 123 virtual-host: /ems
|
Hello World 模型
生产者
在测试类中注入RabbitMQTemplate,使用convertAndSend()
方法

1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @SpringBootTest class RabbitmqSpringbootApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testHello() { rabbitTemplate.convertAndSend("hello", "hello world"); } }
|
此时点击运行的话并不会生成hello队列产生消息,必须要有对应的消费者,才可以生效。
消费者
使用@Componet注解将该类注册到Spring中,通过@RabbitListener监听队列,@RabbitHandler处理。

1 2 3 4 5 6 7 8 9 10
| @Component
@RabbitListener(queuesToDeclare = @Queue("hello")) public class HelloCustomer {
@RabbitHandler public void receive(String message) { System.out.println("message=" + message); } }
|
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用。
使用 @RabbitListener 注解标记方法,当监听到所指队列 中有消息时则会进行接收并处理
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
@RabbitListener(queuesToDeclare = @Queue("hello"))
给queuesToDeclare赋值声明队列 @Queue
创建队列
测试
此时再次启动生产者


消费成功!
细节
如果我们声明队列是想要配置队列的durable持久化
,exclusive独占性
,autoDelete自动删除
怎么配置呢?
很简单依然通过上文中使用的@Queues
注解

通过查看源码我们可以得知默认队列属性是durable=true,exclusive=false,autoDelete=false

Work Queue 模型
生产者

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @SpringBootTest class RabbitmqSpringbootApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testHello() { rabbitTemplate.convertAndSend("hello", "hello world"); }
@Test public void testWork() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work", "work模型" + i); } }
}
|
消费者
类上标注@Componet注解,方法上使用@RabbitMQ注解监听work队列。相当于两个创建了两个消费者

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Component public class WorkCustomer {
@RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message1) { System.out.println("message1=" + message1); }
@RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String message2) { System.out.println("message2=" + message2); }
}
|
测试
点击运行testWork()


测试成功!
Fanout 模型
生产者

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @SpringBootTest class RabbitmqSpringbootApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testHello() { rabbitTemplate.convertAndSend("hello", "hello world"); }
@Test public void testWork() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work", "work模型" + i); } }
@Test public void testFanout() { rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息"); } }
|
消费者

@Compoent注册到Spring中,方法使用@RabbitListener
,使用其中的bindings
属性。使用@QueueBinding注解绑定value属性值为@Queue
创建临时队列,exchange属性值为@Exchange
从而绑定交换机。该注解的value值为交换机的名称logs,type为类型fanout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Component public class FanoutCustomer {
@RabbitListener(bindings = { @QueueBinding( value = @Queue,//创建临时队列 exchange =@Exchange(value = "logs",type = "fanout") //绑定的交换机 ) }) public void receive1(String message){ System.out.println("message1 = " + message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue,//创建临时队列 exchange =@Exchange(value = "logs",type = "fanout") //绑定的交换机 ) }) public void receive2(String message){ System.out.println("message2 = " + message); } }
|
测试
点击运行


Direct 模型
生产者

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @SpringBootTest class RabbitmqSpringbootApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testHello() { rabbitTemplate.convertAndSend("hello", "hello world"); } @Test public void testWork() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work", "work模型" + i); } } @Test public void testFanout() { rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息"); }
@Test public void testRoute(){ rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息"); }
}
|
消费者
@Compoent注册到Spring中,方法使用@RabbitListener
,使用其中的bindings
属性。使用@QueueBinding注解绑定value属性值为@Queue
创建临时队列,exchange属性值为@Exchange
从而绑定交换机。该注解的value值为交换机的名称logs,type为类型fanout。key属性值为routingKey值

消费者1rotingKey值为info,error,warn。消费者2routingKey值为error
测试
生产者的routingKey为error,所以消费者1,消费者2都能消费该信息。


Topic 模型
生产者

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @SpringBootTest class RabbitmqSpringbootApplicationTests {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testHello() { rabbitTemplate.convertAndSend("hello", "hello world"); } @Test public void testWork() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work", "work模型" + i); } } @Test public void testFanout() { rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息"); }
@Test public void testRoute(){ rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息"); }
@Test public void testTopic(){ rabbitTemplate.convertAndSend("topics","product.save.add","produce.save.add 路由消息"); }
}
|
消费者

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Component public class TopicCustomer {
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic",name="topics"), key={"user.save","user.*"} ) }) public void receive1(String message){ System.out.println("message1 = " + message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(type = "topic",name="topics"), key={"order.#","product.#","user.*"} ) }) public void receive2(String message){ System.out.println("message2 = " + message); } }
|
测试
生产者的routingKey为product.save.add
,只有消费者2匹配。

