【RocketMQ】过滤消息

star2017 1年前 ⋅ 713 阅读

前言

在大多数情况下,tag是一种简单而有用的选择消息的设计。
如下示例:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或者TAGB或者TAGC标签的消息,如果限制一个消息只能有一个tag,这可能不适用于复杂的场景,在这种情况下,可以使用SQL表达式来过滤消息。

原理

SQL功能可以通过发送消息时放入的属性进行计算,在RocketMQ定义的语法下,可以实现一些有趣的逻辑,看以下的例子:
image.png

语法

RocketMQ只定义一些基本语法来支持此功能,你也可以很容易地扩展它。
1、数字比较,如>>=<<=BETWEEN=
2、字符比较,如=<>IN
3、IS NULL或者IS NOT NULL
4、逻辑词,如ANDORNOT
常量类型有:
1、数字,如123,456
2、字符,如'abc',必须由单引号组成
3、NULL,特殊常量
4、布尔值,如TRUEFALSE

生产者

发送消息时,可以通过putUserProperty方法将属性放入消息中。

public class FilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("filter_producer_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for(int i=0;i<100;i++){
            Message msg = new Message("TopicFilter2",
                    "tagA",
                    ("Hello RocketMQ "+i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // Set some properties.
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
        }

        producer.shutdown();
    }
}

消费者

使用MessageSelector.bySql来处理消息,如

public class FilterConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_consumer_name");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.subscribe("TopicFilter2",
                MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA', 'TagB'))" +
                        "and (a is not null and a between 0 and 3)"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

出现问题

org.apache.rocketmq.client.exception.MQClientException: CODE: 1  DESC: The broker does not support consumer to filter message by SQL92

在broker.conf文件中添加enablePropertyFilter=true;并启动broker的时候来指定broker.conf文件,如:

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true -c E:\worksoft\rocketmq-4.6.0\conf\broker.conf

源代码

传送门

本文为博主原创文章,未经博主允许不得转载。
更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: