rocketmq源码分析-7-消息拉取流程与消费进度管理

Sunday, July 10, 2022

TOC

本文是RocketMQ源码分析系列的第七篇。在上一章中,我们分析了RocketMQ的消息存储模型,了解了CommitLog和ConsumerQueue的结构(ConsumerQueue是一种轻量级索引文件,每个条目20字节,包含CommitLog物理偏移量、消息大小和Tag哈希值)。本章将继续深入,探讨Broker如何处理消费者的拉取请求,以及消费进度是如何管理的。

背景与问题

消费者拉取消息是RocketMQ消息消费的核心环节。与发送消息不同,拉取是一个"被动"的过程——由消费者主动发起请求,Broker根据请求参数从存储系统中查找消息并返回。这个过程涉及多个组件的协作,理解其中的细节对排查消费问题至关重要。

PullMessageProcessor 核心处理逻辑

请求入口

Broker端处理消息拉取请求的入口是PullMessageProcessoro.a.r.broker.processor.PullMessageProcessor:71)。作为AsyncNettyRequestProcessor的子类,它复写了processRequest方法,接收来自Consumer的拉取请求。

// o.a.r.broker.processor.PullMessageProcessor#processRequest:81
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
    return this.processRequest(ctx.channel(), request, true);
}

处理流程概览

整体处理流程大致分为以下几个阶段:

o.a.r.broker.processor.PullMessageProcessor#processRequest:91
  │
  ├─ 权限与配置校验  [103-150]
  │   ├─ Broker权限检查  [103-107]
  │   ├─ 消费者组存在性检查  [109-121]
  │   ├─ Topic存在性与权限检查  [129-141]
  │   └─ 队列ID合法性检查  [143-150]
  │
  ├─ 订阅信息解析  [152-237]
  │   ├─ 构建SubscriptionData  [156-172]
  │   ├─ 构建ConsumerFilterData  [159-165]
  │   └─ 创建MessageFilter实例  [230-237]
  │
  ├─ 消息查询
  │   └─ messageStore.getMessage()  [239-241]
  │
  ├─ 结果处理与Response构建  [242-461]
  │   ├─ 状态码映射  [279-327]
  │   ├─ 消息编解码返回  [371-407]
  │   ├─ 进度更新  [463-470]
  │   └─ 异常处理(长轮询/Offset越界)  [408-457]
  │
  └─ 返回Response  [471]

参数校验

在调用messageStore.getMessage()之前,PullMessageProcessor做了大量的前置校验工作:

// o.a.r.broker.processor.PullMessageProcessor#processRequest:109
// 检查消费者组配置
SubscriptionGroupConfig subscriptionGroupConfig =
    this.brokerController.getSubscriptionGroupManager()
        .findSubscriptionGroupConfig(requestHeader.getConsumerGroup());

// o.a.r.broker.processor.PullMessageProcessor#processRequest:129
// 检查Topic配置
TopicConfig topicConfig =
    this.brokerController.getTopicConfigManager()
        .selectTopicConfig(requestHeader.getTopic());

// o.a.r.broker.processor.PullMessageProcessor#processRequest:143
// 检查队列ID合法性
if (requestHeader.getQueueId() < 0 ||
    requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
    // 返回错误码
}

这些校验确保了只有合法的请求才会进入后续的消息查找阶段。

消息定位与读取

消息的读取涉及多个组件的协作:DefaultMessageStoreConsumeQueueCommitLog,整个过程是一个"二级查找"的过程。

调用存储层

