消息中间件—RocketMQ 消息消费(二)(push 模式实现)

star2017 1年前 ⋅ 5923 阅读

摘要:在 RocketMQ 中,消息消费都是基于 Pull 消息方式,那么 Push 模式中又是如何实现 Consumer 端准实时消费的呢?

在上一篇—“消息中间件—RocketMQ 消息消费(一)”中,已经简要地介绍了下 RocketMQ 中“Pull 和 Push 两种消费方式的简要流程”以及“Push 消费方式的启动流程”(ps:如果不熟悉这几块内容的童鞋,可以自己回顾下上一篇的内容)。本文将详细介绍 RocketMQ 中 Push 消费方式下的“Pull 消息的长轮询机制”和“Consumer 端的负载均衡机制”这两块关键核心内容。

由于 RocketMQ 系列的技术分享存在一定的连续性,因此希望读者能回顾下往期 RocketMQ 分享的篇幅:

http://www.6aiq.com/article/1563128272857
http://www.6aiq.com/article/1563128435731
http://www.6aiq.com/article/1563129642050
http://www.6aiq.com/article/1563129820252

一、RocketMQ 中长轮询的 Pull 消息机制

在上一篇中,已经简略地介绍过 RocketMQ 中消息消费时 Pull 消息的长轮询机制了,其主要的思路是:Consumer 如果第一次尝试 Pull 消息失败(比如:Broker 端没有可以消费的消息),并不立即给消费者客户端返回 Response 的响应,而是先 hold 住并且挂起请求。然后在 Broker 端,通过后台独立线程—PullRequestHoldService 重复尝试执行 Pull 消息请求来取消息。同时,另外一个 ReputMessageService 线程不断地构建 ConsumeQueue/IndexFile 数据,并取出 hold 住的 Pull 请求进行二次处理。通过这种长轮询机制,即可解决 Consumer 端需要通过不断地发送无效的轮询 Pull 请求,而导致整个 RocketMQ 集群中 Broker 端负载很高的问题。

1.1 Consumer 向 Broker 端发送 Pull 消息请求的主要过程

在 RocketMQ 的 Consumer 端,后台独立线程服务—pullMessageService 是 Pull 消息请求的发起者,它不断地尝试从阻塞队列—LinkedBlockingQueue 中获取元素 PullRequest,并根据 pullRequest 中的参数以及订阅关系信息调用 pullAPIWrapper 的 pullKernelImpl()方法发送封装后的 Pull 消息请求—PullMessageRequestHeader 至 Broker 端来拉取消息(具体完成发送一次 Pull 消息的 PRC 通信请求的是 MQClientAPIImpl 中的 pullMessage()方法)。这里涉及细节的时序图(ps:时序图中没有涉及 PRC 异步通信中的 callback 过程)如下:
9b04da3b90da414d9bcd46cbd4317c4b.png

Consumer 向 Broker 端发送长轮询请求的时序图.jpg

其中, DefaultMQPushConsumerImpl 的 pullMessage(pullRequest)方法是发送 Pull 消息请求的关键:

(1)校验 ProcessQueue 是否“drop”, 如果为“drop”为 true 则直接返回(这个“drop”的设置在下面一节—“Consumer 端的负载均衡机制”中会提到);

(2)给 ProcessQueue 设置 Pull 消息的时间戳;

(3)做流量控制,对于满足下面条件的任何一种情况,稍后再发起 Pull 消息的请求;

条件 1:正在消费的队列中,未被消费的消息数和消息大小超过阀值(默认每个队列消息数为 1000 个/消息存储容量 100MB);

条件 2:如果是顺序消费,正在消费的队列中,消息的跨度超过阀值(默认 2000);

(4)根据 topic 获取订阅关系—SubscriptionData;

