前言
批量发送消息可以提升性能,减少IO开销。
使用约束
同一批的消息应该有:相同的主题,相同的waitStoreMsgOK,不支持定时,此外,每批消息的总大小不应超过1M。
小于1M
public class BatchProducer {
public static void main(String[] args) throws Exception {
//初始化生产者group名称
DefaultMQProducer producer = new DefaultMQProducer("batch_producer_name");
//指定nameserver地址
producer.setNamesrvAddr("localhost:9876");
//启动实例
producer.start();
String topic = "TopicBatch";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
}
//生产者发送完之后需要给他关闭掉
producer.shutdown();
}
}
大于1M
如果要发送的数据量很多,那么最好的方式就是分批发送。
消息分批处理类:
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
//对日志的开销
tmpSize = tmpSize + 20;
if (tmpSize > SIZE_LIMIT) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
消费类:
public class BatchProducerTwo {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("batch_producer_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
//大批消息
String topic = "TopicBatch";
List<Message> messages = new ArrayList<>(100 * 1000);
for (int i = 0; i < 3 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
//将大批消息分成小批:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
}
producer.shutdown();
}
}
源代码
本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点
注意:本文归作者所有,未经作者允许,不得转载