Spring Cloud Stream基本使用
Spring cloud stream是为构建微服务消息驱动而产生的一种框架。Spring Cloud Stream基于Spring boot的基础上,可创建独立的、生产级别的Spring应用,并采用Spring Integration来连接消息中间件提供消息事件驱动。Spring Cloud Stream为不同的消息中间件提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概.
·
Spring cloud stream是为构建微服务消息驱动而产生的一种框架。Spring Cloud Stream基于Spring boot的基础上,可创建独立的、生产级别的Spring应用,并采用Spring Integration来连接消息中间件提供消息事件驱动。Spring Cloud Stream为不同的消息中间件提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前支持两种消息队列:
应用模型
Spring Cloud Stream应用程序由一个中间件中立核心组成。应用程序通过Spring Cloud Stream注入的input和output通道与外界进行通信。通道通过中间件特定的绑定器实现连接到外部代理。
使用步骤
1、引用依赖,修改配置文件
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
spring:
rabbitmq:
host: 192.168.121.131
port: 5672
username: guest
password: guest
stream:
# 增加该配置,对队列进行分组。保证一个服务只有一台实例接受到消息。
bindings:
# 监听的消息队列的名称。
testMessage:
# 服务的名称
group: order
# 将发送的对象消息转化为json,方便调试 默认是bast64加密字符串
content-type: application/json
2、自定义INPUT和OUTPUT接口
public interface StreamRabbitMq {
String INPUT1_NAME = "input1";
String INPUT2_NAME = "input2";
@Input(StreamRabbitMq.INPUT1_NAME)
SubscribableChannel input();
@Output(StreamRabbitMq.INPUT2_NAME)
MessageChannel output();
}
3、消息接收端
@Component
@EnableBinding(StreamRabbitMq.class)
public class StreamReceive {
@StreamListener(value = StreamRabbitMq.INPUT2_NAME)
@SendTo(value = StreamRabbitMq.INPUT1_NAME)
public String processByStreamInput1(String message){
System.out.println("processByStreamInput1 " + message);
return "send input2";
}
@StreamListener(value = StreamRabbitMq.INPUT1_NAME)
public void processByStreamInput2(String message){
System.out.println("processByStreamInput2 " + message);
}
}
其中@EnableBinding注解指定一个或多个定义了 @Input或 @Output注解的接口,以此实现对消息通道(Channel)的绑定。
@SendTo用来指接收到消息后确认发送给另一个接收端
4、消息发送端
@Autowired
private StreamRabbitMq streamRabbitMq;
@RequestMapping("/send")
public void process(){
streamRabbitMq.output().send(MessageBuilder.withPayload("now"+ new Date()).build());
}
启动日志输出:
相关参考
更多推荐
已为社区贡献1条内容
所有评论(0)