Stream消息驱动

发布于 2022-04-19  2.76k 次阅读


一,Stream消息驱动的概述

一,Stream消息驱动产生的背景

市面上常用的消息驱动有:

  • RabbitMQ
  • Kafka
  • RocketMQ
  • ActiveMQ

假如现在有一个微服务系统使用到了RabbitMQ作为消息中间件,并且集成了大数据分析使用Kafka进行数据推送,你们在消息中间件的使用切换维护都需要单独进行,假如一个系统需要四种消息中间件那么开发人员就必须全部掌握,这学习成本高,且开发和维护麻烦

那么有没有一种新的技术,让开发者不需要关注MQ的具体细节,有一种适配绑定的方式,自动的在各种MQ内切换

最Stream就诞生了,它屏蔽了底层消息中间件的差异,提供了统一的消息编程模型

通过使用Stream的API可以操作MQ,而不需要具体某个MQ的API操作,极大的提供了开发效率

降低了学习成本

Stream类似于JDBC,它们都提供了统一的编程模型,JDBC切换数据(如:Mysql,Oracle)只需要切换具体的数据库Drive驱动即可,编程方式并没有区别,一个模型可以适配很多数据库

二,Stream的简介

 SpringCloud Stream是一个构建消息驱动微服务架构

应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互,通过配置来Binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动,Spring Cloud Stream

为一些供应的消息中间件产品提供个性化的自动化配置,引用了发布-订阅,消费组,分区的三个核心概念

目前Stream仅支持RabbitMQ,Kafka

二,Stream的设计思想

标准的MQ:

  • 生产者
  • Exchange交换机
  • Queue队列
  • Message:消息媒介传递消息
  • Channel通道
  • 消费者

Stream的传递过程:比如现在用到RabbitMQ和Kafka,由于两个消息中间件的架构不同,如:RabbitMQ有Exchange和Queue,Kafka使用Topic和Partitions分区,中间就需要Stream进行适配和解耦

假如没有Stream,这两中间件的差异会导致项目实际开发困难,如果用两个消息队列的其中一种,后面的业务需求想往另一个消息队列进行迁移,会非常麻烦,一堆的配置和逻辑需要出现改造,因为跟系统的耦合太严重,这个时候Springcloud Stream给我们提供了一种解决途径

通过定义绑定器Bindder作为中间层,实现应用程序与消息中间件的细节之间的隔离

三,Stream的注解

  1. Binder:连接中间件屏蔽差异
  2. Channel:通道,类似于MQ种的队列+Channel,在消息通信系统种实现存储和转发
组成 说明
Middeware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder通过Binder可以很方便的连接中间件,可以动态的改变消息类型等
@Input 标识输入通道,通过,通过该输入通道接收消息进入应用程序
@Output 标识输出通道,发布消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列消息接收
@EnableBinding 指信道Channel和Exchange绑定在一起

四,Stream的配置使用

一,生产者端的配置使用

① maven依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-devtools</artifactId>
</dependency>

<dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     <scope>provided</scope>
</dependency>

<dependency>
     <groupId>org.springframework.cloud</groupId>
     <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
     <groupId>org.springframework.cloud</groupId>
     <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

② application配置

server:
  port: 8090
spring:
  application:
    name: cloud-stream-rabbitmq-provider
  cloud:
    stream:
      binders: #配置要绑定的具体MQ的服务信息(这里使用RabbitMQ)
        defaultRabbit: #表示组件名称
          type: rabbit #mq的类型,这里为rabbit
          environment: #环境配置
            spring:
              rabbitmq: #配置mq的连接信息
                host: 192.168.68.133
                port: 5672
                username: admin
                password: 123
      bindings: #服务的整合处理
        output: #通道名称,这里是生产者使用输出
          destination: streamExchange #表示约定好的Exchange名称,没有就默认创建
          content-type: application/json #设置信息的类型这里是json
          binder: defaultRabbit #设置要绑定消息服务的具体设置
eureka:
  client:
    fetch-registry: true
    register-with-eureka: true #是否将自己注册进Eureka Server
    service-url:
      defaultZone: http://eureka1.com:9090/eureka/,http://eureka2.com:9091/eureka/

