Netty源码解读(7)-断开连接

This entry is part 7 of 9 in the series Netty源码解读

Netty连接的断开本质上是channel的断开,更具体的(nio)ServerSocketChannel和(nio)SocketChannel的断开,这在jdk的api中定义了方法AbstractInterruptibleChannel#close()。这是netty对一个连接的处理的结束。在执行关闭之前,还需要保证对资源的有序释放,这些资源包括:

主动断开

主动的断开连接是一个出站事件,close()方法定义在ChannelOutboundInvoker中,因此跟write()connect()方法类似,它也有三种实现,而且实现也类似:

  • channel#close() 调用pipeline#close()
  • pipeline#close()调用pipeline中的tailContext#close()
  • context#close()是实际执行关闭的方法,下面着重讨论
# AbstractChannelHandlerContext.java
public ChannelFuture close(final ChannelPromise promise) {
    if (!validatePromise(promise, false)) {
        // cancelled
        return promise;
    }
    // 以当前context为锚点, 找prev节点
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeClose(promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeClose(promise);
            }
        }, promise, null);
    }

    return promise;
}

# AbstractChannelHandlerContext.java
private void invokeClose(ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            /** {@link #handler()}方法返回当前context绑定的handler */
            ((ChannelOutboundHandler) handler()).close(this, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        close(promise);
    }
}

看过前文的你应该非常熟悉这些代码的套路了,当前context调用close()方法之后通过findContextOutbound()方法寻找Pipeline链表上的下一个出站Context,并调用其绑定的Handler的close方法。直到出站方向的最后一个context,也就是HeadContext,它的方法是这样写的:

# DefaultChannelPipeline$HeadContext.java
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
    unsafe.close(promise);
}

# AbstractChannel.AbstractUnsafe.java
public final void close(final ChannelPromise promise) {
    assertEventLoop();
    // 两个EXCEPTION参数是为专门为关闭方法提供的特殊异常
    close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}

# AbstractChannel.AbstractUnsafe.java
private void close(final ChannelPromise promise, final Throwable cause,
                   final ClosedChannelException closeCause, final boolean notify) {
    if (!promise.setUncancellable()) {
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

    if (closeFuture.isDone()) {
        // Closed already.
        safeSetSuccess(promise);
        return;
    }

    final boolean wasActive = isActive();
    // 先设置为null,防止更多的写入
    this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
    // 正式关闭前的预备方法,若执行了预备方法,那么这里返回一个Executor,剩下的关闭操作就必须在此Executor中执行
    // 这样可以保证关闭方法不被阻塞,同时又保证了prepare方法正确执行
    // 目前主要的实现由NioSocketChannel提供,对SO_LINGER优雅关闭导致无法关闭的问题,提供先register再关闭的策略
    //
    // SO_LINGER优雅关闭的问题:
    // 如果SO_LINGER配置了,close()方法会阻塞直到1.没有新的数据需要读写或者2.超时,无论是哪种情况都会导致当前EventLoop的阻塞
    // 这会导致EventLoop无法处理其他连接事务。所以这里我们判断如果开启了SO_LINGER则把关闭操作放到一个独立的线程中去处理
    Executor closeExecutor = prepareToClose();
    if (closeExecutor != null) {
        closeExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // Execute the close.
                    doClose0(promise);
                } finally {
                    // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            // Fail all the queued messages
                            outboundBuffer.failFlushed(cause, notify);
                            outboundBuffer.close(closeCause);
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                }
            }
        });
    } else {
        try {
            // Close the channel and fail the queued messages in all cases.
            doClose0(promise);
        } finally {
            // Fail all the queued messages.
            outboundBuffer.failFlushed(cause, notify); // failed掉outboundBuffer中未发送(flushed)的数据
            outboundBuffer.close(closeCause); // 关闭outboundBuffer, 并移除unflushed数据(不同于flushed,unflused还需要回收资源)
        }
        if (inFlush0) {
            // 正在处理flush操作,需要在当前eventLoop中排队等待
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    // deregister即cancel掉SelectionKey
                    // channel先后触发的状态是: inactive->unregistered
                    fireChannelInactiveAndDeregister(wasActive);
                }
            });
        } else {
            // deregister即cancel掉SelectionKey
            // channel先后触发的状态是: inactive->unregistered
            fireChannelInactiveAndDeregister(wasActive);
        }
    }
}

