环境搭建

创建一个SpringBoot项目rabbitmq_springboot

image-20200910192043363

可以选择导入依赖

image-20200910192128413也可以使用坐标导入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件配置RabbitMQ

image-20200910193155625

1
2
3
4
5
6
7
8
9
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: localhost
port: 5672
username: ems
password: 123
virtual-host: /ems

Hello World 模型

生产者

在测试类中注入RabbitMQTemplate,使用convertAndSend()方法

image-20200910192727359

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

//hello world
@Test
public void testHello() {
//向hello队列中发送 hello world 消息
rabbitTemplate.convertAndSend("hello", "hello world");
}
}

此时点击运行的话并不会生成hello队列产生消息,必须要有对应的消费者,才可以生效。

消费者

使用@Componet注解将该类注册到Spring中,通过@RabbitListener监听队列,@RabbitHandler处理。

image-20200910193942917

1
2
3
4
5
6
7
8
9
10
@Component
//默认 持久化 非独占 非自动删除 true false false
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {

@RabbitHandler
public void receive(String message) {
System.out.println("message=" + message);
}
}

@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用。

使用 @RabbitListener 注解标记方法,当监听到所指队列 中有消息时则会进行接收并处理

@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

@RabbitListener(queuesToDeclare = @Queue("hello"))给queuesToDeclare赋值声明队列 @Queue创建队列

测试

此时再次启动生产者

image-20200910194854253

image-20200910194906975

消费成功!

细节

如果我们声明队列是想要配置队列的durable持久化,exclusive独占性,autoDelete自动删除怎么配置呢?

很简单依然通过上文中使用的@Queues注解

image-20200910195304227

通过查看源码我们可以得知默认队列属性是durable=true,exclusive=false,autoDelete=false

image-20200910195627320

Work Queue 模型

生产者

image-20200910195849195

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

//hello world
@Test
public void testHello() {
//向hello队列中发送 hello world 消息
rabbitTemplate.convertAndSend("hello", "hello world");
}

//work
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work模型" + i);
}
}

}

消费者

类上标注@Componet注解,方法上使用@RabbitMQ注解监听work队列。相当于两个创建了两个消费者

image-20200910200029746

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class WorkCustomer {

//一个消费者
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message1) {
System.out.println("message1=" + message1);
}

//一个消费者
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message2) {
System.out.println("message2=" + message2);
}

}

测试

点击运行testWork()

image-20200910200447797

image-20200910200515227

测试成功!

Fanout 模型

生产者

image-20200910201012594

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

//hello world
@Test
public void testHello() {
//向hello队列中发送 hello world 消息
rabbitTemplate.convertAndSend("hello", "hello world");
}

//work
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work模型" + i);
}
}

//fanout 广播
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
}

}

消费者

image-20200910201130361

@Compoent注册到Spring中,方法使用@RabbitListener,使用其中的bindings属性。使用@QueueBinding注解绑定value属性值为@Queue创建临时队列,exchange属性值为@Exchange从而绑定交换机。该注解的value值为交换机的名称logs,type为类型fanout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class FanoutCustomer {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange =@Exchange(value = "logs",type = "fanout") //绑定的交换机
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}


@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange =@Exchange(value = "logs",type = "fanout") //绑定的交换机
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}

测试

点击运行

image-20200910201716972

image-20200910201747502

Direct 模型

生产者

image-20200910201943941

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

//hello world
@Test
public void testHello() {
//向hello队列中发送 hello world 消息
rabbitTemplate.convertAndSend("hello", "hello world");
}
//work
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work模型" + i);
}
}
//fanout 广播
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
}

//route 路由模式
@Test
public void testRoute(){
rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
}

}

消费者

@Compoent注册到Spring中,方法使用@RabbitListener,使用其中的bindings属性。使用@QueueBinding注解绑定value属性值为@Queue创建临时队列,exchange属性值为@Exchange从而绑定交换机。该注解的value值为交换机的名称logs,type为类型fanout。key属性值为routingKey值

image-20200910202439808

消费者1rotingKey值为info,error,warn。消费者2routingKey值为error

测试

生产者的routingKey为error,所以消费者1,消费者2都能消费该信息。

image-20200910202748884

image-20200910202815706

Topic 模型

生产者

image-20200910203901798

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@SpringBootTest
class RabbitmqSpringbootApplicationTests {

//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

//hello world
@Test
public void testHello() {
//向hello队列中发送 hello world 消息
rabbitTemplate.convertAndSend("hello", "hello world");
}
//work
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work模型" + i);
}
}
//fanout 广播
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("logs", "", "Fanout的模型发送的消息");
}

//route 路由模式
@Test
public void testRoute(){
rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
}

//topic 动态路由 订阅模式
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("topics","product.save.add","produce.save.add 路由消息");
}

}

消费者

image-20200910204002611

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class TopicCustomer {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name="topics"),
key={"user.save","user.*"}
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}



@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(type = "topic",name="topics"),
key={"order.#","product.#","user.*"}
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}

测试

生产者的routingKey为product.save.add,只有消费者2匹配。

image-20200910204149401

image-20200910204219076