rocketmq源码分析-3-RPC基础-基于Netty的通信框架

Monday, June 20, 2022

TOC

本文是RocketMQ源码分析系列的第三篇。在上一篇中,我们介绍了客户端的启动流程。无论是Producer还是Consumer,都是通过NameServer获取Broker的路由信息,然后连接到Broker上进行消息的发送和消费。本文将介绍RocketMQ的通信框架——基于Netty实现的RPC模块。阅读本文需要对Netty有一定的了解,我之前也写过Netty相关的系列文章,有兴趣的可以先去看看。而RocketMQ的RPC通信本质上是对Netty的封装,提供了更高层次的抽象。

通信框架核心类

在RocketMQ中,对网络通信提供的核心抽象是下图所示的几个接口和类。从命名上,读者可以先猜一下各自的职责。

image-20250611202857196

嗒嗒!谜底揭晓。

架构的核心是RemotingService/RemotingServer/RemotingClient三个接口和类,它们分别定义了抽象的服务、通信服务端和通信客户端的核心方法。具体来看,它们各自包含下面这些方法:

RemotingService(I)void start()void shutdown()
 │
 ├─ RemotingServer(I)
 │  │ void registerProcessor(reqCode, NettyRequestProcessor, ExecutorService)
 │  │ void registerDefaultProcessor(NettyRequestProcessor, ExecutorService)
 │  │ RemotingCommand invokeSync(Channel, RemotingCommand, timeoutMs)
 │  │ void invokeAsync(Channel, RemotingCommand, timeoutMs, callback)
 │  │ void invokeOneway(Channel, RemotingCommand, timeoutMs)
 │
 ├─RemotingClient(I)void registerProcessor(reqCode, NettyRequestProcessor, ExecutorService)void updateNameServerAddressList(List<String>)
    │ RemotingCommand invokeSync(String, RemotingCommand, timeoutMs)void invokeAsync(String, RemotingCommand, timeoutMs, callback)void invokeOneway(String, RemotingCommand, timeoutMs)void setCallbackExecutor(ExecutorService)

核心类的核心逻辑

这里有几个问题:

  • Q1registerProcessor方法自然是注册Netty回调处理器的。基于给定的请求码(requestCode)和处理器(NettyRequestProcessor),将其注册到服务端或客户端中。但是为什么RemotingServer比RemotingClient多了一个registerDefaultProcessor方法?这个方法的作用是什么?

  • A1:注册服务时需要指定处理线程池(ExecutorService),将部分核心命令(如发送消息、接收消息、维持心跳等)注册到特定的线程池中进行处理,避免互相影响。而registerDefaultProcessor是将一些通用的处理器注册到一个大杂烩线程池中,通常是一些优先级比较低的请求(如创建Topic、创建Group等)。这些偏管理向的操作主要还是在Server端,Client则没有这样的需求,所以没有registerDefaultProcessor方法。

  • Q2:注意invokeSync/invokeAsync/invokeOneway三个方法在RemotingServerRemotingClient中都有定义,但是它们的第一个参数有差异:分别是Channel和String,这意味着什么?

  • A2:在RemotingServer中,invokeSync/invokeAsync/invokeOneway的第一个参数是Channel,表示已经建立好的网络连接(通常是客户端到服务端的连接)。而在RemotingClient中,第一个参数是String addr,表示目标服务器的地址。客户端需要根据地址建立连接后再发送请求,而服务端已经有了与客户端的连接通道。两者的参数差异反映了各自的角色和调用场景。

接下来是NettyRemotingAbstract这个抽象类,提供了一些通用的Netty/网络相关方法。它包含以下几个重要的成员变量和方法:

NettyRemotingAbstract(A)
 │ responseTable: ConcurrentMap<Integer/*opaque*/, ResponseFuture>
 │ processorTable: HashMap<Integer/*reqCode*/, Pair<NettyRequestProcessor, ExecutorService>>void processMessageReceived(ChannelHandlerContext, RemotingCommand)void processRequestCommand(ChannelHandlerContext, RemotingCommand)void processResponseCommand(ChannelHandlerContext, RemotingCommand)void invokeSyncImpl(Channel, RemotingCommand, timeoutMs)void invokeAsyncImpl(Channel, RemotingCommand, timeoutMs, InvokeCallback)void invokeOnewayImpl(Channel, RemotingCommand, timeoutMs)

