RabbitMq TTL+死信队列 延迟消息问题记录 天天快讯

来源:腾讯云

2023-02-28 15:24:18

延迟队列存储的对象是对应的延迟消息,所谓的延迟消息是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费

利用RabbitMqTTL和死信队列 来实现延时消费。


(资料图)

如果设置的是队列统一过期时间放到死信队列,没有什么问题。

如果是延时时间设置到每条消息上的。而不是给队列的。

实现方式为消息存活时间为动态用户页面可配置的。

这就导致了一个问题:

先用一条消息的存活时间是1天。后面又进了一条消息存活时间是1小时。

结果一小时到了,发现这条消息并没有被转发到消费延时过期消息的队列。

原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。

它不会检测每一条消息是否过期。而是顺序检测。

如果first in的消息过期时间很长,会导致它阻塞后进的消息。

不仅无法实现真正的过期时间。还会导致,一个大的过期时间的先进的消息,会堆积一堆后进的过期时间短的消息。

问题解决

这个时候可以使用rabbitMq的一个插件:rabbitmq_delayed_message_exchange

一段时间以来,人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。而rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送

插件安装

需要根据自己的rabbitMq选择对应的版本。我rabbitMq的版本是RabbitMQ 3.11.0,对应的插件版本就是:3.11.1

基于Linux

--1、cd到rabbitmq默认安装位置cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins--2、通过ftp工具将插件上传到此目录下--3、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--4、重启MQ服务systemctl restart rabbitmq-server

基于Docker

--1、通过ftp工具将插件上传到Linux服务器的根目录下--2、拷贝到docker中rabbitmq插件目录下,rabbitmq_delayed_message_exchange-3.9.0.ez(下载包的全名)docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins--3、进入容器docker exec -it 容器id /bin/bash--4、查看插件是否存在(确保2中的操作已经将插件拷贝过来了)cd pluginsls |grep delay--5、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--6、退出容器exit--7、重启MQ服务docker restart 容器ID

安装成功

web界面新建交换机选择类型出现红框标注即表示成功

image.png

代码实现

1:springBoot配置

@Configurationpublic class DelayRabbitmqConfig {     /**     * 声明延迟队列     * @return     */    @Bean    public Queue delayQueue(){        return new Queue(QueueConstant.DelayQueue,                true,false,false);    }     /**     * 声明延迟自定义交换机类型     * @return     */    @Bean    public CustomExchange delayCustomExchange(){        HashMap args = new HashMap<>();//        设置 x-delayed-type 为 direct,当然也可以是 topic 等 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递        args.put("x-delayed-type","direct");        return new CustomExchange(ExchangeConstant.DelayCustomerExchange,                "x-delayed-message",true,false,args);    }     /**     * 绑定延迟交换机和队列     * @return     */    @Bean    public Binding delayQueueAndCustomExchange(){        return BindingBuilder.bind(delayQueue())                .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs();    }}

springMvc配置

引入依赖:    xmlns:util="http://www.springframework.org/schema/util"    http://www.springframework.org/schema/util    http://www.springframework.org/schema/util/spring-util-4.0.xsd                                                                                                                

代码实现

//消息发送final MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(Integer.valueOf(ttl.toString()));DisTimingPushDto disTimingPushDto = new DisTimingPushDto();disTimingPushDto.setOrderId(dispense.getOrderId());disTimingPushDto.setPushTime(disDispense.getPushTime());rabbitTemplate.convertAndSend(MsgQueueEnum.TIMING_PUSH.getExchangeName(), MsgQueueEnum.TIMING_PUSH.getQueueName(), disTimingPushDto, messagePostProcessor);//每条消息时间配置import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;/** * 延迟消息处理器 Processor * @author king * @date 2022年12月28日 11:14 */public class MyMessagePostProcessor implements MessagePostProcessor {    /**     * 消息延迟时间,单位:毫秒     */    private final Integer TTL;    public MyMessagePostProcessor(final Integer ttl) {        this.TTL = ttl;    }    @Override    public Message postProcessMessage(Message message) throws AmqpException {        message.getMessageProperties().setDelay(TTL);        return message;    }}

关键词: RabbitMQ

RabbitMq TTL+死信队列 延迟消息问题记录 天天快讯

原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。[详细]
2023-02-28

新民艺评丨是少年就该奔放——评儿童音乐剧《放飞的天空》

中福会儿童艺术剧院近日在文化广场上演了一出以小见大、深受孩子欢迎的儿童音乐剧《放飞的天空》。[详细]
2023-02-28

并列关系的关联词是什么意思_并列关系的关联词

1、学习要又不耻下问,又要孜孜不倦,这样才能事半功倍 。本文到此分享完毕,希望对大家有所帮助。[详细]
2023-02-28

2023宜昌居民医保缴费什么时候截止?|天天最资讯

2023宜昌居民医保缴费什么时候截止?2023年度居民医保参保延期缴费2月28日截止,2023年度城乡居民医保个人缴费标准为每人350元。拓展——缴费方式1 手[详细]
2023-02-28

当前视点!下午18点!中国足协正式官宣,24人国足名单出炉,李铁嫡系全开除

根据足协公布的行程安排,他们将在海南集训基地集合,随后前往新西兰进行海外集训。在近期公布的国足名单中,上海海港队和武汉三镇队成为国足[详细]
2023-02-28

广西万山茶业有限责任公司

1、广西万山茶业有限责任公司是一家集生产、加工、贸易、自营进出口业务为一体的综合性企业·是广西第一家通过ECOCER[详细]
2023-02-28

天天最新:X射线胶片洗片机

1、X射线胶片洗片机是自动胶片洗片机。2、该洗片机用于医疗单位冲洗X射线胶片。文章到此就分享结束,希望对大家有所帮助[详细]
2023-02-28

疯狂暗示?FIFA官方晒贴有梅西名字的座位照片

FIFA年度颁奖典礼将在北京时间今天凌晨进行,FIFA在社交媒体晒出了贴有梅西名字的座位照片。在2022年,梅西俱乐部、国家队的总数据为51场35球3[详细]
2023-02-28

如何快速通便的方法_快速通便的办法-今日讯

解答:1、水果通便法,多吃水果少吃主食,尤其是油腻的食物,一定要杜绝。坚持3天就能看到明显的效果,但是需要一定的毅力坚持[详细]
2023-02-27

一道中考题的解析

1、《一道中考题的解析》是官店民族初中提供的微课课程,主讲教师为朱海舟。2、。文章到此就分享结束,希望对大家有所帮助[详细]
2023-02-27
版权所有: 南方制冷网 All Rights Reserved
粤ICP备18023326号-21
联系邮箱:855 729 8@qq.com