前言
在大多数情况下,tag是一种简单而有用的选择消息的设计。
如下示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或者TAGB或者TAGC标签的消息,如果限制一个消息只能有一个tag,这可能不适用于复杂的场景,在这种情况下,可以使用SQL表达式来过滤消息。
原理
SQL功能可以通过发送消息时放入的属性进行计算,在RocketMQ定义的语法下,可以实现一些有趣的逻辑,看以下的例子:
语法
RocketMQ只定义一些基本语法来支持此功能,你也可以很容易地扩展它。
1、数字比较,如>
, >=
, <
, <=
, BETWEEN
, =
2、字符比较,如=
, <>
, IN
3、IS NULL
或者IS NOT NULL
4、逻辑词,如AND
, OR
, NOT
常量类型有:
1、数字,如123,456
2、字符,如'abc',必须由单引号组成
3、NULL,特殊常量
4、布尔值,如TRUE
或FALSE
生产者
发送消息时,可以通过putUserProperty方法将属性放入消息中。
public class FilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("filter_producer_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for(int i=0;i<100;i++){
Message msg = new Message("TopicFilter2",
"tagA",
("Hello RocketMQ "+i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
}
producer.shutdown();
}
}
消费者
使用MessageSelector.bySql来处理消息,如
public class FilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_consumer_name");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicFilter2",
MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
出现问题
org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
在broker.conf文件中添加enablePropertyFilter=true;并启动broker的时候来指定broker.conf文件,如:
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true -c E:\worksoft\rocketmq-4.6.0\conf\broker.conf
源代码
本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点
注意:本文归作者所有,未经作者允许,不得转载