Netty连接的断开本质上是channel的断开,更具体的(nio)ServerSocketChannel
和(nio)SocketChannel
的断开,这在jdk的api中定义了方法AbstractInterruptibleChannel#close()
。这是netty对一个连接的处理的结束。在执行关闭之前,还需要保证对资源的有序释放,这些资源包括:
主动断开
主动的断开连接是一个出站事件,close()
方法定义在ChannelOutboundInvoker
中,因此跟write()
,connect()
方法类似,它也有三种实现,而且实现也类似:
channel#close()
调用pipeline#close()
pipeline#close()
调用pipeline中的tailContext#close()
context#close()
是实际执行关闭的方法,下面着重讨论
# AbstractChannelHandlerContext.java public ChannelFuture close(final ChannelPromise promise) { if (!validatePromise(promise, false)) { // cancelled return promise; } // 以当前context为锚点, 找prev节点 final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeClose(promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeClose(promise); } }, promise, null); } return promise; } # AbstractChannelHandlerContext.java private void invokeClose(ChannelPromise promise) { if (invokeHandler()) { try { /** {@link #handler()}方法返回当前context绑定的handler */ ((ChannelOutboundHandler) handler()).close(this, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { close(promise); } }
看过前文的你应该非常熟悉这些代码的套路了,当前context
调用close()方法之后通过findContextOutbound()
方法寻找Pipeline链表上的下一个出站Context,并调用其绑定的Handler的close方法。直到出站方向的最后一个context,也就是HeadContext
,它的方法是这样写的:
# DefaultChannelPipeline$HeadContext.java @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.close(promise); } # AbstractChannel.AbstractUnsafe.java public final void close(final ChannelPromise promise) { assertEventLoop(); // 两个EXCEPTION参数是为专门为关闭方法提供的特殊异常 close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false); } # AbstractChannel.AbstractUnsafe.java private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) { if (!promise.setUncancellable()) { return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (closeFuture.isDone()) { // Closed already. safeSetSuccess(promise); return; } final boolean wasActive = isActive(); // 先设置为null,防止更多的写入 this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. // 正式关闭前的预备方法,若执行了预备方法,那么这里返回一个Executor,剩下的关闭操作就必须在此Executor中执行 // 这样可以保证关闭方法不被阻塞,同时又保证了prepare方法正确执行 // 目前主要的实现由NioSocketChannel提供,对SO_LINGER优雅关闭导致无法关闭的问题,提供先register再关闭的策略 // // SO_LINGER优雅关闭的问题: // 如果SO_LINGER配置了,close()方法会阻塞直到1.没有新的数据需要读写或者2.超时,无论是哪种情况都会导致当前EventLoop的阻塞 // 这会导致EventLoop无法处理其他连接事务。所以这里我们判断如果开启了SO_LINGER则把关闭操作放到一个独立的线程中去处理 Executor closeExecutor = prepareToClose(); if (closeExecutor != null) { closeExecutor.execute(new Runnable() { @Override public void run() { try { // Execute the close. doClose0(promise); } finally { // Call invokeLater so closeAndDeregister is executed in the EventLoop again! invokeLater(new Runnable() { @Override public void run() { // Fail all the queued messages outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); fireChannelInactiveAndDeregister(wasActive); } }); } } }); } else { try { // Close the channel and fail the queued messages in all cases. doClose0(promise); } finally { // Fail all the queued messages. outboundBuffer.failFlushed(cause, notify); // failed掉outboundBuffer中未发送(flushed)的数据 outboundBuffer.close(closeCause); // 关闭outboundBuffer, 并移除unflushed数据(不同于flushed,unflused还需要回收资源) } if (inFlush0) { // 正在处理flush操作,需要在当前eventLoop中排队等待 invokeLater(new Runnable() { @Override public void run() { // deregister即cancel掉SelectionKey // channel先后触发的状态是: inactive->unregistered fireChannelInactiveAndDeregister(wasActive); } }); } else { // deregister即cancel掉SelectionKey // channel先后触发的状态是: inactive->unregistered fireChannelInactiveAndDeregister(wasActive); } } }
HeadContext
关闭连接的过程是:
- 将当前
channel
绑定的ChannelOutboundBuffer
置为null,阻止新的write和flush数据 - 关闭
nioChannel
- failed掉
ChannelOutboundBuffer
中的所有flushed/unflused数据(会触发对应的ChannelPromise
的listener回调) - 关闭channel绑定的
SelectionKey
- 调用
Pipeline#fireChannelInactive()
方法,inactive
是入站事件。从head
开始链式调用Pipeline上的InboundHandler的channelInactive
方法。 - 调用
Pipeline#fireChannelUnregistered()
方法,跟inactive一样也是入站事件。不再赘述。
以上是主动关闭连接的调用过程。下面讨论被动断开连接的情况。
被动断开
对端显式关闭
先讲本质。连接被动断开跟主动断开不同,是通过OP_READ
事件触发消费的字节数是否为-1进行判断的,直接看javadoc原文:
#ReadableByteChannel.java /** * @Return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream */ public int read(ByteBuffer dst) throws IOException;
那么回到Channel对OP_READ
事件的数据读取逻辑代码,
# AbstractNioByteChannel.NioByteUnsafe#read() do { /** {@link io.netty.channel.AdaptiveRecvByteBufAllocator} */ byteBuf = allocHandle.allocate(allocator); // 执行消费 allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 无新消息, 释放buffer if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; // 此次读事件消费的字节数为负, 即连接断开 break; } // 增加读入消息数量 allocHandle.incMessagesRead(1); readPending = false; // 触发pipeline 消费消息 事件 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); // 读取结束后动作, 对于自适应的Allocator会根据此次读取的字节数调整 allocHandle.readComplete(); // 触发pipeline 读取消息完成 事件 pipeline.fireChannelReadComplete(); if (close) { // 关闭pipeline closeOnRead(pipeline); }
然后关键在closeOnRead(pipeline)
,正如方法名,这是在read方法中发起的close:
private void closeOnRead(ChannelPipeline pipeline) { if (isOpen()) { if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { // 半关闭,这里不做展开解释. 大意就是不再接收读事件,但是还可以给对端写数据 shutdownInput(); SelectionKey key = selectionKey(); key.interestOps(key.interestOps() & ~readInterestOp); pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { // 调用close方法关闭channel, 此方法开始跟主动关闭连接一致 close(voidPromise()); } } }
如果不考虑半关闭的条件分支,那么从此方法之后就进入了跟主动关闭相同的方法。
总结
本文我给你介绍了Netty断开连接相关的一些信息。可以分为主动断开
和被动断开
两种场景,前者是通过close()
方法主动发起;后者则依靠读取数据的字节数是否为负以判断连接状态。另外我还给你详细分析了两种关闭方式的代码逻辑,大部分是相同的,都涉及到SelectionKey
,java.nio.Channel
的关闭,OutboundHandlerBuffer
的资源回收和关闭,以及inactive
和unregistered
事件的触发。