消息驱动之生产者
新建cloud-stream-rabbitmq-provider8801模块

pom
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 
 | <dependencies>
 
 
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
 </dependency>
 
 
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-actuator</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-actuator</artifactId>
 </dependency>
 
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-devtools</artifactId>
 <scope>runtime</scope>
 <optional>true</optional>
 </dependency>
 
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 <optional>true</optional>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 </dependencies>
 
 | 
yml
主启动类
创建主启动类StreamMQMain8801

| 12
 3
 4
 5
 6
 
 | @SpringBootApplicationpublic class StreamMQMain8801 {
 public static void main(String[] args) {
 SpringApplication.run(StreamMQMain8801.class, args);
 }
 }
 
 | 
services
创建IMessageProvider接口

| 12
 3
 
 | public interface IMessageProvider {String send();
 }
 
 | 
发送消息接口实现类MessageProviderImpl

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | @EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider
 {
 @Resource
 private MessageChannel output;
 
 @Override
 public String send()
 {
 String serial = UUID.randomUUID().toString();
 output.send(MessageBuilder.withPayload(serial).build());
 System.out.println("*****serial: "+serial);
 return null;
 }
 }
 
 | 
controller
创建SendMessageController

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | @RestControllerpublic class SendMessageController {
 @Resource
 private IMessageProvider messageProvider;
 
 @GetMapping(value = "/sendMessage")
 public String sendMessage()
 {
 return messageProvider.send();
 }
 
 }
 
 
 | 
测试
启动7001 8801,启动RabbitMQ

多次访问http://localhost:8801/sendMessage控制台

RabbitMQ界面http://localhost:15672/#/

成功!
消息驱动之生产者
新建模块cloud-stream-rabbitmq-consumer8802
pom
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 
 | <dependencies>
 
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
 </dependency>
 
 
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-actuator</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-actuator</artifactId>
 </dependency>
 
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-devtools</artifactId>
 <scope>runtime</scope>
 <optional>true</optional>
 </dependency>
 
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 <optional>true</optional>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 </dependencies>
 
 | 
yml
主启动类
创建主启动类StreamMQMain8802

| 12
 3
 4
 5
 6
 7
 8
 
 | @SpringBootApplicationpublic class StreamMQMain8802 {
 
 public static void main(String[] args) {
 SpringApplication.run(StreamMQMain8802.class, args);
 }
 
 }
 
 | 
controller
创建ReceiveMessageListenerController

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 
 | @Component@EnableBinding(Sink.class)
 public class ReceiveMessageListenerController {
 @Value("${server.port}")
 private String serverPort;
 
 @StreamListener(Sink.INPUT)
 public void input(Message<String> message) {
 System.out.println("消费者1号,接受:" + message.getPayload() + "\t port:" + serverPort);
 }
 
 }
 
 | 
测试
启动7001 8801 8802 RabbitMQ

多次访问http://localhost:8801/sendMessage

8801

8802