(5)构建 Pull 消息的回调对象—PullBack,这里从 Broker 端 Pull 消息的返回结果处理是通过异步回调(发送异步通信 RPC 请求),其中如果 Broker 端返回 Pull 消息成功,在回调方法中先填充至处理队列—processQueue 中(将 Pull 下来的消息,设置到 ProcessQueue 的 msgTreeMap 容器中),然后通过消费消息的服务线程—consumeMessageService,将封装好的 ConsumeRequest 提交至消费端消费线程池—consumeExecutor 异步执行处理(具体处理逻辑:通过业务应用系统在 DefaultMQPushConsumer 实例中注册的消息监听器完成业务端的消息消费);

(6)从 Consumer 端内存中获取 commitOffsetValue;

(7)通过 RocketMQ 的 Remoting 通信层向 Broker 端发送 Pull 消息的 RPC 请求;

1.2 Broker 端处理 Pull 消息请求的一般过程

这里先来说下对于一般情况下(即为所要 Pull 的消息在 RocketMQ 的 Broker 端已经是存在,一般可以 Pull 到的情况),Broker 端处理这个 Pull 消息请求的主要过程。其时序图(ps:图中只是画了大部分的流程,详细细节还需要对照源码看下)如下:
8da8886373a34a55a54a5761088de6c6.png

Broker 端接受长轮询请求的处理时序图.jpg

从上面的简易时序图中可以看到 Broker 端 Pull 消息的主要关键点如下:

(1)Pull 消息的业务处理器—PullMessageProcessor 的 processRequest 为处理拉取消息请求的入口,在设置 reponse 返回结果中的 opaque 值后,就完成一些前置的校验(Broker 是否可读、Topic/ConsumerGroup 是否存在、读取队列 Id 是否在 Topic 配置的队列范围数内);

(2)根据“ConsumerGroup”、“Topic”、“queueId”和“offset”这些参数来调用 MessageStore 实例的 getMessage()方法来产尝试读取 Broker 端的消息;

(3)其中,通过 findConsumeQueue()方法,获取逻辑消费队列—ConsumeQueue;

(4)根据 offset 与逻辑消费队列中的 maxOffset、minOffset 的比较,来设置状态值 status,同时计算出下次 Pull 消息的开始偏移量值—nextBeginOffset,然后通过 MappedFile 的方式获取 ConsumeQueue 的 Buffer 映射结果值;

(5)根据算出来的 offsetPy(物理偏移量值)和 sizePy(消息的物理大小),从 commitLog 获取对应消息的 Buffer 映射结果值,并填充至 GetMessageResult 返回对象,并设置返回结果(状态/下次其实偏移量/maxOffset/minOffset)后 return;

(6)根据 isTransferMsgByHeap 的设置情况(默认为 true),选择下面两种方式之一来真正读取 GetMessageResult 的消息内容并返回至 Consumer 端;



方式 1:使用 JDK NIO 的 ByteBuffer,循环地读取存有消息内容的 messageBufferList 至堆内内存中,返回 byte[]字节数组,并设置到响应的 body 中;然后,通过 RPC 通信组件—NettyRemotingServer 发送响应至 Consumer 端;

方式 2:采用基于 Zero-Copy 的 Netty 组件的 FileRegion,其包装的“FileChannel.tranferTo”实现文件传输,可以直接将文件缓冲区的数据发送至通信目标通道 Channel 中,避免了通过循环 write 方式导致的内存拷贝开销,这种方式性能上更优;

(7)在 PullMessageProcessor 业务处理器的最后,提交并持久化消息消费的 offset 偏移量进度;

1.3 Broker 端对于 Pull 请求挂起处理的流程

说完了 Pull 消息请求的一般流程,下面主要看下 Broker 端的 PullMessageProcessor 业务处理器在 RocketMQ 中还没有消息可以拉取情况下(即为:PULL_NOT_FOUND)的处理流程,本节内容也是 RocketMQ 中长轮询机制的关键。

