netty发送数据使用的接口方法是ChannelOutboundInvoker#write()
,它共有三个主要的实现方案:
// 1) AbstractChannel#write(Object) // 2) AbstractChannelHandlerContext#write(Object) // 3) DefaultChannelPipeline#write(Object)
即可以在Channel, HandlerContext, Pipeline三个级别实行发送。但发送逻辑只有两种类型:
- 从
Pipeline
的TailContext
开始发送,即不会触发链上的下游的出站Handler的write()
方法 (方案1和方案3) - 从当前
HandlerContext
开始调用write方法,会依次调用Pipeline链上的下游出站Handler的write()
方法 (方案2)
大多数业务还以方案2为主,下面我将基于此方案来展开。
write
write方法支持ChannelPromise
来实现异步回调:
# ChannelFuture.java public ChannelFuture write(final Object msg, final ChannelPromise promise) { // (省略) msg和promise合法性校验 write(msg, false, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { // 获取下游Context AbstractChannelHandlerContext next = findContextOutbound(); // 引用计数, 用于内存泄露检测 final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // 在当前线程同步调用 if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { // 封装成Task在EventLoop线程异步调用 AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } } private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; // 从当前context开始遍历 do { ctx = ctx.prev; } while (!ctx.outbound); // 跳过所有非Outbound的handler return ctx; }
关于Flush后文再做介绍,这里先介绍单纯的write操作。可以看到调用HandlerContext
的write()方法,会通过findContextOutbound()
寻找Pipeline链上的前一个(prev)出站Handler,即跟read()是正好相反的方向。而非出站Handler会被忽略跳过。
找到Handler之后,下一步就是调用其write()
方法。对于在Pipeline中间层级的业务Handler,在对输入的Object进行修改和封装后,需要重新通过HandlerContext#write(Object)
方法交给下级Handler让数据在Pipeline上继续流动。直到到达Pipeline尾部,即HeadContext(因为出站数据是从tail到head流动的)通过unsafe
写出:
# DefaultChannelPipeline$HeadContext.java public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } # AbstractChannel$AbstractUnsafe.java public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); // 必须在EventLoop线程中执行 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // (省略) 异常处理 } int size; try { // 将msg进行过滤和修饰, 默认无处理, 由子类Channel实现 msg = filterOutboundMessage(msg); // 获取当前待发送的消息的大小, 对于直接发送文件的返回0(支持零拷贝) size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { // (省略) 异常处理 return; } outboundBuffer.addMessage(msg, size, promise); }
Channel$Unsafe#write()
方法只有AbstractChannel
实现了,奇怪么?不是说Unsafe是跟具体的实现有关么?原来这里的write方法还未真正触发IO写出,仅仅将msg:Object
放入outbundBuffer
,这在所有的Channel实现中都是统一的,所以只在AbstractChannel做了实现。
接下来是msg = filterOutboundMessage(msg)
方法,留给子类实现,在我们讨论的NioSocketChannel中,将堆内存转化为堆外内存(准确地说是转化为池化的堆外内存,非池化的堆外内存代价太高,得不偿失),这样可以利用零拷贝提高性能。
# AbstractNioByteChannel.java protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } // 堆内Buffer转堆外Buffer return newDirectBuffer(buf); } if (msg instanceof FileRegion) { // return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
接下来就进入发送数据的重头戏,缓存。
ChannelOutboundBuffer
此Buffer是netty专门提供用于出站数据缓存的容器,内部的数据结构是三个链表。

