rocketmq源码分析-4-极简设计-NameServer与数据一致性

Monday, June 27, 2022

TOC

本文是RocketMQ源码分析系列的第四篇。前面章节已经简单提到,Topic在Broker中的路由信息是保存在Nameserver上的。准确来说,这些信息不应被称为路由信息,称为名字信息更为准确。例如在消费侧,消费者实例应从哪个队列拉取消息,是各消费者实例基于全局信息,在本地根据一致性hash计算出来的,而不是由Nameserver中心化地提供路由表。本章将深入分析Nameserver的设计与实现,探讨其如何存储和提供这些信息,以及这种设计对数据一致性的影响。

Nameserver保存的信息主要包括三类:1)集群 2)Broker 3)Topic。这些信息被组织成内存中的Map,并通过相应接口供客户端(包括SDK和Broker)使用。与常见的配置中心/名字服务不同,RocketMQ的Nameserver之间不存在数据同步——即不是强一致,甚至不保证最终一致,物理上是相互独立的节点。实际上,每个Nameserver内部的信息来自各个Broker的主动上报(Broker上持久化保存了Topic的信息),而Nameserver上看到的路由信息都是动态的。

你一定会好奇,没有数据同步,不会导致数据不一致吗?我们先来看一下Nameserver的启动流程,之后再分析不同场景下Nameserver的数据一致性问题。

启动流程

先来看一下启动流程

├─ static main0()
  ├─ NamesrvStartup#createNamesrvController(String[])
    | new NamesrvConfig();
    | new NettyServerConfig()
    | new NamesrvController(namesrvConfig, nettyServerConfig)
        | new KVConfigManager()
        | new RouteInfoManager()
        | new BrokerHousekeepingService()
  ├─ NamesrvStartup#start(NamesrvContrtoller)
    ├─ NamesrvController#initialize()
	    | new NettyRemotingServer(NettyServerConfig, ChannelEventListener)
	      | remotingServer.registerDefaultProcessor(Processor,Executor)
	    | scheduleAtFixedRate()->routeInfoManager.scanNotActiveBroker()
	├─ NamesrvController#start()
	    | remotingServer.start()

启动过程主要做了以下几件事:

  1. 加载配置文件,初始化NamesrvConfigNettyServerConfig
  2. 使用NamesrvConfigNettyServerConfig创建NamesrvController实例。该Controller主要包含三个内部组件:
    1. KVConfigManager,逻辑较为简单,维护一个Map,直接将配置写在内存中,未做持久化。
    2. RouteInfoManager,管理集群核心的一些拓扑信息。
    3. BrokerHousekeepingService,负责Broker相关的管理。
  3. 初始化NamesrvController,将主Processor注册到RemotingServer中,并启动定时任务扫描不活跃的broker。
  4. 启动网络服务:remotingServer.start(),开始处理请求。

这里值得关注的是RouteInfoManager,其内部属性如下:

// org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager.java

// 读写锁,控制元数据的安全操作
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// brokerName->broker元数据,一个BrokerName关联一主多从N个Broker实例
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// topicName->queue关系
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// clusterName->brokerName列表,一个clusterName关联多个brokerName
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// brokerAddr(ip:port)->Broker存活信息
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;  
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

可以看出,RouteInfoManager管理的集群拓扑信息是相对静态的,如“集群内有哪些Broker”、“Topic在集群内的队列分布”,这些信息的特点是除非显式发起更新操作,否则在正常运行中不会改变(除非broker上下线,否则broker的存活信息也是相对静态的)。仅保存这类静态数据,使Nameserver更加轻量。

相比之下,group消费信息变化更频繁。消费者数量变化,或topic队列信息变化时,都会导致group消费路由发生改变。在大规模topic+queue+group场景下,频繁更新路由信息(即rebalance)将是非常重量级的操作。如果中心化地进行计算和配置,对controller是极大的负担(Kafka就是这样实现的)。而RocketMQ则将路由更新操作下放到消费者和broker分布式实现,这是一个很有意思的设计。

Topic创建/删除

从Nameserver的角度看,Topic本质是HashMap<String/* topic */, List<QueueData>> topicQueueTable容器内的一个Entry,因此创建/删除Topic本质上是对该Map的操作。不过RocketMQ在实现上,对创建Topic(UpdateTopicSubCommand)和删除Topic(DeleteTopicSubCommand)操作做了不同设计:

创建Topic的流程为:

  1. Admin发送创建Topic请求RequestCode=UPDATE_AND_CREATE_TOPIC到所有相关Broker
  2. Broker内部完成Topic创建
  3. Broker通过心跳请求RequestCode=REGISTER_BROKER向Nameserver上报本地的Topic列表,其中包含新建的Topic

    4.5.0版本后在第2步完成后会立即执行第3步,以加速Topic创建。但本质上都是由Broker而非Admin向Nameserver发起请求。

而删除Topic的流程则为:

  1. Admin发送删除Topic请求RequestCode=DELETE_TOPIC_IN_BROKER到所有相关Broker
  2. Admin发送删除Topic请求RequestCode=DELETE_TOPIC_IN_NAMESRV到Nameserver,直接删除NameServer中的Topic信息。