长轮询机制是对普通轮询的一种优化方案,它平衡了传统 Push/Pull 模型的各自缺点,Server 端如果当前没有 Client 端请求拉取的相关数据会 hold 住这个请求,直到 Server 端存在相关的数据,或者等待超时时间后返回。在响应返回后,Client 端又会再次发起下一次的长轮询请求。RocketMQ 的 push 模式正是采用了这种长轮询机制的设计思路,如果在上面所述的第一次尝试 Pull 消息失败后(比如 Broker 端暂时没有可以消费的消息),先 hold 住并且挂起该请求(这里,设置返回响应 response 为 null,此时不会向 Consumer 端发送任何响应的内容,即不会对响应结果进行处理),然后通过 Broker 端的后台线程 PullRequestHoldService 重新尝试和后台线程 ReputMessageService 的二次处理。在 Broker 端,两个后台线程服务 PullRequestHoldService 和 ReputMessageService 是实现长轮询机制的关键点。下面就来分别介绍这两个服务线程:

(1)PullRequestHoldService:该服务线程会从 pullRequestTable 本地缓存变量中取 PullRequest 请求,检查轮询条件—“待拉取消息的偏移量是否小于消费队列最大偏移量”是否成立,如果条件成立则说明有新消息达到 Broker 端,则通过 PullMessageProcessor 的 executeRequestWhenWakeup()方法重新尝试发起 Pull 消息的 RPC 请求(此处,每隔 5S 重试一次,默认长轮询整体的时间设置为 30s);

(2)ReputMessageService:该服务线程会在 Broker 端不断地从数据存储对象—commitLog 中解析数据并分发请求,随后构建出 ConsumeQueue(逻辑消费队列)和 IndexFile(消息索引文件)两种类型的数据。同时从本地缓存变量—pullRequestTable 中,取出 hold 住的 PullRequest 请求并执行二次处理(具体的做法是,在 PullMessageProcessor 的 executeRequestWhenWakeup()方法中,通过业务线程池 pullMessageExecutor,异步提交重新 Pull 消息的请求任务,即为重新调了一次 PullMessageProcessor 业务处理器的 processRequest()方法,来实现 Pull 消息请求的二次处理)。这里,ReputMessageService 服务线程,每处理一次,Thread.sleep(1),继续下一次处理。

二、Consumer 端的负载均衡机制

看了上面一节—**“RocketMQ 中长轮询的 Pull 消息机制”**后,大家可能会有这样子一个疑问:在 Consumer 端 pullMessageService 线程作为消息的主动拉取者不断地从阻塞队列中获取元素 PullRequest,那么这里的 PullRequest 是在哪儿由哪个线程放入至阻塞队列中的呢?本节内容将介绍“Consumer 端的负载均衡机制”,同时解答上面的疑问。

2.1 RocketMQ 为何需要在 Consumer 端做负载均衡?

在 RocketMQ 中,Consumer 端的两种消费模式(Push/Pull)都是基于拉模式来 Pull 消息的,而在 Push 模式中只是采用了长轮询的方式而实现了准实时的自动消息拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要 Consumer 端在知道从 Broker 端的哪一个消息队列—MessageQueue 中去 Pull 消息。因此,消息队列的负载均衡处理(即 Broker 端中多个 MessageQueue 分配给同一个 ConsumerGroup 中的哪些 Consumer 消费),由 Consumer 端来主动完成更为合理。

2.2 Consumer 端负载均衡的主要流程

1. Consumer 端的心跳包发送

在 Consumer 启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端 id 的值等信息)。Broker 端在收到 Consumer 的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable 中,为之后做 Consumer 端的负载均衡提供可以依据的元数据信息。

2. Consumer 端实现负载均衡的核心类—RebalanceImpl

在上一篇文章的"Consumer 启动流程"中已经介绍了在启动 MQClientInstance 实例时候,会完成负载均衡服务线程—RebalanceService 的启动(每隔 20s 执行一次)。通过查看源码可以发现,RebalanceService 线程的 run()方法最终调用的是 RebalanceImpl 类的 rebalanceByTopic()方法,该方法是实现 Consumer 端负载均衡的核心关键。