而填充这些链表的Entry对象,基于可重复使用的对象池来创建,进一步减少了性能消耗。这是netty提供的很有意思的一个设计。
# ChannelOutboundBuffer public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } // 累计写入buffer的数据大小, 超过阈值修改buffer状态为不可写入 incrementPendingOutboundBytes(size, false); } # ChannelOutboundBuffer$Entry.class static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { // 从线程对象池中获取entry Entry entry = RECYCLER.get(); // (省略) entry赋值初始化 return entry; }
在increamentPendingOutboundBytes()
方法中会更新缓存中总的数据量大小,如果超过高水位(通过WriteBufferWaterMark#high
配置, 默认64k),会修改ChannelOutboundBuffer
的状态为不可写。同时还会触发一个Pipeline事件fireChannelWritabilityChanged
。
flush
数据通过write方法只是写入到ChannelOutboundBuffer
做了缓存,真正的数据发送需要通过flush。跟write()
的调用逻辑类似,经过HandlerContext或者Channel实现的wtite()
,所有flush方法实现最终会调用AbstractUnsafe的实现:
# AbstractChannel$AbstractUnsafe.java protected void flush0() { if (inFlush0) { return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; if (!isActive()) { // (省略) 如果当前channel已关闭, 所有flushed内的Entry作失败处理 return; } try { doWrite(outboundBuffer); } catch (Throwable t) { // (省略) 异常处理 } finally { inFlush0 = false; } } # NioSocketChannel.java protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { // flushed链表剩余元素的数量 int size = in.size(); if (size == 0) { clearOpWrite(); // flushed链表发送结束,清空SelectionKey#OP_WRITE break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; // buffer.flushedEntry链表转为nioByteBuffer数组, 仅仅转换in中类型为ByteBuf的元素 ByteBuffer[] nioBuffers = in.nioBuffers(); // 待发送的buffer数, 即nioBuffers数组元素数量 int nioBufferCnt = in.nioBufferCount(); // 待发送数据的字节数, 即nioBuffers所有元素总字节数 long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); switch (nioBufferCnt) { case 0: // 由于in中可能包括非ByteBuf类型的元素(比如直接发送文件的FileRegion类型),nioBuffers()方法 // 不会将其放入数组, 以致cnt为0. 本方法仅处理nioByteBuffer类型的发送, 交给父类处理 super.doWrite(in); return; case 1:// 单个buffer使用 // 只做有限次(默认16)循环, 防止单个连接占用太多资源 ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { // nio SocketChannel发送 nio ByteBuffer, 返回实际发送的字节数(大小取决于tcp协议). Socket非阻塞默认(一般都是)下返回0表示无数据发送 final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { // nioBuffer有数据, 但socket发送出去的字节数为0, 设置setOpWrite=true, 退出发送循环, 且后续不会马上新增task继续发送, 而是注册OP_WRITE等待os可以发送更多数据的时候继续发送 setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { // 发送完成 done = true; break; } } break; default:// 对于多个Buffer使用 // 只做有限次(默认16)循环, 防止单个连接占用太多资源 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { // 使用 gathering writes 发送多个NIO ByteBuffer, 返回实际发送的字节数(大小取决于tcp协议) final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // 更新处理进度 in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely. // 未完成flushed队列中所有的buffer的发送会进入此方法 // setOpWrite=true,因为socket的问题本次未写入任何数据,是系统原因故注册SelectionKey#OP_WRITE事件,等待系统可写入再写 // setOpWrite=false,表示此次socket发送了部分数据,认为还可以马上发送更多的数据,这里直接添加task来异步执行新的写入 incompleteWrite(setOpWrite); break; } } }
NioSocketChannel#doWrite()
方法值得进行详细分析。
首先是netty有一个writeSpinCount
配置,控制的是单个连接连续发送数据的最大次数,默认为16。这个配置有效地平衡了单个连接批量发送 和 连接之间的发送平衡。
其次,在16次发送都未完成整个OutboundBuffer的发送时if(!done)
,方法最后会调用incompleteWrite
来触发下一次发送,发送的方式有两种,取决于入参setOpWrite
。若为true,那么本次发送不完全的原因是socket
发送速率太慢导致,那么先停止发送,注册监听OP_WRITE
方法等待os通知。若为false,则未完成发送是单纯因为16次的发送限制,此时向当前EventLoop增加一个发送Task排队继续执行即可。
# AbstractNioByteChannel.java protected final void incompleteWrite(boolean setOpWrite) { // Did not write completely. if (setOpWrite) { // 外层调用认为当前socket不能写入, 停止写入, 注册OP_WRITE事件等待os通知 setOpWrite(); } else { // Schedule flush again later so other tasks can be picked up in the meantime // 外层调用认为当前socket还可以写入更多数据, 新增一个task异步继续发送 Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask); } }
总结
这次我们介绍了netty发送数据的流程。分为两个步骤:write和flush。
write的本质是将数据写入各种IO模式Channel统一的缓存结构ChannelOutboundBuffer
,由三个链表构成的缓存接口一方面尽可能把待发送的数据批量发出,另一方面也在必要的时候将非堆外内存实现的ByteBuf转化为堆外的ByteBuf以利用零拷贝的优势。
write之后最终的发送动作需要等到flush触发才执行。flush发送数据的过程会兼顾单连接尽可能一次发送更多的数据
和 所有连接相对平衡地拥有发送的机会
。