spring cloud stream

Author Avatar
丁起男 08月 23,2021
  • 在其它设备中阅读本文章

spring cloud stream

spring cloud stream是用于构建消息驱动微服务应用程序的框架。该框架提供了一个灵活的编程模型,该模型建立在已经成熟的spring习惯用法的基础上,它提供了来自多家供应商的中间件的合理配置,包括publish-subscribe,消息分组和消息分区处理的支持

spring cloud stream解决了开发人员无感知的使用消息中间件的问题,因为stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

核心概念

组成说明
Middleware中间件,支持rabbitmq和kafka
Binder目标绑定器,目标指的是mq。绑定器就是封装了目标中间件的包。如果操作的是kafka就使用spring-cloud-stream-binder-kafka,如果操作的是rabbitmq就使用spring-cloud-stream-binder-rabbit
@Input注解标识输入通道,接收的消息将通过该通道进入应用程序
@Output注解标识输出通道,发布的消息通过该通道离开应用程序
@SteamListener监听队列,消费者的队列的消息接收
@EnableBinding注解标识绑定,将信道和交换机绑定在一起

工作原理

通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的channel通道,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑

工作流程

  • 发送消息:producer->source->binder->mq
  • 消息监听:mq->binder->channel->sink->consumer

核心概念:

  • source:当需要发送消息时,我们就需要通过Source.java,它会把我们所需要发送的消息进行序列化(默认转换成json),然后将这些数据发送到channel中
  • sink:当我们需要监听消息时就需要通过Sink.java,它负责从消息通道中获取消息,并将消息返序列化成消息对象,然后交给具体的消息监听处理
  • channel:通常我们向消息中间件发送消息或监听消息时需要指定主题(topic)和消息队列名称,一旦我们需要变更主题的时候就需要修改消息发送或消息监听的代码。通过channel对象,我们的业务代码只需要对应channel就可以了,具体这个channel对应的是哪个主题,可以再配置文件中来指定,这样当主题变更的时候我们就不用对做代码的任何修改,从而实现了具体消息中间件的解耦
  • binder:通过不同的binder可以实现不同的消息中间件整合,binder提供统一的消息接收接口,从而使得我们可以根据实际需要部署不同的消息中间件,或者根据实际生产中所部署的消息中间件来调整我们的配置

入门应用

  1. 添加依赖

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
  2. 修改配置文件

    spring:
      cloud:
        stream:
          bindings:
            #消息发送通道
            output: #生产者是output,消费者是input 此值是和Source或Sink接口中,@Input或@Output注解的value值相同的
              destination:  #绑定的交换机名称
      # 消息队列配置
      rabbitmq:
        host: 
        port: 
        username: 
        password: 
        virtual-host: 
    
  3. 编写消息生产者

    @Component
    @EnableBinding(Source.class)
    public class MessageProducer {
    
        @Autowired
        private Source source;
    
        /**
         * 发送消息
         * @param message
         */
        public void send(String message){
            source.output()
                    .send(MessageBuilder
                            .withPayload(message)
                            .build());
        }
    }
    
  4. 编写消息消费者

    @Component
    @EnableBinding(Sink.class)
    public class MessageConsumer {
    
        /**
         * 接收消息
         */
        @StreamListener(Sink.INPUT)
        public void receive(String message){
            System.out.println("接收消息:"+message);
        }
    }
    
    

自定义消息通道

消息生产者:

  1. 编写自定义消息发布通道

    public interface MySource {
    
        String OUTPUT = "my_output";
    
        @Output(OUTPUT)
        MessageChannel output();
    }
    
  2. 修改配置文件

    spring:
        stream:
          bindings:
            #消息发送通道
            my_output: #和自定义的MySource的@Output()的value相同
              destination: 
    
  3. 修改消息生产者

    @Component
    @EnableBinding(MySource.class) //改为自己的
    public class MessageProducer {
    
        @Autowired
        private MySource source; //改为自己的
    
        ......
    }
    

消息消费者:

  1. 编写自定义消息接收通道

    public interface MySink {
    
        String INPUT = "my_input";
    
        @Input(INPUT)
        SubscribableChannel input();
    
    }
    
  2. 修改配置文件

    spring:
        stream:
          bindings:
            #消息发送通道
            my_input: #和自定义的MySink的@Input()的value相同
              destination: 
    
  3. 修改消息消费者

    @Component
    @EnableBinding(MySink.class) //改为自己的
    public class MessageConsumer {
    
        @StreamListener(MySink.INPUT) //改为自己的
        ......
    }
    

配置优化

springboot的原则是约定大于配置

spring cloud stream中@Input和@Ouput的value默认就是交换机的名称

public interface MySource {

    String OUTPUT = "default_message"; //自定义交换机名称

    @Output(OUTPUT)
    MessageChannel output();
}
public interface MySink {

    String INPUT = "default_message"; //自定义交换机名称

    @Input(INPUT)
    SubscribableChannel input();
}

配置文件中无需进行交换机名称的设置

消息分组

当有多个消费者时,每个消费者对应一个队列,生产者发送消息时都能收到,当需要让多个消费者争抢一个队列中的消息时需要使用消息分组

修改配置文件

spring:
  cloud:
    stream:
      bindings:
        #消息发送通道
        my_input:
          destination: 
          group: ga #分组名称

这样所以的组名相同的消费者就会监听同一个队列

消息分区

在消息分组的基础上,如果希望同一类型的消息被同一个消费者消费,就需要使用消息分区

消息生产者配置:

spring:
  cloud:
    stream:
      bindings:
        #消息发送通道
        output:
          destination: 
          producer:
            #payload:当有一个消费者消费了消息,后续消息也由它进行消费
            #headers:在生产消息时设置header
            pratition-key-expression: payload #配置分区键的表达式规则,payload或headers
            partitionn-count: 2 #配置消息分区的数量

消息消费者配置:

spring:
  cloud:
    stream:
      bindings:
        #消息发送通道
        input:
          destination: 
          group: 
          consumer:
            partitioned: true #开启分区支持
      instance-index:  #当前消费者的索引
      instance-count:  #当前消费者的总数