信息发布→ 登录 注册 退出

Springboot整合RabbitMq测试TTL的方法详解

发布时间:2026-01-11

点击量:
目录
  • 什么是TTL?
  • 如何设置TTL?
  • 设定整个队列的过期时间
    • 配置类编写
    • 测试
    • 配置
    • 测试
  • 总结
    • 代码下载

      什么是TTL?

      RabbitMq中,存在一种高级特性 TTL

      TTLTime To Live的缩写,含义为存活时间或者过期时间。即:

      设定消息在队列中存活的时间。
      指定时间内,消息依旧未被消费,则由队列自动将其删除。

      如何设置TTL?

      既然涉及到设定消息的存活时间,在RabbitMq中,存在两种设置方式:

      • 设置整个队列的过期时间。
      • 设置单个消息的过期时间。

      设定整个队列的过期时间

      按照上一篇文章的依赖导入和配置编写方式进行。

      Springboot——整合Rabbitmq之Confirm和Return详解

      配置类编写

      在原有基础之上,新创建几个配置的bean类,申明bean对象,并进行交换机队列的关联,如下所示:、

      package cn.linkpower.config;
      
      import org.springframework.amqp.core.*;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import java.util.HashMap;
      import java.util.Map;
      @Configuration
      public class MQConfiguration {
      	// ===========================  Direct 直连模式  ==================================
      	//队列名称
      	public static final String QUEUQ_NAME = "xiangjiao.queue";
      	//交换器名称
      	public static final String EXCHANGE = "xiangjiao.exchange";
      	//路由key
      	public static final String ROUTING_KEY = "xiangjiao.routingKey";
      	// =========================== Direct 普通队列申明 和 交换机绑定  ===================
      	//创建队列
      	@Bean(value = "getQueue")
      	public Queue getQueue(){
      		//QueueBuilder.durable(QUEUQ_NAME).build();
      		return new Queue(QUEUQ_NAME);
      	}
      	//实例化交换机
      	@Bean(value = "getDirectExchange")
      	public DirectExchange getDirectExchange(){
      		//DirectExchange(String name, boolean durable, boolean autoDelete)
      		/**
      		 * 参数一:交换机名称;<br>
      		 * 参数二:是否永久;<br>
      		 * 参数三:是否自动删除;<br>
      		 */
      		//ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
      		return new DirectExchange(EXCHANGE, true, false);
      	//绑定消息队列和交换机
      	@Bean
      	public Binding bindExchangeAndQueue(@Qualifier(value = "getDirectExchange")  DirectExchange exchange,
      										@Qualifier(value = "getQueue") Queue queue){
      		return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
      	// ===========================  TTL ================================
      	public static final String ttl_queue_name = "xiangjiao.ttl.queue";
      	public static final String ttl_exchange_name = "xiangjiao.ttl.exchange";
      	public static final String ttl_routing_key = "xiangjiao.ttl.routingKey";
      	@Bean(value = "getTtlQueue")
      	public Queue getTtlQueue(){
      		// 设置 ttl 队列,并设定 x-message-ttl 参数,表示 消息存活最大时间,单位  ms
      		//return QueueBuilder.durable(ttl_queue_name).withArgument("x-message-ttl",10000).build();
      		Map<String, Object> arguments = new HashMap<>();
      		arguments.put("x-message-ttl",10000);
      		return new Queue(ttl_queue_name,true,false,false,arguments);
      	@Bean(value = "getTTlExchange")
      	public DirectExchange getTTlExchange(){
      		// 设置交换机属性,并保证交换机持久化
      		return new DirectExchange(ttl_exchange_name, true, false);
      	public Binding bindExchangeAndQueueTTL(@Qualifier(value = "getTTlExchange")  DirectExchange getTTlExchange,
      										   @Qualifier(value = "getTtlQueue") Queue queue){
      		return BindingBuilder.bind(queue).to(getTTlExchange).with(ttl_routing_key);
      }

      对比原有的配置类,不难发现区别:

      队列设置过期属性,只需要传递一个 x-message-ttl 的属性值即可。(单位:ms)

      Map<String, Object> arguments = new HashMap<>();
      arguments.put("x-message-ttl",10000);
      return new Queue(ttl_queue_name,true,false,false,arguments);
      

      然后定义交换机类型,并将指定的交换机和队列进行绑定。

      为了测试效果,暂未定义任何该队列的消费者信息。

      测试

      为了便于测试,需要定义一个接口,生产新的数据信息,并将数据向对应的Exchange中传递。

      /**
       * 发送消息,指定ttl参数信息(队列)
       * @return
       */
      @RequestMapping("/sendQueueTtl")
      @ResponseBody
      public String sendQueueTtl(){
      	//发送10条消息
      	for (int i = 0; i < 10; i++) {
      		String msg = "msg"+i;
      		System.out.println("发送消息  msg:"+msg);
      		rabbitmqService.sendMessage(MQConfiguration.ttl_exchange_name,MQConfiguration.ttl_routing_key,msg);
      		//每两秒发送一次
      		try {
      			Thread.sleep(8000);
      		} catch (InterruptedException e) {
      			e.printStackTrace();
      		}
      	}
      	return "send ok";
      }
      

      两条消息之间的过期时间为8s

      请求链接进行测试,查看Rabbitmq web视图信息:

      http://localhost/sendQueueTtl

      查看控制台输出日志:

      消息正常发送到了Exchange,同时Exchange 也将消息推送到了指定的队列 !

      设置有ConfirmReturn监听。

      【说明:】

      给队列设定时间后,单位时间内的消息如果未被消费,则队列会将其中的数据进行删除处理。

      对单个消息设定过期时间

      上面的操作和测试,已经验证对队列设定过期时间,会导致所有的消息过期时间都是一样的现象。

      但实际开发中,可能一个队列需要存放不同过期时间的消息信息,如果需要进行实现,就不能再设定队列的过期时间信息了,需要采取下面要说到的针对单个消息,设置不同过期时间

      配置

      既然是针对单个消息设定不同的过期时间操作,则需要去掉队列过期设置。

      为了测试的简单化,此处采取直连 Direct 交换机类型,进行交换机和队列数据的绑定方式。如下所示:

      // ===========================  Direct 直连模式  ==================================
      //队列名称
      public static final String QUEUQ_NAME = "xiangjiao.queue";
      //交换器名称
      public static final String EXCHANGE = "xiangjiao.exchange";
      //路由key
      public static final String ROUTING_KEY = "xiangjiao.routingKey";
      
      // =========================== Direct 普通队列申明 和 交换机绑定  ===================
      //创建队列
      @Bean(value = "getQueue")
      public Queue getQueue(){
      	//QueueBuilder.durable(QUEUQ_NAME).build();
      	return new Queue(QUEUQ_NAME);
      }
      //实例化交换机
      @Bean(value = "getDirectExchange")
      public DirectExchange getDirectExchange(){
      	//DirectExchange(String name, boolean durable, boolean autoDelete)
      	/**
      	 * 参数一:交换机名称;<br>
      	 * 参数二:是否永久;<br>
      	 * 参数三:是否自动删除;<br>
      	 */
      	//ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();
      	return new DirectExchange(EXCHANGE, true, false);
      }
      //绑定消息队列和交换机
      @Bean
      public Binding bindExchangeAndQueue(@Qualifier(value = "getDirectExchange")  DirectExchange exchange,
      									@Qualifier(value = "getQueue") Queue queue){
      
      	return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
      }
      

      对于消息的发送,依旧沿用之前写的发送处理方式

      设定confirmreturn监听,保证消息能够正常到达指定的队列中。

      测试

      编写一个测试的接口,设定单个消息的过期时间属性,保证不同消息具备不同的过期时间

      在之前博客中,针对消息的持久化设置,需要保证消息向队列设定属性时,传递一个deliveryMode参数值信息。

      同理,设定每个消息的过期时间,也需要设定对应的属性信息。如下所示:

      /**
       * 发送消息,指定ttl参数信息(单个消息);
       * 测试需要将消息消费者关闭监听
       * @return
       */
      @RequestMapping("/sendTtl")
      @ResponseBody
      public String sendTtl(){
      	//发送10条消息
      	for (int i = 0; i < 10; i++) {
      		String msg = "msg"+i;
      		System.out.println("发送消息  msg:"+msg);
      		
      		MessageProperties messageProperties = new MessageProperties();
      		messageProperties.setExpiration("5000"); // 针对消息设定时限
      		// 将消息数据和设置属性进行封装,采取消息发送模板,将消息数据推送至指定的交换机 exchange 中
      		Message message = new Message(msg.getBytes(), messageProperties);
      		
      		rabbitmqService.sendMessage(MQConfiguration.EXCHANGE, MQConfiguration.ROUTING_KEY,message);
      		//每两秒发送一次
      		try {
      			Thread.sleep(3000);
      		} catch (InterruptedException e) {
      			e.printStackTrace();
      		}
      	}
      	return "send ok";
      }
      

      上面代码的编写核心为将消息内容体和消息对象属性进行封装

      MessageProperties messageProperties = new MessageProperties();
      messageProperties.setExpiration("5000"); // 针对消息设定时限
      // 将消息数据和设置属性进行封装,采取消息发送模板,将消息数据推送至指定的交换机 exchange 中
      Message message = new Message(msg.getBytes(), messageProperties);
      

      引申一点:消息的持久化
      Springboot 2.x ——RabbitTemplate为什么会默认消息持久化?

      请求连接进行测试:

      http://localhost/sendTtl

      查看控制台打印日志情况:

      总结

      1、设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期

      2、设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期

      3、如果两者都进行了设置,以时间短的为准。

      代码下载

      gitee 代码下载

      在线客服
      服务热线

      服务热线

      4008888355

      微信咨询
      二维码
      返回顶部
      ×二维码

      截屏,微信识别二维码

      打开微信

      微信号已复制,请打开微信添加咨询详情!