消息查找的核心入口是MessageStore.getMessage()方法(o.a.r.store.DefaultMessageStore#getMessage:560):

// o.a.r.broker.processor.PullMessageProcessor#processRequest:239
final GetMessageResult getMessageResult =
    this.brokerController.getMessageStore().getMessage(
        requestHeader.getConsumerGroup(),  // 消费者组
        requestHeader.getTopic(),          // Topic
        requestHeader.getQueueId(),         // 队列ID
        requestHeader.getQueueOffset(),     // 拉取起始Offset
        requestHeader.getMaxMsgNums(),      // 最大消息数
        messageFilter);                     // 消息过滤器

核心流程

DefaultMessageStore.getMessage()方法(o.a.r.store.DefaultMessageStore#getMessage:560)的执行流程如下:

o.a.r.store.DefaultMessageStore#getMessage:560
  │
  ├─ 前置检查  [563-576]
  │   ├─ shutdown状态检查
  │   └─ runningFlags可读性检查
  │
  ├─ 获取ConsumeQueue  [590-593]
  │   └─ findConsumeQueue(topic, queueId)
  │
  ├─ Offset合法性检查  [595-607]
  │   ├─ maxOffset == 0 → 无消息
  │   ├─ offset < minOffset → OFFSET_TOO_SMALL
  │   ├─ offset == maxOffset → OFFSET_OVERFLOW_ONE
  │   └─ offset > maxOffset → OFFSET_OVERFLOW_BADLY
  │
  ├─ 读取ConsumeQueue索引  [608]
  │   └─ consumeQueue.getIndexBuffer(offset)
  │
  ├─ 遍历索引条目  [616-688]
  │   ├─ 解析索引条目(offsetPy, sizePy, tagsCode)
  │   ├─ 消息过滤(isMatchedByConsumeQueue)
  │   ├─ 从CommitLog读取消息(commitLog.getMessage)
  │   └─ 二次过滤(isMatchedByCommitLog)
  │
  └─ 返回结果  [730-734]

第一步:查找ConsumeQueue

首先根据Topic和QueueId找到对应的ConsumeQueue(o.a.r.store.DefaultMessageStore#getMessage:590):

// o.a.r.store.DefaultMessageStore#getMessage:590
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
    minOffset = consumeQueue.getMinOffsetInQueue();
    maxOffset = consumeQueue.getMaxOffsetInQueue();
    // ...
}

findConsumeQueue方法(o.a.r.store.DefaultMessageStore#findConsumeQueue:1221)从内部的consumeQueueTable中获取:

// o.a.r.store.DefaultMessageStore#findConsumeQueue:1221
private ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConsumeQueue consumeQueue = this.consumeQueueTable.get(
        KeyBuilder.buildKey(topic, queueId));
    if (null == consumeQueue) {
        consumeQueue = new ConsumeQueue(topic, queueId,
            this.storeConfig.getStorePathConsumeQueue(),
            this.getMessageStoreConfig().getMapedFileSizeConsumeQueue());
        this.consumeQueueTable.put(KeyBuilder.buildKey(topic, queueId), consumeQueue);
    }
    return consumeQueue;
}

第二步:Offset合法性检查

在读取索引之前,需要检查请求的Offset是否合法(o.a.r.store.DefaultMessageStore#getMessage:595):

// o.a.r.store.DefaultMessageStore#getMessage:595
if (maxOffset == 0) {
    status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
    nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {
    status = GetMessageStatus.OFFSET_TOO_SMALL;
    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {
    status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
    nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
    status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}

nextOffsetCorrection方法用于修正Offset:当消费者请求的Offset与Broker实际存储的Offset不一致时,返回一个合理的下次拉取起始位置。

第三步:读取ConsumeQueue索引

当Offset合法时,调用ConsumeQueue.getIndexBuffer()方法(o.a.r.store.ConsumeQueue#getIndexBuffer:544)读取索引:

// o.a.r.store.DefaultMessageStore#getMessage:608
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);

ConsumeQueue的索引条目格式为固定20字节:

| offsetPy (8字节) | sizePy (4字节) | tagsCode (8字节) |

getIndexBuffer方法(o.a.r.store.ConsumeQueue#getIndexBuffer:544)会根据Offset计算出要读取的文件和位置:

// o.a.r.store.ConsumeQueue#getIndexBuffer:544
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    // 计算逻辑Offset对应的物理字节偏移量
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    if (offset >= this.getMinLogicOffset()) {
        // 找到对应的MappedFile
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            // 返回该位置的缓冲区
            return mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
        }
    }
    return null;
}

第四步:遍历索引,从CommitLog读取消息

遍历获取到的索引条目,对每一条进行处理(o.a.r.store.DefaultMessageStore#getMessage:616):

// o.a.r.store.DefaultMessageStore#getMessage:623
// 遍历索引条目
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount;
     i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    // 解析索引条目
    long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();   // CommitLog物理偏移量
    int sizePy = bufferConsumeQueue.getByteBuffer().getInt();       // 消息大小
    long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();   // Tag哈希值

    // ...
}

对于每一条索引,流程如下:

4.1 消息过滤(第一层)

首先在ConsumeQueue层面进行过滤(o.a.r.store.DefaultMessageStore#getMessage:655):

// o.a.r.store.DefaultMessageStore#getMessage:655
if (messageFilter != null
    && !messageFilter.isMatchedByConsumeQueue(
        isTagsCodeLegal ? tagsCode : null,
        extRet ? cqExtUnit : null)) {
    // 过滤不匹配,继续下一条
    continue;
}

这里使用的是索引中保存的tagsCode(Tag的哈希值)进行过滤,避免读取完整的消息体。

4.2 从CommitLog读取消息

通过索引中保存的物理偏移量从CommitLog读取完整消息(o.a.r.store.DefaultMessageStore#getMessage:664):

// o.a.r.store.DefaultMessageStore#getMessage:664
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);

CommitLog.getMessage()方法(o.a.r.store.CommitLog#getMessage:941)根据物理偏移量和大小读取消息:

// o.a.r.store.CommitLog#getMessage:941
public SelectMappedBufferResult getMessage(final long offset, final int size) {
    // 检查offset所在的MappedFile
    // 读取对应的消息内容
    // 返回SelectMappedBufferResult
}

4.3 消息过滤(第二层)

从CommitLog读取到消息后,进行第二轮更精确的过滤(o.a.r.store.DefaultMessageStore#getMessage:674):

// o.a.r.store.DefaultMessageStore#getMessage:674
if (messageFilter != null
    && !messageFilter.isMatchedByCommitLog(
        selectResult.getByteBuffer().slice(), null)) {
    selectResult.release();
    continue;
}

这里可以解析消息的完整内容进行SQL92等复杂过滤。

4.4 添加到结果集

通过过滤的消息被添加到结果集中(o.a.r.store.DefaultMessageStore#getMessage:685):

// o.a.r.store.DefaultMessageStore#getMessage:685
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;

第五步:处理返回结果

最后设置返回结果的各种属性(o.a.r.store.DefaultMessageStore#getMessage:730):

// o.a.r.store.DefaultMessageStore#getMessage:730
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;

其中nextBeginOffset的计算方式为:

// o.a.r.store.DefaultMessageStore#getMessage:695
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

这个值表示下一次拉取应该从哪个Offset开始。

GetMessageResult 结构

GetMessageResulto.a.r.store.GetMessageResult)是消息查找结果的封装,包含以下核心字段:

// o.a.r.store.GetMessageResult
public class GetMessageResult {
    private GetMessageStatus status;           // 查询状态
    private long nextBeginOffset;              // 下次拉取的起始Offset
    private long minOffset;                    // 队列最小Offset
    private long maxOffset;                    // 队列最大Offset
    private List<ByteBuffer> messageBufferList; // 消息内容列表
    private boolean suggestPullingFromSlave;   // 是否建议从Slave拉取
    private int bufferTotalSize;               // 消息总大小
    private int msgCount4Commercial;           // 商业统计用消息数
}

状态码处理

根据不同的查找状态,Broker会返回不同的响应码(o.a.r.broker.processor.PullMessageProcessor#processRequest:279):

状态 响应码 含义
FOUND SUCCESS 成功找到消息
NO_MESSAGE_IN_QUEUE PULL_NOT_FOUND 队列中无消息
NO_MATCHED_MESSAGE PULL_RETRY_IMMEDIATELY 无匹配消息
OFFSET_OVERFLOW_BADLY PULL_OFFSET_MOVED Offset越界严重
OFFSET_TOO_SMALL PULL_OFFSET_MOVED Offset过小
OFFSET_OVERFLOW_ONE PULL_NOT_FOUND Offset刚好到末尾
MESSAGE_WAS_REMOVING PULL_RETRY_IMMEDIATELY 消息正在被删除
// o.a.r.broker.processor.PullMessageProcessor#processRequest:279
switch (getMessageResult.getStatus()) {
    case FOUND:
        response.setCode(ResponseCode.SUCCESS);
        break;
    case NO_MESSAGE_IN_QUEUE:
    case NO_MATCHED_LOGIC_QUEUE:
        response.setCode(ResponseCode.PULL_NOT_FOUND);
        break;
    case OFFSET_OVERFLOW_BADLY:
    case OFFSET_TOO_SMALL:
        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
        break;
    case NO_MATCHED_MESSAGE:
    case MESSAGE_WAS_REMOVING:
        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
        break;
    // ...
}

消息过滤

过滤机制

RocketMQ支持两种消息过滤方式:

  1. Tag过滤:根据消息的Tag标签进行过滤
  2. SQL92过滤:根据消息属性进行SQL表达式过滤

过滤发生在PullMessageProcessor中创建过滤器实例(o.a.r.broker.processor.PullMessageProcessor#processRequest:230):

// o.a.r.broker.processor.PullMessageProcessor#processRequest:230
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
    // 支持重试消息的过滤
    messageFilter = new ExpressionForRetryMessageFilter(
        subscriptionData, consumerFilterData,
        this.brokerController.getConsumerFilterManager());
} else {
    messageFilter = new ExpressionMessageFilter(
        subscriptionData, consumerFilterData,
        this.brokerController.getConsumerFilterManager());
}

过滤发生在存储层,在从CommitLog读取消息后、返回给消费者前进行。这样做的好处是可以避免传输不需要的消息,减少网络开销。

两层过滤设计

从代码中可以看到,RocketMQ采用了两层过滤设计:

  1. 第一层(ConsumeQueue层):使用tagsCode(Tag的哈希值)进行快速过滤

    • 位置:o.a.r.store.DefaultMessageStore#getMessage:655
    • messageFilter.isMatchedByConsumeQueue()
    • 优点:不需要读取消息Body,效率高
  2. 第二层(CommitLog层):解析消息Body进行精确过滤

    • 位置:o.a.r.store.DefaultMessageStore#getMessage:674
    • messageFilter.isMatchedByCommitLog()
    • 优点:支持SQL92等复杂表达式

SQL92表达式过滤

SQL92过滤支持在消息属性上执行类似SQL的表达式判断。过滤逻辑在ExpressionMessageFiltero.a.r.broker.filter.ExpressionMessageFilter:34)中实现:

// o.a.r.broker.filter.ExpressionMessageFilter#isMatchedByCommitLog:118
@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
    // ... 省略部分代码

    // 解析消息属性
    if (tempProperties == null && msgBuffer != null) {
        tempProperties = MessageDecoder.decodeProperties(msgBuffer);
    }

    // 使用编译后的表达式进行评估
    Object ret = null;
    try {
        MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
        ret = realFilterData.getCompiledExpression().evaluate(context);
    } catch (Throwable e) {
        log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
    }

    return ret != null && ret instanceof Boolean && (Boolean) ret;
}

消费者可以设置类似a > 1 AND b = 'hello'的SQL表达式,Broker会解析消息属性并执行表达式判断。例如:

  • a > 100:过滤属性a大于100的消息
  • b = 'vip' AND c <= 10:复合条件过滤

消费进度管理

Offset存储结构

ConsumerOffsetManager(o.a.r.broker.offset.ConsumerOffsetManager:36)负责管理消费进度,其内部使用一个嵌套的ConcurrentMap:

// o.a.r.broker.offset.ConsumerOffsetManager:40
// topic@group -> (queueId -> offset)
ConcurrentMap<String/* topic@group */,
              ConcurrentMap<Integer, Long>> offsetTable;

Key的格式为topic@group,例如:TestTopic@my-consumer-group

进度更新时机

消费进度的更新发生在消息拉取成功后(o.a.r.broker.processor.PullMessageProcessor#processRequest:463):

// o.a.r.broker.processor.PullMessageProcessor#processRequest:463
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
    && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;

if (storeOffsetEnable) {
    this.brokerController.getConsumerOffsetManager().commitOffset(
        channel.remoteAddress(),
        requestHeader.getConsumerGroup(),
        requestHeader.getTopic(),
        requestHeader.getQueueId(),
        requestHeader.getCommitOffset());
}

注意,只有当请求中携带了commitOffsetFlag时才会更新进度。这与消费者的消费模式有关:

  • Push模式:消费者拉取消息后并不立即更新进度,而是在消息消费成功后更新
  • Pull模式:消费者可以自行控制进度提交的时机

进度持久化

ConsumerOffsetManager继承自ConfigManager,提供了持久化能力。进度会定时写入consumerOffset.json文件(o.a.r.broker.offset.ConsumerOffsetManager:36):

// o.a.r.broker.offset.ConsumerOffsetManager (继承自ConfigManager)
// 定时将offsetTable写入consumerOffset.json
public String configFilePath() {
    return BrokerPathConfigHelper.getConsumerOffsetPath();
}

默认情况下,消费进度每5秒持久化一次(可配置,通过persistConsumerOffsetInterval参数)。

进度查询

消费者在拉取消息时,需要传入当前希望拉取的Offset(PullMessageRequestHeader.getQueueOffset())。Broker在处理拉取请求时,会返回nextBeginOffset,告诉消费者下次应该从哪里开始拉取。

// o.a.r.broker.processor.PullMessageProcessor#processRequest:244
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

异常处理与返回

长轮询机制

当队列中没有新消息时,Broker支持长轮询(Long Polling)机制,避免消费者频繁请求(o.a.r.broker.processor.PullMessageProcessor#processRequest:408):

// o.a.r.broker.processor.PullMessageProcessor#processRequest:408
if (brokerAllowSuspend && hasSuspendFlag) {
    long pollingTimeMills = suspendTimeoutMillisLong;
    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
        pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
    }

    PullRequest pullRequest = new PullRequest(
        request, channel, pollingTimeMills,
        this.brokerController.getMessageStore().now(), 
        offset, subscriptionData, messageFilter);
    // 挂起请求,待新消息到达后唤醒
    this.brokerController.getPullRequestHoldService()
        .suspendPullRequest(topic, queueId, pullRequest);
    response = null;
}

PullRequestHoldService 唤醒逻辑

长轮询的核心实现是PullRequestHoldServiceo.a.r.broker.longpolling.PullRequestHoldService:32)。该服务会定期检查被挂起的请求,当新消息到达时唤醒对应的消费者:

// o.a.r.broker.longpolling.PullRequestHoldService#run:67
@Override
public void run() {
    while (!this.isStopped()) {
        try {
            // 长轮询默认5秒检查一次
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }
            
            // 检查所有挂起的请求
            this.checkHoldRequest();
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
}

checkHoldRequest方法遍历所有被挂起的请求,调用notifyMessageArriving检查是否有新消息到达:

// o.a.r.broker.longpolling.PullRequestHoldService#notifyMessageArriving:116
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, ...) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (mpr != null) {
        List<PullRequest> requestList = mpr.cloneListAndClear();
        
        for (PullRequest request : requestList) {
            long newestOffset = maxOffset;
            if (newestOffset <= request.getPullFromThisOffset()) {
                // 重新获取最新Offset
                newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
            }
            
            // 检查是否有新消息
            if (newestOffset > request.getPullFromThisOffset()) {
                // 检查消息过滤条件
                boolean match = request.getMessageFilter().isMatchedByConsumeQueue(...);
                
                if (match) {
                    // 唤醒消费者,重新拉取消息
                    this.brokerController.getPullMessageProcessor()
                        .executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand());
                    continue;
                }
            }
            
            // 检查超时
            if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                // 超时后唤醒,返回PULL_NOT_FOUND
                this.brokerController.getPullMessageProcessor()
                    .executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand());
                continue;
            }
            
            // 仍无可消息,继续挂起
            replayList.add(request);
        }
        
        // 重新放入等待队列
        if (!replayList.isEmpty()) {
            mpr.addPullRequest(replayList);
        }
    }
}

整个长轮询流程如下:

消费者请求无消息
    ↓
PullMessageProcessor#suspendPullRequest 挂起请求
    ↓
PullRequestHoldService 每5秒检查一次
    ↓
有新消息到达时 → notifyMessageArriving 唤醒
    ↓
executeRequestWhenWakeup 重新执行拉取逻辑
    ↓
返回消息给消费者

默认情况下,长轮询超时时间为30秒(可通过suspendTimeoutMillis配置)。

Offset越界处理

当消费者请求的Offset超过当前队列范围时,Broker会返回PULL_OFFSET_MOVED并告知正确的Offset(o.a.r.broker.processor.PullMessageProcessor#processRequest:428):

// o.a.r.broker.processor.PullMessageProcessor#processRequest:428
case PULL_OFFSET_MOVED:
    // ...
    OffsetMovedEvent event = new OffsetMovedEvent();
    event.setConsumerGroup(requestHeader.getConsumerGroup());
    event.setMessageQueue(mq);
    event.setOffsetRequest(requestHeader.getQueueOffset());
    event.setOffsetNew(getMessageResult.getNextBeginOffset());
    this.generateOffsetMovedEvent(event);
    // ...

同时,Broker会产生一个OffsetMovedEvent消息(Topic为OFFSET_MOVED_EVENT),供消费者感知Offset变化并调整消费策略。

消息返回方式

找到消息后,Broker有两种返回方式(o.a.r.broker.processor.PullMessageProcessor#processRequest:381):

// o.a.r.broker.processor.PullMessageProcessor#processRequest:381
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
    // 方式1: 拷贝到堆内存返回
    final byte[] r = this.readGetMessageResult(...);
    response.setBody(r);
} else {
    // 方式2: 零拷贝,通过FileRegion直接传输
    FileRegion fileRegion = new ManyMessageTransfer(
        response.encodeHeader(getMessageResult.getBufferTotalSize()), 
        getMessageResult);
    channel.writeAndFlush(fileRegion);
}

零拷贝方式可以显著提升大消息传输的性能,因为它避免了数据在用户态和内核态之间的拷贝。


通过本章的分析,我们详细了解了消息拉取请求的完整处理流程:从请求校验、消息定位(ConsumeQueue索引查找)、消息读取(CommitLog读取)、两层消息过滤、到进度管理,每个环节都有明确的职责和代码位置。理解这些细节对于排查消费问题(如消息重复消费、消费延迟、Offset跳动等)非常有帮助。

在后续的章节中,我们将继续探讨RocketMQ的其他高级特性,如事务消息、延迟消息等。