这里,rebalanceByTopic()方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。这里主要来看下集群模式下的主要处理流程:

(1)从 rebalanceImpl 实例的本地缓存变量—topicSubscribeInfoTable 中,获取该 Topic 主题下的消息消费队列集合(mqSet);

(2)根据 topic 和 consumerGroup 为参数调用 mQClientFactory.findConsumerIdList()方法向 Broker 端发送获取该消费组下消费者 Id 列表的 RPC 通信请求(Broker 端基于前面 Consumer 端上报的心跳包数据而构建的 consumerTable 做出响应返回,业务请求码:GET_CONSUMER_LIST_BY_GROUP);

(3)先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。

c8769d85405f4ea0923e960e7aa2b75e.png

这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端 Consumer 排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range,最后遍历整个 range 而计算出当前 Consumer 端应该分配到的记录(这里即为:MessageQueue)。具体的算法代码如下:
@Override
public List allocate(String consumerGroup, String currentCID, List mqAll,
List cidAll) {
//省略代码......
List result = new ArrayList();
//省略代码......
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;

(4)然后,调用 updateProcessQueueTableInRebalance()方法,具体的做法是,先将分配到的消息队列集合(mqSet)与 processQueueTable 做一个过滤比对,具体的过滤比对方式如下图:
bc2d3365085942fc9e6d76846231aaac.png

RebalancePushImpl 负载均衡(分发 pullRequest 到 pullRequestQueue).jpg

这里可以分如下两种情况来筛选过滤:

**a.图中 processQueueTable 标注的红色部分,表示与分配到的消息队列集合 mqSet 互不包含。**将这些队列设置 Dropped 属性为 true,然后查看这些队列是否可以移除出 processQueueTable 缓存变量,这里具体执行 removeUnnecessaryMessageQueue()方法,即每隔 1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回 true。如果等待 1s 后,仍然拿不到当前消费处理队列的锁则返回 false。如果返回 true,则从 processQueueTable 缓存变量中移除对应的 Entry;

**b.图中 processQueueTable 的绿色部分,表示与分配到的消息队列集合 mqSet 的交集。**判断该 ProcessQueue 是否已经过期了,在 Pull 模式的不用管,如果是 Push 模式的,设置 Dropped 属性为 true,并且调用 removeUnnecessaryMessageQueue()方法,像上面一样尝试移除 Entry;

最后,为过滤后的消息队列集合(mqSet)中的每个 MessageQueue 创建一个 ProcessQueue 对象并存入 RebalanceImpl 的 processQueueTable 队列中(其中调用 RebalanceImpl 实例的 computePullFromWhere(MessageQueue mq)方法获取该 MessageQueue 对象的下一个进度消费值 offset,随后填充至接下来要创建的 pullRequest 对象属性中),并创建拉取请求对象—pullRequest 添加到拉取列表—pullRequestList 中,最后执行 dispatchPullRequest()方法,将 Pull 消息的请求对象 PullRequest 依次放入 PullMessageService 服务线程的阻塞队列 pullRequestQueue 中,待该服务线程取出后向 Broker 端发起 Pull 消息的请求。其中,可以重点对比下,RebalancePushImpl 和 RebalancePullImpl 两个实现类的 dispatchPullRequest()方法不同,RebalancePullImpl 类里面的该方法为空,这样子也就回答了上一篇中最后的那道思考题了。

三、总结

RocketMQ 的消息消费(二)(push 模式实现)篇幅就先分析到这里了。关于 RocketMQ 消息消费的内容比较多也比较复杂,需要读者结合源码并多次 debug 才能对其有一个较为深刻的理解。另外,对于消息消费部分的““消息 ACK 机制”、“消费重试机制”等剩余内容将在后续的篇幅进行介绍和分析。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,如有阐述不合理之处还望留言一起探讨。


本文地址:https://www.6aiq.com/article/1563130068940
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出

更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: