消息驱动之生产者
新建cloud-stream-rabbitmq-provider8801
模块

pom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
yml
主启动类
创建主启动类StreamMQMain8801

1 2 3 4 5 6
| @SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }
|
services
创建IMessageProvider
接口

1 2 3
| public interface IMessageProvider { String send(); }
|
发送消息接口实现类MessageProviderImpl

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @EnableBinding(Source.class) public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output;
@Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("*****serial: "+serial); return null; } }
|
controller
创建SendMessageController

1 2 3 4 5 6 7 8 9 10 11 12 13
| @RestController public class SendMessageController { @Resource private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.send(); }
}
|
测试
启动7001 8801,启动RabbitMQ

多次访问http://localhost:8801/sendMessage
控制台

RabbitMQ界面http://localhost:15672/#/

成功!
消息驱动之生产者
新建模块cloud-stream-rabbitmq-consumer8802
pom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| <dependencies>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
yml
主启动类
创建主启动类StreamMQMain8802

1 2 3 4 5 6 7 8
| @SpringBootApplication public class StreamMQMain8802 {
public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class, args); }
}
|
controller
创建ReceiveMessageListenerController

1 2 3 4 5 6 7 8 9 10 11 12
| @Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort;
@StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消费者1号,接受:" + message.getPayload() + "\t port:" + serverPort); }
}
|
测试
启动7001 8801 8802 RabbitMQ

多次访问http://localhost:8801/sendMessage

8801

8802