虽然细节上有差异,但本质上创建和删除Topic的逻辑都是先更新Broker,再同步更新到Nameserver。而真正的持久化操作其实在Broker,Nameserver通过心跳更新与broker保持同步。

Broker上/下线

由于持久化的Topic元数据实际只保存在各Broker上,Nameserver通过各Broker定时上报(默认30s)的心跳信息来拼凑整个集群的Topic信息。因此可能会出现一些“神奇”的现象。

创建Topic时,可以指定目标Broker;如果这些存储该Topic的Broker全部下线,对应Topic便会从集群内消失——即便你没有显式删除它,而且Nameserver也正常运行。换句话说,如果集群内各Broker维护的Topic不一致(这是RocketMQ设计上允许的),同集群内不同Broker的数据就不对等。这种不对等的Broker关系,给集群维护带来很大灵活性,但如果运维不当,也可能带来灾难。

同理,当上线新Broker时,RocketMQ并不会自动为其复制现有的Topic信息,需要通过一些手段处理。例如从现有Broker上Clone Topic信息(原生Admin未提供CloneTopic工具),或通过Admin手动创建到新Broker上。

数据一致性

从上面的介绍可以看出,Nameserver之间不会有任何数据交互,甚至彼此都不知道对方的存在。唯一将Nameserver组成集群的,是Broker的启动配置里,将一组Nameserver实例的地址填到namesrvAddr配置项上;以及使用Admin工具时指定namesrvAddr参数。这种极轻量的设计使Nameserver几乎可以视作无状态服务。在设计理念上,RocketMQ的Nameserver追求“最终一致性”,在CAP中倾向于AP(可用性和分区容忍性)。相比之下,Kafka的Zookeeper作为中心化配置中心,追求强一致性(CP),但在分布式环境下会牺牲可用性。

那么在RocketMQ中,如何保证数据一致性?答案是通过客户端的重试和幂等来实现。具体来说,由于Nameserver的设计,可能出现以下几种路由数据不一致的情况:

Broker的Topic路由与客户端不一致: 首先看单一生产者&消费者的情况:

  • 生产者角度:
    • 若生产者缺少Topic在某Broker的路由信息,可能导致消息发送失败。此时生产者可通过重试机制重新获取路由信息,可能导致消息发送延迟,或消息不均衡地分布到其他Broker。
    • 若生产者发送的消息目标Topic在Broker上不存在,Broker会返回错误码(如TopicNotExist)。
  • 消费者角度:
    • 若消费者缺少Topic在某Broker的路由信息,会导致该Broker上的消息消费延迟。
    • 若消费者尝试消费一个不存在的Topic,Broker会返回错误码(如TopicNotExist)。 多生产者的情况与单生产者一致:
  • 任何一个生产者缺少Topic在某Broker的路由信息,只会导致该生产者发送失败,不影响其他生产者。通过路由更新,最终所有生产者会获取到最新路由信息,实现数据均衡。 多消费者的情况则更复杂:
  • 假设有两个消费者C1和C2,订阅的Topic共有4个队列Q1-Q4。若出现路由不一致,可能出现以下场景:
    • C1获取到Q1-Q4的完整路由信息,C2只获取到Q1-Q2。基于消费端最简单的平均分配(AllocateMessageQueueAveragely)策略,C1会消费Q1和Q3,C2会消费Q2。这样会导致Q4上的消息消费延迟,直到C2重新获取到Q4的路由信息。
    • C1获取到Q1-Q4的完整路由信息,C2只获取到Q2-Q3。同样策略下,C1会消费Q1和Q3,C2会消费Q3。这样导致Q3上的消息:1)重复消费,依赖消费端幂等性处理;2)消费乱序,在大多数只需At Least Once的场景下可以接受。如果需要严格顺序,则需更复杂的策略保证。

上面假设Broker与Namesvr之间的路由信息一致,客户端与Broker/Namesvr之间路由信息不一致。还有一种情况是Broker和Namesvr之间路由信息不一致,以及客户端-Namesvr-Broker三者路由均不一致。这时应从单一生产者&消费者角度看:

  • 生产者发送消息依赖可靠的消息投递策略,只要能保证已发送成功的消息被成功持久化到Broker,生产端就是可靠的。
  • 消费端,只要能保证至少一个消费者实例最终能从目标Broker拉取消息,基于可靠的消息拉取策略,也能保证消息可靠消费。

因此,Namesvr提供的最终一致性路由信息是否会导致数据不一致(准确说是消息丢失,因为重复和顺序问题是可能发生的),则依赖于客户端和Broker之间消息发送和消费的可靠性。这个问题将在后续章节详细讨论。


简单总结本章内容:

  • RocketMQ的NameServer设计非常简单,节点之间互不通信,数据一致性主要靠Broker主动上报和客户端重试保证。
  • NameServer只保存静态元数据,路由和消费分配都交给客户端和Broker自己处理,系统扩展非常轻松。
  • NameServer追求“最终一致性”,不强求每个节点数据都一样,但这样做让服务非常高可用。
  • 路由信息短时间内可能不一致,但只要客户端和Broker有重试和幂等机制,消息最终都能被可靠投递和消费。