环境搭建
创建一个SpringBoot项目rabbitmq_springboot

可以选择导入依赖
 也可以使用坐标导入依赖
也可以使用坐标导入依赖
| 12
 3
 4
 
 | <dependency><groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 
 | 
配置文件配置RabbitMQ

| 12
 3
 4
 5
 6
 7
 8
 9
 
 | spring:application:
 name: rabbitmq-springboot
 rabbitmq:
 host: localhost
 port: 5672
 username: ems
 password: 123
 virtual-host: /ems
 
 | 
Hello World 模型
生产者
在测试类中注入RabbitMQTemplate,使用convertAndSend()方法

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 
 | @SpringBootTestclass RabbitmqSpringbootApplicationTests {
 
 
 @Autowired
 private RabbitTemplate rabbitTemplate;
 
 
 @Test
 public void testHello() {
 
 rabbitTemplate.convertAndSend("hello", "hello world");
 }
 }
 
 | 
此时点击运行的话并不会生成hello队列产生消息,必须要有对应的消费者,才可以生效。
消费者
使用@Componet注解将该类注册到Spring中,通过@RabbitListener监听队列,@RabbitHandler处理。

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | @Component
 @RabbitListener(queuesToDeclare = @Queue("hello"))
 public class HelloCustomer {
 
 @RabbitHandler
 public void receive(String message) {
 System.out.println("message=" + message);
 }
 }
 
 | 
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用。
使用 @RabbitListener 注解标记方法,当监听到所指队列 中有消息时则会进行接收并处理
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
@RabbitListener(queuesToDeclare = @Queue("hello"))给queuesToDeclare赋值声明队列 @Queue创建队列
测试
此时再次启动生产者


消费成功!
细节
如果我们声明队列是想要配置队列的durable持久化,exclusive独占性,autoDelete自动删除怎么配置呢?
很简单依然通过上文中使用的@Queues注解

通过查看源码我们可以得知默认队列属性是durable=true,exclusive=false,autoDelete=false

Work Queue 模型
生产者

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 
 | @SpringBootTestclass RabbitmqSpringbootApplicationTests {
 
 
 @Autowired
 private RabbitTemplate rabbitTemplate;
 
 
 @Test
 public void testHello() {
 
 rabbitTemplate.convertAndSend("hello", "hello world");
 }
 
 
 @Test
 public void testWork() {
 for (int i = 0; i < 10; i++) {
 rabbitTemplate.convertAndSend("work", "work模型" + i);
 }
 }
 
 }
 
 | 
消费者
类上标注@Componet注解,方法上使用@RabbitMQ注解监听work队列。相当于两个创建了两个消费者

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | @Componentpublic class WorkCustomer {
 
 
 @RabbitListener(queuesToDeclare = @Queue("work"))
 public void receive1(String message1) {
 System.out.println("message1=" + message1);
 }
 
 
 @RabbitListener(queuesToDeclare = @Queue("work"))
 public void receive2(String message2) {
 System.out.println("message2=" + message2);
 }
 
 }
 
 | 
测试
点击运行testWork()


测试成功!
Fanout 模型
生产者

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 
 | @SpringBootTestclass RabbitmqSpringbootApplicationTests {
 
 
 @Autowired
 private RabbitTemplate rabbitTemplate;
 
 
 @Test
 public void testHello() {
 
 rabbitTemplate.convertAndSend("hello", "hello world");
 }
 
 
 @Test
 public void testWork() {
 for (int i = 0; i < 10; i++) {
 rabbitTemplate.convertAndSend("work", "work模型" + i);
 }
 }
 
 
 @Test
 public void testFanout() {
 rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
 }
 
 }
 
 | 
消费者

@Compoent注册到Spring中,方法使用@RabbitListener,使用其中的bindings属性。使用@QueueBinding注解绑定value属性值为@Queue创建临时队列,exchange属性值为@Exchange从而绑定交换机。该注解的value值为交换机的名称logs,type为类型fanout
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 
 | @Componentpublic class FanoutCustomer {
 
 @RabbitListener(bindings = {
 @QueueBinding(
 value = @Queue,//创建临时队列
 exchange =@Exchange(value = "logs",type = "fanout")  //绑定的交换机
 )
 })
 public void receive1(String  message){
 System.out.println("message1 = " + message);
 }
 
 
 @RabbitListener(bindings = {
 @QueueBinding(
 value = @Queue,//创建临时队列
 exchange =@Exchange(value = "logs",type = "fanout")  //绑定的交换机
 )
 })
 public void receive2(String  message){
 System.out.println("message2 = " + message);
 }
 }
 
 | 
测试
点击运行


Direct 模型
生产者

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 
 | @SpringBootTestclass RabbitmqSpringbootApplicationTests {
 
 
 @Autowired
 private RabbitTemplate rabbitTemplate;
 
 
 @Test
 public void testHello() {
 
 rabbitTemplate.convertAndSend("hello", "hello world");
 }
 
 @Test
 public void testWork() {
 for (int i = 0; i < 10; i++) {
 rabbitTemplate.convertAndSend("work", "work模型" + i);
 }
 }
 
 @Test
 public void testFanout() {
 rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
 }
 
 
 @Test
 public void testRoute(){
 rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
 }
 
 }
 
 | 
消费者
@Compoent注册到Spring中,方法使用@RabbitListener,使用其中的bindings属性。使用@QueueBinding注解绑定value属性值为@Queue创建临时队列,exchange属性值为@Exchange从而绑定交换机。该注解的value值为交换机的名称logs,type为类型fanout。key属性值为routingKey值

消费者1rotingKey值为info,error,warn。消费者2routingKey值为error
测试
生产者的routingKey为error,所以消费者1,消费者2都能消费该信息。


Topic 模型
生产者

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 
 | @SpringBootTestclass RabbitmqSpringbootApplicationTests {
 
 
 @Autowired
 private RabbitTemplate rabbitTemplate;
 
 
 @Test
 public void testHello() {
 
 rabbitTemplate.convertAndSend("hello", "hello world");
 }
 
 @Test
 public void testWork() {
 for (int i = 0; i < 10; i++) {
 rabbitTemplate.convertAndSend("work", "work模型" + i);
 }
 }
 
 @Test
 public void testFanout() {
 rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
 }
 
 
 @Test
 public void testRoute(){
 rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
 }
 
 
 @Test
 public void testTopic(){
 rabbitTemplate.convertAndSend("topics","product.save.add","produce.save.add 路由消息");
 }
 
 }
 
 | 
消费者

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 
 | @Componentpublic class TopicCustomer {
 
 @RabbitListener(bindings = {
 @QueueBinding(
 value = @Queue,
 exchange = @Exchange(type = "topic",name="topics"),
 key={"user.save","user.*"}
 )
 })
 public void receive1(String message){
 System.out.println("message1 = " + message);
 }
 
 
 
 @RabbitListener(bindings = {
 @QueueBinding(
 value = @Queue,
 exchange = @Exchange(type = "topic",name="topics"),
 key={"order.#","product.#","user.*"}
 )
 })
 public void receive2(String message){
 System.out.println("message2 = " + message);
 }
 }
 
 | 
测试
生产者的routingKey为product.save.add,只有消费者2匹配。

