rocketmq源码分析-6-消息存储-CommitLog与ConsumerQueue

Sunday, July 3, 2022

TOC

本文是RocketMQ源码分析系列的第六篇。在前面的章节中,我们分析了RocketMQ的整体架构、客户端设计、网络通信和Broker启动流程。现在我们来深入探讨RocketMQ最核心的部分——消息存储系统。RocketMQ的消息存储设计堪称经典,通过CommitLog + ConsumerQueue的组合,既保证了消息写入的高性能,又支持了灵活的消息消费模式。

存储设计思想

传统消息队列的存储问题

在分析RocketMQ的存储设计之前,我们先来看看比较传统的消息队列存储架构。以kafka为例,通常每个Topic对应一个或多个Partition,每个Partition又对应多个Segment文件。这种设计虽然简单,但在高并发场景下会遇到以下问题:

  1. 随机写入性能差:如果为每个Topic的每个Partition都独立维护一个文件,在高并发场景下会产生大量随机IO,严重影响性能
  2. 文件数量过多:Topic和Partition数量增长时,文件数量会急剧增加,给文件系统带来压力

RocketMQ的化零为整设计

RocketMQ采用了"化零为整"的设计思想,将所有消息统一写入一个CommitLog文件,然后通过ConsumerQueue建立索引。每个ConsumeQueue对应一个Topic-Queue的逻辑概念,记录着该Topic-Queue上的消息在CommitLog中的存储位置。消费的时候,broker先根据Topic-QueueId-Offset定位到对应的ConsumerQueue文件,然后通过索引快速找到CommitLog中的消息位置。

这种设计有以下优势:

  • 顺序写入:所有消息都追加写入CommitLog,充分利用磁盘顺序IO的高性能
  • 统一存储:不管有多少Topic和Queue,都只需要一个(准确地说是一系列滚动的)CommitLog文件,减少文件数量
  • 索引读取:ConsumerQueue提供了Topic-Queue的索引,消费者可以快速定位到CommitLog中的消息位置,避免全盘扫描;更进一步地,RocketMQ还提供了Index文件支持按Key查询消息。

对比实际的存储文件

假设我们在Kafka和RocketMQ上分别创建了两个Topic(Topic-a, Topic-b),各自包含3个Partition,看看他们生成的存储文件是怎样的:

大的设计有了,下面看看RocketMQ是如何实现这个设计的。

CommitLog详解

CommitLog的文件结构

CommitLog是RocketMQ存储消息的核心文件,具有以下特点:

文件组织形式

  • CommitLog由多个固定大小的文件组成(默认1GB)
  • 文件名为该文件第一条消息的物理偏移量(20位数字,不足左补0)
  • 例如:00000000000000000000、00000000001073741824、00000000002147483648

消息存储格式

每条消息在CommitLog中的存储格式包含:

消息长度(4字节) + 魔数(4字节) + CRC校验(4字节) + Flag(4字节) + 
消息体长度(4字节) + 消息体 + Topic长度(1字节) + Topic + 
Queue信息 + 消息属性(properties)长度 + 消息属性 + ...

CommitLog的写入流程

消息写入的核心方法

// 核心写入方法分析
DefaultMessageStore#asyncPutMessage()
  ├─ CommitLog#putMessage()
    ├─ 获取当前写入文件(MappedFile)
    ├─ 序列化消息内容
    ├─ 追加写入到MappedFile
    └─ 触发刷盘策略

MappedFile与内存映射

  • RocketMQ使用Java NIO的MappedByteBuffer技术
  • 将文件映射到内存,提升读写性能
  • 支持同步/异步刷盘策略

刷盘策略

  1. 同步刷盘:消息写入后立即调用fsync,确保数据持久化
  2. 异步刷盘:定时批量刷盘,提升性能但可能丢失部分数据

ConsumerQueue详解

ConsumerQueue的设计目的

