Netty源码解读(6)-数据发送

This entry is part 6 of 9 in the series Netty源码解读

netty发送数据使用的接口方法是ChannelOutboundInvoker#write(),它共有三个主要的实现方案:

// 1) 
AbstractChannel#write(Object)
// 2) 
AbstractChannelHandlerContext#write(Object)
// 3) 
DefaultChannelPipeline#write(Object)

即可以在Channel, HandlerContext, Pipeline三个级别实行发送。但发送逻辑只有两种类型:

  1. PipelineTailContext开始发送,即不会触发链上的下游的出站Handler的write()方法 (方案1和方案3)
  2. 从当前HandlerContext开始调用write方法,会依次调用Pipeline链上的下游出站Handler的write()方法 (方案2)

大多数业务还以方案2为主,下面我将基于此方案来展开。

write

write方法支持ChannelPromise来实现异步回调:

# ChannelFuture.java
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    // (省略) msg和promise合法性校验
    write(msg, false, promise);

    return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
    // 获取下游Context
    AbstractChannelHandlerContext next = findContextOutbound();
    // 引用计数, 用于内存泄露检测
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 在当前线程同步调用
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        // 封装成Task在EventLoop线程异步调用
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this; // 从当前context开始遍历
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound); // 跳过所有非Outbound的handler
    return ctx;
}

关于Flush后文再做介绍,这里先介绍单纯的write操作。可以看到调用HandlerContext的write()方法,会通过findContextOutbound()寻找Pipeline链上的前一个(prev)出站Handler,即跟read()是正好相反的方向。而非出站Handler会被忽略跳过。

找到Handler之后,下一步就是调用其write()方法。对于在Pipeline中间层级的业务Handler,在对输入的Object进行修改和封装后,需要重新通过HandlerContext#write(Object)方法交给下级Handler让数据在Pipeline上继续流动。直到到达Pipeline尾部,即HeadContext(因为出站数据是从tail到head流动的)通过unsafe写出:

# DefaultChannelPipeline$HeadContext.java
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

# AbstractChannel$AbstractUnsafe.java
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop(); // 必须在EventLoop线程中执行

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // (省略) 异常处理
    }

    int size;
    try {
        // 将msg进行过滤和修饰, 默认无处理, 由子类Channel实现
        msg = filterOutboundMessage(msg);
        // 获取当前待发送的消息的大小, 对于直接发送文件的返回0(支持零拷贝)
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        // (省略) 异常处理
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}

Channel$Unsafe#write()方法只有AbstractChannel实现了,奇怪么?不是说Unsafe是跟具体的实现有关么?原来这里的write方法还未真正触发IO写出,仅仅将msg:Object放入outbundBuffer,这在所有的Channel实现中都是统一的,所以只在AbstractChannel做了实现。

接下来是msg = filterOutboundMessage(msg)方法,留给子类实现,在我们讨论的NioSocketChannel中,将堆内存转化为堆外内存(准确地说是转化为池化的堆外内存,非池化的堆外内存代价太高,得不偿失),这样可以利用零拷贝提高性能。

# AbstractNioByteChannel.java
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isDirect()) {
            return msg;
        }
        // 堆内Buffer转堆外Buffer
        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        // 
        return msg;
    }

    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

接下来就进入发送数据的重头戏,缓存。

ChannelOutboundBuffer

此Buffer是netty专门提供用于出站数据缓存的容器,内部的数据结构是三个链表。

而填充这些链表的Entry对象,基于可重复使用的对象池来创建,进一步减少了性能消耗。这是netty提供的很有意思的一个设计。

# ChannelOutboundBuffer
public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }
    // 累计写入buffer的数据大小, 超过阈值修改buffer状态为不可写入
    incrementPendingOutboundBytes(size, false);
}

# ChannelOutboundBuffer$Entry.class
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
    // 从线程对象池中获取entry
    Entry entry = RECYCLER.get();
    // (省略) entry赋值初始化
    return entry;
}

increamentPendingOutboundBytes()方法中会更新缓存中总的数据量大小,如果超过高水位(通过WriteBufferWaterMark#high配置, 默认64k),会修改ChannelOutboundBuffer的状态为不可写。同时还会触发一个Pipeline事件fireChannelWritabilityChanged

flush

数据通过write方法只是写入到ChannelOutboundBuffer做了缓存,真正的数据发送需要通过flush。跟write()的调用逻辑类似,经过HandlerContext或者Channel实现的wtite(),所有flush方法实现最终会调用AbstractUnsafe的实现:

# AbstractChannel$AbstractUnsafe.java
protected void flush0() {
    if (inFlush0) { return; }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; }

    inFlush0 = true;

    if (!isActive()) {
        // (省略) 如果当前channel已关闭, 所有flushed内的Entry作失败处理
        return;
    }

    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        // (省略) 异常处理
    } finally {
        inFlush0 = false;
    }
}

