MQ 消息中间件基本都支持延时消息或消息过期处理相关功能,可以使用此功能来实现消息队列。
使用 MQ 实现延时队列的基本逻辑,是将消息发送到普通队列,让消息自动过期被转发路由到死信队列,消费者订阅死信队列。
MQ实现延时队列
RabbitMQ实现延时队列
RabbitMQ 支持消息过期和死信列队,可以借助此功能实现延时队列。
基本逻辑是,将消息发送到普通队列,让消息自动过期被转发路由到死信队列,消费者订阅死信队列。
RabbitMQ 实现延时队列有两种方式:
在 RabbitMQ 可以对 Queue 设置
x-expires
过期时间或者对message
设置超时时间x-message-ttl
。注意:延时相同的消息要扔到同一个队列中,对于每一个延时要建立一个与之对应的队列,这是因为 RabbitMQ 的过期检测是惰性检测的,同样可以保证消息处理的顺序性。
使用 RabbitMQ 的
rabbitmq-delayed-message-exchange
插件来实现延时队列。
RocketMQ实现延时队列
RocketMQ 在发送延时消息时,是先把消息按照延迟时间段发送到指定的队列中(把延时时间段相同的消息放到同一个队列中,保证了消息过期能被及时顺序地处理,可以让同一个队列中消息延时时间是相同的,整个 RocketMQ 中延时消息时按照递增顺序排序,保证信息处理的先后顺序性)。
然后通过一个定时器来轮询处理这些队列里的信息,判断是否到期。对于到期的消息会发送到相应的处理队列中,进行业务处理。
注意 :目前 RocketMQ 只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时设置。
Kafka实现延时队
Kafka 基于时间轮算法自定义了一个用于实现延迟功能的定时器(SystemTimer
),Kafka中的时间轮(TimingWheel
)是一个存储定时任务的环形队列,可以进行相关的延时队列设置。
Kafka 这种消息队列存储来实现延时功能,每个队列的时间都需要创建一个单独的 topic(如: Q1-1s, Q1-2s..)。这种设计在延时时间比较固定的场景下问题不太大,但如果是延时时间变化比较大会导致 topic 数目过多,会把磁盘从顺序读写会变成随机读写从导致性能衰减,同时也会带来其他类似重启或者恢复时间过长的问题。
TTL+DLX实现
RabbitMQ 的过期时间和死信队列,可参考 RabbitMQ(四):过期时间,死信队列,延迟队列,优先队列,持久化。
基于 RabbitMQ 的 TTL + DLX 实现延迟队列的基本逻辑是:
- 生产者发送消息到普通队列(这里称为缓冲队列),消息设置了过期时间,给缓存冲队列绑定死信交换机。
- 让缓冲队列中的消息自动过期,死信消息被转发到死信交换机。
- 死信交换机再将死信消息路由到死信队列。
- 消费都监听死信队列进行处理。
TTL
TTL(Time To Live):消息的存活时间,即消息在队列的保留时间,等同于过期时间,单位为毫秒。
当消息在存活期间内没被消费,就变为 死信(Dead Letter)。如果一个队列和消息都设置了过期时间,则取小的。
设置消息的 TTL 有两种方式:
在声明队列时设置
x-message-ttl
属性参数,对队列中所有的消息起效(有相同的过期时间)。Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
在发送消息时设置
expiration
属性参数,只对当前发送的消息起效。byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000"); channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
设置了消息过期时间,还需要设置 死信交换机,过期的消息要被路由到 死信交换。
消息转为死信的三种形式:
- 消息被拒绝。通过调用
basic.reject
或basic.nack
,并且设置的requeue
参数为false
。 - 消息设置了 TTL 而过期。
- 消息进入了一条已达最大长度的队列。
DLX
Dead Letter Exchange 实际就是一种普通的 exchange
,可使用任何交换器类型,使用常规声明即可。
为一个队列设置死信交换器,在声明队列时指定 x-dead-letter-exchange
参数选项。
声明死信交换器:
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
// 将声明的交换器指定为死信交换器
args.put("x-dead-letter-exchange", "some.exchange.name");
// 声明队列时传入参数
channel.queueDeclare("myqueue", false, false, false, args);
上面代码,如果 myqueue
队列存在死信,将被重新转发给死信交换器 some.exchange.name
再被路由到另一个队列(死信队列),开发者可以监听这个队列进行处理。
注意:在消息需要被死信时,死信交换器应该存在;如果不存在,则消息将被静默删除。
注意:相同过期时间的消息应发送到相同的队列,这样可以保证消息过期能被及时处理;队列消费是先进先出(FIFO),如果不同过期时间的消息存在同一个队列里,因为消息过期是隋性处理(即是在投递到消费者前判断的),如果当前队列消息积压严重,或队列头的消息过期时间远大于后面消息的过期时间,则已过期的消息可能在队列中存活很长时间而没有被及时处理。
RabbitMQ 的延时队列插件不存在此问题,插件是把消息缓存在交换器并做延时检测,再把过期的消息投递到队列。
实现示例
基于 Spring Boot + spring-boot-starter-amqp 实现
添加依赖
Maven pom.xml 添加 Spring Amqp 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置连接
properties 文件配置 RabbitMQ 连接:
#===========Rabbit MQ=====================
spring.rabbitmq.host=192.168.0.120
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.listener.type=direct
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
注意,Spring AMQP 默认配置的 RabbitMQ 的监听器类型是 simple
,消费者确认模式默认是自动(AUTO
),确认会删除消息,关闭 channel
。如果这个时候再调用手动确认(`channel.basicAck),因为消费已被删除会报错的:
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
这时需要设置监听器的确认模式为手动(manual
):
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.listener.type=direct
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
声明交换机和队列
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @desc: 延时队列 TTL+DLX
*/
@Configuration
public class RabbitMQConfig {
// 延时
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
public static final String DELAY_QUEUE_NAME = "delay.queue";
public static final String DELAY_MESSAGE_ROUTE_KEY = "delay.route.key";
// 死信
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
public static final String DEAD_LETTER_ROUTE_KEY = "dead.letter.route.key";
/**
* 声明延时交换器
*
* @return
*/
@Bean(name = "delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
/**
* 声明延时队列
*
* @return
*/
@Bean(name = "delayQueue")
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
// x-dead-letter-routing-key 声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTE_KEY);
// x-message-ttl 声明队列的TTL, 单位 毫秒
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
}
/**
* 声明死信交换器
*
* @return
*/
@Bean(name = "deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}
/**
* 声明死信队列
*
* @return
*/
@Bean(name = "deadLetterQueue")
public Queue deadLetterQueue() {
return QueueBuilder.nonDurable(DEAD_LETTER_QUEUE_NAME).build();
}
/**
* 延时队列绑定延时交换器
*
* @param delayExchange
* @param delayExchange
* @return
*/
@Bean
public Binding delayQueueBind(@Qualifier(value = "delayQueue") Queue delayQueue,
@Qualifier(value = "delayExchange") DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_MESSAGE_ROUTE_KEY);
}
/**
* 死信队列绑定死信交换器
*
* @param deadLetterQueue
* @param deadLetterExchange
* @return
*/
@Bean
public Binding deadLetterQueueBind(@Qualifier(value = "deadLetterQueue") Queue deadLetterQueue,
@Qualifier(value = "deadLetterExchange") DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTE_KEY);
}
}
创建生产者
@Component
public class MessageProduce {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayMessage(String msg) throws IOException {
ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(false);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("6000") //设置消息过期时间 6 秒,单位毫秒
.build();
channel.basicPublish(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_MESSAGE_ROUTE_KEY, properties, msg.getBytes());
// rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_MESSAGE_ROUTE_KEY, msg);
}
}
创建消费者
import com.rabbitmq.client.Channel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @desc 消费者
*/
@Component
public class MessageConsumer {
private static final Logger logger = LogManager.getLogger(MessageConsumer.class);
/**
* 监听消息
*
* @param message
* @param channel
* @param msgBody
*/
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME)
public void receiveDelayMsg(Message message, Channel channel, String msgBody) {
String msg = new String(message.getBody());
logger.info("receive message:{}, currentTime:{}, ", msgBody, System.currentTimeMillis() / 1000);
try {
//.....处理业务......
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
e.printStackTrace();
}
}
}
发送消息接口
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQMsgController {
@Autowired
private MessageProduce messageProduce;
@RequestMapping("/send")
public void sendDelayMsg(String msg) {
messageProduce.sendDelayMessage(msg + ":" + System.currentTimeMillis() / 1000);
}
}
收到的消息:
receive message:hello1:1616512604, currentTime:1616512610,
receive message:hello2:1616512606, currentTime:1616512612,
receive message:hello3:1616512607, currentTime:1616512613,
receive message:hello4:1616512609, currentTime:1616512615,
receive message:hello5:1616512610, currentTime:1616512616,
延时队列插件
RabbitMQ Delayed Message Plugin:rabbitmq_delayed_message_exchange,该插件将延迟消息(或计划消息)添加到 RabbitMQ。
用户可以声明类型为x-delayed-message
的交换,然后发布带有自定义头为x-delay
的消息,该消息以毫秒为单位表示消息的延迟时间。该消息将在 x
延迟毫秒后被投递到相应的队列。
安装插件
查看插件列表:
rabbitmq-plugins list
默认是没有安装
rabbitmq_delayed_message_exchange
插件的。进入 RabbitMQ 的插件存放目录:
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins
下载
rabbitmq_delayed_message_exchange
插件,注意版本号的匹配wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
具体使用可参考官方文档,或 RabbitMQ(四):过期时间,死信队列,延迟队列,优先队列,持久化 > 延时队列,及下面实现示例。
实现示例
声明交换机和队列
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @desc: 延时队列 TTL+DLX
*/
@Configuration
public class RabbitMQConfig {
/*-------RabbitMQ Plugin:rabbitmq_delayed_message_exchange ----------------*/
public static final String X_DELAYED_EXCHANGE_NAME = "x-delayed-exchange";
public static final String X_DELAYED_EXCHANGE_TYPE = "x-delayed-message";
public static final String X_DELAYED_QUEUE_NAME = "x-delayed-queue";
public static final String X_DELAYED_ROUTE_KEY = "x-delayed-route-key";
/**
* 声明延时队列
*
* @return
*/
@Bean(name = "xDelayedQueue")
public Queue xDelayedQueue() {
return QueueBuilder.nonDurable(X_DELAYED_QUEUE_NAME).build();
}
/**
* 声明延时交换器
*
* @return
*/
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(X_DELAYED_EXCHANGE_NAME, X_DELAYED_EXCHANGE_TYPE, true, false, args);
}
/**
* 队列与交换器绑定
*
* @param queue
* @param customExchange
* @return
*/
@Bean
public Binding bindingNotify(@Qualifier("xDelayedQueue") Queue queue,
@Qualifier("customExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(X_DELAYED_ROUTE_KEY).noargs();
}
}
创建生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
/**
* @desc
*/
@Component
public class MessageProduce {
@Autowired
private RabbitTemplate rabbitTemplate;
public void xDelayedMessage(String msg) throws IOException {
// channel.basicPublish
/*
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);;
HashMap<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
channel.basicPublish(RabbitMQConfig.X_DELAYED_EXCHANGE_NAME, RabbitMQConfig.X_DELAYED_ROUTE_KEY, props, msg.getBytes(Charset.defaultCharset()));
*/
// rabbitTemplate.convertAndSend
rabbitTemplate.convertAndSend(RabbitMQConfig.X_DELAYED_EXCHANGE_NAME, RabbitMQConfig.X_DELAYED_ROUTE_KEY, msg, message -> {
// 下面两都效果一样
message.getMessageProperties().setDelay(6000);
// message.getMessageProperties().setHeader("x-delay", 5000);
return message;
});
}
}
创建消费者
import com.rabbitmq.client.Channel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @desc 消费者
*/
@Component
public class MessageConsumer {
private static final Logger logger = LogManager.getLogger(MessageConsumer.class);
/**
* 监听消息
*
* @param message
* @param channel
* @param msgBody
*/
@RabbitListener(queues = RabbitMQConfig.X_DELAYED_QUEUE_NAME)
public void xDelayedListener(Message message, Channel channel, String msgBody) {
String msg = new String(message.getBody());
logger.info("receive message:{}, currentTime:{}, ", msgBody, System.currentTimeMillis() / 1000);
// try {
// //.....处理业务......
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// } catch (IOException e) {
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
// e.printStackTrace();
// }
}
}
启动服务
启动服务,查看 RabbitMQ 的 Web 管理平台的交换器,存在一个类型为 x-delayed-message
的交换器。
TTL+DLX与插件区别
- TTL+DLX:先进先出(FIFO),消息可能因为头消息不会被立即处理
相关资料
- RabbitMQ 延时队列插件
- RabbitMQ 延时队列插件 Github > rabbitmq/rabbitmq-delayed-message-exchange
- RabbitMQ 官网 > Plugins
- RabbitMQ 官网 > Scheduling Messages with RabbitMQ
- RabbitMQ 官网 > Delayed Message Exchange Plugin
- Spring 官网 > Spring AMQP > Delayed Message Exchange
相关参考
- 一文带你搞定RabbitMQ延迟队列:详细
- 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
- 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
- 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
- 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
- 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址。
- Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
- Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
- 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
- Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
- Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
- Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。
注意:本文归作者所有,未经作者允许,不得转载