RabbitMQ的消息应答,消息持久化,不公平分发,发布确认(二)

发布于 2021-10-12  2.71k 次阅读


一,RabbitMQ消息应答

无消息应答机制的出现的问题:没有应答机制时,Queue向消费者推送一条消息后,就会将当前的队列的消息就会根据Tag标记进行删除,它不会判断消息是否被消费者处理,而是推送后直接删除,假如消息在被消费者处理时,消费者突然宕机,那么消息会被丢失

消息应答机制解决的问题:

  • 处理完成时:消费者在队列中取出消息处理,当消费者处理成功后,会发送一个ACK的确认应答信息,MQ接收到ACK后,按照Tag将消息进行删除
  • 处理失败时:消费者在队列中取出消费处理,但在处理时,消费者突然挂机,MQ没有接收到ACK确认消息,不对queue中的消息进行删除,保证数据不丢失

消息应答机制的作用:保证消息在发送过程中不丢失

消息应答的分类:

  • 自动应答
  • 手动应答

一,自动应答

自动应答由系统自动应答,它不在乎消息是否已经被处理成功,但消息发送时都会告诉队列删除消息,如果消息处理失败返回异常,实行消息补偿(出现入对或者重新投递)

自动应答的优弊:

  • 弊端:当消费者在接收消息时的某一刻,突然宕机,那么消息就会丢失
  • 优点:处理消息快速,手动应答相对较慢

自动应答的场景:

  • 适应于在消费者可以高效并以某种速率能够处理这些消息的情况下使用
  • 需要在高吞吐量和传输的安全性和可靠性之间做权衡

注:消息重新入队,重新投递,消息补偿等名词不一样,其实含义都是一样的,将消息重新放入队列供消费者重新消费,一般使用不推荐使用自动应答

自动应答的设置:在定义Consumer时将自动应答的设置true进行

/*
* 参数:
*String var1:消费的队列名称
*boolean var2:是否自动消息确认
*Consumer var3:回调对象
*/
channel.basicConsume("FQ",true,consumer);

二,手动应答

手动应答:在消费者接收到消息后,处理完成后手动进行应答

实现手动应答的三个方法:

  • channel.basicAck(long deliveryTag, boolean multiple):由于ACK肯定确认
  • channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):用于否定确定(拒绝消息)
  • channel.basicReject(long deliveryTag, boolean requeue):用于否定确定(比basicNack少一个参数)

注:basicNack一次可以拒绝多个消息(否定确定),basicReject一次只能拒绝一条消息

参数:

  • deliveryTag:当前消息的Tag标识,通过envelope获取
  • multiple:是否开启批量应答
  • requeue:消息被拒绝后是否重新入列

一,Multiple批量应答

multiple是手动应答的一大优势,自动应答无法进行批量应答,批量应答可以减少高并发情况下的网络拥堵问题,但也有可能造成消息丢失

注:一般应用场景为了数据的可靠性不使用批量应答,如果场景特殊可以使用

二,消息自动重新入队

消息的重新入队又叫消息的补偿,消息的重新投递,这是机制由系统自动执行的,当没有返回ACK确认时,就会进行该操作

重新入队:如果消费者由某些原因失去了连接(通道异常关闭,连接已关闭或TCP连接丢失),导致未能发送ACK确认,RabbitMQ将知道消息未完全被处理,将该消息进行重新入队,如果此时其他消费者能够处理,将分发给其他消费者

重新入队的好处:即使某一个消费者宕机,也能让其他消费者处理

消息应答和重新入队:两种是相辅相成结合在一起被使用

  • 消息应答保证了消息在发送给消费者时消息不丢失
  • 重新入队保证了消息能再次被处理

手动应答测试:

util连接工具类:

public class message_response_util {
    public Connection message_response_getConnection() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();


        //设置参数
        factory.setHost("192.168.68.133");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("love");
        factory.setPassword("123");

        //创建连接
        Connection connection = factory.newConnection();

        return  connection;


    }
}

publish生产者代码:

public class message_respoonse_publish {
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection connection = new message_response_util().message_response_getConnection();

        //获取channel逻辑逻辑
        Channel channel = connection.createChannel();

        //声明交换机
        channel.exchangeDeclare("message_response_exchange",BuiltinExchangeType.FANOUT,true,false,false,null);

        //创建队列
        channel.queueDeclare("message_wql",true,false,false,null);

