Spring Cloud Stream:消息驱动

  Spring Cloud Stream 是一个构建消息驱动微服务的框架,应用程序通过 input 或者 output 来与 Spring Cloud Stream 中 binder 交互,而 binder 负责与消息中间件交互。

开发环境版本
IDEA2018.2.6
JDK1.8
Spring Boot2.0.6
Spring CloudFinchley.SR2
Docker18.09.0
RabbitMQ3.7.8-management

特别注意:本系列纪要环环相扣,建议从第一节开始阅读 点击跳转

增加provider和consumer项目

添加依赖
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
添加RabbitMQ配置
spring:
  rabbitmq:
    username: guest
    password: guest
    host: localhost
    port: 5672

普通RabbitMQ使用方式

provider生产消息
@RestController
@RequestMapping("/hello")
public class HelloController {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @GetMapping("/send")
    public void sendMsg() {
        amqpTemplate.convertAndSend("myExchange", "myRoutingKey", "Hello MSG from provider");
    }
}
consumer消费消息
@Component
public class MQReceiver {
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange("myExchange"),
            key = "myRoutingKey",
            value = @Queue("myQueue")
    ))
    public void consume(String msg) {
        System.out.println("rabbit-" + msg);
    }
}

Stream生产和消费消息

provider生产消息

  provider项目增加配置:

spring:
  cloud:
    stream:
      bindings:
        output:
          group: provider
          destination: input
          content-type: application/json

  provider项目生产消息:

@RestController
@RequestMapping("/hello")
@EnableBinding(Source.class)
public class HelloController {
    @Autowired
    private Source source;

    @GetMapping("/stream")
    public void stream() {
        source.output().send(MessageBuilder.withPayload("Stream Hello MSG from provider").build());
    }
}
consumer消费消息

  consumer项目增加配置:

spring:
  cloud:
    stream:
      bindings:
        input:
          group: consumer
          destination: input
          content-type: application/json

  consumer项目消费消息:

@Component
@EnableBinding(Sink.class)
public class MQReceiver {
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("stream:" + message.getPayload());
    }
}

写在后面

  不难发现,通过Stream可以极大简化消息队列的使用,不管是RabbitMQ还是Kafka,我们只需要关注Stream的使用,如此解耦也可以极大方便我们项目的消息中间件迁移。当然,本系列《Spring Cloud 学习纪要》小博只记录最简洁的入门Demo,更高阶的使用方法和技巧心得相信聪明的你能够自己掌握。

附件

本系列纪要博客源码:跳转到github
本系列纪要博客配置文件:跳转到github

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