更具体地,服务端启动流程为:

  1. 使用NettyServerConfig配置作为参数创建NettyRemotingServer实例
  2. NettyRemotingServer实例中注册请求处理器(NettyRequestProcessor)并指定对应的请求码(requestCode)
  3. 调用.start()方法启动server

客户端发送请求的流程:

  1. 使用NettyClientConfig配置作为参数创建NettyRemotingClient实例
  2. 调用.start()方法启动client
  3. 构建请求实体RemotingCommand并指定请求码(requestCode)
  4. 调用invokeSync(或者其他请求方法)发送请求(RemotingCommand)并获取响应(也是一个RemotingCommand)

下面是源码中的一个单元测试,演示如何基于上述框架完成一个最简单的通信流程:

// @see org.apache.rocketmq.remoting.RemotingServerTest

/* Server Side */
NettyServerConfig config = new NettyServerConfig();  
RemotingServer remotingServer = new NettyRemotingServer(config);  
remotingServer.registerProcessor(0, new AsyncNettyRequestProcessor() {  
 @Override  
 public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {  
  request.setRemark("Hi " + ctx.channel().remoteAddress());  
  return request;  
 }  
  
 @Override  
 public boolean rejectRequest() {  
  return false;  
 }  
}, Executors.newCachedThreadPool());  
  
remotingServer.start();

/* Client Side */
RemotingClient client = new NettyRemotingClient(nettyClientConfig);  
client.start();
RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);  
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);

深入RocketMQ+Netty定制逻辑

这里带来几个疑问:

  1. RocketMQ具体对Netty进行了怎样的封装(自定义)?更细化的处理流程是怎样的?
  2. 请求体RemotingCommand的完整结构是怎样的?正式的请求(比如发送消息)的结构是如何基于此结构来定义的?

很显然,服务端是在NettyRemotingServer中引入了对Netty的依赖,通过Netty的启动工具类ServerBootstrap()来创建一个服务端实例。比较重要的实例化步骤包括:

  1. 创建Boss线程池,负责接收和处理Netty的连接请求
  2. 创建Worker线程池,负责接收和处理Netty业务请求
  3. 创建若干Handler实例,负责实现业务逻辑
  4. 构造Netty的处理流程pipeline

其中,构造pipeline的代码如下:

// @see org.apache.rocketmq.remoting.netty.NettyRemotingServer.start()

ServerBootstrap childHandler =
	this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) // 传入 boss 和 worker 线程池
		.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) // 指定使用 epoll 或 nio 方式创建 channel
		// TCP 相关的配置项
		.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()) 
		.option(ChannelOption.SO_REUSEADDR, true)
		.option(ChannelOption.SO_KEEPALIVE, false)
		.childOption(ChannelOption.TCP_NODELAY, true)
		.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) // 绑定 TCP 端口
		// 定义 pipeline
		.childHandler(new ChannelInitializer<SocketChannel>() { 
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
			    // 请求进来之后,依次执行下面的 handler 回调方法做业务逻辑
				ch.pipeline()
				    // SSL 开启时需要在此 handler 处理握手,本次分析忽略 SSL
					.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
					// 将业务 handler 注册到 pipeline,执行的顺序是先 addLast 的先执行
					.addLast(defaultEventExecutorGroup,
						encoder,
						new NettyDecoder(),
						new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
						connectionManageHandler,
						serverHandler
					);
			}
		});

更多Netty的细节可以参看Netty介绍

注意到注册到pipeline的业务handler一共有5个:

  1. NettyEncoder:继承自OutboundHandler,请求出栈时将RemotingCommand编码为ByteBuf
  2. NettyDecoder:继承自InboundHandler,请求入栈时将ByteBuf解码为RemotingCommand对象
  3. IdleStateHandler:继承自ChannelDuplexHandler,当Channel超时无请求时发送一个IdleStateEvent事件,由下面的NettyConnectManageHandler捕捉并断开该Channel
  4. NettyConnectManageHandler:监听Channel状态事件,并回调channelEventListener(如果有注册的话)
  5. NettyServerHandler:继承自ChannelInboundHandler,对接收到的RemotingCommand进行业务处理

下面看看核心业务的NettyServerHandler在接收到请求RemotingCommand之后的处理逻辑:

NettyServerHandler#channelRead0(ChannelHandlerContext, RemotingCommand)
  ├─ NettyRemotingAbstract#processMessageReceived(ChannelHandlerContext, RemotingCommand)
    | if(cmd.getType()==REQUEST) processRequestCommand(ctx, cmd)
    | if(cmd.getType()==RESPONSE) processResponseCommand(ctx, cmd)

以type=REQUEST的请求为例,会找到该请求的reqCode对应的处理器(Processor)和线程池,将逻辑封装成Runnable提交到该线程池进行处理。具体代码如下:

// @see o.a.r.r.netty.NettyRemotingAbstract#processRequestCommand

// processorTable注册了每个reqCode对应的处理器, 注意每个处理器还对应了负责执行它的线程池
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 若没有匹配的处理器,则使用default
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
// opaque用于记录on-going请求,一般是自增的唯一数字标示
final int opaque = cmd.getOpaque();

if (pair != null) {
  Runnable run = new Runnable() {
    @Override
    public void run() {
      try {
        String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        doBeforeRpcHooks(remoteAddr, cmd);
        // 回调函数, 用于将返回体通过netty返回请求方
        final RemotingResponseCallback callback = new RemotingResponseCallback() {
          @Override
          public void callback(RemotingCommand response) {
            doAfterRpcHooks(remoteAddr, cmd, response);
            if (!cmd.isOnewayRPC()) {
              if (response != null) {
                response.setOpaque(opaque);
                response.markResponseType();
                try {
                  // 刷回netty
                  ctx.writeAndFlush(response);
                } catch (Throwable e) {
                  // 异常处理
                }
              } else {// oneway请求, 不需要响应
              }
            }
          }
        };
        if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
          // 异步执行, 在回调方法中调用callback
          AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
          processor.asyncProcessRequest(ctx, cmd, callback);
        } else {
          // 同步执行, 在当前线程(Processor注册的线程池)中执行请求处理, 并回调callback
          NettyRequestProcessor processor = pair.getObject1();
          RemotingCommand response = processor.processRequest(ctx, cmd);
          callback.callback(response);
        }
      } catch (Throwable e) {
        // 异常处理
      }
    }
  };

  if (pair.getObject1().rejectRequest()) {
    // 对应processor处于繁忙状态, 快速返回SYSTEM_BUSY, 跳过后续逻辑
    return;
  }

  try {
    // 将处理逻辑提交到processor的线程池中执行
    final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
    pair.getObject2().submit(requestTask);
  } catch (RejectedExecutionException e) {
    // 异常处理
  }
} else {
  // 无对应processor的异常处理
}

至此,通信框架的介绍完毕。不同的请求有不同的Processor实现,后续文章会继续分析其内部实现。

RemotingCommand的数据包结构

再来看看RemotingCommand的结构,以及如何对其进行序列化。

public class RemotingCommand {
  // 请求码,依赖此标识请求类型
  private int code;
  // 这些参数在发送时会根据序列化方法整合到header中, 具体见RemotingCommand#headerEncode
  private LanguageCode language = LanguageCode.JAVA;
  private int version = 0;
  private int opaque = requestId.getAndIncrement();
  private int flag = 0;
  private String remark;
  private transient CommandCustomHeader customHeader;
  private HashMap<String, String> extFields;
  private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
  // 请求体, 可以为空
  private transient byte[] body;
}

下面是一个示例请求包的结构:

值得注意的是,请求的自定义参数在发送时会封装到一个extFields的Map容器中,发送时通过JSON或RocketMQ自定义的序列化方式转化成encode(header)字节流发送出去。

最后补充一个发送消息的抓包快照:

  • 第一部分: 0x00000153对应十进制是339,表示整个请求包长度为339
  • 第二部分:0x00对应十进制是0,表示header采用JSON的方式进行序列化
  • 第三部分:0x000142对应十进制是322,表示header部分的长度是322
  • 第四部分:是对header进行JSON序列化的内容
  • 第五部分:是请求body,这里发送的是一个时间戳文本

image-20241205204637958


以上完成了对RocketMQ通信框架的介绍,简单总结一下:

  • RocketMQ基于Netty封装了自己的一套通信协议,数据包处理的流程是流式的(pipeline),解包得到特定的请求后由对应的注册回调方法进行处理
  • 无论是客户端还是服务端,都支持sync/async/oneway三种请求方式
  • 数据包结构比较简单,只有五部分:总长度+编码标识+header长度+header内容+body内容