        //交换机和队列绑定
        channel.queueBind("message_wql","message_response_exchange","",null);


        //创建生产者发送消息
        String mess;
        Scanner scanner =new Scanner(System.in);
        while (scanner.hasNext()) {
             mess = "WQL Love FQ:"+scanner.next();
            channel.basicPublish("message_response_exchange", "", null, mess.getBytes());
        }
        //关闭连接
//        channel.close();
//        connection.close();
    }
}

Consumer消费者1代码:

public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection connection = new message_response_util().message_response_getConnection();

        //获取channel逻辑逻辑
        final Channel channel = connection.createChannel();

        //回调对象
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("消息(consumer1):" + new String(body));
                //手动消息应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //创建消费者
        channel.basicConsume("message_wql",consumer);

        //关闭连接
//        channel.close();
//        connection.close();

    }

Consumer消费者2代码:

public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接对象
        Connection connection = new message_response_util().message_response_getConnection();

        //获取channel逻辑逻辑
        final Channel channel = connection.createChannel();

        //回调对象
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("消息(consumer2):" + new String(body));
                //手动消息应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        //创建消费者
        channel.basicConsume("message_wql",consumer);

        //关闭连接
//        channel.close();
//        connection.close();


    }

二,RabbitMQ消息持久化

RabbitMQ消息持久化也是保证消息可靠性的机制,之前消息应答机制保证了消息在发送给消费者处理时的可靠性,而消息持久化保证了RabbitMQ服务宕机时消息的可靠性

消息持久化:将queue或者消息保存到磁盘,RabbitMQ在宕机重启后,依然可以读取数据

消息持久化的目的:保证RabbitMQ运行时突然宕机,消息不丢失

持久化的两种实现:

  • queue队列持久化
  • 消息的持久化

一,队列实现持久化

队列持久化只需在创建队列时,把durable是否持久化设为true

/*参数:
* String queue:队列名称
* boolean durable:是否持久化,持久化之后mq重启它依然存在
* boolean exclusive:1,是否独占占(只有一个消费者能监听) 2,Connection连接断开后是否删除该队列
* boolean autoDelete:是否在没有consumer监听时,自动删除队列
* Map<String, Object> arguments:其他参数信息
* 注:队列名不能重复
* */
channel.queueDeclare("FQ",true,false,false,null);

注:当队列已经被创建了,没有持久化,就不能再设置为持久化,需要重新创建队列指定为持久化

二,消息实现持久化

实现消息持久化需要再消费生产者修改代码,在BasicPropertiesBasicProperties配置中加入MessageProperties.PERSISTENT_TEXT_PLAIN属性即可实现持久化

