消息驱动之生产者

新建cloud-stream-rabbitmq-provider8801模块

image-20200710152201670

pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

<dependencies>

<!--stream-rabbit-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

yml

主启动类

创建主启动类StreamMQMain8801

image-20200710153404760

1
2
3
4
5
6
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}

services

创建IMessageProvider接口

image-20200710153828859

1
2
3
public interface IMessageProvider {
String send();
}

发送消息接口实现类MessageProviderImpl

image-20200710155032617

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // 消息发送管道

@Override
public String send()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}

controller

创建SendMessageController

image-20200710160146613

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
public class SendMessageController {
@Resource
private IMessageProvider messageProvider;

@GetMapping(value = "/sendMessage")
public String sendMessage()
{
return messageProvider.send();
}

}

测试

启动7001 8801,启动RabbitMQ

image-20200710161627483

多次访问http://localhost:8801/sendMessage控制台

image-20200710161817183

RabbitMQ界面http://localhost:15672/#/

image-20200710161920362

成功!

消息驱动之生产者

新建模块cloud-stream-rabbitmq-consumer8802

pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<dependencies>

<!--stream-rabbit-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

yml

主启动类

创建主启动类StreamMQMain8802

image-20200710162631433

1
2
3
4
5
6
7
8
@SpringBootApplication
public class StreamMQMain8802 {

public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class, args);
}

}

controller

创建ReceiveMessageListenerController

image-20200710164730216

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;

@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消费者1号,接受:" + message.getPayload() + "\t port:" + serverPort);
}

}

测试

启动7001 8801 8802 RabbitMQ

image-20200710164818787

多次访问http://localhost:8801/sendMessage

image-20200710164836905

8801

image-20200710164907121

8802

image-20200710164926231

image-20200710164953354