引入依赖
项目中用的是maven,代码如下
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.0</version>
</dependency>
因为我的RocketMQ版本是4.6.0,jar包也用4.6.0,尽量保持一致。
同步消息
public class SyncProducer {
public static void main(String[] args) throws Exception {
//初始化生产者group名称
DefaultMQProducer producer = new
DefaultMQProducer("simple_producer_name");
//指定nameserver地址
producer.setNamesrvAddr("localhost:9876");
//启动实例
producer.start();
for (int i = 0; i < 100; i++) {
//创建消息实例,指定主题、标签、消息体
Message msg = new Message("TopicTest" /* 主题 */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
);
//调用发送消息方法进行消息发送
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//生产者发送完之后需要给他关闭掉
producer.shutdown();
}
}
异步消息
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//初始化生产者group名称
DefaultMQProducer producer = new DefaultMQProducer("simple_producer_name");
//指定nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//创建消息实例,指定主题、标签、消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
//发送完之后会有回调
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
//出现异常可以做一些事情
}
});
}
//生产者发送完之后需要给他关闭掉,需要把这个代码注释掉或者sleep几秒再关闭
// producer.shutdown();
}
}
单向消息
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//初始化生产者group名称
DefaultMQProducer producer = new DefaultMQProducer("simple_producer_name");
//指定nameserver地址
producer.setNamesrvAddr("localhost:9876");
//启动实例
producer.start();
for (int i = 0; i < 100; i++) {
//创建消息实例,指定主题、标签、消息体
Message msg = new Message("TopicTest" /* 主题 */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
);
//调用发送消息方法进行消息发送
producer.sendOneway(msg);
}
//生产者发送完之后需要给他关闭掉
producer.shutdown();
}
}
消费消息
public class SimpleConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//初始化指定消费者的group名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("simple_consumer_name");
//指定name server地址
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
可以通过控制台俩看发送消息、消费消息的日志。
源代码
本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点
注意:本文归作者所有,未经作者允许,不得转载