/* 参数:
* String exchange:交换机(不是使用的话默认为"")
* String routingKey:路由key(通过它让队列和交换机绑定),如果使用默认交换机它和队列名一样即可绑定
* BasicProperties props:配置信息,添加MessageProperties.PERSISTENT_TEXT_PLAIN
* byte[] body:具体消息
* */
channel.basicPublish("","FQ", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

将消息标记为持久化也并不能保证消息不会完全丢失,尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候,但是还没有存储完,消息还在一个缓存间隔点,此时RabbitMQ服务宕机消息并没有真正写入磁盘,持久性保证也有丢失的可能性,但对于简单任务队列而言,已经绰绰有余了

三,RabbitMQ不公平分发和预取值

一,不公平分发

RabbitMQ默认采用轮询方式分发消费

轮询分发方式的弊端:现在有两个消费者处理任务,其中一个消费者处理速度非常快,而另一个消费者处理速度非常慢,这个时候采用轮询分发,处理速度快的消费者就会处于空闲状态,处理速度慢的消费者一直在处理消息

为了避免轮询分发方式的弊端RabbitMQ提供了不公平的分发方式,处理快的消费者就会分发多的消费,处理慢的就较少分发

不公平分发的设置:不公平分发设置在消费者端口,使用 basicQos(int prefetchCount)

/*
*参数:
*      prefetchCount::处理消息最大的数量,默认为0为轮询分发,设置为1就行,设置为其他数就为预取值
*/
channel.basicQos(1);

二,预选值分发

本身消息的发送就是异步发送的,所以在任何时候, channel上肯定不止只有一个消息另外来自消费"者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题

预取值就是设置每一个消费者消费消息的多少,例:有10条数据,consumer1预取值为4,consumer2预取值为4,那么consumer1就消费4条数据,consumer2就消费5条数据

预取值的设置:

/*
*参数:
*      prefetchCount::处理消息最大的数量,默认为0为轮询分发,设置为1就为不公布分发,设置为1以上其他数就为预取值设置
*/
channel.basicQos(5);//预期值为5

四,RabbitMQ发布确认

一,RabbitMQ Queue发布确认

一,Queue发布确认的概念

无持久化发布确认:生产者将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上发布的消息都将被指定一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确定信息给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达队列

持久化发布确认:消息和队列指定持久化,那么确定信息就会在将消息或者队列写入磁盘之后发出,broker回传给生产者的确定信息中delivery-tag域包含确定消息的序列号,此外broker也可以设置basic.ack的multiple域,表示这个序列号之前的所有消息都已经被处理

发布确定解决的问题:当声明队列和消息持久化后,消息刚准备持久化时,那个时间节点,RabbitMQ突然宕机,那么消息既没有被消费也没有得到持久化,造成消息丢失,发布确认就可以解决这一问题,消息投递到队列并持久化磁盘后发布消息确定

发布确定的主要目的:保证数据的持久化可靠

消息应答机制和发布确定机制的异同:

同:都是保证MQ消息可靠性的机制

异:

  • 消息应答机制保证的是消息在从队列发送给消费者时消息的可靠性,在消费者端进行声明
  • 发布确认机制保证的消息从生产者投递给队列时消息的可靠性,依赖于消息的持久化,在生产端进行声明

发布确认机制的三种策略:

  1. 单个发布确认
  2. 批量发布确认
  3. 异步发布确认

无论是使用哪一个发布确认策略都需要先开启发布确认机制:使用channel的confirmSelect方法开启

//开启发布确认机制
Channel channel = connection.createChannel();
channel.confirmSelect();

二,单个发布确认

这是最简单的一种发布确认方式,它是同步确认的方式,发送一条消息之后只有它被确认发布,后续的消息才能继续发布

单个确认发布有很大的弊端:发布速度特别慢,一条消费没有被确认,那个其他消费的发布都会被堵塞,这种方式每秒最多提供数百条的吞吐量

使用方法:waitForConfirms(long)这个方法只有消息被确认了才会返回true

public String single_upback() throws IOException, InterruptedException {


     //声明队列,没有自定义声明和绑定交换机,使用的就是默认交换机"",默认绑定Routing Key为队列的名称
    channel.queueDeclare("puback_queue",true,false,false,null);


    //开启消息确认
    channel.confirmSelect();


    //开始时间
     long begin = System.currentTimeMillis();


     //循环发送1000条消息并单个确认
    for (int a=1;a<=1000;a++){


        //生产者发送消息
        channel.basicPublish("","puback_queue",null,String.valueOf(a).getBytes());


        //单条消息发布确认,确认成功返回true
       boolean b = channel.waitForConfirms();
       if (b){
           System.out.println("消息已发布确认!");
       } }
     //结束时间
     long end = System.currentTimeMillis();


     return "单个确认发布耗时(ms):"+(end-begin);
}

三,批量发布确认

批量确认指不单条确认,而是投递多少条消息之后统一确认,它与单条确认相比极大的提高了吞吐量

弊端:当发生故障时,不知道是那条消息出现的问题,我们必须将整个批量消息保证在内存中,以记录重要信息后重新发布消息,本质上它也是同步的

批量确认发布并没有单独的API支持,只是在编程逻辑实现批量

//批量发布
public String batch_upback() throws IOException, InterruptedException {


     //声明队列,没有自定义声明和绑定交换机,使用的就是默认交换机"",默认绑定Routing Key为队列的名称
     channel.queueDeclare("puback_queue",true,false,false,null);


     //开启消息确认
     channel.confirmSelect();


     //开始时间
     long begin = System.currentTimeMillis();


     //是否确认变量
     boolean b=false;


     //循环发送1000条消息,没得发送了100条时就确认一次
     for (int a=1;a<=1000;a++){


         //生产者发送消息
         channel.basicPublish("","puback_queue",null,String.valueOf(a).getBytes());


         //批量消息确认
         if(a%1000==0){
             b = channel.waitForConfirms();
         }
         if (b){
             System.out.println("消息已发布确认!");
         } }
     //结束时间
     long end = System.currentTimeMillis();

     return "批量个确认发布耗时(ms):"+(end-begin);
}

四,异步发布确认(最优解)

一,异步发布确认

异步确认发布是相比单个确认发布和批量确认发布,执行过程中的最优解,无论是消息可靠性还是效率都很好,唯一的缺点或许就是它的编程复杂度相对较高

异步发布确认采用监听的方式实现异步,利用回调来达到消息的可靠性传递,对于异步为处理消息也有一套处理方案

异步使用的监听方法:

/*
* 参数:两个参数类型是一样的
*     1,监听哪些消息成功了
*     2,监听那些消息失败了
* */
channel.addConfirmListener(ackCallback,neckCallback);

代码:

//异步发布
public String asynchronous_upback() throws IOException {


    //声明队列,没有自定义声明和绑定交换机,使用的就是默认交换机"",默认绑定Routing Key为队列的名称
    channel.queueDeclare("puback_queue",true,false,false,null);


    //开启消息确认
    channel.confirmSelect();


    //开始时间
    long begin = System.currentTimeMillis();




    //创建消息监听,异步监听那些消息是发布成功的,那些是发布失败的
    //消息成功发布的回调
    /*
    * 参数:
    *   1,deliveryTag:消息的唯一标识
    *   2,multiple:是否批量确认
    * */
    ConfirmCallback ackCallback =(deliveryTag,multiple) -> {
    System.out.println("消息发布确认成功!");


    };
    //消息发布失败的回调
    ConfirmCallback neckCallback =(deliveryTag,multiple) -> {




    };
    /*
    * 参数:两个参数类型是一样的
    *     1,监听哪些消息成功了
    *     2,监听那些消息失败了
    * */
    channel.addConfirmListener(ackCallback,neckCallback);


    //循环发送1000条消息,没得发送了100条时就确认一次
    for (int a=1;a<=1000;a++){


        //生产者发送消息
        channel.basicPublish("","puback_queue",null,String.valueOf(a).getBytes());


       }
    //结束时间
    long end = System.currentTimeMillis();


    return "异步确认发布耗时(ms):"+(end-begin);
}

二,处理异步未确认消息

处理发布未确认的消息的解决方法就是把为确认的消息放到一个基于内存的能被多个线程访问的并发Map中(比如:ConcurrentSkipListMap或者ConcurrentHashMap),这样就可以在发布确认线程之间进行访问

为什么用Map存储:因为消息在开启确认发布时会有一个唯一ID标识,还有消息实体,所以适合用Map存储,如果只有实体可以用list,set,queue等进行存储,Map可以提高ID进行删除

步骤:

  1. 将要发送的消息保存在Map中
  2. 发送成功回调时,通过ID删除已成功发送的消息,剩下的就是未成功发送的消息
  3. 发送失败的回调中,可以将失败的消息重新发送

//异步处理为确认发布
public String asynchronous_un_upback() throws IOException {


    //声明队列,没有自定义声明和绑定交换机,使用的就是默认交换机"",默认绑定Routing Key为队列的名称
    channel.queueDeclare("puback_queue",true,false,false,null);


    //开启消息确认
    channel.confirmSelect();


    //开始时间
    long begin = System.currentTimeMillis();


    //创建一个并发ConcurrentHashMap,,key为消息的为标识ID,值为消息的实体
    ConcurrentSkipListMap<Long, String> concurrentSkipListMap =new ConcurrentSkipListMap();


    ConfirmCallback ackCallback =(deliveryTag,multiple) -> {
        System.out.println("消息发布确认成功!");


        //2,删除concurrentmap已经确认的消息,剩下的就是未确认的消息
        ConcurrentNavigableMap map = concurrentSkipListMap.headMap(deliveryTag);


    };
    ConfirmCallback neckCallback =(deliveryTag,multiple) -> {
        //3,重新发送未确认的消息
        channel.basicPublish("","puback_queue",null,concurrentSkipListMap.get(deliveryTag).getBytes());
    };


    channel.addConfirmListener(ackCallback,neckCallback);


    //循环发送1000条消息,没得发送了100条时就确认一次
    for (int a=1;a<=1000;a++){
        //生产者发送消息
        channel.basicPublish("","puback_queue",null,String.valueOf(a).getBytes());
        /*1,将发送的使用消息记录在map中
        * key:为消息ID唯一标识,通过channel获取
        * value:消息实体
        * */
        concurrentSkipListMap.put(channel.getNextPublishSeqNo(),String.valueOf(a));




    }
    //结束时间
    long end = System.currentTimeMillis();


    return "异步确认发布耗时(ms):"+(end-begin);
}

二,RabbitMQ Exchange发布确认

RabbitMQ Echange发布确认是在Queue的发布确认的基础上,进行再次靠性保障

Exchange端的发布确认的弊端:Queue发布确认是在queue将消息进行持久化后Broker进行回调确认的,确认回调是在queue上,假如队列失效或者消息持久化失败才会触发回调,这时假如是消息在发送给Exchage时,Exchange失效或者不存在时,消息还没有进行转发就丢失了,那么普通发布确认根本检查不出来消息在Exhcage时失败了

Exchange发布确认:在Exchange上进行消息确认,假如exchange失效或者不存在,会进行回调处理

一,开启Exchange发布确认

在使用Exchange发布确认时需要先进行开启:配置文件开启发布确认和消息回退(这个是补充内容)

publisher-confirm-type: correlated
#消息回退,当发送失败时,把消息返回给生产者
publisher-returns: true

参数:

  • None:禁用发布确认
  • Correlated:发布消息到exchange成功后触发回调方法(推荐)
  • Simple:效果和Correlated一样,当返回为false时,会把channel进行关闭,之后无法发送消息到Broeck

在使用过程中主要依赖于两个类:

  • RabbitTemplate.ConfirmCallback:exchange消息回调类
  • RabbitTemplate.ReturnsCallback:消息回退类,回退给生产者,让生产者重新发信息

二,代码演示

① 配置类:声明一个exchange和一个queue

@Configuration
public class ConfirmConfig {

//交换机名称
String exchange = "confirm_exchange";
//队列名称
String queue = "confirm_queue";
//RoutingKey
String routingkey = "routingkey";

@Bean("get_confirm_exchange")
public Exchange get_confirm_exchange(){

return ExchangeBuilder.directExchange(exchange).durable(true).build();
}

@Bean("get_confirm_queue")
public Queue get_confirm_queue(){

return QueueBuilder.durable(queue).build();
}

@Bean
public Binding exchange_binding_queue(@Qualifier("get_confirm_queue") Queue queue,@Qualifier("get_confirm_exchange") Exchange exchange ){

return BindingBuilder.bind(queue).to(exchange).with(routingkey).noargs();
}
}

② 发布确认回调类:关键

@Component
public class mycallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {


//因为我们实现是内部类,实现了方法,也并没有加载进ConfirmCallback,需要将当前的类注入进去
@Autowired
RabbitTemplate rabbitTemplate;

@PostConstruct
//将当前实现类加入到RabbitTemplate.ConfirmCallback
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}

/*
* confirm方法:交换机确认回调方法
* 参数:
*
* 1,correlationData:保存消息的ID及相关信息(这个消息可以在发送方进行指定,)
* 2,b:是否发送成功(true:成功,flase:失败)
* 3,s:错误信息(发送成功就为null)
* */
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {

if(b){
System.out.println("消息发送成功!"+correlationData.getId());
}else{
System.out.println("消息发送失败!"+correlationData.getId());
}
}

//该方法可以在消息不可达目的地时,对消息进行回退(只有不可达目的地时才会回退)
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
//重新发回退回来的消息
rabbitTemplate.convertAndSend("confirm_exchange","routingkey",returnedMessage.getMessage());
System.out.println("消息回退重新发送:"+returnedMessage.getExchange()+returnedMessage.getMessage());
}
}

③ 生产者

@Test
public void configfrim(){

CorrelationData correlationData = new CorrelationData();
correlationData.setId("WQL_ID");
for (int a=0;a<=20;a++) {
//把队列改为不存在的队列
rabbitTemplate.convertAndSend("confirm_exchanges", "routingkey", "高级发布确认!",correlationData);
}
}

④ 消费者

//发布确认高级
@RabbitListener(queues = "confirm_queue")
public void confirm(Message message){
System.out.println(new String(message.getBody()));
}

⑤ 打印

2021-10-09 21:02:27.222 ERROR 14908 --- [168.68.133:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchanges' in vhost '/', class-id=60, method-id=40)
消息发送失败!WQL_ID
2021-10-09 21:02:27.226 ERROR 14908 --- [168.68.133:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchanges' in vhost '/', class-id=60, method-id=40)
2021-10-09 21:02:27.229 ERROR 14908 --- [168.68.133:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchanges' in vhost '/', class-id=60, method-id=40)
消息发送失败!WQL_ID

路漫漫其修远兮,吾将上下而求索