微服务应用(二十一):延时队列之MQ实现方案及RabbitMQTTL+DLX或插件实现

star2017 1年前 ⋅ 1091 阅读

MQ 消息中间件基本都支持延时消息消息过期处理相关功能,可以使用此功能来实现消息队列。

使用 MQ 实现延时队列的基本逻辑,是将消息发送到普通队列,让消息自动过期被转发路由到死信队列,消费者订阅死信队列。

MQ实现延时队列

RabbitMQ实现延时队列

RabbitMQ 支持消息过期和死信列队,可以借助此功能实现延时队列。

基本逻辑是,将消息发送到普通队列,让消息自动过期被转发路由到死信队列,消费者订阅死信队列。

RabbitMQ 实现延时队列有两种方式:

  • 在 RabbitMQ 可以对 Queue 设置 x-expires 过期时间或者对 message设置超时时间x-message-ttl

    注意:延时相同的消息要扔到同一个队列中,对于每一个延时要建立一个与之对应的队列,这是因为 RabbitMQ 的过期检测是惰性检测的,同样可以保证消息处理的顺序性。

  • 使用 RabbitMQ 的 rabbitmq-delayed-message-exchange 插件来实现延时队列。

RocketMQ实现延时队列

RocketMQ 在发送延时消息时,是先把消息按照延迟时间段发送到指定的队列中(把延时时间段相同的消息放到同一个队列中,保证了消息过期能被及时顺序地处理,可以让同一个队列中消息延时时间是相同的,整个 RocketMQ 中延时消息时按照递增顺序排序,保证信息处理的先后顺序性)。

然后通过一个定时器来轮询处理这些队列里的信息,判断是否到期。对于到期的消息会发送到相应的处理队列中,进行业务处理。

注意 :目前 RocketMQ 只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时设置。

Kafka实现延时队

Kafka 基于时间轮算法自定义了一个用于实现延迟功能的定时器(SystemTimer),Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,可以进行相关的延时队列设置。

Kafka 这种消息队列存储来实现延时功能,每个队列的时间都需要创建一个单独的 topic(如: Q1-1s, Q1-2s..)。这种设计在延时时间比较固定的场景下问题不太大,但如果是延时时间变化比较大会导致 topic 数目过多,会把磁盘从顺序读写会变成随机读写从导致性能衰减,同时也会带来其他类似重启或者恢复时间过长的问题。

TTL+DLX实现

RabbitMQ 的过期时间死信队列,可参考 RabbitMQ(四):过期时间,死信队列,延迟队列,优先队列,持久化

基于 RabbitMQ 的 TTL + DLX 实现延迟队列的基本逻辑是:

  1. 生产者发送消息到普通队列(这里称为缓冲队列),消息设置了过期时间,给缓存冲队列绑定死信交换机。
  2. 让缓冲队列中的消息自动过期,死信消息被转发到死信交换机。
  3. 死信交换机再将死信消息路由到死信队列。
  4. 消费都监听死信队列进行处理。

延时队列逻辑

TTL

TTL(Time To Live):消息的存活时间,即消息在队列的保留时间,等同于过期时间,单位为毫秒

当消息在存活期间内没被消费,就变为 死信(Dead Letter)。如果一个队列消息都设置了过期时间,则取小的。

设置消息的 TTL 有两种方式:

  • 在声明队列时设置 x-message-ttl 属性参数,对队列中所有的消息起效(有相同的过期时间)。

    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-message-ttl", 6000);
    channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
    
  • 在发送消息时设置expiration属性参数,只对当前发送的消息起效。

    byte[] messageBodyBytes = "Hello, world!".getBytes();
    AMQP.BasicProperties properties = new AMQP.BasicProperties();
    properties.setExpiration("60000");
    channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
    

设置了消息过期时间,还需要设置 死信交换机,过期的消息要被路由到 死信交换

消息转为死信的三种形式:

  • 消息被拒绝。通过调用 basic.rejectbasic.nack,并且设置的 requeue参数为 false
  • 消息设置了 TTL 而过期。
  • 消息进入了一条已达最大长度的队列。

DLX

Dead Letter Exchange 实际就是一种普通的 exchange,可使用任何交换器类型,使用常规声明即可。

为一个队列设置死信交换器,在声明队列时指定 x-dead-letter-exchange 参数选项。

声明死信交换器

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
// 将声明的交换器指定为死信交换器
args.put("x-dead-letter-exchange", "some.exchange.name");
// 声明队列时传入参数
channel.queueDeclare("myqueue", false, false, false, args);

上面代码,如果 myqueue 队列存在死信,将被重新转发给死信交换器 some.exchange.name再被路由到另一个队列(死信队列),开发者可以监听这个队列进行处理。

注意:在消息需要被死信时,死信交换器应该存在;如果不存在,则消息将被静默删除。

注意:相同过期时间的消息应发送到相同的队列,这样可以保证消息过期能被及时处理;队列消费是先进先出(FIFO),如果不同过期时间的消息存在同一个队列里,因为消息过期是隋性处理(即是在投递到消费者前判断的),如果当前队列消息积压严重,或队列头的消息过期时间远大于后面消息的过期时间,则已过期的消息可能在队列中存活很长时间而没有被及时处理。