# NioSocketChannel.java
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        // flushed链表剩余元素的数量
        int size = in.size();
        if (size == 0) {
            clearOpWrite(); // flushed链表发送结束,清空SelectionKey#OP_WRITE
            break;
        }
        long writtenBytes = 0;
        boolean done = false;
        boolean setOpWrite = false;

        // buffer.flushedEntry链表转为nioByteBuffer数组, 仅仅转换in中类型为ByteBuf的元素
        ByteBuffer[] nioBuffers = in.nioBuffers();
        // 待发送的buffer数, 即nioBuffers数组元素数量
        int nioBufferCnt = in.nioBufferCount();
        // 待发送数据的字节数, 即nioBuffers所有元素总字节数
        long expectedWrittenBytes = in.nioBufferSize();
        SocketChannel ch = javaChannel();

        switch (nioBufferCnt) {
            case 0:
                // 由于in中可能包括非ByteBuf类型的元素(比如直接发送文件的FileRegion类型),nioBuffers()方法
                // 不会将其放入数组, 以致cnt为0. 本方法仅处理nioByteBuffer类型的发送, 交给父类处理
                super.doWrite(in);
                return;
            case 1:// 单个buffer使用
                // 只做有限次(默认16)循环, 防止单个连接占用太多资源
                ByteBuffer nioBuffer = nioBuffers[0];
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    // nio SocketChannel发送 nio ByteBuffer, 返回实际发送的字节数(大小取决于tcp协议). Socket非阻塞默认(一般都是)下返回0表示无数据发送
                    final int localWrittenBytes = ch.write(nioBuffer);
                    if (localWrittenBytes == 0) {
                        // nioBuffer有数据, 但socket发送出去的字节数为0, 设置setOpWrite=true, 退出发送循环, 且后续不会马上新增task继续发送, 而是注册OP_WRITE等待os可以发送更多数据的时候继续发送
                        setOpWrite = true;
                        break;
                    }
                    expectedWrittenBytes -= localWrittenBytes;
                    writtenBytes += localWrittenBytes;
                    if (expectedWrittenBytes == 0) { // 发送完成
                        done = true;
                        break;
                    }
                }
                break;
            default:// 对于多个Buffer使用
                // 只做有限次(默认16)循环, 防止单个连接占用太多资源
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    // 使用 gathering writes 发送多个NIO ByteBuffer, 返回实际发送的字节数(大小取决于tcp协议)
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes == 0) {
                        setOpWrite = true;
                        break;
                    }
                    expectedWrittenBytes -= localWrittenBytes;
                    writtenBytes += localWrittenBytes;
                    if (expectedWrittenBytes == 0) {
                        done = true;
                        break;
                    }
                }
                break;
        }

        // 更新处理进度
        in.removeBytes(writtenBytes);

        if (!done) {
            // Did not write all buffers completely.
            // 未完成flushed队列中所有的buffer的发送会进入此方法
            // setOpWrite=true,因为socket的问题本次未写入任何数据,是系统原因故注册SelectionKey#OP_WRITE事件,等待系统可写入再写
            // setOpWrite=false,表示此次socket发送了部分数据,认为还可以马上发送更多的数据,这里直接添加task来异步执行新的写入
            incompleteWrite(setOpWrite);
            break;
        }
    }
}

NioSocketChannel#doWrite()方法值得进行详细分析。

首先是netty有一个writeSpinCount配置,控制的是单个连接连续发送数据的最大次数,默认为16。这个配置有效地平衡了单个连接批量发送 和 连接之间的发送平衡。

其次,在16次发送都未完成整个OutboundBuffer的发送时if(!done),方法最后会调用incompleteWrite来触发下一次发送,发送的方式有两种,取决于入参setOpWrite。若为true,那么本次发送不完全的原因是socket发送速率太慢导致,那么先停止发送,注册监听OP_WRITE方法等待os通知。若为false,则未完成发送是单纯因为16次的发送限制,此时向当前EventLoop增加一个发送Task排队继续执行即可。

# AbstractNioByteChannel.java
protected final void incompleteWrite(boolean setOpWrite) {
    // Did not write completely.
    if (setOpWrite) {
        // 外层调用认为当前socket不能写入, 停止写入, 注册OP_WRITE事件等待os通知
        setOpWrite();
    } else {
        // Schedule flush again later so other tasks can be picked up in the meantime
        // 外层调用认为当前socket还可以写入更多数据, 新增一个task异步继续发送
        Runnable flushTask = this.flushTask;
        if (flushTask == null) {
            flushTask = this.flushTask = new Runnable() {
                @Override
                public void run() {
                    flush();
                }
            };
        }
        eventLoop().execute(flushTask);
    }
}

总结

这次我们介绍了netty发送数据的流程。分为两个步骤:write和flush。

write的本质是将数据写入各种IO模式Channel统一的缓存结构ChannelOutboundBuffer,由三个链表构成的缓存接口一方面尽可能把待发送的数据批量发出,另一方面也在必要的时候将非堆外内存实现的ByteBuf转化为堆外的ByteBuf以利用零拷贝的优势。

write之后最终的发送动作需要等到flush触发才执行。flush发送数据的过程会兼顾单连接尽可能一次发送更多的数据所有连接相对平衡地拥有发送的机会

Series Navigation<< Netty源码解读(5)-数据处理(Handler调用链)Netty源码解读(7)-断开连接 >>

发表评论

电子邮件地址不会被公开。

92 ÷ 23 =