前言
RocketMQ使用FIFO(先进先出)顺序提供有序消息,有顺序的消息在实际业务中是比较常见的,比如下单,下面让我们来看看代码如何实现。
rocketmq支持全局顺序和分区顺序,全局顺序只是分区顺序的一种特例,但是对性能会有很大的影响,建议使用分区顺序,以下示例也是使用了分区顺序。
生产者
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//初始化生产者group名称
DefaultMQProducer producer = new DefaultMQProducer("order_producer_name");
producer.setNamesrvAddr("127.0.0.1:9876");
//启动实例
producer.start();
for (int i = 0; i < 10; i++) {
//创建消息实例,指定主题、标签、消息体
Message msg = new Message("TopicOrderTest", "tagA", "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
Integer id = (Integer) arg;
int index = id % mqs.size();
System.out.println("producer:queueId:"+id+";content:"+new String(msg.getBody()));
return mqs.get(index);
}, 3);
}
//生产者关闭
producer.shutdown();
}
}
执行结果
消费者
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_name");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicOrderTest", "*");
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
System.out.println("consumer:queueId:"+msgs.get(0).getQueueId()+";content:"+new String(msgs.get(0).getBody()));
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
执行结果
源代码
本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点
注意:本文归作者所有,未经作者允许,不得转载