虽然CommitLog解决了写入性能问题,但带来了新的挑战:

  • 消费者需要按Topic-Queue维度拉取消息,如果直接扫描CommitLog,效率很低

ConsumerQueue就是为了解决这个问题而设计的轻量级索引文件。

ConsumerQueue的文件结构

目录组织

ConsumerQueue文件按Topic和QueueId组织目录结构:

store/consumequeue/{topic}/{queueId}/{fileName}

例如:store/consumequeue/TestTopic/0/00000000000000000000

索引条目格式

每个索引条目固定20字节(o.a.r.store.ConsumeQueue:35):

// o.a.r.store.ConsumeQueue:35
public static final int CQ_STORE_UNIT_SIZE = 20;

具体结构如下:

| CommitLog物理偏移量(8字节) | 消息大小(4字节) | Tag哈希值(8字节) |
  • CommitLog物理偏移量:消息在CommitLog文件中的起始位置
  • 消息大小:消息体的字节长度
  • Tag哈希值:消息Tag的哈希值,用于快速过滤

文件文件默认特点

  • 每个包含30万个条目(mappedFileSizeConsumeQueue默认约6MB)
  • 文件名为该文件第一个条目的逻辑偏移量(消息在队列中的序号)
  • 支持扩展字段(ConsumeQueueExt),用于存储更丰富的过滤信息

核心代码结构

ConsumeQueue类(o.a.r.store.ConsumeQueue:32)的核心字段:

// o.a.r.store.ConsumeQueue:32
public class ConsumeQueue {
    private final DefaultMessageStore defaultMessageStore;
    private final MappedFileQueue mappedFileQueue;  // 文件管理
    private final String topic;
    private final int queueId;
    private final ByteBuffer byteBufferIndex;       // 索引缓冲区
    private final String storePath;                 // 存储路径
    private final int mappedFileSize;               // 文件大小
    private long maxPhysicOffset = -1;              // 最大物理偏移量
    private volatile long minLogicOffset = 0;       // 最小逻辑偏移量
    private ConsumeQueueExt consumeQueueExt = null;  // 扩展字段
}

getIndexBuffer方法

根据逻辑Offset获取对应的索引缓冲区(o.a.r.store.ConsumeQueue#getIndexBuffer:544):

// o.a.r.store.ConsumeQueue#getIndexBuffer:544
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    // 将逻辑Offset转换为物理字节偏移量
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    if (offset >= this.getMinLogicOffset()) {
        // 找到对应的MappedFile
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            // 返回该位置的缓冲区
            return mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
        }
    }
    return null;
}

rollNextFile方法

计算切换到下一个文件的逻辑Offset(o.a.r.store.ConsumeQueue#rollNextFile:578):

// o.a.r.store.ConsumeQueue#rollNextFile:578
public long rollNextFile(final long index) {
    int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE;
    // 返回下一个文件起始的逻辑Offset
    return index + totalUnitsInFile - index % totalUnitsInFile;
}

ConsumerQueue的构建流程

ReputMessageService异步构建

ConsumerQueue的构建是由ReputMessageServiceo.a.r.store.DefaultMessageStore.ReputMessageService:1972)异步完成的。该服务会实时扫描CommitLog,将新到达的消息索引分发到对应的ConsumerQueue中。