HeadContext关闭连接的过程是:

  1. 将当前channel绑定的ChannelOutboundBuffer置为null,阻止新的write和flush数据
  2. 关闭nioChannel
  3. failed掉ChannelOutboundBuffer中的所有flushed/unflused数据(会触发对应的ChannelPromise的listener回调)
  4. 关闭channel绑定的SelectionKey
  5. 调用Pipeline#fireChannelInactive()方法,inactive是入站事件。从head开始链式调用Pipeline上的InboundHandler的channelInactive方法。
  6. 调用Pipeline#fireChannelUnregistered()方法,跟inactive一样也是入站事件。不再赘述。

以上是主动关闭连接的调用过程。下面讨论被动断开连接的情况。

被动断开

对端显式关闭

先讲本质。连接被动断开跟主动断开不同,是通过OP_READ事件触发消费的字节数是否为-1进行判断的,直接看javadoc原文:

#ReadableByteChannel.java

/**
* @Return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream
*/
public int read(ByteBuffer dst) throws IOException;

那么回到Channel对OP_READ事件的数据读取逻辑代码,

# AbstractNioByteChannel.NioByteUnsafe#read()

do {
    /** {@link io.netty.channel.AdaptiveRecvByteBufAllocator} */
    byteBuf = allocHandle.allocate(allocator);
    // 执行消费
    allocHandle.lastBytesRead(doReadBytes(byteBuf));
    // 无新消息, 释放buffer
    if (allocHandle.lastBytesRead() <= 0) {
        // nothing was read. release the buffer.
        byteBuf.release();
        byteBuf = null;
        close = allocHandle.lastBytesRead() < 0; // 此次读事件消费的字节数为负, 即连接断开
        break;
    }

    // 增加读入消息数量
    allocHandle.incMessagesRead(1);
    readPending = false;
    // 触发pipeline 消费消息 事件
    pipeline.fireChannelRead(byteBuf);
    byteBuf = null;
} while (allocHandle.continueReading());

// 读取结束后动作, 对于自适应的Allocator会根据此次读取的字节数调整
allocHandle.readComplete();
// 触发pipeline 读取消息完成 事件
pipeline.fireChannelReadComplete();

if (close) {
    // 关闭pipeline
    closeOnRead(pipeline);
}

然后关键在closeOnRead(pipeline),正如方法名,这是在read方法中发起的close:

        private void closeOnRead(ChannelPipeline pipeline) {
            if (isOpen()) {
                if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
                    // 半关闭,这里不做展开解释. 大意就是不再接收读事件,但是还可以给对端写数据
                    shutdownInput();
                    SelectionKey key = selectionKey();
                    key.interestOps(key.interestOps() & ~readInterestOp);
                    pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
                } else {
                    // 调用close方法关闭channel, 此方法开始跟主动关闭连接一致
                    close(voidPromise());
                }
            }
        }

如果不考虑半关闭的条件分支,那么从此方法之后就进入了跟主动关闭相同的方法。

总结

本文我给你介绍了Netty断开连接相关的一些信息。可以分为主动断开被动断开两种场景,前者是通过close()方法主动发起;后者则依靠读取数据的字节数是否为负以判断连接状态。另外我还给你详细分析了两种关闭方式的代码逻辑,大部分是相同的,都涉及到SelectionKeyjava.nio.Channel的关闭,OutboundHandlerBuffer的资源回收和关闭,以及inactiveunregistered事件的触发。

Series Navigation<< Netty源码解读(6)-数据发送Netty源码解读(8)-关闭服务 >>

发表评论

电子邮件地址不会被公开。

+ 82 = 89