【RocketMQ】批量消息

star2017 1年前 ⋅ 636 阅读

前言

批量发送消息可以提升性能,减少IO开销。

使用约束

同一批的消息应该有:相同的主题,相同的waitStoreMsgOK,不支持定时,此外,每批消息的总大小不应超过1M。

小于1M

public class BatchProducer {
    public static void main(String[] args) throws Exception {
        //初始化生产者group名称
        DefaultMQProducer producer = new DefaultMQProducer("batch_producer_name");
        //指定nameserver地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        String topic = "TopicBatch";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
        try {
            producer.send(messages);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //生产者发送完之后需要给他关闭掉
        producer.shutdown();
    }
}

大于1M

如果要发送的数据量很多,那么最好的方式就是分批发送。

消息分批处理类:

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;

    private final List<Message> messages;

    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            //对日志的开销
            tmpSize = tmpSize + 20;
            if (tmpSize > SIZE_LIMIT) {
                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

消费类:

public class BatchProducerTwo {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("batch_producer_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        //大批消息
        String topic = "TopicBatch";
        List<Message> messages = new ArrayList<>(100 * 1000);
        for (int i = 0; i < 3 * 1000; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }

        //将大批消息分成小批:
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            List<Message> listItem = splitter.next();
            producer.send(listItem);
        }

        producer.shutdown();
    }
}

源代码

传送门

本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: