1、什么是Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot 来创建独立的,可用于生产的Spring 应用程序。他通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。

什么是Spring Integration  ? Integration  集成

企业应用集成(EAI)是集成应用之间数据和服务的一种应用技术。四种集成风格:

    1. 文件传输:两个系统生成文件,文件的有效负载就是由另一个系统处理的消息。该类风格的例子之一是针对文件轮询目录或FTP目录,并处理该文件。
    2. 共享数据库:两个系统查询同一个数据库以获取要传递的数据。一个例子是你部署了两个EAR应用,它们的实体类(JPA、Hibernate等)共用同一个表。
    3. 远程过程调用:两个系统都暴露另一个能调用的服务。该类例子有EJB服务,或SOAP和REST服务。
    4. 消息:两个系统连接到一个公用的消息系统,互相交换数据,并利用消息调用行为。该风格的例子就是众所周知的中心辐射式的(hub-and-spoke)JMS架构。

Spring Integration作为一种企业级集成框架,遵从现代经典书籍《企业集成模式》,为开发者提供了一种便捷的实现模式。Spring Integration构建在Spring控制反转设计模式之上,抽象了消息源和目标,利用消息传送和消息操作来集成应用环境下的各种组件。消息和集成关注点都被框架处理,所以业务组件能更好地与基础设施隔离,从而降低开发者所要面对的复杂的集成职责。

2、核心概念

2.1、应用模型

Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input和output通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。

业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。

Spring Cloud Stream应用

2.2、绑定器

通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通过,是的应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。

目前只提供了RabbitMQ和Kafka的Binder实现

2.3、发布-订阅

应用间通信遵照发布-订阅模型,消息通过共享主题进行广播。下图所示,显示了交互的Spring Cloud Stream 应用的典型布局。

Spring Cloud Stream发布订阅模型

未处理的传感数据发布到raw-sensor-data的Topic进行广播,Averages 和IngestHDFS同时订阅了此消息,收到消息后触发自身的处理逻辑。

Topic可能对应不同的概念,在RabbitMQ表示的是Exchange,Kafka中对应Topic


2.4、消费组

由于发布-订阅模型使得共享主题的应用之间连接更简便,创建给定应用的不同实例来进行弹性扩张的能力也同样重要。如果存在多个应用实例,那么同一应用的额不同实例便会成为相互竞争的消费者,其中应该只有一个实例处理给定消息。 
Spring Cloud Stream通过消费者组的概念给这种情况进行建模。每一个单独的消费者可以使用spring.cloud.stream.bindings.input.group属性来指定一个组名字。下图中展示的消费者们,这一属性被设置为spring.cloud.stream.bindings.input.group=hdfsWrite或者spring.cloud.stream.bindings.input.group=average。  

所有订阅给定目标的组都会收到发布消息的一个拷贝,但是每一个组内只有一个成员会收到该消息。默认情况下,如果没有指定组,Spring Cloud Stream 会将该应用指定给一个匿名的独立的单成员消费者组,后者与所有其他组都处于一个发布-订阅关系中。

Spring Cloud Stream消费者组

2.5、消息分区

Spring Cloud Stream对给定应用的多个实例之间分隔数据予以支持。在分隔方案中,物理交流媒介(如:代理主题)被视为分隔成了多个片(partitions)。一个或者多个生产者应用实例给多个消费者应用实例发送消息并确保相同特征的数据被同一消费者实例处理。 
Spring Cloud Stream对分割的进程实例实现进行了抽象。使得Spring Cloud Stream 为不具备分区功能的消息中间件(RabbitMQ)也增加了分区功能扩展。

Spring Cloud Stream Partitioning

3、使用 Spring Cloud Stream

3.1、添加依赖


< dependency >
     < groupId >org.springframework.cloud</ groupId >
     < artifactId >spring-cloud-starter-stream-rabbit</ artifactId >
</ dependency >


3.2、创建消费者

Sink:input通道接口

Source:output通道接口

Processor:包含input、output

@EnableBinding (value = {Sink. class }) //用来指定一个或多个定义了@Input 或者 @Output注解的接口,实现对消息通道的绑定。Sink接口是默认输入消息通道绑定接口
public  class  SinkReceiver {
 
     private  static  Logger logger = LoggerFactory.getLogger(SinkReceiver. class );
 
     @StreamListener (Sink.INPUT) //将被修饰的方法注册为消息中间件上数据流的事件监听器,
     public  void  receive(Object payload) {
         logger.info( "Received: "  + payload);
     }
}

3.3、启动应用,并在rabbirMQ控制台向绑定的队列发一条消息

控制台会打印收到的消息


4、绑定消息通道

4.2、创建消费者,并绑定通道

@EnableBinding (value = {Sink. class , SinkSender. class })  //value参数指定绑定消息通道的接口 ,比如 Sink.class 有@Input注解的接口, SinkSender.class中有@Output注解的接口,应用启动时会实现对消息通道的绑定
public  class  SinkReceiver {
 
     private  static  Logger logger = LoggerFactory.getLogger(SinkReceiver. class );
 
     @StreamListener (Sink.INPUT)  // "input" 通道监听
     public  void  receive(Object payload) {
         logger.info( "Received: "  + payload);
     }
}

4.2、创建生产者接口

//SinkSender  接口在SinkReceiver  已经进行过绑定了
public  interface  SinkSender {
 
     @Output (Sink.INPUT)  // input通道输出
     MessageChannel output();
 
     @Output ( "input2" // input2通道输出,测试直接注解 @HelloApplicationTest2
     MessageChannel output2();
}

4.3、创建测试类1,测试绑定的通道

@RunWith (SpringJUnit4ClassRunner. class )
@SpringApplicationConfiguration (classes = MetadataServerApplication. class )
@WebAppConfiguration
public  class  HelloApplicationTest {
 
     @Autowired
     private  SinkSender sinkSender;
 
     @Test
     public  void  contextLoads() {
         //output方法注解绑定的是"input"通道
         sinkSender.output().send(MessageBuilder.withPayload( "**************From SinkSender" ).build());
     }
}

4.4、创建测试类2,测试绑定接口的注入

@RunWith (SpringJUnit4ClassRunner. class )
@SpringApplicationConfiguration (classes = MetadataServerApplication. class )
@WebAppConfiguration
public  class  HelloApplicationTest2 {
 
     @Autowired  // 通过直接注入的方式注入通道,input Sink.INPUT
     private  MessageChannel input;
     // 当有多个通道时,可通过名称指定
     @Autowired
     @Qualifier ( "input2" // 通过直接注入的方式注入通道,input Sink.INPUT
     private  MessageChannel inputtest;
 
     @Test
     public  void  contextLoads() {
         input.send(MessageBuilder.withPayload( "**************From SinkSender" ).build()); // 发到input通道,有消费者
         inputtest.send(MessageBuilder.withPayload( "#################From SinkSender" ).build()); // 发到input2通道,无消费者
     }
}

5、Spring integration支持

5.1     @ServiceActivator 和 @InboundChannelAdapter

@ServiceActivator注解 和 @StreamListener 都实现了对消息的监听,ServiceActivator 没有内置消息转换,需要自己实现转换

@StreamListener 不需要自己实现,只需要在配置文件增加spring.cloud.stream.bindings.input.content-type=application/json 属性(默认支持json,json格式的可以不用配置)

详细内容可运行demo:metadata-rabbit-stream-integration-consumer和metadata-rabbit-stream-integration-server

6、消息反馈

@SendTo消息消费之后可以指定输出通道

新建消费者,同时指定反馈通道

/**
  *
  *
  * @author cuiyt
  * @date: 2017年6月14日 上午9:10:46
  */
@EnableBinding (value = {SendToBinder. class })  // 绑定输入和输出
public  class  SendToReceiver {
 
     private  static  Logger logger = LoggerFactory.getLogger(SendToReceiver. class );
 
     @StreamListener (SendToBinder.INPUT)  // 监听input通道
     @SendTo (SendToBinder.OUTPUT)  // 返回处理结果到output通道
     public  Car receiveFromInput(Car car) {
         logger.info( "SendToReceiver Received: "  + car.getStatus());
         Car newCar =  new  Car();
         newCar.setNo(car.getNo());
         newCar.setStatus(car.getStatus() +  1 );
         return  newCar;
     }
 
