【RocketMQ】定时消息

star2017 1年前 ⋅ 648 阅读

前言

rocketmq定时消息只支持到几秒,几分钟,几小时之后执行,而且执行的等级是固定的,不能通过指定时间来处理消息,消息的延迟等级如下:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

生产者

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // 初始化生产者
        DefaultMQProducer producer = new DefaultMQProducer("scheduled_producer_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动实例
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TopicScheduled", ("Hello scheduled message " + i).getBytes());
            //这个消息将延迟10秒消费
            message.setDelayTimeLevel(3);
            //发送消息
            producer.send(message);
        }

        // 关闭生产者
        producer.shutdown();
    }
}

消费者

public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
        // 初始化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("scheduled_consumer_name");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅主题
        consumer.subscribe("TopicScheduled", "*");
        // 注册消息监听
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // 打印大约的延迟时间
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                            + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

源代码

传送门

本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: