前言
RocketMQ的介绍可以看这里:传送门
本文主要介绍springboot如何集成RocketMq,还有几个简单的代码示例:字符串消息发送,对象消息发送,事务消息。
代码示例
springboot启动类,里面包括了消息的发送:
@SpringBootApplication
public class RocketMqApplication implements CommandLineRunner {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${demo.rocketmq.topic}")
private String springTopic;
@Value("${demo.rocketmq.topic.user}")
private String userTopic;
@Value("${demo.rocketmq.transTopic}")
private String springTransTopic;
public static void main(String[] args) {
SpringApplication.run(RocketMqApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// Send string
SendResult sendResult = rocketMQTemplate.syncSend(springTopic + ":tagA", "Hello, Msg!");
System.out.printf("发送消息 topic %s sendResult=%s %n", springTopic, sendResult);
// Send string with spring Message
sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message ").build());
System.out.printf("发送消息 to topic %s sendResult=%s %n", springTopic, sendResult);
// Send object
User user = new User();
user.setAge(16);
user.setName("zs");
sendResult = rocketMQTemplate.syncSend(userTopic, MessageBuilder.withPayload(user).build());
System.out.printf("发送消息 to topic %s sendResult=%s %n", userTopic, sendResult);
//Send transactional messages
testTransaction();
}
private void testTransaction() throws MessagingException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(null,
springTransTopic + ":" + tags[i % tags.length], msg, null);
System.out.printf("发送事务消息=%s , sendResult=%s %n",
msg.getPayload(), sendResult.getSendStatus());
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
transId);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
retState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
retState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
retState = RocketMQLocalTransactionState.COMMIT;
break;
}
}
System.out.printf("checkLocalTransaction执行," +
" msgTransactionId=%s, TransactionState=%s status=%s %n",
transId, retState, status);
return retState;
}
}
}
消息发送字符串消费
@Service
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("字符串消息: %s ; \n", message);
}
}
消息发送对象消费
@Service
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer")
public class UserConsumer implements RocketMQListener<User> {
@Override
public void onMessage(User message) {
System.out.printf("对象消息: %s ; \n", message);
}
}
实体类
public class User implements Serializable {
private static final long serialVersionUID = 8678741699731601L;
private String name;
private Integer age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=testGroup
rocketmq.producer.sendMessageTimeout=300000
# properties used in the application
demo.rocketmq.topic=string-topic
demo.rocketmq.orderTopic=order-paid-topic
demo.rocketmq.transTopic=spring-transaction-topic
demo.rocketmq.topic.user=user-topic
本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点
注意:本文归作者所有,未经作者允许,不得转载