RabbitMQ 的延时队列插件不存在此问题,插件是把消息缓存在交换器并做延时检测,再把过期的消息投递到队列。

实现示例

基于 Spring Boot + spring-boot-starter-amqp 实现

添加依赖

Maven pom.xml 添加 Spring Amqp 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置连接

properties 文件配置 RabbitMQ 连接:

#===========Rabbit MQ=====================
spring.rabbitmq.host=192.168.0.120
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.listener.type=direct
#spring.rabbitmq.listener.direct.acknowledge-mode=manual

注意,Spring AMQP 默认配置的 RabbitMQ 的监听器类型是 simple,消费者确认模式默认是自动(AUTO),确认会删除消息,关闭 channel。如果这个时候再调用手动确认(`channel.basicAck),因为消费已被删除会报错的:

Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

这时需要设置监听器的确认模式为手动(manual):

spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.listener.type=direct
#spring.rabbitmq.listener.direct.acknowledge-mode=manual

声明交换机和队列

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @desc: 延时队列 TTL+DLX
 */
@Configuration
public class RabbitMQConfig {

    // 延时
    public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
    public static final String DELAY_QUEUE_NAME = "delay.queue";
    public static final String DELAY_MESSAGE_ROUTE_KEY = "delay.route.key";
    // 死信
    public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
    public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
    public static final String DEAD_LETTER_ROUTE_KEY = "dead.letter.route.key";

    /**
     * 声明延时交换器
     *
     * @return
     */
    @Bean(name = "delayExchange")
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    /**
     * 声明延时队列
     *
     * @return
     */
    @Bean(name = "delayQueue")
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange    声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
        // x-dead-letter-routing-key  声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTE_KEY);
        // x-message-ttl  声明队列的TTL, 单位 毫秒
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
    }

    /**
     * 声明死信交换器
     *
     * @return
     */
    @Bean(name = "deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
    }

    /**
     * 声明死信队列
     *
     * @return
     */
    @Bean(name = "deadLetterQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.nonDurable(DEAD_LETTER_QUEUE_NAME).build();
    }

    /**
     * 延时队列绑定延时交换器
     *
     * @param delayExchange
     * @param delayExchange
     * @return
     */
    @Bean
    public Binding delayQueueBind(@Qualifier(value = "delayQueue") Queue delayQueue,
                                  @Qualifier(value = "delayExchange") DirectExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_MESSAGE_ROUTE_KEY);
    }

    /**
     * 死信队列绑定死信交换器
     *
     * @param deadLetterQueue
     * @param deadLetterExchange
     * @return
     */
    @Bean
    public Binding deadLetterQueueBind(@Qualifier(value = "deadLetterQueue") Queue deadLetterQueue,
                                       @Qualifier(value = "deadLetterExchange") DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTE_KEY);
    }
}

创建生产者

@Component
public class MessageProduce {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayMessage(String msg) throws IOException {
        ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
        Connection connection = connectionFactory.createConnection();
        Channel channel = connection.createChannel(false);
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("6000") //设置消息过期时间 6 秒,单位毫秒
                .build();
        channel.basicPublish(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_MESSAGE_ROUTE_KEY, properties, msg.getBytes());
//        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_MESSAGE_ROUTE_KEY, msg);
    }
}

创建消费者

import com.rabbitmq.client.Channel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @desc 消费者
 */
@Component
public class MessageConsumer {
    private static final Logger logger = LogManager.getLogger(MessageConsumer.class);

    /**
     * 监听消息
     *
     * @param message
     * @param channel
     * @param msgBody
     */
    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME)
    public void receiveDelayMsg(Message message, Channel channel, String msgBody) {
        String msg = new String(message.getBody());
        logger.info("receive message:{}, currentTime:{}, ", msgBody, System.currentTimeMillis() / 1000);
        try {
            //.....处理业务......
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
            e.printStackTrace();
        }
    }
}

发送消息接口

@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQMsgController {

    @Autowired
    private MessageProduce messageProduce;

    @RequestMapping("/send")
    public void sendDelayMsg(String msg) {
        messageProduce.sendDelayMessage(msg + ":" + System.currentTimeMillis() / 1000);
    }
}

收到的消息:

receive message:hello1:1616512604, currentTime:1616512610, 
receive message:hello2:1616512606, currentTime:1616512612, 
receive message:hello3:1616512607, currentTime:1616512613, 
receive message:hello4:1616512609, currentTime:1616512615, 
receive message:hello5:1616512610, currentTime:1616512616,

延时队列插件

RabbitMQ Delayed Message Plugin:rabbitmq_delayed_message_exchange,该插件将延迟消息(或计划消息)添加到 RabbitMQ。

用户可以声明类型为x-delayed-message的交换,然后发布带有自定义头为x-delay的消息,该消息以毫秒为单位表示消息的延迟时间。该消息将在 x 延迟毫秒后被投递到相应的队列。

安装插件

  1. 查看插件列表:rabbitmq-plugins list

    默认是没有安装 rabbitmq_delayed_message_exchange 插件的。

  2. 进入 RabbitMQ 的插件存放目录:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins

  3. 下载 rabbitmq_delayed_message_exchange 插件,注意版本号的匹配

    wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
    
  4. 启用插件

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
  5. 具体使用可参考官方文档,或 RabbitMQ(四):过期时间,死信队列,延迟队列,优先队列,持久化 > 延时队列,及下面实现示例。

实现示例

声明交换机和队列

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @desc: 延时队列 TTL+DLX
 */
@Configuration
public class RabbitMQConfig {

    /*-------RabbitMQ Plugin:rabbitmq_delayed_message_exchange ----------------*/
    public static final String X_DELAYED_EXCHANGE_NAME = "x-delayed-exchange";
    public static final String X_DELAYED_EXCHANGE_TYPE = "x-delayed-message";
    public static final String X_DELAYED_QUEUE_NAME = "x-delayed-queue";
    public static final String X_DELAYED_ROUTE_KEY = "x-delayed-route-key";


    /**
     * 声明延时队列
     *
     * @return
     */
    @Bean(name = "xDelayedQueue")
    public Queue xDelayedQueue() {
        return QueueBuilder.nonDurable(X_DELAYED_QUEUE_NAME).build();
    }

    /**
     * 声明延时交换器
     *
     * @return
     */
    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(X_DELAYED_EXCHANGE_NAME, X_DELAYED_EXCHANGE_TYPE, true, false, args);
    }

    /**
     * 队列与交换器绑定
     *
     * @param queue
     * @param customExchange
     * @return
     */
    @Bean
    public Binding bindingNotify(@Qualifier("xDelayedQueue") Queue queue,
                                 @Qualifier("customExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(X_DELAYED_ROUTE_KEY).noargs();
    }

}

创建生产者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;

/**
 * @desc
 */
@Component
public class MessageProduce {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void xDelayedMessage(String msg) throws IOException {

        // channel.basicPublish
/*
        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);;
        HashMap<String, Object> headers = new HashMap<>();
        headers.put("x-delay", 5000);
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
        channel.basicPublish(RabbitMQConfig.X_DELAYED_EXCHANGE_NAME, RabbitMQConfig.X_DELAYED_ROUTE_KEY, props, msg.getBytes(Charset.defaultCharset()));
*/

        // rabbitTemplate.convertAndSend
        rabbitTemplate.convertAndSend(RabbitMQConfig.X_DELAYED_EXCHANGE_NAME, RabbitMQConfig.X_DELAYED_ROUTE_KEY, msg, message -> {
            // 下面两都效果一样
            message.getMessageProperties().setDelay(6000);
//            message.getMessageProperties().setHeader("x-delay", 5000);
            return message;
        });
    }
}

创建消费者

import com.rabbitmq.client.Channel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @desc 消费者
 */
@Component
public class MessageConsumer {
    private static final Logger logger = LogManager.getLogger(MessageConsumer.class);

    /**
     * 监听消息
     *
     * @param message
     * @param channel
     * @param msgBody
     */
    @RabbitListener(queues = RabbitMQConfig.X_DELAYED_QUEUE_NAME)
    public void xDelayedListener(Message message, Channel channel, String msgBody) {
        String msg = new String(message.getBody());
        logger.info("receive message:{}, currentTime:{}, ", msgBody, System.currentTimeMillis() / 1000);
//        try {
//            //.....处理业务......
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//        } catch (IOException e) {
//            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
//            e.printStackTrace();
//        }
    }

}

启动服务

启动服务,查看 RabbitMQ 的 Web 管理平台的交换器,存在一个类型为 x-delayed-message 的交换器。

rabbitmq-plugin-delayed-exchange

TTL+DLX与插件区别

  • TTL+DLX:先进先出(FIFO),消息可能因为头消息不会被立即处理

相关资料

相关参考

  1. 一文带你搞定RabbitMQ延迟队列:详细
  2. 实现一个延时队列:模拟 DelayQueue 实现自定义的延时对列,对理解 DelayQueue 实现原理非常有帮助。
  3. 有赞延迟队列设计:基于 Redis 实现,把定时任务和消费进行了拆分。
  4. 延时队列实现思路:Redis,RabbitMQ,Kafka,Netty,DelayQueue,没有示例代码。
  5. 定时任务实现几种方式:@schedule 注解,Timer & TimerTask,Quartz,ScheduleExecutorService。
  6. 美图延时队列实现-LMSTFY:基于 Redis 实现,LMSTFY Github地址
  7. Redis实现消息队列:借助了 Redis 的 List 的 BLPOP 或 BRPOP 阻塞消费消息。
  8. Lua Guava-EventBus 实现延时队列,这个实现思路值得参考。
  9. 10种延迟任务实现方式:做了汇总,有示例代码,可参考。
  10. Redus 过期 Key 监听与发布订阅功能:有详情的代码示例参考。
  11. Spring Messaging with Redis:Spring 官方手册,基于 Redis 的 发布/订阅 来发送消息。
  12. Spring Messaging with RabbitMQ:Spring 官方手册,基于 RabbitMQ 的 发布/订阅 来发送消息。
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: