rocketmq源码分析-2-客户端设计-多样化的生产消费方式

Thursday, June 16, 2022

TOC

本文是RocketMQ源码分析系列的第二篇,主要从客户端的角度,介绍RocketMQ丰富的消息生产和消费方式,以及这些方式如何满足常见的业务场景需求。

不同消息模式的适用场景

RocketMQ作为一个分布式消息中间件,提供了多种消息发送和消费模式,以满足不同业务场景的需求。参考下方的方法列表,可以看到:

Producer端支持的发送模式

Producer发送消息

  • 发送端支持单条和批量的发送方式;
  • 可以在发送时指定或计算消息的存储队列;
  • 支持异步发送,在高吞吐场景下尽可能利用资源(异步编程可参考这篇文章);
  • 还支持事务消息、延迟消息、单向消息等高级特性,后续会专门介绍。

下面以最简单的单条消息发送流程为例进行分析。

DefaultMQProducer.send(msg, selector, arg)
  └─ DefaultMQProducerImpl.send(msg, selector, arg)
    ├─ 校验producer状态和消息合法性
    ├─ 查找Topic路由信息(本地缓存/NameServer)
    ├─ 选择MessageQueue(通过selector)
    └─ sendKernelImpl(msg, mq, ...)
      ├─ 构造SendMessageRequestHeader
      ├─ MQClientInstance.getMQClientAPIImpl().sendMessage()
      │  └─ NettyRemotingClient.invokeSync()
      │    └─ 通过Netty网络发送到Broker
      └─ 等待Broker响应
    └─ 处理 SendResult,返回给应用

核心流程为:通过NameServer获取Topic的路由信息,然后选择一个MessageQueue,最后通过Netty发送消息到Broker。发送完成后,Broker会返回一个SendResult,包含消息ID、状态等信息。流程并不复杂,此处不再展开。

Consumer端支持的消费模式

Push模式 vs. Pull模式

在消费端,RocketMQ提供了两种消费模式:Push和Pull。Push模式通过DefaultMQPushConsumer启动,由客户端内部线程自动、持续地从Broker拉取消息,拉取到后投递到用户的MessageListener进行消费。对用户来说像是“被推送”过来,实际底层还是拉取(pull),但用户无需关心拉取细节。Pull模式通过DefaultMQPullConsumer启动,由用户主动调用 pull 方法从Broker拉取消息,何时拉、拉多少、拉哪个队列都由用户控制。拉取到消息后,用户自行处理消费和offset提交。

Push模式更加易用,适合大多数业务场景。只需注册监听器,RocketMQ会自动管理拉取、消费、offset提交、重试等。当然,简单易用的背后是SDK封装了大量细节,配置不当可能导致消费异常。下面将详细分析Push模式的实现原理。

先看一下Push模式的核心消费流程:

DefaultMQPushConsumer.start()
  └─ DefaultMQPushConsumerImpl.start()
    ├─ 校验配置、初始化订阅信息
    └─ MQClientInstance.start()
      ├─ 启动网络连接、定时任务
      └─ PullMessageService.run()(线程常驻)
        └─ (定时循环)从pullRequestQueue中取出PullRequest
          ├─ DefaultMQPushConsumerImpl.pullMessage(PullRequest)
          ├─ 构造PullCallback回调函数处理Pull结果
          └─ PullAPIWrapper.pullKernelImpl(messageQueue, ..., nextOffset, ..., pullCallback)
            └─ 从NameServer获取Topic路由信息,构造RemotingCommand
              └─ MQClientAPIImpl.pullMessageAsync()
                └─ NettyRemotingClient.invokeAsync()
                  └─ 通过Netty网络发送到Broker
      └─ RebalanceService.run()(线程常驻)
          └─ (定时循环)基于Group/Topic的消费状态+路由重新生成/移除PullRequest触发/停止Pull操作

