一,Stream消息驱动的概述
一,Stream消息驱动产生的背景
市面上常用的消息驱动有:
RabbitMQ
Kafka
RocketMQ
ActiveMQ
假如现在有一个微服务系统使用到了RabbitMQ作为消息中间件,并且集成了大数据分析使用Kafka进行数据推送,你们在消息中间件的使用切换维护都需要单独进行,假如一个系统需要四种消息中间件那么开发人员就必须全部掌握,这学习成本高,且开发和维护麻烦
那么有没有一种新的技术,让开发者不需要关注MQ的具体细节,有一种适配绑定的方式,自动的在各种MQ内切换
最Stream就诞生了,它屏蔽了底层消息中间件的差异,提供了统一的消息编程模型
通过使用Stream的API可以操作MQ,而不需要具体某个MQ的API操作,极大的提供了开发效率
降低了学习成本
Stream类似于JDBC,它们都提供了统一的编程模型,JDBC切换数据(如:Mysql,Oracle)只需要切换具体的数据库Drive驱动即可,编程方式并没有区别,一个模型可以适配很多数据库
二,Stream的简介
SpringCloud Stream是一个构建消息驱动微服务架构
应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互,通过配置来Binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动,Spring Cloud Stream
为一些供应的消息中间件产品提供个性化的自动化配置,引用了发布-订阅,消费组,分区的三个核心概念
目前Stream仅支持RabbitMQ,Kafka
二,Stream的设计思想
标准的MQ:
生产者
Exchange交换机
Queue队列
Message:消息媒介传递消息
Channel通道
消费者
Stream的传递过程:比如现在用到RabbitMQ和Kafka,由于两个消息中间件的架构不同,如:RabbitMQ有Exchange和Queue,Kafka使用Topic和Partitions分区,中间就需要Stream进行适配和解耦
假如没有Stream,这两中间件的差异会导致项目实际开发困难,如果用两个消息队列的其中一种,后面的业务需求想往另一个消息队列进行迁移,会非常麻烦,一堆的配置和逻辑需要出现改造,因为跟系统的耦合太严重,这个时候Springcloud Stream给我们提供了一种解决途径
通过定义绑定器Bindder作为中间层,实现应用程序与消息中间件的细节之间的隔离
三,Stream的注解
Binder:连接中间件屏蔽差异
Channel:通道,类似于MQ种的队列+Channel,在消息通信系统种实现存储和转发
组成
说明
Middeware
中间件,目前只支持RabbitMQ和Kafka
Binder
Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder通过Binder可以很方便的连接中间件,可以动态的改变消息类型等
@Input
标识输入通道,通过,通过该输入通道接收消息进入应用程序
@Output
标识输出通道,发布消息将通过该通道离开应用程序
@StreamListener
监听队列,用于消费者的队列消息接收
@EnableBinding
指信道Channel和Exchange绑定在一起
四,Stream的配置使用
一,生产者端的配置使用
① maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
② application配置
server:
port: 8090
spring:
application:
name: cloud-stream-rabbitmq-provider
cloud:
stream:
binders: #配置要绑定的具体MQ的服务信息(这里使用RabbitMQ)
defaultRabbit: #表示组件名称
type: rabbit #mq的类型,这里为rabbit
environment: #环境配置
spring:
rabbitmq: #配置mq的连接信息
host: 192.168.68.133
port: 5672
username: admin
password: 123
bindings: #服务的整合处理
output: #通道名称,这里是生产者使用输出
destination: streamExchange #表示约定好的Exchange名称,没有就默认创建
content-type: application/json #设置信息的类型这里是json
binder: defaultRabbit #设置要绑定消息服务的具体设置
eureka:
client:
fetch-registry: true
register-with-eureka: true #是否将自己注册进Eureka Server
service-url:
defaultZone: http://eureka1.com:9090/eureka/,http://eureka2.com:9091/eureka/
management:
endpoints:
web:
exposure:
include: "*"
③ main启动类
@SpringBootApplication
@EnableEurekaClient
public class CloudStreamMain {
public static void main(String[] args) {
SpringApplication.run(CloudStreamMain.class,args);
}
}
④ service
@Slf4j
@EnableBinding(Source.class) //定义消息的推送管道,生产端使用Source(固定写法),注意引入的包是Stream提供的
public class MessageProviderServiceImpl implements MessageProviderService {//接口是自定义的
@Resource
MessageChannel output; //消息发送的管道
@Override
public String send() {
String uuid = "WQL-Cloud-Stream:"+UUID.randomUUID().toString();
//发送消息
boolean send = output.send(MessageBuilder.withPayload(uuid).build());
if (send){
log.info("发送成功:"+uuid);
return "发送成功";
}
return "发送失败";
}
}
⑤ controller
@RestController
public class SendMessageController {
@Autowired
MessageProviderServiceImpl messageProviderService;
@GetMapping(value = "/sentmessage")
public String sent(){
String send = messageProviderService.send();
return send;
}
测试:
查看Rabbitmq:
二,消费端的配置和使用
配置两个消费者端,修改一下端口和服务名称即可,其他配置一样
① maven依赖(和生产者是一样的)
② application配置
server:
port: 8092
spring:
application:
name: cloud-stream-rabbitmq-consumer2
cloud:
stream:
binders: #配置要绑定的具体MQ的服务信息(这里使用RabbitMQ)
defaultRabbit: #表示组件名称
type: rabbit #mq的类型,这里为rabbit
environment: #环境配置
spring:
rabbitmq: #配置mq的连接信息
host: 192.168.68.133
port: 5672
username: admin
password: 123
bindings: #服务的整合处理
input: #通道名称,这里是消费者使用输入 生产者和消费者的区别
destination: streamExchange #表示约定好的Exchange名称,没有就默认创建
content-type: application/json #设置信息的类型这里是json
binder: defaultRabbit #设置要绑定消息服务的具体设置
eureka:
client:
fetch-registry: true
register-with-eureka: true #是否将自己注册进Eureka Server
service-url:
defaultZone: http://eureka1.com:9090/eureka/,http://eureka2.com:9091/eureka/
management:
endpoints:
web:
exposure:
include: "*"
③ main启动类(和生产者一样)
④ service
@Service
@Slf4j
@EnableBinding(Sink.class)//服务消费者,消费者使用Sink.class
public class StreamConsumerService {
@Value("${server.port}")
String port;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
String payload = "服务消费者1:"+port+" ----> "+message.getPayload();
log.info(payload);
}
}
测试:生产者发送消息
消息被重复消费
五,Stream重复消费
Stream使用分组来解决重复消费和持久化问题
在RabbitMQ中有五种工作模式,通过消费者绑定同一个Queue来解决消息重复消费的问题,Stream的Group也是一样的,通过使用同一个Group来解决重复消费的,在Stream-Rabbitmq中Group底层也是通过Queue来实现的
重复消费和不重复消费解决的是应用场景的问题,假如是订单系统肯定是不能重复消费的,假如是系统通知就是需要重复通知消费的
group配置:
cloud:
stream:
binders: #配置要绑定的具体MQ的服务信息(这里使用RabbitMQ)
defaultRabbit: #表示组件名称
type: rabbit #mq的类型,这里为rabbit
environment: #环境配置
spring:
rabbitmq: #配置mq的连接信息
host: 192.168.68.133
port: 5672
username: admin
password: 123
bindings: #服务的整合处理
input: #通道名称,这里是消费者使用输入
destination: streamExchange #表示约定好的Exchange名称,没有就默认创建
content-type: application/json #设置信息的类型这里是json
binder: defaultRabbit #设置要绑定消息服务的具体设置
group: streamgroup #设置分组
注:只要配置了group那么就会开启持久化
Comments | NOTHING
Warning: Undefined variable $return_smiles in /www/wwwroot/wql_luoqin_ltd/wp-content/themes/Sakura/functions.php on line 1109