前言
rocketmq定时消息只支持到几秒,几分钟,几小时之后执行,而且执行的等级是固定的,不能通过指定时间来处理消息,消息的延迟等级如下:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
生产者
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 初始化生产者
DefaultMQProducer producer = new DefaultMQProducer("scheduled_producer_name");
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动实例
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TopicScheduled", ("Hello scheduled message " + i).getBytes());
//这个消息将延迟10秒消费
message.setDelayTimeLevel(3);
//发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
消费者
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 初始化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("scheduled_consumer_name");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅主题
consumer.subscribe("TopicScheduled", "*");
// 注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// 打印大约的延迟时间
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
源代码
本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点
注意:本文归作者所有,未经作者允许,不得转载