07-RabbitMQ之SpringCloudStream集成
SpringCloudStream是一个构建高扩展和事件驱动的微服务系统的框架,用于连接共有消息系统,整体上是把各种MQ产品抽象成了一套非常简单的统一的编程框架,以实现事件驱动的编程模型。社区官方实现了RabbitMQ,Apache Kafka,Kafka Stream和Amazon Kinesis等产品,RocketMQ需要产品方自行提供扩展实现。...
一、SpringCloudStream简介
官网地址:https://spring.io/projects/spring-cloud-stream
SpringCloudStream是一个构建高扩展和事件驱动的微服务系统的框架,用于连接共有消息系统,整体上是把各种MQ产品抽象成了一套非常简单的统一的编程框架,以实现事件驱动的编程模型。社区官方实现了RabbitMQ,Apache Kafka,Kafka Stream和Amazon Kinesis等产品,RocketMQ需要产品方自行提供扩展实现。
SpringCloudStream框架封装出了三个最基础的概念来对各种消息中间件提供统一的抽象:
- Destination Binders:负责集成外部消息系统的组件
- Destination Binding:由Binders创建的,负责沟通外部消息系统、消息发送者和消息消费者的桥梁
- Message:消息发送者与消息消费者沟通的简单数据结构
注意:使用非常简单,但是理解SpringCloudStream的各种概念模型比较困难。
二、引入依赖
RabbitMQ的SpringCloudStream支持是由Spring社区官网提供的,所以这也是相当成熟的一种集成方案。但是要注意,SpringCloudStream框架集成的版本通常是比RabbitMQ产品本身落后几个版本的,使用时需要注意。以下两个依赖没有太大区别
<dependency>
<groupId>org.springframework.cloud</groupId>
<!-- artifactId>spring-cloud-starter-stream-rabbit</artifactId -->
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
三、基础使用方法
1、声明Sink消息消费者
@Component
@EnableBinding(Sink.class)
public class MessageReceiver {
private Logger logger = LoggerFactory.getLogger(MessageReceiver.class);
@EventListener
@StreamListener(Sink.INPUT)
public void process(Object message) {
System.out.println("received message : " + message);
logger.info("received message : {}", message);
}
}
2、使用Source消息生产者发送消息
@Component
@EnableBinding(Source.class)
public class MessageSender {
@Autowired
private Source source;
public void sendMessage(Object message) {
MessageBuilder<Object> builder = MessageBuilder.withPayload(message);
source.output().send(builder.build());
}
}
3、在application.properties文件中,增加配置
server.port=8080
spring.cloud.stream.bindings.output.destination=streamExchange
spring.cloud.stream.bindings.input.destination=streamExchange
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain
4、添加测试Controller
@RestController
public class SendMessageController {
@Autowired
private Source source;
@GetMapping("/send")
public Object send(String message) {
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
source.output().send(messageBuilder.build());
return "message sended : " + message;
}
}
5、访问http://localhost:8080/send?message=123控制台即可收到消息
更多推荐
所有评论(0)