management:
  endpoints:
    web:
      exposure:
        include: "*"

③ main启动类

@SpringBootApplication
@EnableEurekaClient
public class CloudStreamMain {

    public static void main(String[] args) {
        SpringApplication.run(CloudStreamMain.class,args);
    }
}

④ service

@Slf4j
@EnableBinding(Source.class) //定义消息的推送管道,生产端使用Source(固定写法),注意引入的包是Stream提供的
public class MessageProviderServiceImpl implements MessageProviderService {//接口是自定义的

    @Resource
    MessageChannel output; //消息发送的管道

    @Override
    public String send() {
        String uuid = "WQL-Cloud-Stream:"+UUID.randomUUID().toString();
        //发送消息
        boolean send = output.send(MessageBuilder.withPayload(uuid).build());
        if (send){
            log.info("发送成功:"+uuid);
            return "发送成功";
        }
        return "发送失败";
    }
}

⑤ controller

@RestController
public class SendMessageController {

    @Autowired
    MessageProviderServiceImpl messageProviderService;

    @GetMapping(value = "/sentmessage")
    public String sent(){
        String send = messageProviderService.send();
        return send;
    }

测试:

查看Rabbitmq:

二,消费端的配置和使用

配置两个消费者端,修改一下端口和服务名称即可,其他配置一样

① maven依赖(和生产者是一样的)

② application配置

server:
  port: 8092
spring:
  application:
    name: cloud-stream-rabbitmq-consumer2
  cloud:
    stream:
      binders: #配置要绑定的具体MQ的服务信息(这里使用RabbitMQ)
        defaultRabbit: #表示组件名称
          type: rabbit #mq的类型,这里为rabbit
          environment: #环境配置
            spring:
              rabbitmq: #配置mq的连接信息
                host: 192.168.68.133
                port: 5672
                username: admin
                password: 123
      bindings: #服务的整合处理
        input: #通道名称,这里是消费者使用输入 生产者和消费者的区别
          destination: streamExchange #表示约定好的Exchange名称,没有就默认创建
          content-type: application/json #设置信息的类型这里是json
          binder: defaultRabbit #设置要绑定消息服务的具体设置
eureka:
  client:
    fetch-registry: true
    register-with-eureka: true #是否将自己注册进Eureka Server
    service-url:
      defaultZone: http://eureka1.com:9090/eureka/,http://eureka2.com:9091/eureka/


management:
  endpoints:
    web:
      exposure:
        include: "*"

③ main启动类(和生产者一样)

④ service

@Service
@Slf4j
@EnableBinding(Sink.class)//服务消费者,消费者使用Sink.class
public class StreamConsumerService {
    @Value("${server.port}")
    String port;


    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){


        String payload = "服务消费者1:"+port+" ----> "+message.getPayload();
        log.info(payload);
    }
}

测试:生产者发送消息

消息被重复消费

五,Stream重复消费

Stream使用分组来解决重复消费和持久化问题

   在RabbitMQ中有五种工作模式,通过消费者绑定同一个Queue来解决消息重复消费的问题,Stream的Group也是一样的,通过使用同一个Group来解决重复消费的,在Stream-Rabbitmq中Group底层也是通过Queue来实现的

重复消费和不重复消费解决的是应用场景的问题,假如是订单系统肯定是不能重复消费的,假如是系统通知就是需要重复通知消费的

group配置:

cloud:
  stream:
    binders: #配置要绑定的具体MQ的服务信息(这里使用RabbitMQ)
      defaultRabbit: #表示组件名称
        type: rabbit #mq的类型,这里为rabbit
        environment: #环境配置
          spring:
            rabbitmq: #配置mq的连接信息
              host: 192.168.68.133
              port: 5672
              username: admin
              password: 123
    bindings: #服务的整合处理
      input: #通道名称,这里是消费者使用输入
        destination: streamExchange #表示约定好的Exchange名称,没有就默认创建
        content-type: application/json #设置信息的类型这里是json
        binder: defaultRabbit #设置要绑定消息服务的具体设置
        group: streamgroup #设置分组

注:只要配置了group那么就会开启持久化