流程中有几个核心概念:

  • MessageQueue:消息队列,是对broker上物理队列的抽象。由于每个Topic有多个队列,消费端需要首先处理如何选择和管理这些队列。
  • ProcessQueue:处理队列,客户端本地维护的消息缓存队列。每个分配到的MessageQueue对应一个ProcessQueue,存放从Broker拉取但尚未消费完成的消息,实现消息的有序消费、重试等功能。
  • PullRequest:拉取请求,描述一次从Broker拉取消息的请求,包括要拉取的MessageQueue、拉取的起始offset、拉取的消息数量等。Push模式下由PullMessageService定时生成和调度。
  • PullCallback:拉取回调,异步拉取消息时的回调接口。拉取完成后会回调该接口,处理拉取结果(判断是否拉取成功、投递到ProcessQueue等)。

常用的通用客户端配置项及其作用

这里面还有细节配置,大多数可以通过DefaultMQPushConsumer的配置项来控制,下图给出了主要的配置项:

Consumer消费模式

其中一些比较有趣的项目可以展开说说。

setAllocateMessageQueueStrategy()

该方法用于设置消息队列分配策略。RocketMQ在集群消费模式下,需要将一个Topic下的多个MessageQueue分配给同一消费组内的不同消费者。通过setAllocateMessageQueueStrategy()可以自定义分配算法,常见的有:

  • 平均分配(AllocateMessageQueueAveragely):默认选项。例如有10个队列Q1-Q10,3个消费者C1-C3,则分配结果可能为C1:Q1,Q2,Q3;C2:Q4,Q5,Q6;C3:Q7,Q8,Q9,Q10。单个消费者分配的队列基本相同,且尽可能分布在相同的Broker上。
  • 环形分配(AllocateMessageQueueAveragelyByCircle):同样以10个队列、3个消费者为例,分配结果为C1:Q1,Q4,Q7;C2:Q2,Q5,Q8;C3:Q3,Q6,Q9,Q10。每个消费者轮流分配队列。在队列数量不均等时,环形分配可以更好地平衡负载。
  • 一致性哈希分配(AllocateMessageQueueConsistentHash):适用于需要队列与消费者绑定关系尽量稳定的场景,例如某些消费者需要处理特定队列的消息。但严格来说,基于Hash计算的结果在极端情况下依然会发生变动,因此在分配结果变更导致Rebalance时,仍可能出现消息重复消费及顺序错乱。

setConsumeConcurrentlyMaxSpan()

如前所述,PullRequest控制每次从Broker拉取的消息队列和数量。消息拉取频率由PullMessageService定时循环执行。该定时任务的间隔时间可通过setPullInterval()配置,默认0毫秒,意味着启动后会尽可能快地拉取所有消息,并缓存在ProcessQueue中等待处理。而setConsumeConcurrentlyMaxSpan()则会在ProcessQueue中缓存的消息数超过该值时,降低拉取频率,避免拉取过多消息导致无法及时消费(可能引发内存占用问题,更主要的是增加消息处理延迟)。

setConsumeTimeout()

该参数用于配置业务回调方法的最大执行时间(单位为分钟,默认15)。需要注意,这里的timeout仅用于判断业务处理消息是否超时,并不会触发超时重试。RocketMQ中通过ConsumeConcurrentlyStatus.RECONSUME_LATER标记消息需要重试,而不是通过超时来触发重试。

setPersistConsumerOffsetInterval()

persistConsumerOffsetInterval参数本质上是pullInterval的逆配置项。后者控制拉取消息的频率,前者则控制消费端将消费进度(offset)持久化到Broker的频率。默认5秒。极端情况下,如果实例在commit消息前宕机,可能会丢失5秒内的消费进度,导致重复消费。


这篇文章我们主要聊了RocketMQ客户端在消息生产和消费上的各种“花样”,并以最简单的Push模式介绍了RocketMQ的生产消费完整流程。希望你能了解到RocketMQ作为业务消息中间件,最基础的应用场景是如何使用的。后续我们还会介绍RocketMQ适应不同的业务需求,提供的许多丰富的配置和灵活的API。

不过你有没有想过,这一切的底层通信到底是怎么实现的?客户端和Broker之间的消息到底是怎么传递的?是通过什么样的网络协议和数据结构来实现的?这些问题就留给下一篇文章来解答。