前言
先来看一张图片,
上图显示了消息系统的模型设计。
Producer
生产者把业务系统生成的消息发送给broker,RocketMQ提供多种发送方式:同步、异步、单向。
发送状态
当发送一个消息之后,producer会收到发送结果,其中包括发送状态,首页我们假设消息的isWaitStoreMsgOK=true,如果没有抛出异常,那么producer的发送状态为SEND_OK,这里列出了一些状态:
FLUSH_DISK_TIMEOUT:如果把FlushDiskType=SYNC_FLUSH,默认是ASYNC_FLUSH,broker在刷新数据到磁盘会有一个超时时间(默认是5秒),如果在5秒内数据没有落到磁盘,那么会返回该状态值。
FLUSH_SLAVE_TIMEOUT:如果broker的角色的SYNC_MASTER,默认为ASYNC_MASTER,broker在刷新数据到磁盘会有一个超时时间(默认是5秒),如果从broker在超时前没有完成同步,那么会返回该状态值。
SLAVE_NOT_AVAILABLE:如果broker的角色的SYNC_MASTER,但是你没有配置从broker,那么会返回该状态值。
SEND_OK:返回该状态不一定是可靠的,如果要保证数据不丢失,那么需要开启SYNC_MASTER或SYNC_FLUSH。
重复或丢失
如果producer刚发消息到broker,此时broker刚好挂了,那么这个时候producer会收到FLUSH_DISK_TIMEOUT或 FLUSH_SLAVE_TIMEOUT两个状态,这个时候你发的消息就丢失了。一般情况下你有两个选择,第一个就是不处理,消息丢了就丢了,还有一种是在重新发一次,但有时候因为网络原因你收到没有发送成功的状态,但是实际上发成功了,这个时候消费端会重复收到消息,此时消费端的幂等性一定要做好。但是当返回SLAVE_NOT_AVAILABLE时,重复发送是没用的,这个时候需要发短信告诉管理员,消息中间件出问题了。
超时
客户端发送消息到broker后,需要等他们的响应,有时候因为网络原因,迟迟没有收到响应,当超过最大等待时间(默认是3秒)后会抛出RemotingTimeoutException异常。也可以通过send(msg, timeout)方法来设置等待响应超时时间,建议把超时时间可以设置的大一点,因为broker需要把数据刷到磁盘或者同步到从broker。
消息大小
官方建议消息的大小不要超过512K
异步发送
默认发送完消息会阻塞住,等待broker的响应。如果对性能有要求,那么可以使用异步的方式进行发送。
线程安全
producer是线程安全的
Producer Group
相同功能的生产者被分组在一起,如果原始的生产者崩溃了,那么broker会找到相同生产组的生产者实例来提交或者回滚事务。
首先需要知道的是不同的消费组可以消费相同的主题,并且他们各自维护着自己的消费offsets,需要确认同一个消费组内的消费者订阅相同的主题。
Consumer
消费者从broker中获取消息并交给业务系统,在用户使用视角看,提供了2类消费者:
PullConsumer
主动从broker中获取消息,一旦获取了成批的消息,用户应用程序就会启动消费进程,这里本地需要维护offset信息。
PushConsumer
Push consumer封装了消息提取、消耗进度和维护内部的其他工作,将一个回调接口留给最终用户来实现,该接口将在消息到达时执行。
Consumer Group
与Producer Group一样,把相同的消费者放在一个组里,消费者组是一个很好的概念,在消息消费方面,实现负载平衡和容错的目标非常容易,他们必须订阅相同的topic。
###ConsumeFromWhere
当新增了一个消费者的时候,他需要考虑该从哪里开始处理消息,是否需要处理历史的消息,如果设置为CONSUME_FROM_LAST_OFFSET,那么不会处理历史的消息,如果是CONSUME_FROM_FIRST_OFFSET,那么会消费已经存在于broker里面的所有消息。如果设置为CONSUME_FROM_TIMESTAMP,那么表示会消费指定时间戳之后的消息。
Topic
topic是生产者投递消息、消费者获取消息的主题,生产者、消费者、主题之间的关系是比较松散的,意思就是一个topic可以有0个、1个或多个生产者对它发送消息,相反一个生产者可以对多个topic发送消息。从消费者的角度来看,一个topic可以由0个、1个或多个消费者组订阅。消费者组也可以订阅一个或多个主题,只要该组的实例保持订阅一致。
Message
消息是要传递的信息,一个消息必须要有一个topic。消息还可以有一个可选的tag和额外的键-值对。
Message队列
topic被划分为一个或多个子主题,即“消息队列”。
Tag
标签,也就是子主题,为用户提供了额外的灵活性。对于标记,来自相同业务模块的具有不同用途的消息可能具有相同的主题和不同的标记,这样有助于查询消息。
Broker
Broker是RocketMQ系统的主要组件,它接收来自生产者的消息,存储它们并准备处理来自消费者的拉请求,它还存储与消息相关的元数据,包括消费者组、消费进度偏移量和主题/队列信息。broker的角色有ASYNC_MASTER, SYNC_MASTER 和 SLAVE 三种,如果你不能容忍消息丢失,那么可以配置为SYNC_MASTER和一个slave。如果对消息的丢失可以接受,那么可以设置为ASYNC_MASTER和一个slave。如果你想要你的broker变得简单,那么可以不配置slave。
刷新磁盘类型:建议使用ASYNC_FLUSH,因为SYNC_FLUSH开销很大,而且会导致太多的性能损失。
Name Server
NameServer提供路由选择,生产者、消费者通过NameServer获取broker列表。
Message模型
1、集群
2、广播
Message顺序
当使用DefaultMQPushConsumer是,你希望消息是有顺序的消费。
有序性
有序性表示消息的消费顺序和生产者发送的顺序是一样的,一般的有序是在消息放到某一个分区里面,如果你想要保住消息的全局有序性,那么你需要保证topic只有一个消息队列。如果指定了消费顺序,则消息消费的最大并发性是消费组订阅的消息队列的数量。
并发性
消费者的最高并发性是根据线程池的大小决定的。在此模式下不再保证消息顺序。
注意:本文归作者所有,未经作者允许,不得转载