什么是广播
广播是向某个主题的所有订阅者发送消息。如果你希望所有订阅者都收到关于某个主题的消息,那么广播是一个不错的选择。
生产者
public class BroadcastProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("broadcast_producer_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10; i++){
Message msg = new Message("TopicBroadcast",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消费者
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer_name");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置消息模式为广播
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicBroadcast", "TagA || TagC || TagD");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
源代码
本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点
注意:本文归作者所有,未经作者允许,不得转载