什么是分布式事务
他是2阶段提交的实现,在分布式系统中来保证数据最终一致性,事务性消息确保本地事务的执行和消息的发送可以自动执行,
注意:RocketMq4.3.0及以上才支持。
注意点
1、事务的消息不支持调度和批处理。
2、为了避免一个消息被检查太多次,导致消息堆积,默认检查配置是15次,如果超过这个transactionCheckMax配置的值,那么MQ会丢弃这个消息同时记录错误日志,可以在broker中配置,修改transactionCheckMax值,可以重写AbstractTransactionCheckListener类来改变丢弃消息的行为。
3、事务消息在transactionTimeout配置的时间之后被检查,也可以通过设置CHECK_IMMUNITY_TIME_IN_SECONDS来修改该值,他的优先级比较高。
4、事务性消息可能被多次检查或使用。
5、提交的消息再次提交可能会失败,当前他通过日志记录来判断消息是否提交,高可用性是由RocketMQ本身的高可用性机制确保的,如果想要确保事务消息没有丢失,并且事务完整性得到保证,建议使用同步双写机制。
6、事务性消息的生产者id不能与其他类型消息的生产者id共享。与其他类型的消息不同,事务性消息允许向后查询,MQ服务器根据其生产者id查询客户端。
事务状态
这里有3中状态:
1、TransactionStatus.CommitTransaction:提交事务,意味着所有的消费者可以消费这条消息。
2、TransactionStatus.RollbackTransaction:回滚事务,意味着这条消息将被删除,并且不能进行消费。
3、TransactionStatus.Unknown:
中间状态,意味着MQ需要去检查状态。
应用
1、首先创建一个事务生产者,使用TransactionMQProducer类来创建生产者,并且指定唯一的producerGroup,创建一个线程池来处理回查请求。在执行完本地事务之后,需要再次发送MQ消息,已确认消息的状态,状态就是上面提到的几个事务状态。
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_name");
producer.setNamesrvAddr("localhost:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("事务消息回查检查线程");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
2、实现TransactionListener接口
executeLocalTransaction方法在发送半消息成功之后用于执行本地事务,返回值的状态就是上面提到的三种状态。
checkLocalTransaction方法用来检查本地事务状态,响应MQ检查请求,他的返回值也是上面提到的三种状态。
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
3、消费者
public class TransactionConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//初始化指定消费者的group名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_name");
//指定name server地址
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TransactionTopic", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
源代码
注意:本文归作者所有,未经作者允许,不得转载