核心流程(o.a.r.store.DefaultMessageStore.ReputMessageService#doReput:2009)如下:

ReputMessageService.doReput()
  │
  ├─ 从CommitLog读取新消息  [2022]
  │   └─ commitLog.getData(reputFromOffset)
  │
  ├─ 解析消息元数据  [2028-2029]
  │   └─ commitLog.checkMessageAndReturnSize()
  │
  ├─ 触发分发  [2034]
  │   └─ doDispatch(dispatchRequest)
  │
  ├─ 通知消息到达(长轮询)  [2036-2044]
  │   └─ messageArrivingListener.arriving()
  │
  └─ 更新reputFromOffset  [2046]

构建时机

ConsumerQueue的构建发生在两个时机:

  1. Broker启动时:扫描CommitLog文件,重建历史消息的索引
  2. 运行时:通过ReputMessageService实时构建新消息的索引

doDispatch方法

doDispatch方法(o.a.r.store.DefaultMessageStore#doDispatch:1519)负责将消息索引写入对应的ConsumerQueue:

// o.a.r.store.DefaultMessageStore#doDispatch:1519
public void doDispatch(DispatchRequest req) {
    // 写入ConsumerQueue索引
    final int queueId = req.getQueueId();
    if (req.getTopic().equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
        // 延迟消息特殊处理
    } else {
        // 正常消息,写入ConsumeQueue
        ConsumeQueue consumeQueue = findConsumeQueue(req.getTopic(), queueId);
        if (consumeQueue != null) {
            consumeQueue.putMessagePositionInfo(
                req.getCommitLogOffset(),
                req.getMsgSize(),
                req.getConsumeQueueOffset(),
                req.getTagsCode());
        }
    }
    // 写入IndexFile(如果启用了)
    if (this.indexService != null) {
        // ...
    }
}

putMessagePositionInfo方法(o.a.r.store.ConsumeQueue#putMessagePositionInfo:478)负责将索引信息写入MappedFile:

// o.a.r.store.ConsumeQueue#putMessagePositionInfo:478
private boolean putMessagePositionInfo(final long offset, final int size, 
    final long tagsCode, final long cqOffset) {
    // 写入索引数据到ByteBuffer
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);    // CommitLog偏移量
    this.byteBufferIndex.putInt(size);       // 消息大小
    this.byteBufferIndex.putLong(tagsCode);  // Tag哈希值
    
    // 追加到MappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    // ...
    return mappedFile.appendMessage(this.byteBufferIndex.array());
}

这个过程是异步的,Broker会批量将索引写入磁盘,保证消息写入的高性能。

消息读取流程

消费者拉取消息的完整流程

  1. 定位ConsumerQueue:根据Topic、QueueId、ConsumeOffset确定读取位置
  2. 读取索引条目:从ConsumerQueue获取CommitLog偏移量和消息大小
  3. 读取消息内容:根据偏移量从CommitLog读取完整消息
  4. 消息过滤:根据Tag等条件过滤消息
  5. 返回消息:将符合条件的消息返回给消费者

性能优化技巧

预读机制

  • 消费者可以指定批量拉取消息数量
  • ConsumerQueue支持批量读取多个索引条目

缓存机制

  • 热点ConsumerQueue会被缓存在内存中
  • 使用LRU策略管理缓存

Index文件

Index文件的作用

除了ConsumerQueue,RocketMQ还提供了Index文件用于支持按Key查询消息:

使用场景

  • 根据Message Key查找特定消息
  • 支持时间范围查询
  • 用于消息轨迹追踪

文件结构

Index文件采用哈希索引结构:

  • Header:存储文件元信息
  • Hash槽:哈希表的桶
  • Index条目:具体的索引记录

存储相关配置优化

重要配置参数

CommitLog相关

  • commitLogFileSize:单个CommitLog文件大小(默认1GB)
  • flushCommitLogTimed:是否定时刷盘
  • flushIntervalCommitLog:刷盘间隔

ConsumerQueue相关

  • mapedFileSizeConsumeQueue:单个ConsumerQueue文件大小
  • flushIntervalConsumeQueue:ConsumerQueue刷盘间隔

性能调优建议

  • 根据消息大小调整CommitLog文件大小
  • 在可靠性和性能之间选择合适的刷盘策略
  • 合理配置预分配文件数量

通过本章的分析,我们深入了解了RocketMQ消息存储的精妙设计。CommitLog + ConsumerQueue的组合既保证了写入性能,又支持了灵活的消息消费。这种设计思想值得我们在其他存储系统设计中借鉴。

下一章我们将分析消息读取的具体实现,包括正常流程和各种异常情况的处理机制。