【RocketMQ】事务消息

star2017 1年前 ⋅ 615 阅读

什么是分布式事务

他是2阶段提交的实现,在分布式系统中来保证数据最终一致性,事务性消息确保本地事务的执行和消息的发送可以自动执行,
注意:RocketMq4.3.0及以上才支持。

注意点

1、事务的消息不支持调度和批处理。
2、为了避免一个消息被检查太多次,导致消息堆积,默认检查配置是15次,如果超过这个transactionCheckMax配置的值,那么MQ会丢弃这个消息同时记录错误日志,可以在broker中配置,修改transactionCheckMax值,可以重写AbstractTransactionCheckListener类来改变丢弃消息的行为。
3、事务消息在transactionTimeout配置的时间之后被检查,也可以通过设置CHECK_IMMUNITY_TIME_IN_SECONDS来修改该值,他的优先级比较高。
4、事务性消息可能被多次检查或使用。
5、提交的消息再次提交可能会失败,当前他通过日志记录来判断消息是否提交,高可用性是由RocketMQ本身的高可用性机制确保的,如果想要确保事务消息没有丢失,并且事务完整性得到保证,建议使用同步双写机制。
6、事务性消息的生产者id不能与其他类型消息的生产者id共享。与其他类型的消息不同,事务性消息允许向后查询,MQ服务器根据其生产者id查询客户端。

事务状态

这里有3中状态:
1、TransactionStatus.CommitTransaction:提交事务,意味着所有的消费者可以消费这条消息。
2、TransactionStatus.RollbackTransaction:回滚事务,意味着这条消息将被删除,并且不能进行消费。
3、TransactionStatus.Unknown:
中间状态,意味着MQ需要去检查状态。

应用

1、首先创建一个事务生产者,使用TransactionMQProducer类来创建生产者,并且指定唯一的producerGroup,创建一个线程池来处理回查请求。在执行完本地事务之后,需要再次发送MQ消息,已确认消息的状态,状态就是上面提到的几个事务状态。

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, InterruptedException {

        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_name");
        producer.setNamesrvAddr("localhost:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("事务消息回查检查线程");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

2、实现TransactionListener接口
executeLocalTransaction方法在发送半消息成功之后用于执行本地事务,返回值的状态就是上面提到的三种状态。
checkLocalTransaction方法用来检查本地事务状态,响应MQ检查请求,他的返回值也是上面提到的三种状态。

public class TransactionListenerImpl implements TransactionListener {

    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

3、消费者

public class TransactionConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        //初始化指定消费者的group名称
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_name");

        //指定name server地址
        consumer.setNamesrvAddr("localhost:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("TransactionTopic", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

源代码

传送门

本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: