一,RabbitMQ死信队列
一,死信和死信队列
死信的概念:死信就是无法被消费的消息,一般problish将消息投递到brock或直接到queue,consumer从queue取出消息进行消费,但某些时候由特定的原因导致queue中的消息无法被消息,这样的消息如果没有后续的处理,就变成了死信
死信的处理方式:一般死信通过死信队列专门处理,而死信队列需要死信交换机绑定,进行死信消息的转发
死信的来源:
- 消息TTL过期(TTL:存活时间)
- 队列达到最大长度(队列以满,无法再添加的消息也为死信)
- 消息被消费者拒绝(basic.reject或basic.nack)
注:死信队列本质也保证消息可靠性的一种机制,根据业务而产生的,死信队列有多种应用场景
死信的作用:当消费者消费消息发生异常时,将消息保存到死信队列中防止消息被丢失
死信队列功能和消息应答机制的异同:
同:都是保证从queue到consumer消息的发送的可靠信
异:
- 消息应答机制保证了消费者在消费消息时,由于消费者宕机或者其他原因无法返回ack,消息重新入队
- 死信保证了消费者消息消息时,出现异常,导致消息无法被消费,出现的消息丢失
- 死信的更多是消费或队列本身存在问题入TTL过期,队列已满,消息应答更多的是消费者的问题
- 消息应答通过ACK,死信通过死信队列和死信交换机
二,死信的处理方案
处理流程:
- 定义死信交换机和死信队列
- 普通队列绑定死信交换机(x-dead-letter-exchange)
- 设置消息的TTL存活时间(可以队列设置也可以生产者端设置):方便测试(也可以不设置)
注:死信队列和死信交换机本质来说就是普通的队列和交换机,只是它保存着死信所以是死信队列和死信交换机
死信的案例代码:
设置:
- 设置TTL时间:队列参数设置:x-message-ttl,生产端设置:new AMQP.BasicProperties().builder().expiration("1000").build();
- 设置队列长度:队列参数设置:x-max-length
- 拒绝消息:消费者端设置:channel.basicNack()或者channel.basicReject();
①工具类:
public class dead_queue_util {
public Connection dead_queue_getConnection() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHandshakeTimeout(9999999);
//设置参数
factory.setHost("192.168.68.133");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("fq_wql");
factory.setPassword("123");
//创建连接
Connection connection = factory.newConnection();
return connection;
}
}
②生产者端口:
public class dead_publish {
//定义普通交换机和普通队列的名称
static String common_exchange = "com_exchange";
static String common_queue = "com_queue";
//定义死信交换机和死信队列的名称
static String dead_exchange = "dead_exchange";
static String dead_queue = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = new dead_queue_util().dead_queue_getConnection();
//获取channel逻辑逻辑
Channel channel = connection.createChannel();
//创建死信队列和死信交换机()
channel.exchangeDeclare(dead_exchange,BuiltinExchangeType.DIRECT,true,false,false,null);
channel.queueDeclare(dead_queue,true,false,false,null);
//创建普通交换机和普通队列
channel.exchangeDeclare(common_exchange, BuiltinExchangeType.DIRECT,true,false,false,null);
//通过Map加入参数
ConcurrentHashMap<String,Object> arguments = new ConcurrentHashMap();
//设置死信交换机
arguments.put("x-dead-letter-exchange",dead_exchange);
//设置死信Routing Key(使死信交换机直接将消息通过routingkey路由死信队列)
arguments.put("x-dead-letter-routing-key","deadqueue");
//方便测试设置消息的TTL(在发送时设置更好)
//arguments.put("x-message-ttl",10000);
//创建普通队列,并通过arguments参数绑定死信交换机
channel.queueDeclare(common_queue,true,false,false,arguments);
//绑定死信队列和死信交换机
channel.queueBind(dead_queue,dead_exchange,"deadqueue");
//绑定普通队列和普通交换机
channel.queueBind(common_queue,common_exchange,"comqueue");
//循环发送20条消息并设置TTL时间为10000
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("1000").build();
for (int a=1;a<=20;a++){
String message = "mes = "+a;
channel.basicPublish(common_exchange,"comqueue",basicProperties,message.getBytes());
}
//关闭连接
// channel.close();
// connection.close();
}
}
③普通消费者:
//消费普通队列
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = new dead_queue_util().dead_queue_getConnection();
//获取channel逻辑逻辑
Channel channel = connection.createChannel();
//成功回调
DeliverCallback deliverCallback = (ConsumerTag,Message)->{
//打印消息
System.out.println("消费普通队列:"+new String(Message.getBody(),"UTF-8"));
};
//失败回调
CancelCallback cancelCallback = (ConsumerTag)->{
};
//消费普通队列
channel.basicConsume("com_queue",false,deliverCallback,cancelCallback);
}
④死信消费者:
//消费死信队列队列
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = new dead_queue_util().dead_queue_getConnection();
//获取channel逻辑逻辑
Channel channel = connection.createChannel();
//成功回调
DeliverCallback deliverCallback = (ConsumerTag, Message)->{
//打印消息
System.out.println("消费死信队列:"+new String(Message.getBody(),"UTF-8"));
};
//失败回调
CancelCallback cancelCallback = (ConsumerTag)->{
};
//消费普通队列
channel.basicConsume("dead_queue",false,deliverCallback,cancelCallback);
}
二,RabbitMQ的TTL延迟队列
一,TTL延迟队列的概述
延迟队列的概念:延迟队列用来存放需要指定时间被处理的消息队列,队列中的消息在指定时间以后或之前进行取出消费
延迟的队列的使用场景:在不同系统中应用的都非常广泛
- 订单系统:订单在15分钟之内没有支付则自动取消
- 登录注册模块:如果注册在60秒内未进行验证码效验,进行需要重新发送
- 支付退款模块:用户在取消订单3天未处理,进行发送给后台人工处理
- 会议提醒模块:会议人员在10分钟内未到场,就发信息通知
- 消息推送模块:活动宣传,每5个小时推送给消费者,维持两天
- ……
这些场景都一个共同的特点,就是需要在某个事件发生之后指定一个时间进行处理,那么定时器也是可以实现的,那么为什么推荐用延迟队列
延迟队列和普通定时器的区别:
- 在低并发,时效性低的程序开发中,两者都可以使用
- 在高并发和高时效性场景中,定时器要跑很多个定时监听任务,性能低,而延迟队列确有很强抗高并发特质
RabbitMQ中TTL延迟队列的两种实现方式:
- TTL+死信队列实现
- 外部的延迟队列插件实现
注:RabbitMQ并没有直接提供延迟队列
二,TTL+死信队列实现延迟队列
设置TTL有两种方式:
- 在队列进行设置,对队列进行TTL过期,当队列TTL过期,队列的消息会集体过期
- 对单条消息本身进行设置,在生产端指定
一,基于queue的TTL过期实现延迟队列
@Configuration
public class TTL_Dead_Config {
//普通交换机和普通队列的名称
String com_exchange = "com_delay_exchange";
String com_queue = "com_delay_queue";
//死信交换机和死信队列(也叫延迟交换机和延迟队列)
String delay_exchange = "delay_exchange";
String delay_queue = "delay_queue";
//声明普通交换机
@Bean(value = "get_com_exchange")
public Exchange get_com_exchange(){
return ExchangeBuilder.directExchange(com_exchange).durable(true).build();
}
//声明死信交换机
@Bean(value = "get_delay_exchange")
public Exchange get_delay_exchange(){
return ExchangeBuilder.directExchange(delay_exchange).durable(true).build();
}
//声明死信队列
@Bean(value = "get_delay_queue")
public Queue get_delay_queue(){
return QueueBuilder.durable(delay_queue).build();
}
//声明普通队列
@Bean(value = "get_com_queue")
public Queue get_com_queue(){
//参数map
Map<String,Object> argument = new HashMap<>();
//绑定死信交换机
argument.put("x-dead-letter-exchange",delay_exchange);
//设置绑定Routing Key
argument.put("x-dead-letter-routing-key","delay");
//设置message在队列的TTL过期时间
argument.put("x-message-ttl",100000);
return QueueBuilder.durable(com_queue).withArguments(argument).build();
}
//普通队列绑定
@Bean
public Binding com_binding(@Qualifier("get_com_queue") Queue queue,@Qualifier("get_com_exchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("com_queue").noargs();
}
//死信队列绑定
@Bean
public Binding dead_binding(@Qualifier("get_delay_queue") Queue queue,@Qualifier("get_delay_exchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();
}
}
② 生产者发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void delay_queue_test(){
for (int a=0;a<=20;a++) {
rabbitTemplate.convertAndSend("com_delay_exchange", "com_queue", "TTL+死信队列实现延迟队列!!");
}
}
③ 消费者消费消息
//消费死信队列
@RabbitListener(queues = "delay_queue")
public void listenr_delay_queue(Message message){
System.out.println(new String(message.getBody()));
}
二,基于单条消息的TTL过期实现延迟队列
对单条消息本身进行设置,在生产端指定
① 配置类:参照上面的配置类(删除队列TTL过期即可)
删除:
//设置message在队列的TTL过期时间
argument.put("x-message-ttl",100000);
② 生产者发送消息
@Test
public void delay_queue_test(){
for (int a=0;a<=20;a++) {
rabbitTemplate.convertAndSend("com_delay_exchange", "com_queue", "TTL+死信队列实现延迟队列!!",msg ->{
msg.getMessageProperties().setExpiration("100000"); //通过MessageProperties对象设置
return msg;
});
}
}
}
③ 消费者消费消息:和上面一样
这种方式有一个致命的弊端:因为RabbitMQ一次之后检查一个消费是否过期(和queue TTL不一样,它是集体过期,这是单个过期),如果过期则丢到死信队列,如果前一个消息的延迟时间非常长,而后一个延时的时间非常短,则后一个不会被优先执行,底层延迟消息是排队执行的
三,基于插件实现延迟队列
一,安装延迟队列的插键
官网插件下载地址:https://www.rabbitmq.com/community-plugins.html
下载插件:rabbitmq_delayed_message_exchange-3.9.0.ez
① 上传后将插件移动到RabbitMQ的插件目录中
mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.7/plugins
② 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
③ 重启RabbitMQ
systemctl restart rabbitmq-server
延迟插件安装完成之后,就会多出一种路由类型x-delayed-message
二,插件实现延迟队列的架构
插件实现延迟队列和TTL+死信队列实现延迟队列有很大的不同:
- 插件实现延迟队列:消息在Exchange进行延迟,且不需要借助死信队列
- TTL+死信队列:消息在Queue进行延迟
三,代码实现
@Configuration
public class Delay_Plugin_Config {
//交换机名称
String exchange = "Delay_Plugin_Exchange";
//队列名称
String queue = "Delay_Plugin_Queue";
//Routing Key
String routingkey = "Delay_Plugin";
//创建交换机
@Bean(value = "get_delay_plugin_exchange")
public Exchange get_delay_plugin_exchange(){
Map<String,Object> arguments = new HashMap<>();
//需要指定延迟队列的类型
arguments.put("x-delayed-type","direct");
//因为我们要声明的类型是x-delayed-message延迟队列类型,使用创建者模式创建没有这个类型选项,所以需要自定义交换机,它也提高CustomExchange让我们自定义
return new CustomExchange(exchange,"x-delayed-message",true,false,null);
}
//创建队列
@Bean(value = "get_delay_plugin_queue")
public Queue get_delay_plugin_queue(){
return QueueBuilder.durable(queue).build();
}
//队列绑定
@Bean
public Binding delayd_plugin_binding(@Qualifier("get_delay_plugin_queue")Queue queue,@Qualifier("get_delay_plugin_exchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(routingkey).noargs();
}
}
三,优先级队列
优先级队列顾名思义就是判断有优先级的队列,优先级队列会根据消费的优先级(生产者发送时需要指定优先级)在队列中进行优先级排序,排序完成之后在按顺序发送给消费者
使用场景:订单催付,用户反馈,售后解决……, 以售后解决为例假如有100个需要售后问题解决,当售后团队只有那么几个,但公司需要优先考虑大客户的需求,这样可以对100个售后服务设置优先级,优先级高优先处理
优先级队列的使用事项:
- 队列创建时想要指定为优先级队列并给定最多优先级(0-255)
- 生产者发送消费时需要给消息指定优先级
注:优先级大小是0-255,值越大越优先执行,一般设置队列优先级时最大优先级设置为10即可,假如设置为255会影响一部分性能
一,优先级队列和消息的设置
1,页面设置队列优先级
2,代码设置队列优先级
@Bean("get_priority_queue")
public Queue get_priority_queue(){
Map<String,Object> argument = new HashMap();
//设置优先级,最高优先级为10
argument.put("x-max-priority","10");
//构建queue,名称在durable中指定
return QueueBuilder.durable(priority_queue).withArguments(argument).build();
}
3,设置消息优先级
rabbitTemplate.convertAndSend("priority_exchange","","优先级队列!8",
message -> {//这个表达式设置
message.getMessageProperties().setPriority(8);
return message;
});
二,代码演示
① 配置类:
@Configuration
public class PriorityConfig {
//交换机名称
String priority_exchange = "priority_exchange";
//优先级队列名称
String priority_queue = "priority_queue";
//交换机
@Bean("get_priority_exchange")
public Exchange get_priority_exchange(){
return ExchangeBuilder.fanoutExchange(priority_exchange).durable(true).durable(true).build();
}
//队列
@Bean("get_priority_queue")
public Queue get_priority_queue(){
Map<String,Object> argument = new HashMap();
//设置优先级,最高优先级为10
argument.put("x-max-priority","10");
//构建queue,名称在durable中指定
return QueueBuilder.durable(priority_queue).withArguments(argument).build();
}
//绑定
@Bean
public Binding dead_binding(@Qualifier("get_priority_queue") Queue queue, @Qualifier("get_priority_exchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
② 生产者:
@Test
public void priorityqueue(){
//发送消息
for(int a=0;a<10;a++){
rabbitTemplate.convertAndSend("priority_exchange","","优先级队列!6",message -> {
message.getMessageProperties().setPriority(7);
return message;
});
if(a>5) {
rabbitTemplate.convertAndSend("priority_exchange","","优先级队列!8",
message -> {//设置优先级的类
message.getMessageProperties().setPriority(8);
return message;
});
}
}
}
③ 消费者:
//消费优先级队列
@RabbitListener(queues = "priority_queue")
public void priority_consumer(Message message){
System.out.println(new String(message.getBody()));
}
}
四,RabbitMQ惰性队列
是能够支持更长的队列,即支持更多的消息存储
惰性队列在正常的生产环境中是不使用,把消息存入磁盘后再消费太浪费时间了,它的出现有专门的使用场景
使用场景:当队列有大量的消息积压,而消费者宕机,没有消费者进行消费,这个时候就可以将消息保存到磁盘中,节省空间
队列的两种模式:
- default:默认模式,就是普通队列
- lazy:惰性模式,惰性队列
设置惰性队列:在队列声明的参数中加这个key-value即可
x-queue-mode = lazy
五,幂等性问题
幂等性指一次或多次请求某一个资源,对于资源本身应该具有相同的结果,通俗的讲就是一个操作无论它执行多少次对于资源本身只产生一次效果
例如:你在某宝上买东西,你购买后支付,支付扣款成功,但由于网络异常,你点击了很多次付款,但最终只扣款了一次,假如没有幂等性保障,你点击了多少次就会扣多少钱
幂等性的目的:解决消息重复消费的问题
消息重复消费:消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回ack时网络中断,故MQ未收到应答消息,该消息会重新发给其他消费者消费,但实际上那个网络中断的消费者已经消费了消息,这样造成了消息的重复消费
解决思路:MQ消费者的幂等性解决一般使用全局ID或者全局标识比如UUID,订单消费者消费MQ中的消息也可以利用MQ的该ID判断,或者可案自己的规则生成一个全局唯一ID,每次消费者消费消息时用id先判断是否已被消费过,如果不利MQ解决幂等性,也可以利用数据库的乐观锁加版本号来解决
消息幂等性的几种解决方案:
1,唯一ID+指纹锁机制
指纹锁:指的是唯一确认信息,它可以是时间戳,UUID,业务服务信息,并不一定要系统生成,也可以根据业务拼接而来,但它一定要是唯一的
在业务操作时,查询判断数据库中是否已有该ID,如果有则不再执行该操作,利用数据库主键去重
优点:编写简单,通过数据查询是否存在就行
缺点:在高并发情况下,需要频繁的查询写入,有性能瓶颈
2,数据库乐观锁
在数据库表上加一个version版本号,当第一次操作时,将verion置为1,其后的多次操作都判断verion是否为1,假如大于或者等于1,均不执行操作
优点:编写简单,通过数据查询version版本号判断即可
缺点:在高并发情况下,需要频繁的查询写入,有性能瓶颈
3,利用Redis的原子性(最优解)
Redis中的set是有原生的去重功能的,将消息ID放入redis,操作时判断redis是否有该ID,如果没有就执行操作,这样不会消耗数据库的性能,又能提高操作的效率(redis查询写入的效率是mysql的几个量级)











Comments | 2 条评论
????
老铁 我滴666啊
("▔□▔)/