     public  interface  SendToBinder {
 
         String INPUT =  "sendtoInput" ;
         String OUTPUT =  "sendtoOutput" ;
 
         @Output (OUTPUT)
         SubscribableChannel output();
 
         @Input (INPUT)
         SubscribableChannel input();
     }
}


新建生产者,同时监听消费反馈的通道

/**
  *
  * @author cuiyt
  * @date 2017年6月14日
  *
  */
@EnableBinding (value = {SendToBinderReverse. class })
public  class  SendToSender {
     private  static  Logger logger = LoggerFactory.getLogger(SendToSender. class );
 
     @Bean
     @InboundChannelAdapter (value = SendToBinderReverse.OUTPUT, poller =  @Poller (fixedDelay =  "3000" ))
     public  MessageSource<String> sendToTest() {
         return  () ->  new  GenericMessage<>( "{\"no\":\"abc\",\"status\":1}" );
     }
 
     @StreamListener (SendToBinderReverse.INPUT)  // 监听input通道
     public  void  receiveFromInput(Car car) {
         logger.info( "SendToSender Received: "  + car.getStatus());
     }
 
     public  interface  SendToBinderReverse {
         String OUTPUT =  "sendtoInput" ; // 消费者绑定的监听通道为sendtoInput,所以生产者的output通道指定为sendtoInput
         String INPUT =  "sendtoOutput" ; // 消费者反馈信息的通道为sendtoOutput,所以生产者的input通道指定为sendtoOutput
 
         @Output (OUTPUT)
         SubscribableChannel output();
 
         @Input (INPUT)
         SubscribableChannel input();
     }
}


启动消费者、生产者,消费者收到消息之后将Car对象中的status加1,并将消息发送到sendTo通道

生产者收到反馈信息后,打印status为2

7、消费组和分区

7.1 消费组

当一个服务启动多个实例的时候,这些实例会绑定到同一个消息通道的主题上,默认情况下,生产者发送一条消息之后,这条消息会产生多个副本被每个消费者实例接收。

有些业务场景下,我们希望生产者产生的消息只被其中一个实例消费,可以通过设置消费组来实现这样的功能。在消费端配置spring.cloud.stream.binddings.input.group属性即可

以上面代码为基础

在消费者设置消费组,并指定主题

#设置@Input的sendtoInput通道消费组为sendToGroup
spring.cloud.stream.bindings.sendtoInput.group=sendToGroup
#设置@Input的sendtoInput通道主题为sendToDes
spring.cloud.stream.bindings.sendtoInput.destination=sendToDes
#设置@Output的sendtoOutput通道主题为sendtoOutput
spring.cloud.stream.bindings.sendtoOutput.destination=sendToDesOut

在生产者指定输出通道和输入通道的主题

#生产者@Output绑定的是sendtoInput
spring.cloud.stream.bindings.sendtoInput.destination=sendToDes
#生产者@Input绑定的是sendtoOutput
spring.cloud.stream.bindings.sendtoOutput.destination=sendToDesOut

分别指定不同的端口号启动两个消费者,一个生产者,可以发现每个消费者变为6秒接收一次

7.2 消息分区

特定的场景下,还希望具备相同特征的消息只被同一个实例进行消费,可以通过分区来实现。

消费者开启分区,指定实例数量及本实例索引

#开启消费分区
spring.cloud.stream.bindings.sendtoInput.consumer.partitioned=true
#实例数量
spring.cloud.stream.instanceCount=2
#实例索引
spring.cloud.stream.instanceIndex=1

生产者指定分区键

#分区键
spring.cloud.stream.bindings.sendtoInput.producer.partitionKeyExpression=payload.no
#分区数量
spring.cloud.stream.bindings.sendtoInput.producer.partitionCount=2


启动两个消费者,及一个生产者,因为我们指定了payload.no作为分区键,每次发送的car.no为相同值abc,则消息只会被一个消费者消费

消费者A每隔3秒收到消息

消费者B不再收到SendToReceiver Received:1 的消息

8、demo源码

配置详解:https://www.springcloud.cc/spring-cloud-dalston.html#_spring_cloud_stream

stream.rar

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