信息发布→ 登录 注册 退出

Springboot死信队列 DLX 配置和使用思路分析

发布时间:2026-01-11

点击量:
目录
  • 前言
  • 什么是死信
  • 配置和测试死信
    • 思路分析
    • 配置类编写
    • 编写消息发送服务
    • 测试
  • 消息什么时候会成为死信消息?
    • 总结
      • 参考资料
        • 代码下载

          前言

          上一篇博客 Springboot——整合RabbitMq测试TTL中,针对设置单个消息期限或者整个队列消息期限,进行了一些配置和说明。同时也都列举了一些区别关系。

          但考虑过一个问题了没有?

          不管是设置哪种方式,如果消息期限到了,队列都会将该消息进行丢弃处理。
          这么做合适么?

          假设是某个设备的重要信息,或者某个重要的订单信息,因为规定时间内未被及时消费就将其舍弃,是否会造成很严重的后果?

          有人会说,设置消息永不过期!等着消费者能够成功监听到该队列,将消息消费不就可以了嘛!

          但这里需要考虑另外一个问题:
          每个服务器的容量是有上限的!如果消息一直存在队列,如果一直不会被消费,岂不是很占用服务器资源?

          如何解决这个问题,就是今天这篇文章需要说到的死信队列

          什么是死信

          说道死信,可能大部分观众大姥爷会有懵逼的想法,什么是死信?

          死信队列,俗称DLX,翻译过来的名称为Dead Letter Exchange 死信交换机

          当消息限定时间内未被消费,成为 Dead Message后,可以被重新发送另一个交换机中,发挥其应有的价值!

          配置和测试死信

          思路分析

          需要测试死信队列,则需要先梳理整体的思路,如可以采取如下方式进行配置:

          从上面的逻辑图中,可以发现大致的思路:

          1、消息队列分为正常交换机正常消息队列;以及死信交换机死信队列

          2、正常队列针对死信信息,需要将数据 重新 发送至死信交换机中。

          配置类编写

          结合上面的思路,编写具体的配置类。如下所示:

          package cn.linkpower.config;
          
          import org.springframework.amqp.core.*;
          import org.springframework.beans.factory.annotation.Qualifier;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          /**
           * 死信队列配置
           */
          @Configuration
          public class DeadMsgMqConfig {
              // 定义正常交换机和正常队列信息(交换机名、队列名、路由key)
              public static final String queue_name = "xj_natural_queue";
              public static final String exchange_name = "xj_natural_exchange";
              public static final String routing_key = "xj_natural_routingKey";
              // 定义死信交换机名、死信队列名、路由key
              public static final String queue_name_dead = "xj_dead_queue";
              public static final String exchange_name_dead = "xj_dead_exchange";
              public static final String routing_key_dead = "xj_dead_routingKey";
              /**
               * 设置正常的消息队列;
               * 正常的消息队列具备以下几种功能:
               * 1、消息正常消费,需要绑定对应的消费者(这里为了测试死信,不创建消费者)
               * 2、当消息失效后,需要将指定的消息发送至 死信交换机 中
               * @return
               */
              @Bean(value = "getNaturalQueue")
              public Queue getNaturalQueue(){
                  return QueueBuilder.durable(queue_name)
                          // 正常的队列,在消息失效后,需要将消息丢入 死信 交换机中
                          // 这里只需要针对名称进行绑定
                          .withArgument("x-dead-letter-exchange",exchange_name_dead)
                          // 丢入 死信交换机,需要设定指定的 routingkey
                          .withArgument("x-dead-letter-routing-key",routing_key_dead)
                          // 设置正常队列中消息的存活时间为 10s,当然也可以针对单个消息进行设定不同的过期时间
                          .withArgument("x-message-ttl",10000)
                          // 设定当前队列中,允许存放的最大消息数目
                          .withArgument("x-max-length",10)
                          .build();
              }
               * 设定正常的消息交换机
              @Bean(value = "getNaturalExchange")
              public Exchange getNaturalExchange(){
                  // 这里为了测试,采取 direct exchange
                  return ExchangeBuilder.directExchange(exchange_name)
                          .durable(true) // 设定持久化
               * 将正常的消息交换机和正常的消息队列进行绑定
               * @param queue
               * @param directExchange
              @Bean
              public Binding bindNaturalExchangeAndQueue(
                      @Qualifier(value = "getNaturalQueue") Queue queue,
                      @Qualifier(value = "getNaturalExchange") Exchange directExchange
              ){
                  return BindingBuilder
                          // 绑定消息队列
                          .bind(queue)
                          // 至指定的消息交换机
                          .to(directExchange)
                          // 匹配 routingkey
                          .with(routing_key)
                          // 无参数,不加会报错提示
                          .noargs();
               * 定义死信队列
              @Bean(value = "getDealQueue")
              public Queue getDealQueue(){
                  return QueueBuilder.durable(queue_name_dead).build();
               * 定义死信交换机
              @Bean(value = "getDeadExchange")
              public Exchange getDeadExchange(){
                  return ExchangeBuilder.directExchange(exchange_name_dead).durable(true).build();
               * 将死信交换机和死信队列进行绑定
               * @param deadQueue
               * @param directDeadExchange
              public Binding bindDeadExchangeAndQueue(
                      @Qualifier(value = "getDealQueue") Queue deadQueue,
                      @Qualifier(value = "getDeadExchange") Exchange directDeadExchange
                  return BindingBuilder.bind(deadQueue).to(directDeadExchange).with(routing_key_dead).noargs();
          }

          编写消息发送服务

          默认采取rabbitTemplate.convertAndSend方法,进行消息的发送处理。但为了保证消息生产者能够成功将数据发送至正常交换机,同时为了保证正常交换机能够将数据信息,推送至正常消息队列。需要对其增加监听。

          package cn.linkpower.service;
          
          import lombok.extern.slf4j.Slf4j;
          import org.springframework.amqp.core.Message;
          import org.springframework.amqp.rabbit.connection.CorrelationData;
          import org.springframework.amqp.rabbit.core.RabbitTemplate;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.stereotype.Component;
          @Slf4j
          @Component
          public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
              @Autowired
              private RabbitTemplate rabbitTemplate;
              /**
               * 直接发送消息
               * @param exchange
               * @param routingKey
               * @param msg
               */
              public void sendMessage(String exchange,String routingKey,Object msg) {
                  // 设置交换机处理失败消息的模式     true 表示消息由交换机 到达不了队列时,会将消息重新返回给生产者
                  // 如果不设置这个指令,则交换机向队列推送消息失败后,不会触发 setReturnCallback
                  rabbitTemplate.setMandatory(true);
                  //消息消费者确认收到消息后,手动ack回执
                  rabbitTemplate.setConfirmCallback(this);
                  // return 配置
                  rabbitTemplate.setReturnCallback(this);
                  //发送消息
                  rabbitTemplate.convertAndSend(exchange,routingKey,msg);
              }
               * 交换机并未将数据丢入指定的队列中时,触发
               *  channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
               *  参数三:true  表示如果消息无法正常投递,则return给生产者 ;false 表示直接丢弃
               * @param message   消息对象
               * @param replyCode 错误码
               * @param replyText 错误信息
               * @param exchange 交换机
               * @param routingKey 路由键
              @Override
              public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                  log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
               * 消息生产者发送消息至交换机时触发,用于判断交换机是否成功收到消息
               * @param correlationData  相关配置信息
               * @param ack exchange 交换机,判断交换机是否成功收到消息    true 表示交换机收到
               * @param cause  失败原因
              public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                  log.info("---- confirm ----ack="+ack+"  cause="+String.valueOf(cause));
                  log.info("correlationData -->"+correlationData.toString());
                  if(ack){
                      // 交换机接收到
                      log.info("---- confirm ----ack==true  cause="+cause);
                  }else{
                      // 没有接收到
                      log.info("---- confirm ----ack==false  cause="+cause);
                  }
          }

          测试

          既然说到测试,那么需要编写一个测试类,能够将产生的消息,推送至指定的正常消息交换机中去。

          package cn.linkpower.controller;
          
          import cn.linkpower.config.DeadMsgMqConfig;
          import cn.linkpower.service.RabbitmqService;
          import lombok.extern.slf4j.Slf4j;
          import org.apache.tomcat.jni.Time;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.web.bind.annotation.RequestMapping;
          import org.springframework.web.bind.annotation.RestController;
          import java.util.concurrent.TimeUnit;
          @Slf4j
          @RestController
          public class DeadMsgController {
              @Autowired
              private RabbitmqService rabbitmqService;
              @RequestMapping("/deadMsgTest")
              public String deadMsgTest() throws InterruptedException {
                  // 向正常的消息队列中丢数据,测试限定时间未消费后,死信队列的情况
                  // 配置文件中,针对于正常队列而言,设置有10条上限大小
                  for (int i = 0; i < 20; i++) {
                      String msg = "dead msg test "+i;
                      log.info("发送消息,消息信息为:{}",msg);
                      // 向正常的消息交换机中传递数据
                      rabbitmqService.sendMessage(DeadMsgMqConfig.exchange_name,DeadMsgMqConfig.routing_key,msg);
                      TimeUnit.SECONDS.sleep(2);
                  }
                  return "ok";
              }
          }

          启动项目,访问指定的链接,进行数据产生和将消息发送交换机操作:

          http://localhost/deadMsgTest

           

          控制台部分日志展示:

          消息什么时候会成为死信消息?

          1、队列消息长度到达限制;

          2、消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false

          channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

          3、原队列存在消息过期设置,消息到达超时时间未被消费;

          总结

          此处只是为了进行配置和测试需要,暂未定义任何正常消息队列消费者死信消息队列消费者信息。

          1、死信交换机和死信队列和普通的没有区别

          2、当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

          参考资料

          RabbitMQ死信队列在SpringBoot中的使用

          代码下载

          gitee 代码下载

          在线客服
          服务热线

          服务热线

          4008888355

          微信咨询
          二维码
          返回顶部
          ×二维码

          截屏,微信识别二维码

          打开微信

          微信号已复制,请打开微信添加咨询详情!