Netty 源码解析 —— ChannelPipeline(一)之初始化
Netty 是一个高性能的异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在 Netty 中,
ChannelPipeline
是一个非常核心的概念,它定义了处理入站和出站事件的顺序。初始化 ChannelPipeline
1. ChannelInitializer
ChannelInitializer
是一个抽象类,通常用于在连接建立之后初始化Channel
。它有一个默认的方法initChannel(Channel ch)
,在这个方法中你可以添加不同的处理器到ChannelPipeline
上。示例代码如下:
public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 添加编码器 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 添加解码器 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 添加业务逻辑处理器 pipeline.addLast(new MyServerHandler()); } }
2. ChannelPipeline 的创建
当你创建一个新的
Channel
时,ChannelPipeline
会被自动创建并关联到这个Channel
。你可以通过调用channel.pipeline()
获取到这个ChannelPipeline
实例。3. 添加处理器
可以通过
ChannelPipeline
的addLast
方法来添加处理器。例如:ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("handler1", new MyHandler1()); pipeline.addLast("handler2", new MyHandler2());
这里
"handler1"
和"handler2"
分别是处理器的名字,名字需要唯一。MyHandler1
和MyHandler2
需要实现ChannelInboundHandler
或ChannelOutboundHandler
接口。4. 处理器的执行顺序
入站:对于入站消息,
ChannelPipeline
会从头开始按顺序调用每一个处理器,直到没有更多的处理器为止。出站:对于出站消息,
ChannelPipeline
会从最后一个处理器开始逆序调用每一个处理器,直到没有更多的处理器为止。具体流程
当客户端与服务端建立连接后,
ChannelInitializer
的initChannel
方法被调用。在
initChannel
方法中,向ChannelPipeline
添加处理器。这些处理器会根据它们被添加的顺序来决定它们处理事件的顺序。
以上就是对 Netty 中
ChannelPipeline
初始化的基本介绍。如果你想要深入了解ChannelPipeline
的内部实现和工作原理,可以进一步阅读 Netty 的源代码,特别是ChannelPipeline
、ChannelHandlerContext
以及相关的处理器类。
Netty 是一个基于 Java 的异步事件驱动的网络应用框架,广泛应用于高性能网络服务器和客户端开发。在 Netty 中,
ChannelPipeline
是一个核心组件,负责处理和管理与通道(Channel
)相关的所有处理器(ChannelHandler
)。下面是对 Netty 源码中
ChannelPipeline
的初始化过程的解析。一、
ChannelPipeline
简介
ChannelPipeline
是一个处理器链(pipeline),它内部维护了一组ChannelHandler
,这些处理器负责处理入站和出站的数据流。每个Channel
都会关联一个ChannelPipeline
,而ChannelPipeline
则会包含多个ChannelHandlerContext
,每个ChannelHandlerContext
都与一个ChannelHandler
关联。二、
ChannelPipeline
的初始化1. 初始化方式
在 Netty 中,
ChannelPipeline
的初始化通常发生在Channel
初始化时。具体来说,ChannelPipeline
是在Channel
的构造函数中被创建的。以下是
AbstractChannel
类中的构造函数部分源码:protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); // 初始化 ChannelPipeline }
在上面的代码中,可以看到
pipeline
是通过newChannelPipeline()
方法来创建的。2.
newChannelPipeline()
方法
newChannelPipeline()
方法是一个抽象方法,不同的Channel
实现类会有自己的实现。例如,在NioSocketChannel
中的实现如下:@Override protected ChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
DefaultChannelPipeline
是ChannelPipeline
的默认实现。3.
DefaultChannelPipeline
的构造函数接下来,我们来看一下
DefaultChannelPipeline
的构造函数:public DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
在这个构造函数中,
DefaultChannelPipeline
做了以下几件事情:
Channel 引用:首先,通过
ObjectUtil.checkNotNull()
方法确保channel
非空,并将其保存。SucceededFuture 和 VoidPromise:为
ChannelPipeline
创建了两个特殊的ChannelFuture
实例:succeededFuture
和voidPromise
,它们在后续的操作中会被频繁使用。头尾节点初始化:接下来,
DefaultChannelPipeline
初始化了两个关键的处理器上下文(ChannelHandlerContext
)实例:HeadContext
和TailContext
,分别对应处理链的头部和尾部。它们是ChannelPipeline
内部的链表结构的两端节点。链表连接:最后,构造函数将
HeadContext
与TailContext
连接起来,形成一个双向链表结构。三、处理器链的结构
在
DefaultChannelPipeline
中,处理器链(ChannelHandler
的链表)是一个双向链表结构。HeadContext
和TailContext
作为链表的两端,它们之间可以插入其他的ChannelHandlerContext
,这些ChannelHandlerContext
会包装实际的ChannelHandler
。
HeadContext
是默认处理链的头部,它处理出站事件,并且会将入站事件传递给下一个处理器。
TailContext
是默认处理链的尾部,它处理入站事件的结束,并且会将出站事件传递给前一个处理器。四、总结
在 Netty 中,
ChannelPipeline
的初始化过程主要围绕DefaultChannelPipeline
展开。DefaultChannelPipeline
通过构造函数将头尾节点(HeadContext
和TailContext
)进行初始化,并建立起处理器链的基础结构。理解ChannelPipeline
的初始化过程,对于后续深入理解 Netty 的数据流处理机制至关重要。在后续的解析中,我们会进一步探讨如何向
ChannelPipeline
中添加、删除ChannelHandler
,以及数据在ChannelPipeline
中的流转过程。
ChannelInitializer
是 Netty 提供的一个抽象类,主要用于在新的Channel
被注册到EventLoop
之后,对其进行初始化。ChannelInitializer
的主要职责是配置ChannelPipeline
,即将各种ChannelHandler
添加到ChannelPipeline
中。它通常用于服务器端和客户端的初始化场景。
ChannelInitializer
的基本使用
ChannelInitializer
的核心方法是initChannel(Channel ch)
。这个方法是一个抽象方法,必须在子类中实现。在这个方法中,你可以向ChannelPipeline
中添加各种ChannelHandler
,这些处理器将负责处理该Channel
上的入站和出站数据。以下是一个简单的示例,展示了如何使用
ChannelInitializer
:public class MyChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 添加各种处理器 pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new MyInboundHandler()); pipeline.addLast(new MyOutboundHandler()); } }
在上面的代码中:
MyChannelInitializer
继承了ChannelInitializer
,并实现了initChannel
方法。在
initChannel
方法中,通过ch.pipeline()
获取当前Channel
的ChannelPipeline
。然后,使用
pipeline.addLast()
方法,将各种处理器(ChannelHandler
)添加到ChannelPipeline
中。
ChannelInitializer
的工作流程
ChannelInitializer
的工作流程如下:
Channel 注册:当
Channel
注册到EventLoop
时,Netty 会调用ChannelInitializer
的initChannel(Channel ch)
方法。初始化 Pipeline:在
initChannel(Channel ch)
方法中,开发者可以根据需要,将各种ChannelHandler
添加到ChannelPipeline
中。自动移除:在
initChannel(Channel ch)
方法完成执行后,Netty 会自动将ChannelInitializer
从ChannelPipeline
中移除。这是因为ChannelInitializer
仅用于初始化ChannelPipeline
,在初始化完成后就不再需要了。
ChannelInitializer
的应用场景
ChannelInitializer
在实际开发中有广泛的应用,特别是在以下场景中:
服务器端初始化:通常在服务器端的
ServerBootstrap
中使用ChannelInitializer
来初始化每个新连接的Channel
。例如:ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MyChannelInitializer());
在这个例子中,每当有新的客户端连接时,
MyChannelInitializer
会被调用来初始化客户端的Channel
。客户端初始化:在客户端使用
Bootstrap
时,也可以通过ChannelInitializer
来初始化Channel
。例如:Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new MyChannelInitializer());
这里,
MyChannelInitializer
会初始化客户端的Channel
,通常用于添加出站请求的编码器和入站响应的解码器。总结
ChannelInitializer
是 Netty 提供的一个便捷工具,用于在Channel
注册后初始化其ChannelPipeline
。通过实现initChannel(Channel ch)
方法,开发者可以轻松地将所需的ChannelHandler
添加到ChannelPipeline
中。ChannelInitializer
的自动移除机制确保了它在初始化完成后不会对ChannelPipeline
造成多余的负担。
本文主要介绍 Netty 中 ChannelPipeline 的初始化过程,包括 ChannelPipeline 接口定义、Inbound 和 Outbound 事件的特性以及 DefaultChannelPipeline、ChannelHandlerContext、HeadContext、TailContext 和 DefaultChannelHandlerContext 类的实现细节。
Key Takeaways
ChannelPipeline 为 ChannelHandler 的链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API。
DefaultChannelPipeline 是 ChannelPipeline 接口的默认实现类,也是唯一实现类。
ChannelHandlerContext 是 ChannelPipeline 中的节点,每个节点包含一个 ChannelHandler、它的上下节点以及其他上下文。
HeadContext 是 DefaultChannelPipeline 的头部节点,实现 ChannelOutboundHandler 接口,负责处理出站事件。
TailContext 是 DefaultChannelPipeline 的尾部节点,实现 ChannelInboundHandler 接口,负责处理入站事件。
DefaultChannelHandlerContext 内嵌一个 ChannelHandler 对象,负责处理事件的具体逻辑。
Outbound 事件是请求事件,由 Channel 发起,由 Unsafe 处理;Inbound 事件是通知事件,由 Unsafe 发起,由 TailContext 处理。
1. 概述
在 《Netty 源码分析 —— Netty 简介(二)之核心组件》 中,对 EventLoopGroup 和 EventLoop 做了定义,我们再来回顾下:
ChannelPipeline 为 ChannelHandler 的链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API 。一个数据或者事件可能会被多个 Handler 处理,在这个过程中,数据或者事件经流 ChannelPipeline ,由 ChannelHandler 处理。在这个处理过程中,一个 ChannelHandler 接收数据后处理完成后交给下一个 ChannelHandler,或者什么都不做直接交给下一个 ChannelHandler。
因为 ChannelPipeline 涉及的代码量较大,所以笔者会分成好几篇文章分别分享。而本文,我们来分享 ChannelPipeline 的初始化。也因此,本文更多是体现 ChannelPipeline 的整体性,所以不会过多介绍每个类的具体的每个方法的实现。
2. ChannelPipeline
io.netty.channel.ChannelPipeline
,继承 ChannelInboundInvoker、ChannelOutboundInvoker、Iterable 接口,Channel Pipeline 接口。代码如下:
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
// ========== 添加 ChannelHandler 相关 ==========
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
// ========== 移除 ChannelHandler 相关 ==========
ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
<T extends ChannelHandler> T remove(Class<T> handlerType);
ChannelHandler removeFirst();
ChannelHandler removeLast();
// ========== 替换 ChannelHandler 相关 ==========
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler);
// ========== 查询 ChannelHandler 相关 ==========
ChannelHandler first();
ChannelHandlerContext firstContext();
ChannelHandler last();
ChannelHandlerContext lastContext();
ChannelHandler get(String name);
<T extends ChannelHandler> T get(Class<T> handlerType);
ChannelHandlerContext context(ChannelHandler handler);
ChannelHandlerContext context(String name);
ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
List<String> names();
// ========== Channel 相关 ==========
Channel channel();
// ========== ChannelInboundInvoker 相关 ==========
@Override
ChannelPipeline fireChannelRegistered();
@Override
ChannelPipeline fireChannelUnregistered();
@Override
ChannelPipeline fireChannelActive();
@Override
ChannelPipeline fireChannelInactive();
@Override
ChannelPipeline fireExceptionCaught(Throwable cause);
@Override
ChannelPipeline fireUserEventTriggered(Object event);
@Override
ChannelPipeline fireChannelRead(Object msg);
@Override
ChannelPipeline fireChannelReadComplete();
@Override
ChannelPipeline fireChannelWritabilityChanged();
// ========== ChannelOutboundInvoker 相关 ==========
@Override
ChannelPipeline flush();
}
虽然接口的方法比较多,笔者做了归类如下:
ChannelHandler 的增删改查的相关方法。
Channel 的相关方法,目前只有一个。
继承自 ChannelInboundInvoker 的相关方法。
继承自 ChannelOutboundInvoker 的相关方法。
有可能会疑惑为什么继承 Iterable 接口?因为 ChannelPipeline 是 ChannelHandler 的链。
ChannelPipeline 的类图如下:
2.1 ChannelInboundInvoker
io.netty.channel.ChannelInboundInvoker
,Channel Inbound Invoker( 调用者 ) 接口。代码如下:
ChannelPipeline fireChannelRegistered();
ChannelPipeline fireChannelUnregistered();
ChannelPipeline fireChannelActive();
ChannelPipeline fireChannelInactive();
ChannelPipeline fireExceptionCaught(Throwable cause);
ChannelPipeline fireUserEventTriggered(Object event);
ChannelPipeline fireChannelRead(Object msg);
ChannelPipeline fireChannelReadComplete();
ChannelPipeline fireChannelWritabilityChanged();
通知 Channel 事件的接口方法。
2.2 ChannelOutboundInvoker
io.netty.channel.ChannelOutboundInvoker
,Channel Outbound Invoker( 调用者 ) 接口。代码如下:
// ========== Channel 操作相关 ==========
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
ChannelFuture disconnect(ChannelPromise promise);
ChannelFuture close(ChannelPromise promise);
ChannelFuture deregister(ChannelPromise promise);
ChannelOutboundInvoker read();
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
// ========== Promise 相关 ==========
ChannelPromise newPromise();
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable cause);
ChannelPromise voidPromise();
发起 Channel 操作的接口方法。
创建 Promise 对象的接口方法。
2.3 Outbound v.s Inbound 事件
在 《Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (二)》 中,看到一个比较不错的总结:
因为要加一些注释,所以暂时不使用引用。
对于 Outbound 事件:
Outbound 事件是【请求】事件(由 Connect 发起一个请求, 并最终由 Unsafe 处理这个请求)
Outbound 事件的发起者是 Channel
Outbound 事件的处理者是 Unsafe
Outbound 事件在 Pipeline 中的传输方向是
tail
->head
旁白:Outbound 翻译为“出站”,所以从
tail
( 尾 )到head
( 头 )也合理。至于什么是
head
和tail
,等看了具体的 ChannelPipeline 实现类 DefaultChannelPipeline 再说。在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Handler, 则需要调用
ctx.xxx
(例如ctx.connect
) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.Outbound 事件流:
Context.OUT_EVT
->Connect.findContextOutbound
->nextContext.invokeOUT_EVT
->nextHandler.OUT_EVT
->nextContext.OUT_EVT
对于 Inbound 事件:
Inbound 事件是【通知】事件, 当某件事情已经就绪后, 通知上层.
Inbound 事件发起者是 Unsafe
Inbound 事件的处理者是 TailContext, 如果用户没有实现自定义的处理方法, 那么Inbound 事件默认的处理者是 TailContext, 并且其处理方法是空实现.
Inbound 事件在 Pipeline 中传输方向是
head
( 头 ) ->tail
( 尾 )
旁白:Inbound 翻译为“入站”,所以从
head
( 头 )到tail
( 尾 )也合理。在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Handler, 则需要调用
ctx.fireIN_EVT
(例如ctx.fireChannelActive
) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.Inbound 事件流:
Context.fireIN_EVT
->Connect.findContextInbound
->nextContext.invokeIN_EVT
->nextHandler.IN_EVT
->nextContext.fireIN_EVT
Outbound 和 Inbound 事件十分的镜像, 并且 Context 与 Handler 直接的调用关系是否容易混淆, 因此读者在阅读这里的源码时, 需要特别的注意。
3. DefaultChannelPipeline
io.netty.channel.DefaultChannelPipeline
,实现 ChannelPipeline 接口,默认 ChannelPipeline 实现类。😈 实际上,也只有这个实现类。
3.1 静态属性
/**
* {@link #head} 的名字
*/
private static final String HEAD_NAME = generateName0(HeadContext.class);
/**
* {@link #tail} 的名字
*/
private static final String TAIL_NAME = generateName0(TailContext.class);
/**
* 名字({@link AbstractChannelHandlerContext#name})缓存 ,基于 ThreadLocal ,用于生成在线程中唯一的名字。
*/
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap<Class<?>, String>();
}
};
/**
* {@link #estimatorHandle} 的原子更新器
*/
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
HEAD_NAME
和 TAIL_NAME
静态属性,通过调用 #generateName0(Class<?> handlerType)
方法,生成对应的名字。代码如下:
private static String generateName0(Class<?> handlerType) {
return StringUtil.simpleClassName(handlerType) + "#0";
}
即
HEAD_NAME = "HeadContext#0"
,TAIL_NAME= "TailContext#0"
。
nameCaches
静态属性,名字(AbstractChannelHandlerContext.name
)缓存 ,基于 ThreadLocal ,用于生成在线程中唯一的名字。详细解析,见 《Netty 源码解析 —— Pipeline(二)之添加 ChannelHandler》 。ESTIMATOR
静态属性,estimatorHandle
属性的原子更新器。
3.2 构造方法
/**
* Head 节点
*/
final AbstractChannelHandlerContext head;
/**
* Tail 节点
*/
final AbstractChannelHandlerContext tail;
/**
* 所属 Channel 对象
*/
private final Channel channel;
/**
* 成功的 Promise 对象
*/
private final ChannelFuture succeededFuture;
/**
* 不进行通知的 Promise 对象
*
* 用于一些方法执行,需要传入 Promise 类型的方法参数,但是不需要进行通知,就传入该值
*
* @see io.netty.channel.AbstractChannel.AbstractUnsafe#safeSetSuccess(ChannelPromise)
*/
private final VoidChannelPromise voidPromise;
/**
* TODO 1008 DefaultChannelPipeline 字段用途
*/
private final boolean touch = ResourceLeakDetector.isEnabled();
/**
* 子执行器集合。
*
* 默认情况下,ChannelHandler 使用 Channel 所在的 EventLoop 作为执行器。
* 但是如果有需要,也可以自定义执行器。详细解析,见 {@link #childExecutor(EventExecutorGroup)} 。
*/
private Map<EventExecutorGroup, EventExecutor> childExecutors;
/**
* TODO 1008 DefaultChannelPipeline 字段用途
*/
private volatile MessageSizeEstimator.Handle estimatorHandle;
/**
* 是否首次注册
*/
private boolean firstRegistration = true;
/**
* This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
* all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
*
* We only keep the head because it is expected that the list is used infrequently and its size is small.
* Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
* complexity.
*
* 准备添加 ChannelHandler 的回调
*/
private PendingHandlerCallback pendingHandlerCallbackHead;
/**
* Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
* change.
* Channel 是否已注册
*/
private boolean registered;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
// succeededFuture 的创建
succeededFuture = new SucceededChannelFuture(channel, null);
// voidPromise 的创建
voidPromise = new VoidChannelPromise(channel, true);
// 创建 Tail 及诶点
tail = new TailContext(this); // <1>
// 创建 Head 节点
head = new HeadContext(this); // <2>
// 相互指向 <3>
head.next = tail;
tail.prev = head;
}
head
属性,Head 节点,在构造方法的<1>
处初始化。详细解析,见 「4.2 HeadContext」 。tail
节点,Tail 节点,在构造方法的<2>
处初始化。详细解析,见 「4.3 TailContext」 。在构造方法的
<3>
处,head
节点向下指向tail
节点,tail
节点向上指向head
节点,从而形成相互的指向。即如下图所示:
FROM 《netty 源码分析之 pipeline(一)》
pipeline 节点链(默认)
pipeline 中的节点的数据结构是 ChannelHandlerContext 类。每个 ChannelHandlerContext 包含一个 ChannelHandler、它的上下节点( 从而形成 ChannelHandler 链 )、以及其他上下文。详细解析,见 「4. ChannelHandlerContext」 。
默认情况下,pipeline 有
head
和tail
节点,形成默认的 ChannelHandler 链。而我们可以在它们之间,加入自定义的 ChannelHandler 节点。如下图所示:
FROM 《netty 源码分析之 pipeline(一)》
pipeline 节点链(自定义)
childExecutors 属性,子执行器集合。默认情况下,ChannelHandler 使用 Channel 所在的 EventLoop 作为执行器。
但是如果有需要,也可以自定义执行器。详细解析,见 《Netty 源码解析 —— Pipeline(二)之添加 ChannelHandler》 。
pendingHandlerCallbackHead 属性,准备添加 ChannelHandler 的回调。详细解析,见 《Netty 源码解析 —— Pipeline(二)之添加 ChannelHandler》 。
registered 属性,Channel 是否已注册。详细解析,见 《Netty 源码解析 —— Pipeline(二)之添加 ChannelHandler》 。
firstRegistration 属性,是否首次注册。详细解析,见 《Netty 源码解析 —— Pipeline(二)之添加 ChannelHandler》 。
3.3 其他方法
DefaultChannelPipeline 中的其他方法,详细解析,见后续的文章。
4. ChannelHandlerContext
ChannelHandlerContext
是 Netty 中的一个非常重要的概念,它是ChannelPipeline
中处理器的上下文环境。每个处理器都有一个对应的ChannelHandlerContext
实例,这个实例提供了访问Channel
、ChannelPipeline
以及其他相关操作的能力。ChannelHandlerContext 的作用
访问 ChannelPipeline:
ChannelHandlerContext
可以用来访问所属的ChannelPipeline
,进而访问其他处理器或者改变处理器链。访问 Channel:通过
ChannelHandlerContext
可以访问到当前处理器所绑定的Channel
。发送事件:可以使用
ChannelHandlerContext
发送各种事件,包括入站事件和出站事件。调用上下游处理器:可以调用上下游的处理器来处理特定的事件或消息。
属性管理:
ChannelHandlerContext
实现了AttributeMap
接口,允许存储和检索键值对,这对于传递状态信息特别有用。生命周期管理:
ChannelHandlerContext
还支持处理器的生命周期管理,例如ChannelHandlerContext
的fireUserEventTriggered
方法可以用来触发用户自定义事件。ChannelHandlerContext 的使用
下面是一些
ChannelHandlerContext
的常见使用方法:
发送数据:可以使用
writeAndFlush
方法发送数据。ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, World!", CharsetUtil.UTF_8));
触发事件:可以使用
fireUserEventTriggered
方法触发用户自定义事件。ctx.fireUserEventTriggered(new UserEvent());
访问上下游处理器:
上游处理器:可以通过
invokeFireUserEventTriggered
方法来调用上游处理器。ctx.invokeFireUserEventTriggered(new UserEvent());
下游处理器:可以通过
fireUserEventTriggered
方法来调用下游处理器。ctx.fireUserEventTriggered(new UserEvent());
访问 ChannelPipeline:
获取 ChannelPipeline:通过
ctx.pipeline()
方法。获取特定处理器:通过
ctx.pipeline().get(String name)
方法。属性管理:
获取属性:通过
ctx.attr(AttributeKey<T>)
方法。设置属性:通过
ctx.attr(AttributeKey<T>).set(T value)
方法。示例代码
假设我们有一个简单的
ChannelHandlerContext
的使用示例:import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.AttributeKey; public class MyServerHandler extends SimpleChannelInboundHandler<String> { private static final AttributeKey<Integer> VISIT_COUNT_KEY = AttributeKey.valueOf("visitCount"); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("Server received: " + msg); // 设置属性 ctx.attr(VISIT_COUNT_KEY).setIfAbsent(0); int count = ctx.attr(VISIT_COUNT_KEY).getAndIncrement(); // 发送响应 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, this is message " + count, CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
在这个示例中,我们使用
ChannelHandlerContext
来发送响应,并且使用AttributeKey
来记录客户端访问次数。总结来说,
ChannelHandlerContext
是 Netty 中处理器的上下文环境,提供了访问Channel
和ChannelPipeline
的能力,同时还支持发送事件、管理属性等功能。
ChannelHandlerContext
是 Netty 中的一个关键接口,表示ChannelPipeline
中每个ChannelHandler
所在的上下文(节点)。它继承了以下三个接口:
ChannelInboundInvoker
ChannelOutboundInvoker
AttributeMap
ChannelHandlerContext
作为ChannelPipeline
中的一个节点,负责管理ChannelHandler
,并在ChannelPipeline
中提供入站和出站事件的传递能力。
ChannelHandlerContext
的作用
ChannelHandlerContext
的主要作用包括:
与
ChannelHandler
的关联:每个ChannelHandler
在ChannelPipeline
中都有一个对应的ChannelHandlerContext
,这个上下文不仅关联了处理器,还维护了其在ChannelPipeline
中的位置。事件传播:
ChannelHandlerContext
提供了方法来将事件传播到ChannelPipeline
中的下一个处理器。它支持两类事件传播:
入站事件(Inbound Events):例如读操作、解码、异常处理等。这些事件从头部
HeadContext
开始依次向下传播。出站事件(Outbound Events):例如写操作、编码等。这些事件从尾部
TailContext
开始依次向上传播。属性管理:通过
AttributeMap
接口,ChannelHandlerContext
可以为ChannelHandler
提供一个存储自定义属性的容器,这些属性可以在处理不同事件时共享。
ChannelHandlerContext
的方法
ChannelHandlerContext
提供了一些重要的方法用于事件的传播和处理,以下是一些关键方法:1. 入站事件传播方法
fireChannelRead(Object msg)
:将数据传递给ChannelPipeline
中的下一个ChannelInboundHandler
的channelRead
方法。
fireChannelActive()
:将Channel
激活事件(如连接建立)传递给下一个ChannelInboundHandler
的channelActive
方法。
fireExceptionCaught(Throwable cause)
:将异常传递给ChannelPipeline
中的下一个ChannelInboundHandler
的exceptionCaught
方法。2. 出站事件传播方法
write(Object msg)
:请求将数据写出到远程Channel
,会触发ChannelPipeline
中的下一个ChannelOutboundHandler
的write
方法。
flush()
:请求刷新之前write
的数据到远程Channel
。
close()
:请求关闭Channel
,会触发ChannelPipeline
中的下一个ChannelOutboundHandler
的close
方法。3. 属性管理方法
attr(AttributeKey<T> key)
:获取或创建与ChannelHandlerContext
相关联的属性。
hasAttr(AttributeKey<T> key)
:检查ChannelHandlerContext
是否具有指定的属性。
ChannelHandlerContext
的使用示例以下是一个简单的
ChannelHandler
示例,展示了如何在处理入站和出站事件时使用ChannelHandlerContext
传播事件:public class MyHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Received message: " + msg); // 继续传播事件给下一个处理器 ctx.fireChannelRead(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 关闭通道 ctx.close(); } } public class MyOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("Writing message: " + msg); // 继续传播事件给下一个处理器 ctx.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { System.out.println("Flushing channel"); // 继续传播事件给下一个处理器 ctx.flush(); } }
在这个例子中:
MyHandler
是一个入站处理器,接收消息后打印并继续传递。
MyOutboundHandler
是一个出站处理器,写入消息时打印并继续传递。总结
ChannelHandlerContext
是 Netty 中ChannelPipeline
的关键节点,负责管理ChannelHandler
并在处理器链中传播事件。它不仅提供了丰富的 API 用于事件传播,还支持属性的存储和访问,使得开发者可以灵活地处理网络数据流。理解ChannelHandlerContext
的工作机制,对于编写高效的 Netty 应用程序至关重要。
io.netty.channel.ChannelHandlerContext
,继承 ChannelInboundInvoker、ChannelOutboundInvoker、AttributeMap 接口,ChannelHandler Context( 上下文 )接口,作为 ChannelPipeline 中的节点。代码如下:
// ========== Context 相关 ==========
String name();
Channel channel();
EventExecutor executor();
ChannelHandler handler();
ChannelPipeline pipeline();
boolean isRemoved(); // 是否已经移除
// ========== ByteBuf 相关 ==========
ByteBufAllocator alloc();
// ========== ChannelInboundInvoker 相关 ==========
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
// ========== ChannelOutboundInvoker 相关 ==========
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();
// ========== AttributeMap 相关 ==========
@Deprecated
@Override
<T> Attribute<T> attr(AttributeKey<T> key);
@Deprecated
@Override
<T> boolean hasAttr(AttributeKey<T> key);
虽然接口的方法比较多,笔者做了归类如下:
Context 相关的接口方法。
继承自 ChannelInboundInvoker 的相关方法,和 ChannelPipeline 一样。
继承自 ChannelOutboundInvoker 的相关方法,和 ChannelPipeline 一样。
继承自 AttributeMap 的相关方法,实际上已经废弃(
@Deprecated
)了,不再从 ChannelHandlerContext 中获取,而是从 Channel 中获取。
ChannelHandlerContext 的类图如下:
😈 类图中的 AttributeMap 和 DefaultAttributeMap 可以无视。
4.1 AbstractChannelHandlerContext
AbstractChannelHandlerContext
是 Netty 中ChannelHandlerContext
的抽象实现类,是所有ChannelHandlerContext
实现的基础类。它在 Netty 的ChannelPipeline
中扮演了核心角色,负责管理ChannelHandler
的执行顺序,并提供了事件在ChannelPipeline
中传播的基础设施。
AbstractChannelHandlerContext
的作用
AbstractChannelHandlerContext
主要负责以下几个方面:
双向链表节点:
AbstractChannelHandlerContext
在ChannelPipeline
中充当一个双向链表节点,它通过prev
和next
引用相邻的上下文,使得ChannelHandler
可以按照顺序进行执行。事件传播机制:
AbstractChannelHandlerContext
实现了事件传播的核心逻辑,包括入站事件(Inbound Events)和出站事件(Outbound Events)的传播。状态管理:它管理
ChannelHandler
的状态(如是否被添加到ChannelPipeline
中、是否是入站/出站处理器等),并在适当的时候触发相应的事件。异步执行支持:
AbstractChannelHandlerContext
支持在不同的线程或事件循环中执行ChannelHandler
的方法调用。这通过检查ChannelHandler
是否需要在不同的EventLoop
上执行来实现。
AbstractChannelHandlerContext
的关键字段
AbstractChannelHandlerContext
包含了一些关键字段,这些字段在事件传播和处理器管理中起着重要作用:
pipeline
:表示当前上下文所属的ChannelPipeline
,通过它可以访问整个处理器链。
prev
和next
:分别表示当前上下文的前一个和后一个节点,用于维护处理器链的顺序。
handler
:与当前上下文关联的ChannelHandler
,用于实际处理事件。
executor
:用于在特定的EventExecutor
中执行ChannelHandler
的方法。如果executor
为null
,则表示该处理器将在默认的EventLoop
中执行。
inbound
和outbound
:布尔值,用于指示当前上下文是否处理入站或出站事件。
AbstractChannelHandlerContext
的关键方法1. 事件传播方法
AbstractChannelHandlerContext
提供了多个方法用于在ChannelPipeline
中传播事件:
fireChannelRead(Object msg)
:将读事件传递给下一个ChannelInboundHandlerContext
。
fireChannelActive()
:将通道激活事件传递给下一个ChannelInboundHandlerContext
。
fireExceptionCaught(Throwable cause)
:将异常事件传递给下一个ChannelInboundHandlerContext
。
write(Object msg, ChannelPromise promise)
:将写请求传递给下一个ChannelOutboundHandlerContext
。
flush()
:将刷新请求传递给下一个ChannelOutboundHandlerContext
。2. 执行管理方法
invokeChannelRead()
:调用当前ChannelHandler
的channelRead
方法。如果处理器需要在不同的线程中执行,该方法会安排异步执行。
invokeWrite()
:调用当前ChannelHandler
的write
方法,同样支持异步执行。
invokeFlush()
:调用当前ChannelHandler
的flush
方法。
AbstractChannelHandlerContext
的工作流程
事件接收:当
ChannelPipeline
接收到某个事件时,会从HeadContext
或TailContext
开始,逐步调用链表中的每个ChannelHandlerContext
。事件传播:
AbstractChannelHandlerContext
根据当前事件类型(入站或出站),调用对应的传播方法(如fireChannelRead
或write
),并将事件传递给下一个处理器。处理器执行:在事件传播过程中,
AbstractChannelHandlerContext
会判断当前处理器是否需要在不同的EventExecutor
中执行。如果需要,则安排异步执行,否则直接在当前线程中执行处理器的方法。状态管理:在事件传播和处理器执行过程中,
AbstractChannelHandlerContext
会更新处理器的状态,并在适当的时候触发相应的生命周期事件(如handlerAdded
和handlerRemoved
)。
AbstractChannelHandlerContext
在 Netty 中的地位
AbstractChannelHandlerContext
是 Netty 处理链机制的核心,它为ChannelPipeline
中的事件传播提供了基础设施,并确保每个ChannelHandler
都能够按照预期的顺序和线程模型执行。通过管理链表节点、执行上下文和事件传播逻辑,AbstractChannelHandlerContext
实现了 Netty 的高效且灵活的处理器链架构。理解
AbstractChannelHandlerContext
对于掌握 Netty 的内部运作原理至关重要,尤其是在处理复杂的异步事件流时。
AbstractChannelHandlerContext
是 Netty 中的一个抽象类,它实现了ChannelHandlerContext
接口,并提供了ChannelHandlerContext
的基本实现。AbstractChannelHandlerContext
的主要目的是为了简化ChannelHandlerContext
的实现,并提供了一些默认的行为和方法实现。AbstractChannelHandlerContext 的作用
简化实现:
AbstractChannelHandlerContext
提供了一个基础实现,使得开发者在编写自己的处理器时不必实现所有方法,只需要重写必要的部分即可。通用行为:它包含了
ChannelHandlerContext
的一些通用行为,如事件的转发、属性管理等。生命周期管理:提供了处理器生命周期管理的钩子方法,如
handlerAdded
,handlerRemoved
等。简化调用:提供了对上下游处理器的调用方法,如
fireChannelRead
,fireExceptionCaught
等。AbstractChannelHandlerContext 的使用
通常情况下,你不需要直接使用
AbstractChannelHandlerContext
,而是通过实现ChannelInboundHandler
或ChannelOutboundHandler
接口来创建自己的处理器。这些处理器会在ChannelPipeline
中被创建,并由 Netty 自动提供一个ChannelHandlerContext
的实例。示例代码
下面是一个简单的例子,展示如何使用
AbstractChannelHandlerContext
的实现类SimpleChannelInboundHandler
:import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.AttributeKey; public class MyServerHandler extends SimpleChannelInboundHandler<String> { private static final AttributeKey<Integer> VISIT_COUNT_KEY = AttributeKey.valueOf("visitCount"); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("Server received: " + msg); // 设置属性 ctx.attr(VISIT_COUNT_KEY).setIfAbsent(0); int count = ctx.attr(VISIT_COUNT_KEY).getAndIncrement(); // 发送响应 ctx.writeAndFlush("Hello, this is message " + count); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
在这个例子中,
MyServerHandler
继承了SimpleChannelInboundHandler
,而SimpleChannelInboundHandler
实际上是AbstractChannelInboundHandler
的一个子类,它简化了ChannelInboundHandler
的实现。AbstractChannelHandlerContext 的内部实现
AbstractChannelHandlerContext
内部包含了一些关键的成员变量和方法:
成员变量:
final ChannelPipeline pipeline
: 当前处理器所在的ChannelPipeline
。
final Channel channel
: 当前处理器所绑定的Channel
。
final ChannelHandler handler
: 当前处理器实例。
final String name
: 处理器的名称。
final boolean addBefore
: 标记处理器添加的位置信息(在内部实现中用于优化)。构造方法:
AbstractChannelHandlerContext(ChannelPipeline pipeline, ChannelHandler handler, String name, boolean addBefore)
:
pipeline
: 当前处理器所在的ChannelPipeline
。
handler
: 当前处理器实例。
name
: 处理器的名称。
addBefore
: 是否将处理器添加到ChannelPipeline
的前面。常用方法:
Channel channel()
: 返回当前处理器所绑定的Channel
。
ChannelPipeline pipeline()
: 返回当前处理器所在的ChannelPipeline
。
ChannelHandler handler()
: 返回当前处理器实例。
String name()
: 返回处理器的名称。
boolean isRemoved()
: 判断处理器是否已经被移除。
void fireChannelRegistered()
: 触发channelRegistered
事件。
void fireChannelUnregistered()
: 触发channelUnregistered
事件。
void fireChannelActive()
: 触发channelActive
事件。
void fireChannelInactive()
: 触发channelInactive
事件。
void fireExceptionCaught(Throwable cause)
: 触发exceptionCaught
事件。
void fireUserEventTriggered(Object event)
: 触发userEventTriggered
事件。总结
AbstractChannelHandlerContext
是 Netty 中ChannelHandlerContext
的一个抽象实现,主要用于简化处理器的实现。在实际开发中,我们通常不直接使用AbstractChannelHandlerContext
,而是通过继承SimpleChannelInboundHandler
或SimpleChannelOutboundHandler
来创建具体的处理器。这些处理器会自动获得ChannelHandlerContext
的实例,用于与ChannelPipeline
和Channel
交互。
io.netty.channel.AbstractChannelHandlerContext
,实现 ChannelHandlerContext、ResourceLeakHint 接口,继承 DefaultAttributeMap 类,ChannelHandlerContext 抽象基类。
4.1.1 静态属性
/**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int INIT = 0; // 初始化
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
*/
private static final int ADD_PENDING = 1; // 添加准备中
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/
private static final int ADD_COMPLETE = 2; // 已添加
/**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int REMOVE_COMPLETE = 3; // 已移除
/**
* {@link #handlerState} 的原子更新器
*/
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
// ========== 非静态属性 ==========
/**
* 处理器状态
*/
private volatile int handlerState = INIT;
handlerState
属性( 非静态属性,放这里主要是为了统一讲 ),处理器状态。共有 4 种状态。状态变迁如下图:
详细解析,见 「4.1.3 setAddComplete」、「4.1.4 setRemoved」、「4.1.5 setAddPending」 中。
HANDLER_STATE_UPDATER 静态属性,handlerState 的原子更新器。
4.1.2 构造方法
/**
* 上一个节点
*/
volatile AbstractChannelHandlerContext next;
/**
* 下一个节点
*/
volatile AbstractChannelHandlerContext prev;
/**
* 是否为 inbound
*/
private final boolean inbound;
/**
* 是否为 outbound
*/
private final boolean outbound;
/**
* 所属 pipeline
*/
private final DefaultChannelPipeline pipeline;
/**
* 名字
*/
private final String name;
/**
* 是否使用有序的 EventExecutor ( {@link #executor} ),即 OrderedEventExecutor
*/
private final boolean ordered;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
/**
* EventExecutor 对象
*/
final EventExecutor executor;
/**
* 成功的 Promise 对象
*/
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor. 懒加载
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
/**
* 执行 Channel ReadComplete 事件的任务
*/
private Runnable invokeChannelReadCompleteTask;
/**
* 执行 Channel Read 事件的任务
*/
private Runnable invokeReadTask;
/**
* 执行 Channel WritableStateChanged 事件的任务
*/
private Runnable invokeChannelWritableStateChangedTask;
/**
* 执行 flush 事件的任务
*/
private Runnable invokeFlushTask;
/**
* 处理器状态
*/
private volatile int handlerState = INIT;
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor; // <1>
}
next
、prev
属性,分别记录上、下一个节点。Handler 相关属性:
在 AbstractChannelHandlerContext 抽象类中,按照我们上文的分享,应该会看到一个类型为 ChannelHandler 的处理器,但是实际并不是这样。而是,😈 我们下文 DefaultChannelHandlerContext、TailContext、HeadContext 见。
inbound
、outbound
属性,分别是否为 Inbound、Outbound 处理器。name
属性,处理器名字。handlerState
属性,处理器状态,初始为INIT
。
executor
属性,EventExecutor 对象ordered
属性,是否使用有序的executor
,即 OrderedEventExecutor ,在构造方法的<1>
处理的初始化。
pipeline
属性,所属 DefaultChannelPipeline 对象。
4.1.3 setAddComplete
#setAddComplete()
方法,设置 ChannelHandler 添加完成。完成后,状态有两种结果:
REMOVE_COMPLETE
ADD_COMPLETE
代码如下:
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
循环 + CAS 保证多线程下的安全变更
handlerState
属性。
4.1.4 setRemoved
#setRemoved()
方法,设置 ChannelHandler 已移除。代码如下:
final void setRemoved() {
handlerState = REMOVE_COMPLETE;
}
4.1.5 setAddPending
#setAddPending()
方法,设置 ChannelHandler 准备添加中。代码如下:
final void setAddPending() {
boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
}
当且仅当
INIT
可修改为ADD_PENDING
。理论来说,这是一个绝对会成功的操作,原因见英文注释。
4.1.6 其他方法
AbstractChannelHandlerContext 中的其他方法,详细解析,见后续的文章。
4.2 HeadContext
HeadContext ,实现 ChannelOutboundHandler、ChannelInboundHandler 接口,继承 AbstractChannelHandlerContext 抽象类,pipe 头节点 Context 实现类。
HeadContext 是 DefaultChannelPipeline 的内部类。
4.2.1 构造方法
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true); // <1>
unsafe = pipeline.channel().unsafe(); // <2>
setAddComplete(); // <3>
}
<1>
处,调用父 AbstractChannelHandlerContext 的构造方法,设置inbound = false
、outbound = true
。<2>
处,使用 Channel 的 Unsafe 作为unsafe
属性。HeadContext 实现 ChannelOutboundHandler 接口的方法,都会调用 Unsafe 对应的方法。代码如下:
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
这也就是为什么设置
outbound = true
的原因。
<3>
处,调用#setAddComplete()
方法,设置 ChannelHandler 添加完成。此时,handlerStatus
会变成ADD_COMPLETE
状态。
4.2.2 handler
#handler()
方法,返回自己作为 Context 的 ChannelHandler 。代码如下:
@Override
public ChannelHandler handler() {
return this;
}
因为 HeadContext ,实现 ChannelOutboundHandler、ChannelInboundHandler 接口,而它们本身就是 ChannelHandler 。
4.2.3 其他方法
HeadContext 中的其他方法,详细解析,见后续的文章。
4.3 TailContext
TailContext ,实现 ChannelInboundHandler 接口,继承 AbstractChannelHandlerContext 抽象类,pipe 尾节点 Context 实现类。
TailContext 是 DefaultChannelPipeline 的内部类。
4.3.1 构造方法
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false); // <1>
setAddComplete(); // <2>
}
<1>
处,调用父 AbstractChannelHandlerContext 的构造方法,设置inbound = true
、outbound = false
,和 HeadContext 相反。<2>
处,调用#setAddComplete()
方法,设置 ChannelHandler 添加完成。此时,handlerStatus
会变成ADD_COMPLETE
状态。
4.3.2 handler
#handler()
方法,返回自己作为 Context 的 ChannelHandler 。代码如下:
@Override
public ChannelHandler handler() {
return this;
}
因为 HeadContext ,实现 ChannelInboundHandler 接口,而它们本身就是 ChannelHandler 。
4.3.3 其他方法
TailContext 中的其他方法,详细解析,见后续的文章。
4.4 DefaultChannelHandlerContext
io.netty.channel.DefaultChannelHandlerContext
,实现 AbstractChannelHandlerContext 抽象类。代码如下:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); // <1>
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler; // <2>
}
@Override
public ChannelHandler handler() {
return handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}
不同于 HeadContext、TailContext,它们自身就是一个 Context 的同时,也是一个 ChannelHandler 。而 DefaultChannelHandlerContext 是内嵌 一个 ChannelHandler 对象,即
handler
。这个属性通过构造方法传入,在<2>
处进行赋值。<1>
处,调用父 AbstractChannelHandlerContext 的构造方法,通过判断传入的handler
是否为 ChannelInboundHandler 和 ChannelOutboundHandler 来分别判断是否为inbound
和outbound
。
推荐阅读如下文章:
Netty 源码解析 —— ChannelPipeline(二)之添加 ChannelHandler
在 Netty 中,
ChannelPipeline
是处理网络事件的核心组件。它是一个ChannelHandler
的链,每个ChannelHandler
负责处理或拦截传入的 I/O 事件或操作。当你添加一个ChannelHandler
到ChannelPipeline
时,它会插入到这条处理链中,从而在指定的位置处理特定类型的事件。
ChannelPipeline
的结构
ChannelPipeline
本质上是一个双向链表,链表中的每一个节点都是一个ChannelHandlerContext
对象,它包装了ChannelHandler
。这些节点可以通过名字或类型来相互引用和操作。添加
ChannelHandler
的方法Netty 提供了多种方法将
ChannelHandler
添加到ChannelPipeline
中:
addFirst
方法:将ChannelHandler
添加到ChannelPipeline
的头部。pipeline.addFirst("handler1", new MyChannelHandler());
addLast
方法:将ChannelHandler
添加到ChannelPipeline
的尾部。pipeline.addLast("handler2", new MyChannelHandler());
addBefore
方法:将ChannelHandler
添加到指定的ChannelHandler
之前。pipeline.addBefore("handler2", "handler1.5", new MyChannelHandler());
addAfter
方法:将ChannelHandler
添加到指定的ChannelHandler
之后。pipeline.addAfter("handler1", "handler1.5", new MyChannelHandler());
ChannelHandler
添加的实现原理当调用
addFirst
、addLast
、addBefore
或addAfter
方法时,ChannelPipeline
内部会创建一个新的ChannelHandlerContext
对象,并将其插入到双向链表中。这些方法都是通过AbstractChannelHandlerContext
及其子类实现的。以下是
addLast
方法的简化流程:
创建
ChannelHandlerContext
: Netty 会根据你提供的ChannelHandler
创建一个新的ChannelHandlerContext
,这个对象将包含指向前一个和后一个ChannelHandlerContext
的引用。链表的插入操作: 新创建的
ChannelHandlerContext
会被插入到链表的尾部,前一个节点的next
指针指向这个新的节点,而新节点的prev
指针指向前一个节点。事件传播: 一旦
ChannelHandler
被添加到ChannelPipeline
,所有传入的事件将从这个ChannelHandler
开始被处理,如果没有被当前的ChannelHandler
处理,它将传递给链中的下一个ChannelHandler
。实际代码解析
public final class DefaultChannelPipeline implements ChannelPipeline { // 省略其他代码 @Override public final ChannelPipeline addLast(String name, ChannelHandler handler) { // 获取 Channel 的 Unsafe 对象,用于执行底层的操作 final AbstractChannelHandlerContext newCtx; synchronized (this) { // 将 handler 包装为一个 ChannelHandlerContext newCtx = newContext(null, name, handler); // 将该 context 添加到链表尾部 addLast0(newCtx); } // 如果 Channel 已经注册,则立即调用 handlerAdded 方法 callHandlerAdded0(newCtx); return this; } private void addLast0(AbstractChannelHandlerContext newCtx) { // 获取尾部节点的引用 AbstractChannelHandlerContext prev = tail.prev; // 进行双向链表的插入操作 newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } private static AbstractChannelHandlerContext newContext( DefaultChannelPipeline pipeline, String name, ChannelHandler handler) { // 省略部分代码:检查 name 的合法性 return new DefaultChannelHandlerContext( pipeline, null, name, handler); } // 省略其他代码 }
从代码可以看出,
DefaultChannelPipeline
中的addLast
方法通过同步块来确保线程安全,然后创建一个ChannelHandlerContext
,并将它插入到链表的尾部。添加完成后,还会调用callHandlerAdded0
方法来触发handlerAdded
回调。小结
Netty 的
ChannelPipeline
通过一种灵活而高效的方式管理ChannelHandler
,它采用双向链表的结构来组织这些处理器,并提供了多种方法在不同位置插入ChannelHandler
。通过这种机制,Netty 可以高效地处理网络事件,提供高度可扩展和可定制的网络应用开发框架。
ChannelPipeline
是 Netty 中一个非常重要的组件,它负责管理一系列的ChannelHandler
并定义了这些处理器的执行顺序。当一个事件(例如数据读取或写入、异常发生等)触发时,这个事件会沿着ChannelPipeline
传递给每一个注册的ChannelHandler
。添加 ChannelHandler
添加方式
ChannelPipeline
提供了多种方法来添加ChannelHandler
,以下是一些常用的方法:
addLast: 将指定的
ChannelHandler
添加到ChannelPipeline
的尾部。addFirst: 将指定的
ChannelHandler
添加到ChannelPipeline
的头部。addBefore: 在指定的
ChannelHandler
前面插入一个新的ChannelHandler
。addAfter: 在指定的
ChannelHandler
后面插入一个新的ChannelHandler
。addLast/First: 这些方法可以接受多个
ChannelHandler
,允许你一次添加多个处理器。每个
ChannelHandler
在ChannelPipeline
中都有一个唯一的名称,这样可以通过名字来引用和操作处理器。示例代码
以下是一个简单的示例,展示如何使用
ChannelPipeline
添加不同的ChannelHandler
:public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server received: " + msg); ctx.fireChannelRead(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 添加编码器和解码器 pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); // 添加业务逻辑处理器 pipeline.addLast("handler1", new MyServerHandler()); pipeline.addLast("handler2", new MyServerHandler()); } }
在这个例子中,我们创建了一个名为
MyServerInitializer
的类,该类继承了ChannelInitializer
。在initChannel
方法中,我们通过addLast
方法向ChannelPipeline
添加了三个ChannelHandler
:StringDecoder
、StringEncoder
和两个MyServerHandler
实例。注意事项
执行顺序:
ChannelHandler
的执行顺序取决于它们被添加到ChannelPipeline
的顺序。双向处理:
ChannelPipeline
支持两种类型的处理器:ChannelInboundHandler
和ChannelOutboundHandler
。前者处理入站事件(如接收到的数据),后者处理出站事件(如要发送的数据)。你可以使用同一个处理器实现这两种接口来处理双向通信。共享实例:多个
Channel
可以共享相同的ChannelHandler
实例,但每个Channel
都有自己的ChannelPipeline
。了解这些基础知识后,你可以更深入地研究
ChannelPipeline
和ChannelHandler
的具体工作原理以及如何有效地利用它们来构建高性能的应用程序。
1. 概述
本文我们来分享,添加 ChannelHandler 到 pipeline 中的代码具体实现。
在 《Netty 源码解析 —— ChannelPipeline(一)之初始化》 中,我们看到 ChannelPipeline 定义了一大堆添加 ChannelHandler 的接口方法:
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
考虑到实际当中,我们使用
#addLast(ChannelHandler... handlers)
方法较多,所以本文只分享这个方法的具体实现。
2. addLast
#addLast(ChannelHandler... handlers)
方法,添加任意数量的 ChannelHandler 对象。代码如下:
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h); // <1>
}
return this;
}
<1>
处,调用#addLast(EventExecutorGroup group, String name, ChannelHandler handler)
方法,添加一个 ChannelHandler 对象到 pipeline 中。
#addLast(EventExecutorGroup group, String name, ChannelHandler handler)
方法,代码如下:
1: @Override
2: @SuppressWarnings("Duplicates")
3: public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
4: final AbstractChannelHandlerContext newCtx;
5: synchronized (this) { // 同步,为了防止多线程并发操作 pipeline 底层的双向链表
6: // 检查是否有重复 handler
7: checkMultiplicity(handler);
8:
9: // 创建节点名
10: // 创建节点
11: newCtx = newContext(group, filterName(name, handler), handler);
12:
13: // 添加节点
14: addLast0(newCtx);
15:
16: // <1> pipeline 暂未注册,添加回调。再注册完成后,执行回调。详细解析,见 {@link #invokeHandlerAddedIfNeeded} 方法。
17: // If the registered is false it means that the channel was not registered on an eventloop yet.
18: // In this case we add the context to the pipeline and add a task that will call
19: // ChannelHandler.handlerAdded(...) once the channel is registered.
20: if (!registered) {
21: // 设置 AbstractChannelHandlerContext 准备添加中
22: newCtx.setAddPending();
23: // 添加 PendingHandlerCallback 回调
24: callHandlerCallbackLater(newCtx, true);
25: return this;
26: }
27:
28: // <2> 不在 EventLoop 的线程中,提交 EventLoop 中,执行回调用户方法
29: EventExecutor executor = newCtx.executor();
30: if (!executor.inEventLoop()) {
31: // 设置 AbstractChannelHandlerContext 准备添加中
32: newCtx.setAddPending();
33: // 提交 EventLoop 中,执行回调 ChannelHandler added 事件
34: executor.execute(new Runnable() {
35: @Override
36: public void run() {
37: callHandlerAdded0(newCtx);
38: }
39: });
40: return this;
41: }
42: }
43:
44: // <3> 回调 ChannelHandler added 事件
45: callHandlerAdded0(newCtx);
46: return this;
47: }
第 5 行:
synchronized
同步,为了防止多线程并发操作 pipeline 底层的双向链表。第 7 行:调用
#checkMultiplicity(ChannelHandler)
方法,校验是否重复的 ChannelHandler 。详细解析,见 「3. checkMultiplicity」 。第 11 行:调用
#filterName(String name, ChannelHandler handler)
方法,获得 ChannelHandler 的名字。详细解析,见 「4. filterName」 。第 11 行:调用
#newContext(EventExecutorGroup group, String name, ChannelHandler handler)
方法,创建 DefaultChannelHandlerContext 节点。详细解析,见 「5. newContext」 。第 14 行:
#addLast0(AbstractChannelHandlerContext newCtx)
方法,添加到最后一个节点。详细解析,见 「6. addLast0」 。========== 后续分成 3 种情况 ==========
<1>
第 20 行:Channel 并未注册。这种情况,发生于 ServerBootstrap 启动的过程中。在
ServerBootstrap#init(Channel channel)
方法中,会添加 ChannelInitializer 对象到 pipeline 中,恰好此时 Channel 并未注册。第 22 行:调用
AbstractChannelHandlerContext#setAddPending()
方法,设置 AbstractChannelHandlerContext 准备添加中。第 24 行:调用
#callHandlerCallbackLater(AbstractChannelHandlerContext, added)
方法,添加 PendingHandlerAddedTask 回调。在 Channel 注册完成后,执行该回调。详细解析,见 「8. PendingHandlerCallback」 。<2>
第 30 行:不在 EventLoop 的线程中。
第 32 行:调用
AbstractChannelHandlerContext#setAddPending()
方法,设置 AbstractChannelHandlerContext 准备添加中。第 34 至 39 行:提交 EventLoop 中,调用
#callHandlerAdded0(AbstractChannelHandlerContext)
方法,执行回调 ChannelHandler 添加完成( added )事件。详细解析,见 「7. callHandlerAdded0」 。<3>
这种情况,是
<2>
在 EventLoop 的线程中的版本。也因为此,已经确认在 EventLoop 的线程中,所以不需要在synchronized
中。第 45 行:和【第 37 行】的代码一样,调用
#callHandlerAdded0(AbstractChannelHandlerContext)
方法,执行回调 ChannelHandler 添加完成( added )事件。
3. checkMultiplicity
在 Netty 中,
ChannelPipeline
的addLast
和其他添加方法通常会检查是否已经存在具有相同名称的ChannelHandler
。如果存在,则会抛出一个IllegalArgumentException
,这是因为每个ChannelHandler
在ChannelPipeline
中必须有唯一的名称。这个检查可以通过调用
checkMultiplicity
方法来实现。虽然checkMultiplicity
不是公开的 API,但在内部实现中它用于确保没有重复的处理器名称。下面是一个简化的示例,展示了ChannelPipeline
如何通过类似的方式来检查处理器名称的唯一性:public class SimpleChannelPipeline extends AbstractChannelPipeline { private final Map<String, ChannelHandler> nameToHandlerMap; public SimpleChannelPipeline() { this.nameToHandlerMap = new LinkedHashMap<>(); } @Override public ChannelPipeline addLast(String name, ChannelHandler handler) { checkNotNull(name, "name"); checkNotNull(handler, "handler"); // Check if the handler with the same name already exists. checkMultiplicity(name); // Add the handler to the pipeline. // This is a simplified example and does not reflect the full logic of Netty's implementation. nameToHandlerMap.put(name, handler); return this; } private void checkMultiplicity(String name) { if (nameToHandlerMap.containsKey(name)) { throw new IllegalArgumentException("Duplicate handler name: " + name); } } }
在这个简化的示例中,
checkMultiplicity
方法检查提供的名称是否已经存在于nameToHandlerMap
中。如果存在,则抛出一个IllegalArgumentException
。内部实现
实际上,在 Netty 中,
ChannelPipeline
的实现更为复杂,并且涉及多个层次的检查和优化。例如,ChannelPipeline
会维护一个处理器链表,同时也会有一个映射表来快速查找处理器。此外,Netty 的ChannelPipeline
还支持动态地添加、移除和替换处理器,而这些操作都需要保证线程安全。在实际的 Netty 实现中,
checkMultiplicity
方法通常会在添加处理器之前被调用,以避免重复添加相同名称的处理器。如果需要深入了解具体的实现细节,建议查看 Netty 的源代码,因为这会提供最准确的信息。如果你有关于 Netty 的
ChannelPipeline
或ChannelHandler
的具体问题,请随时提问!
checkMultiplicity
是 Netty 内部用于验证ChannelHandler
能否被多次添加到ChannelPipeline
的方法。这个方法在添加ChannelHandler
到ChannelPipeline
时会被调用,以确保ChannelHandler
的实例只能在满足其预期的情况下被多次添加。
ChannelHandler
的类型在 Netty 中,
ChannelHandler
有两种主要类型:
@Sharable
注解的ChannelHandler
:标识该ChannelHandler
可以在多个ChannelPipeline
中共享,即一个实例可以被多个ChannelPipeline
或者同一个ChannelPipeline
多次添加使用。非
@Sharable
的ChannelHandler
:一个实例只能在一个ChannelPipeline
中使用,不能被重复添加到同一个ChannelPipeline
中。
checkMultiplicity
方法的作用
checkMultiplicity
方法的主要作用就是检查是否可以将ChannelHandler
的同一实例多次添加到同一个ChannelPipeline
中。如果尝试重复添加一个非@Sharable
的ChannelHandler
,会抛出异常。实现代码解析
以下是
DefaultChannelPipeline
类中checkMultiplicity
方法的实现:private static void checkMultiplicity(ChannelHandler handler) { // 如果 handler 没有 @Sharable 注解,并且已经在别处被使用,则抛出异常 if (handler instanceof ChannelInboundHandlerAdapter || handler instanceof ChannelOutboundHandlerAdapter) { // 如果是 Inbound 或 Outbound Handler Adapter 且不是 Sharable 的 if (!handler.getClass().isAnnotationPresent(Sharable.class)) { if (DefaultChannelHandlerContext.isHandlerAdded(handler)) { throw new ChannelPipelineException( handler.getClass().getName() + " is not a @Sharable handler, " + "so can't be added or removed multiple times."); } } } }
解析
检查
ChannelHandler
类型:
方法首先检查
ChannelHandler
是否是ChannelInboundHandlerAdapter
或ChannelOutboundHandlerAdapter
类型的实例。判断是否有
@Sharable
注解:
如果
ChannelHandler
的类上没有@Sharable
注解,则表示该ChannelHandler
不是可共享的,不能在多个ChannelPipeline
中使用。验证是否已添加:
使用
DefaultChannelHandlerContext.isHandlerAdded
方法来检查该ChannelHandler
是否已经被添加到某个ChannelPipeline
中。如果已经添加,则抛出
ChannelPipelineException
异常,表示该ChannelHandler
不能重复添加。结论
checkMultiplicity
方法是 Netty 保证ChannelHandler
正确使用的关键机制,尤其是在涉及到非共享ChannelHandler
的情况下。它确保每个ChannelHandler
的实例按照预期的方式使用,防止由于不正确的多次添加导致的潜在问题。通过这种机制,Netty 能够提供更健壮和可靠的网络处理管道。什么时候能多次添加,为什么
在 Netty 中,通常情况下不允许在同一个
ChannelPipeline
中多次添加具有相同名称的ChannelHandler
。这是因为在ChannelPipeline
中,每个ChannelHandler
必须有一个唯一的名称,以确保能够正确地引用和操作这些处理器。然而,有一些特殊情况允许你多次添加
ChannelHandler
,这些情况主要与处理器的类型和用途有关:
使用不同的名称:
即使是相同的
ChannelHandler
类型,只要给它们分配不同的名称,就可以多次添加到同一个ChannelPipeline
中。例如,可以在ChannelPipeline
中添加多个MyServerHandler
实例,只要它们的名字不同即可。共享处理器实例:
当你需要在多个
ChannelPipeline
之间共享一个ChannelHandler
实例时,可以将这个实例添加到多个ChannelPipeline
中,但是每次添加时都需要使用不同的名称。使用复合处理器:
有时,你可能希望在同一个
ChannelPipeline
中使用多个功能相似的处理器。例如,可以创建一个复合处理器,该处理器包含多个子处理器,然后将这个复合处理器添加到ChannelPipeline
中。这样,即使这些子处理器是相同的类型,也可以通过这种方式进行管理。示例
假设你有一个简单的
MyServerHandler
类,并且你想在一个ChannelPipeline
中添加两个这样的处理器实例:public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server received: " + msg); ctx.fireChannelRead(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 添加多个 MyServerHandler 实例,但使用不同的名称 pipeline.addLast("handler1", new MyServerHandler()); pipeline.addLast("handler2", new MyServerHandler()); } }
在这个例子中,尽管添加了两个
MyServerHandler
实例,但由于它们的名字不同,因此这种做法是可行的。每个处理器都会按照它们在ChannelPipeline
中的顺序依次执行。总结
推荐做法:一般情况下,你应该为每个
ChannelHandler
使用不同的名称,以便于管理和调试。避免重复:尽量避免在同一个
ChannelPipeline
中使用相同名称的ChannelHandler
,除非你有明确的理由这样做,并且理解其潜在的影响。性能考虑:重复添加处理器可能会导致不必要的开销,尤其是在处理器数量较多的情况下。因此,合理设计处理器的结构和逻辑是非常重要的。
在 Netty 中,ChannelHandler
是否能多次添加到ChannelPipeline
取决于它是否具备线程安全的属性,以及它的设计目标。能够多次添加的ChannelHandler
通常具有以下特征:1. 使用
@Sharable
注解
当
ChannelHandler
类使用@Sharable
注解时,表示这个ChannelHandler
可以在多个ChannelPipeline
中共享,或者在同一个ChannelPipeline
中多次添加。@Sharable
注解表明该ChannelHandler
实例是线程安全的,可以在多个线程同时处理 I/O 事件,而不会产生并发问题。@Sharable public class MySharableHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 处理逻辑 } }
多次添加的场景:
如果你希望在多个
Channel
中使用同一个ChannelHandler
实例,可以通过@Sharable
让这个ChannelHandler
在多个ChannelPipeline
中共享。同样,如果在一个
ChannelPipeline
中需要同一逻辑处理多次(如在不同阶段处理相同的数据),也可以多次添加同一个@Sharable
的ChannelHandler
。2. 无状态的
ChannelHandler
无状态的
ChannelHandler
通常可以被多次添加。无状态意味着该ChannelHandler
没有保存任何与特定连接或数据包相关的上下文数据,因此它可以被多个连接共享。例如,一个简单的日志记录
ChannelHandler
可以是无状态的,它只会记录传入或传出的数据,并不依赖于任何特定的状态。多次添加的场景:
无状态的
ChannelHandler
通常可以在不同的ChannelPipeline
中重复使用,因为它们不会因并发访问或不同的连接上下文而出错。为什么能多次添加?
线程安全:
@Sharable
和无状态的ChannelHandler
由于其设计使得它们在并发环境下是线程安全的,因此可以在多个ChannelPipeline
中重复使用,而不必担心数据竞争或并发问题。功能复用:多次添加相同的
ChannelHandler
可以减少代码重复。例如,如果你有一个通用的日志记录功能,可能希望在多个位置使用相同的ChannelHandler
来记录不同阶段的数据。不能多次添加的情况
状态相关的
ChannelHandler
:如果ChannelHandler
维护了某些状态(如统计信息、连接上下文),并且这些状态与特定的连接或数据流相关联,那么它就不能被多次添加或共享。因为不同连接或数据流的状态不应混淆在一起。无
@Sharable
注解:如果ChannelHandler
没有@Sharable
注解,那么它假定为非线程安全的,且设计上只能在一个ChannelPipeline
中使用。这种设计是为了防止在多个ChannelPipeline
中使用同一实例时,出现线程安全问题或数据不一致的问题。总结
ChannelHandler
只有在标注为@Sharable
或者是无状态的情况下才能在多个ChannelPipeline
中多次添加使用。这是因为这样的ChannelHandler
通常是线程安全的,并且设计上支持在多个上下文中共享使用。如果一个ChannelHandler
维护了与特定连接或数据流相关的状态,并且没有@Sharable
注解,那么它就不能被多次添加,以避免并发问题和数据混乱。
#checkMultiplicity(ChannelHandler handler)
方法,校验是否重复的 ChannelHandler 。代码如下:
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// 若已经添加,并且未使用 @Sharable 注解,则抛出异常
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
// 标记已经添加
h.added = true;
}
}
在 pipeline 中,一个创建的 ChannelHandler 对象,如果不使用 Netty @Sharable
注解,则只能添加到一个 Channel 的 pipeline 中。所以,如果我们想要重用一个 ChannelHandler 对象( 例如在 Spring 环境中 ),则必须给这个 ChannelHandler 添加 @Sharable
注解。
例如,在 Dubbo 的 com.alibaba.dubbo.remoting.transport.netty.NettyHandler
处理器,它就使用了 @Sharable
注解。
4. filterName
#filterName(String name, ChannelHandler handler)
方法,获得 ChannelHandler 的名字。代码如下:
private String filterName(String name, ChannelHandler handler) {
if (name == null) { // <1>
return generateName(handler);
}
checkDuplicateName(name); // <2>
return name;
}
<1>
处,若未传入默认的名字name
,则调用#generateName(ChannelHandler)
方法,根据 ChannelHandler 生成一个唯一的名字。详细解析,见 「4.1 generateName」 。<2>
处,若已传入默认的名字name
,则调用#checkDuplicateName(String name)
方法,校验名字唯一。详细解析,见 「4.2 checkDuplicateName」 。
4.1 generateName
#generateName(ChannelHandler)
方法,根据 ChannelHandler 生成一个唯一名字。代码如下:
1: private String generateName(ChannelHandler handler) {
2: // 从缓存中查询,是否已经生成默认名字
3: Map<Class<?>, String> cache = nameCaches.get();
4: Class<?> handlerType = handler.getClass();
5: String name = cache.get(handlerType);
6: // 若未生成过,进行生成
7: if (name == null) {
8: name = generateName0(handlerType);
9: cache.put(handlerType, name);
10: }
11:
12: // 判断是否存在相同名字的节点
13: // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
14: // any name conflicts. Note that we don't cache the names generated here.
15: if (context0(name) != null) {
16: // 若存在,则使用基础名字 + 编号,循环生成,直到一个是唯一的
17: String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
18: for (int i = 1;; i ++) {
19: String newName = baseName + i;
20: if (context0(newName) == null) { // // 判断是否存在相同名字的节点
21: name = newName;
22: break;
23: }
24: }
25: }
26: return name;
27: }
第 2 至 5 行:从缓存
nameCaches
中,查询是否已经生成默认名字。若未生成过,调用
#generateName0(ChannelHandler)
方法,进行生成。而后,添加到缓存nameCaches
中。
第 15 行:调用
#context0(String name)
方法,判断是否存在相同名字的节点。代码如下:
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
// 顺序向下遍历节点,判断是否有指定名字的节点。如果有,则返回该节点。
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
顺序向下遍历节点,判断是否有指定名字的节点。如果有,则返回该节点。
第 15 至 25 行:若存在相同名字的节点,则使用基础名字 + 编号,循环生成,直到一个名字是唯一的,然后结束循环。
4.2 checkDuplicateName
#checkDuplicateName(String name)
方法,校验名字唯一。代码如下:
private void checkDuplicateName(String name) {
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
通过调用
#context0(String name)
方法,获得指定名字的节点。若存在节点,意味着不唯一,抛出 IllegalArgumentException 异常。
5. newContext
#newContext(EventExecutorGroup group, String name, ChannelHandler handler)
方法,创建 DefaultChannelHandlerContext 节点。而这个节点,内嵌传入的 ChannelHandler 参数。代码如下:
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group) /** <1> **/, name, handler);
}
方法的作用是创建一个
newContextDefaultChannelHandlerContext
节点。这个节点是ChannelPipeline
中的重要组成部分,它封装了一个ChannelHandler
,并在ChannelPipeline
中扮演处理器节点的角色。
newContext
方法签名private DefaultChannelHandlerContext newContext( EventExecutorGroup group, String name, ChannelHandler handler) { // Implementation here }
参数说明
group
: 该参数是EventExecutorGroup
类型,代表了一个线程池,用来执行与该ChannelHandler
相关的事件。如果传入null
,则会使用Channel
的EventLoop
作为执行环境。
name
: 这是ChannelHandlerContext
的名称,通常用于标识每个ChannelHandler
。如果传入null
,则会自动生成一个默认名称。
handler
: 这是ChannelHandler
类型的参数,表示需要被封装的实际处理器。它是DefaultChannelHandlerContext
内部的核心组件,处理管道中的事件和数据。方法的核心逻辑
创建
DefaultChannelHandlerContext
实例:
通过调用
new DefaultChannelHandlerContext(...)
来创建一个新的DefaultChannelHandlerContext
对象。该对象会关联上面的三个参数,包括执行器(
group
)、名字(name
)和处理器(handler
)。返回新创建的上下文:
方法最后返回这个新创建的
DefaultChannelHandlerContext
对象。
DefaultChannelHandlerContext
的作用
DefaultChannelHandlerContext
是ChannelPipeline
中的一个节点,它连接了ChannelHandler
,并负责在管道中传播事件。每个
ChannelHandlerContext
都持有一个ChannelHandler
,并且包含指向下一个和上一个节点的指针,这使得事件能够沿着管道流动。代码示例
以下是一个简化的
newContext
方法的实现示例:private DefaultChannelHandlerContext newContext( EventExecutorGroup group, String name, ChannelHandler handler) { // 创建并返回 DefaultChannelHandlerContext 实例 return new DefaultChannelHandlerContext(this, group, name, handler); }
在 Netty 中,
ChannelPipeline
是一个包含了ChannelHandler
的双向链表,每个ChannelHandlerContext
代表链表中的一个节点。newContext
方法就是用来创建这些节点的。通过
newContext
方法,Netty 能够将多个ChannelHandler
串联起来,形成一个可以处理 IO 操作的管道。
<1>
处,调用 #childExecutor(EventExecutorGroup group)
方法,创建子执行器。代码如下:
private EventExecutor childExecutor(EventExecutorGroup group) {
// <1> 不创建子执行器
if (group == null) {
return null;
}
// <2> 根据配置项 SINGLE_EVENTEXECUTOR_PER_GROUP ,每个 Channel 从 EventExecutorGroup 获得不同 EventExecutor 执行器
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
// <3> 通过 childExecutors 缓存实现,一个 Channel 从 EventExecutorGroup 获得相同 EventExecutor 执行器
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = childExecutors.get(group);
// 缓存不存在,进行 从 EventExecutorGroup 获得 EventExecutor 执行器
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor); // 进行缓存
}
return childExecutor;
}
一共有三种情况:
<1>
,当不传入 EventExecutorGroup 时,不创建子执行器。即,使用 Channel 所注册的 EventLoop 作为执行器。对于我们日常使用,基本完全都是这种情况。所以,下面两种情况,胖友不理解也是没关系的。<2>
,根据配置项ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP
,每个 Channel 从 EventExecutorGroup 获得不同 EventExecutor 执行器。<3>
,通过childExecutors
缓存实现,每个 Channel 从 EventExecutorGroup 获得相同 EventExecutor 执行器。是否获得相同的 EventExecutor 执行器,这就是<2>
、<3>
的不同。
注意,创建的是 DefaultChannelHandlerContext 对象。
在
newContext
方法的实现中,调用childExecutor(EventExecutorGroup group)
方法是为了确定要为当前的ChannelHandler
选择哪一个EventExecutor
来执行相关的任务。这个步骤对于保证ChannelHandler
在合适的线程上执行至关重要。方法调用背景
假设在
newContext
方法中,有这样的一段代码:private DefaultChannelHandlerContext newContext( EventExecutorGroup group, String name, ChannelHandler handler) { // <1> 调用 childExecutor 方法 EventExecutor executor = childExecutor(group); // 创建并返回 DefaultChannelHandlerContext 实例 return new DefaultChannelHandlerContext(this, executor, name, handler); }
childExecutor(EventExecutorGroup group)
方法
childExecutor(EventExecutorGroup group)
方法的作用是从EventExecutorGroup
中选择一个合适的EventExecutor
来执行ChannelHandler
的任务。主要逻辑
检查传入的
group
:
如果
group
为null
,则使用默认的EventLoop
,这通常是与Channel
相关联的EventLoop
。如果
group
不为null
,那么从group
中选择一个具体的EventExecutor
来执行任务。选择合适的执行器:
如果
group
是一个非空的EventExecutorGroup
,则调用其next()
方法来选择一个EventExecutor
。这个
EventExecutor
将会与ChannelHandler
绑定,在事件发生时处理它们。示例实现
private EventExecutor childExecutor(EventExecutorGroup group) { if (group == null) { // 如果 group 为空,则使用当前 Channel 的 EventLoop 作为执行器 return null; } else { // 否则从 group 中选择一个具体的 EventExecutor return group.next(); } }
整体流程
newContext
方法被调用:
传入的
EventExecutorGroup
、name
和handler
作为参数。调用
childExecutor
:
判断
group
是否为空,决定使用哪个EventExecutor
。创建
DefaultChannelHandlerContext
:
使用选定的
EventExecutor
创建新的DefaultChannelHandlerContext
。返回新创建的上下文:
新创建的上下文会包含对
ChannelHandler
的引用,并负责在管道中传播事件。作用与意义
通过
childExecutor
的调用,Netty 可以灵活地为不同的ChannelHandler
分配执行线程。这样可以确保每个处理器在合适的执行环境中运行,避免线程安全问题,并提升系统的并发处理能力。
6. addLast0
#addLast0(AbstractChannelHandlerContext newCtx)
方法,添加到最后一个节点。注意,实际上,是添加到 tail
节点之前。代码如下:
private void addLast0(AbstractChannelHandlerContext newCtx) {
// 获得 tail 节点的前一个节点
AbstractChannelHandlerContext prev = tail.prev;
// 新节点,指向 prev 和 tail 节点
newCtx.prev = prev; // <1>
newCtx.next = tail; // <2>
// 在 prev 和 tail ,指向新节点
prev.next = newCtx; // <3>
tail.prev = newCtx; // <4>
}
FROM 闪电侠 《Netty 源码分析之pipeline(一)》
在 Netty 中,
ChannelPipeline
内部维护了一个双向链表结构来存储所有的ChannelHandler
。当您向ChannelPipeline
添加一个新的ChannelHandler
时,实际上是通过双向链表来组织这些处理器的顺序。您提到的
addLast0
方法是一个内部方法,用于将一个新的ChannelHandlerContext
添加到链表的末尾,但在此之前的位置。下面是对您提供的代码的详细解释:private void addLast0(AbstractChannelHandlerContext newCtx) { // 获得 tail 节点的前一个节点 AbstractChannelHandlerContext prev = tail.prev; // 新节点,指向 prev 和 tail 节点 newCtx.prev = prev; // <1> newCtx.next = tail; // <2> // 在 prev 和 tail ,指向新节点 prev.next = newCtx; // <3> tail.prev = newCtx; // <4> }
代码解析
获取前一个节点:
AbstractChannelHandlerContext prev = tail.prev;
获取链表中最后一个有效节点(即tail
节点的前一个节点)。更新新节点的指针:
newCtx.prev = prev;
设置新节点的prev
指针指向当前链表的最后一个有效节点。
newCtx.next = tail;
设置新节点的next
指针指向tail
节点。更新前一个节点的指针:
prev.next = newCtx;
更新前一个节点的next
指针,使其指向新节点。更新 tail 节点的指针:
tail.prev = newCtx;
更新tail
节点的prev
指针,使其指向新节点。通过这样的方式,
newCtx
被插入到了链表的末尾,但在tail
节点之前。这样做的目的是为了保持tail
节点作为一个特殊的占位符,始终位于链表的末尾,以便于管理和遍历链表。示例说明
假设链表初始状态如下:
head <-> ctx1 <-> ctx2 <-> ... <-> ctxN <-> tail
当我们调用
addLast0
方法添加一个新的上下文newCtx
时,链表的状态将变为:head <-> ctx1 <-> ctx2 <-> ... <-> ctxN <-> newCtx <-> tail
其中
ctxN
是原先链表的最后一个有效节点。如果您有任何疑问或者需要进一步的解释,请随时提问。
在
ChannelPipeline
中,addLast0(AbstractChannelHandlerContext newCtx)
方法用于将新的ChannelHandlerContext
添加到ChannelPipeline
的末尾,但并非真正的最后一个节点,而是添加在tail
节点之前。tail
节点是一个哨兵节点,用于表示管道的末端,并且不处理任何事件或数据。方法分析
private void addLast0(AbstractChannelHandlerContext newCtx) { // 获得 tail 节点的前一个节点 AbstractChannelHandlerContext prev = tail.prev; // 新节点,指向 prev 和 tail 节点 newCtx.prev = prev; // <1> newCtx.next = tail; // <2> // 在 prev 和 tail ,指向新节点 prev.next = newCtx; // <3> tail.prev = newCtx; // <4> }
详细解读
AbstractChannelHandlerContext prev = tail.prev;
:
先获取
tail
节点之前的节点prev
,也就是当前管道中真正的最后一个处理器节点。
newCtx.prev = prev;
(<1>
):
将新节点
newCtx
的prev
指针指向之前的prev
节点。
newCtx.next = tail;
(<2>
):
将新节点
newCtx
的next
指针指向tail
节点。
prev.next = newCtx;
(<3>
):
将之前的最后一个节点
prev
的next
指针指向新节点newCtx
,将其连接到新节点上。
tail.prev = newCtx;
(<4>
):
最后,将
tail
节点的prev
指针指向新节点newCtx
,更新链表末端的连接。作用与意义
链表操作:
通过上述四步操作,新的
ChannelHandlerContext
被插入到tail
节点之前,使得它成为新的最后一个有效节点。维护双向链表:
ChannelPipeline
维护的是一个双向链表,通过prev
和next
指针,可以在链表中正向或反向遍历节点,从而处理事件流。线程安全性:
这些操作通常会在同步块中执行,以保证多线程环境下链表结构的安全性。
这一方法的核心在于维护
ChannelPipeline
的链表结构,确保新添加的ChannelHandler
能够正确地插入到事件处理流程中。
7. callHandlerAdded0
#callHandlerAdded0(AbstractChannelHandlerContext)
方法,执行回调 ChannelHandler 添加完成( added )事件。代码如下:
1: private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
2: try {
3: // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
4: // any pipeline events ctx.handler() will miss them because the state will not allow it.
5: // 设置 AbstractChannelHandlerContext 已添加
6: ctx.setAddComplete();
7: // 回调 ChannelHandler 添加完成( added )事件
8: ctx.handler().handlerAdded(ctx);
9: } catch (Throwable t) {
10: // 发生异常,移除该节点
11: boolean removed = false;
12: try {
13: remove0(ctx); // 移除
14: try {
15: ctx.handler().handlerRemoved(ctx); // 回调 ChannelHandler 移除完成( removed )事件
16: } finally {
17: ctx.setRemoved(); // 标记节点已移除
18: }
19: removed = true; // 标记移除成功
20: } catch (Throwable t2) {
21: if (logger.isWarnEnabled()) {
22: logger.warn("Failed to remove a handler: " + ctx.name(), t2);
23: }
24: }
25:
26: // 触发异常的传播
27: if (removed) {
28: fireExceptionCaught(new ChannelPipelineException(
29: ctx.handler().getClass().getName() +
30: ".handlerAdded() has thrown an exception; removed.", t));
31: } else {
32: fireExceptionCaught(new ChannelPipelineException(
33: ctx.handler().getClass().getName() +
34: ".handlerAdded() has thrown an exception; also failed to remove.", t));
35: }
36: }
37: }
第 6 行:调用
AbstractChannelHandlerContext#setAddComplete()
方法,设置 AbstractChannelHandlerContext 已添加。第 8 行:调用
ChannelHandler#handlerAdded(AbstractChannelHandlerContext)
方法,回调 ChannelHandler 添加完成( added )事件。一般来说,通过这个方法,来初始化 ChannelHandler 。注意,因为这个方法的执行在 EventLoop 的线程中,所以要尽量避免执行时间过长。第 9 行:发生异常。
第 10 至 24 行:移除该节点( ChannelHandler )。详细解析,见 《Netty 源码解析 —— ChannelPipeline(三)之移除 ChannelHandler》 。
😈 所以,
ChannelHandler#handlerAdded(AbstractChannelHandlerContext)
方法的执行异常时,将被移除。
第 26 至 35 行:触发异常的传播。详细解析,见 《Netty 源码解析 —— ChannelPipeline(六)之异常事件的传播》 。
在 Netty 中,
ChannelHandlerContext
提供了生命周期回调方法,其中一个重要的方法是handlerAdded0
。当一个ChannelHandler
被添加到ChannelPipeline
时,会调用此方法。下面是关于callHandlerAdded0
方法的详细解释:
callHandlerAdded0
方法
callHandlerAdded0
方法通常是在ChannelHandlerContext
的内部实现中调用的,它的主要目的是通知ChannelHandler
它已经被添加到ChannelPipeline
中。这使得ChannelHandler
可以在被添加后执行一些初始化操作。方法签名
callHandlerAdded0
方法的签名大致如下(请注意,这是简化后的示例,实际源码可能略有不同):void callHandlerAdded0() { final ChannelHandler handler = this.handler; if (handler != null) { handler.handlerAdded(this); } }
工作原理
当一个
ChannelHandler
被添加到ChannelPipeline
中时,Netty 会调用ChannelHandlerContext
的callHandlerAdded0
方法。如果ChannelHandlerContext
的handler
字段不为空,则调用handler.handlerAdded(ChannelHandlerContext)
方法。使用示例
下面是一个简单的示例,展示了如何在自定义的
ChannelHandler
中重写handlerAdded
方法:public class MyChannelHandler extends ChannelInboundHandlerAdapter { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("Handler added to the pipeline: " + ctx.name()); // 这里可以进行一些初始化操作 } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 处理接收到的消息 } }
当
MyChannelHandler
被添加到ChannelPipeline
时,handlerAdded
方法将会被调用。如何触发
callHandlerAdded0
callHandlerAdded0
通常在ChannelPipeline
的addLast
或addFirst
方法被调用后触发。例如:ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("myHandler", new MyChannelHandler());
在这个例子中,
MyChannelHandler
被添加到ChannelPipeline
后,callHandlerAdded0
方法会在内部被调用。总结
callHandlerAdded0
方法是在ChannelHandlerContext
中调用的,用于通知ChannelHandler
已经被添加到ChannelPipeline
中。当
ChannelHandler
被添加后,可以通过重写handlerAdded
方法来执行初始化逻辑。此方法通常是在
ChannelPipeline
的addLast
或addFirst
方法被调用后触发的。如果您有任何疑问或需要进一步的帮助,请随时告诉我。
callHandlerAdded0
方法在 Netty 中用于通知ChannelHandler
它已被添加到ChannelPipeline
中。这是ChannelPipeline
管理ChannelHandler
生命周期的关键部分之一。方法背景
在 Netty 中,每个
ChannelHandler
在被添加到ChannelPipeline
时,都会经历一系列的回调操作,这些回调是由callHandlerAdded0
和callHandlerRemoved0
方法触发的。callHandlerAdded0
方法是在ChannelHandler
被添加到管道中后立即调用的,用于执行ChannelHandler
的初始化逻辑。方法签名
private void callHandlerAdded0(AbstractChannelHandlerContext ctx) { try { // 调用 ChannelHandler 的 handlerAdded 方法 ctx.handler().handlerAdded(ctx); // 设置上下文为已添加状态 ctx.setAddComplete(); } catch (Throwable t) { // 如果添加过程中发生异常,调用异常处理逻辑 boolean removed = false; try { remove(ctx); removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { logger.warn("Failed to remove a handler: " + ctx.name(), t2); } } if (removed) { // 如果成功移除,则触发异常处理 fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", t)); } else { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", t)); } } }
详细解读
ctx.handler().handlerAdded(ctx);
:
调用当前
ChannelHandler
的handlerAdded
方法,传递当前的上下文ctx
。这是
ChannelHandler
的生命周期方法之一,用于在处理器被添加到ChannelPipeline
后进行初始化。不同的
ChannelHandler
可能会在这个方法中执行不同的初始化逻辑,例如分配资源、设置初始状态等。
ctx.setAddComplete();
:
标记该
ChannelHandlerContext
已完成添加操作。这个方法通常是为了设置一些状态,确保后续操作可以正确执行。异常处理:
在
handlerAdded
方法中,如果发生异常,代码会尝试将该ChannelHandler
从ChannelPipeline
中移除,防止系统进入不一致的状态。如果移除操作成功,则抛出一个
ChannelPipelineException
,并在管道中传播该异常。如果移除操作失败,则会记录日志并抛出更严重的异常。
作用与意义
生命周期管理:
callHandlerAdded0
是ChannelHandler
生命周期管理的重要一环,通过这个方法,可以确保每个ChannelHandler
在被添加到ChannelPipeline
后得到正确的初始化。异常处理:
该方法中对异常的处理逻辑确保了在
ChannelHandler
初始化失败时,可以安全地将其移除,防止管道进入不可预期的状态。上下文状态:
通过
ctx.setAddComplete()
,确保上下文的状态是与处理器的实际情况一致的,从而避免后续操作出现问题。这个方法在 Netty 的
ChannelPipeline
中起到了保证ChannelHandler
添加过程的完整性和安全性的作用,使得整个管道的处理逻辑更加健壮。
为什么要回调
在 Netty 中,回调机制是框架设计的一部分,尤其是在
ChannelPipeline
中通过回调来管理ChannelHandler
的生命周期和处理各种网络事件。回调的使用有几个关键原因:1. 解耦与灵活性
解耦:通过回调,Netty 可以将事件处理逻辑与事件触发机制解耦。
ChannelPipeline
只负责事件的传播,而具体的处理逻辑则由各个ChannelHandler
自己定义。这样一来,ChannelPipeline
可以非常灵活地处理各种不同类型的ChannelHandler
,而不需要了解它们的内部实现。灵活性:开发者可以通过实现不同的
ChannelHandler
接口来定制自己的处理逻辑,并将其添加到ChannelPipeline
中。Netty 通过回调机制自动触发这些处理逻辑,而不需要硬编码逻辑在核心框架中。2. 生命周期管理
初始化与清理:每个
ChannelHandler
在添加到ChannelPipeline
时,Netty 会回调handlerAdded
方法。这给了ChannelHandler
一个机会去执行一些初始化操作,例如分配资源、配置状态等。同样地,当ChannelHandler
被移除时,Netty 会回调handlerRemoved
方法,用于资源清理和状态重置。安全性:通过这些生命周期回调,Netty 可以确保
ChannelHandler
在整个生命周期内都能正确管理自己的状态和资源,减少内存泄漏或状态不一致的风险。3. 异步与并发模型的支持
异步处理:Netty 是一个高度异步的框架,回调机制非常适合这种异步模型。通过回调,Netty 可以在事件发生时通知相关的处理器,并在事件处理完成后继续执行其他任务,而不需要阻塞当前线程。
并发控制:回调机制还可以与
EventLoop
或EventExecutor
一起使用,以确保回调是在正确的线程中执行。这对于并发处理和避免线程安全问题至关重要。4. 事件驱动编程模型
响应特定事件:在网络编程中,不同的事件需要不同的处理逻辑。例如,连接建立、消息接收、连接断开等事件都需要不同的处理方式。通过回调,Netty 可以在这些事件发生时通知相应的
ChannelHandler
,使其可以对特定事件做出反应。简化逻辑:回调机制使得事件驱动编程变得简单和直观,开发者只需要专注于事件的处理逻辑,而不必关心事件的触发和管理。
5. 扩展性
插拔式设计:通过回调机制,Netty 的
ChannelPipeline
支持插拔式设计,开发者可以随时添加或移除ChannelHandler
,以调整处理逻辑。这种扩展性使得 Netty 能够适应各种不同的应用场景,从简单的协议处理到复杂的多协议栈处理。总结
回调机制在 Netty 中的应用,主要是为了提供解耦、灵活性、生命周期管理、异步处理支持,以及简化事件驱动编程模型。它使得 Netty 在设计上更加模块化、可扩展,并且能够轻松处理复杂的网络协议和应用场景。
在 Netty 中,回调机制(如
handlerAdded
,handlerRemoved
,exceptionCaught
等方法)是非常重要的设计模式之一,它允许开发者在特定的时间点执行自定义的逻辑。下面详细解释为什么需要回调以及回调的好处:1. 生命周期管理
初始化和清理:
handlerAdded
: 当一个ChannelHandler
被添加到ChannelPipeline
中时,handlerAdded
方法会被调用。这为开发者提供了一个机会来执行初始化操作,比如设置内部状态、注册定时器等。
handlerRemoved
: 相反地,当一个ChannelHandler
从ChannelPipeline
中被移除时,handlerRemoved
方法会被调用。这可以用来释放资源、取消定时器等。2. 错误处理
异常捕获:
exceptionCaught
: 当在处理事件时发生异常时,exceptionCaught
方法会被调用。这允许开发者记录错误、关闭连接或者采取其他适当的措施来处理异常情况。3. 状态同步
状态检查:
channelActive
和channelInactive
: 这些方法分别在通道变得活跃(即连接建立成功)和通道变得不活跃(即连接关闭)时被调用。这对于同步状态非常有用,例如,在通道变得活跃时开始接收数据,或者在通道变得不活跃时释放资源。4. 自定义逻辑
灵活扩展:
回调方法允许开发者根据自己的需求扩展 Netty 的行为,而不必修改框架本身的代码。这提高了系统的灵活性和可扩展性。
5. 事件驱动编程
非阻塞式编程模型:
Netty 是基于事件驱动的非阻塞式编程模型构建的。回调机制是这种编程模型的核心,因为它允许程序在特定事件发生时执行代码,而无需主动轮询或等待。
6. 异步操作
异步处理:
通过回调,开发者可以编写异步代码,这意味着处理可以在事件发生时立即开始,而不是等待某个操作完成。
示例
下面是一个简单的示例,展示如何使用回调来处理通道激活和异常捕获:
public class MyChannelHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Channel is active."); // 这里可以执行初始化操作 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("Exception caught: " + cause.getMessage()); // 这里可以记录异常、关闭连接等 cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 处理接收到的消息 } }
总结
回调机制是 Netty 中非常重要的一部分,它使得开发者能够轻松地控制和扩展框架的行为。通过使用回调,您可以编写更加灵活、高效和可靠的网络应用程序。如果您有任何具体的问题或需要更深入的解释,请随时提问。
当然可以。让我们更详细地讨论
ChannelHandler
的生命周期管理,特别是handlerAdded
和handlerRemoved
方法。生命周期管理
1. 初始化和清理
handlerAdded
方法:
何时调用:
当一个
ChannelHandler
被添加到ChannelPipeline
中时,handlerAdded
方法会被调用。用途:
这个方法提供了一个机会来执行初始化操作。
开发者可以利用这个方法来设置
ChannelHandler
的内部状态,例如初始化计数器、设置标志、注册定时器等。这也是配置任何依赖于
ChannelHandlerContext
的资源的好时机,例如订阅事件或启动后台任务。示例:
public class MyChannelHandler extends ChannelInboundHandlerAdapter { private ScheduledFuture<?> scheduledFuture; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("Handler added to the pipeline: " + ctx.name()); // 注册一个定时器,每隔一段时间执行某个任务 scheduledFuture = ctx.executor().scheduleAtFixedRate( new Runnable() { @Override public void run() { // 执行任务 } }, 0, 10, TimeUnit.SECONDS); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 处理接收到的消息 } }
handlerRemoved
方法:
何时调用:
当一个
ChannelHandler
从ChannelPipeline
中被移除时,handlerRemoved
方法会被调用。用途:
这个方法提供了一个机会来执行清理操作。
开发者可以利用这个方法来释放任何在
handlerAdded
方法中创建的资源,例如取消定时器、关闭文件句柄等。示例:
@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("Handler removed from the pipeline: " + ctx.name()); // 取消定时器 if (scheduledFuture != null) { scheduledFuture.cancel(false); } }
总结
handlerAdded
:
当
ChannelHandler
被添加到ChannelPipeline
时调用。用于执行初始化操作。
handlerRemoved
:
当
ChannelHandler
从ChannelPipeline
中被移除时调用。用于执行清理操作。
通过这些方法,您可以确保您的
ChannelHandler
在整个生命周期内正确地管理资源,从而提高应用程序的健壮性和性能。如果您有任何进一步的问题或需要具体的代码示例,请随时提问。
8. PendingHandlerCallback
PendingHandlerCallback ,实现 Runnable 接口,等待添加 ChannelHandler 回调抽象类。代码如下:
PendingHandlerCallback 是 DefaultChannelPipeline 的内部静态类。
private abstract static class PendingHandlerCallback implements Runnable {
/**
* AbstractChannelHandlerContext 节点
*/
final AbstractChannelHandlerContext ctx;
/**
* 下一个回调 PendingHandlerCallback 对象
*/
PendingHandlerCallback next;
PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
this.ctx = ctx;
}
/**
* 执行方法
*/
abstract void execute();
}
通过
ctx
和next
字段,形成回调链。#execute()
抽象方法,通过实现它,执行回调逻辑。
PendingHandlerCallback
是 Netty 内部的一个类,用于在ChannelPipeline
中管理ChannelHandler
的添加和移除操作,尤其是在需要异步处理这些操作时。它的设计旨在确保ChannelHandler
的添加和移除过程可以在正确的线程中执行,并且顺序不会出错。背景
在 Netty 中,
ChannelPipeline
是一个双向链表,负责管理ChannelHandler
的链式调用。由于 Netty 是一个异步框架,ChannelHandler
的添加和移除可能并不会立即生效,尤其是在多线程环境下。为了解决这个问题,Netty 引入了PendingHandlerCallback
,用于管理这些可能被推迟的操作。
PendingHandlerCallback
类
PendingHandlerCallback
是一个抽象类,其设计目的是为ChannelHandler
的添加和移除提供一种延迟执行的机制。当需要将一个ChannelHandler
添加到ChannelPipeline
中,或者从中移除时,如果当前无法立即执行这些操作,就会将它们包装成PendingHandlerCallback
,然后稍后在合适的时机执行。核心代码与实现
abstract class PendingHandlerCallback { final AbstractChannelHandlerContext ctx; PendingHandlerCallback next; PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx; } abstract void execute(); }
属性
ctx
: 持有当前ChannelHandlerContext
的引用,用于执行实际的添加或移除操作。
next
: 指向下一个PendingHandlerCallback
,形成一个链表结构,这样多个待执行的回调可以依次进行。方法
execute()
: 抽象方法,需要在子类中实现,表示具体的操作逻辑。根据子类的不同,这个方法会执行具体的ChannelHandler
添加或移除操作。常见子类
PendingHandlerCallback
有两个常见的子类,分别用于处理ChannelHandler
的添加和移除:
PendingHandlerAddedTask
:
这个子类用于延迟执行
ChannelHandler
的添加操作。在调用
execute()
方法时,它会触发ChannelHandler
的handlerAdded()
方法,完成ChannelHandler
的添加过程。
PendingHandlerRemovedTask
:
这个子类用于延迟执行
ChannelHandler
的移除操作。在调用
execute()
方法时,它会触发ChannelHandler
的handlerRemoved()
方法,完成ChannelHandler
的移除过程。使用场景
异步操作: 如果
ChannelPipeline
在某个非EventLoop
线程中执行添加或移除操作时,需要将这些操作推迟到EventLoop
线程中执行,以确保线程安全。在这种情况下,操作会被封装成PendingHandlerCallback
,然后在合适的时机被执行。事件顺序: 由于 Netty 是异步事件驱动的,
ChannelHandler
的添加和移除操作必须保证事件顺序不被打乱。PendingHandlerCallback
通过链表结构来确保这些操作按顺序执行。作用与意义
线程安全: 通过
PendingHandlerCallback
,Netty 可以保证ChannelHandler
的添加和移除操作在正确的线程中进行,避免了多线程操作带来的不确定性。顺序执行: 它确保了
ChannelHandler
的生命周期事件(如handlerAdded
和handlerRemoved
)按正确的顺序执行,防止事件错乱。简化开发: 对于开发者而言,
PendingHandlerCallback
的引入简化了在多线程环境下管理ChannelPipeline
的复杂性,使得添加和移除ChannelHandler
变得更安全和可靠。总结
PendingHandlerCallback
是 Netty 中用于延迟执行ChannelHandler
添加和移除操作的重要机制。它通过保证这些操作的线程安全和顺序执行,为ChannelPipeline
的稳定性提供了基础支持。"Pending" 是一个英语词汇,意思是“待处理的”或“未决的”。在计算机科学和软件工程中,"pending" 常用来描述那些已经被触发但尚未完成或尚未执行的任务、操作或事件。这个词表示某个动作或操作已经被安排或请求,但还没有实际执行或完成。
在
PendingHandlerCallback
中的含义在 Netty 框架中,
PendingHandlerCallback
类中的 "Pending" 表示该回调操作(如添加或移除ChannelHandler
)已经被请求,但由于某些原因,它还未立即执行,需要等待一个合适的时机(通常是在适当的线程环境中)再去执行。这种设计能够确保操作顺序的正确性和线程安全性。一些例子
Pending Task:
一个“待处理任务”可能是已经被加入到任务队列中的操作,但还没有被调度器执行。例如,在 GUI 程序中,一个点击事件可能会将某个任务加入到事件队列中,任务状态就是 "pending",直到该任务被执行。
Pending Request:
在网络通信中,一个“待处理请求”可能是一个已经发送到服务器但尚未得到响应的 HTTP 请求。在这个状态下,请求是 "pending" 的,等待服务器的处理。
Pending Approval:
在审批流程中,一个“待审批”项可能是已经提交但尚未被审核和批准的文档或请求。
总结
"Pending" 强调的是某个动作或事件处于等待执行的状态。它反映了一个已经发生的触发或请求,但由于需要等待某个条件满足或者等待被调度,因此还没有实际执行。
为什么会有 PendingHandlerCallback 呢?
因为 ChannelHandler 添加到 pipeline 中,会触发 ChannelHandler 的添加完成( added )事件,并且该事件需要在 Channel 所属的 EventLoop 中执行。
但是 Channel 并未注册在 EventLoop 上时,需要暂时将“触发 ChannelHandler 的添加完成( added )事件”的逻辑,作为一个 PendingHandlerCallback 进行“缓存”。在 Channel 注册到 EventLoop 上时,进行回调执行。
PendingHandlerCallback 有两个实现类:
PendingHandlerAddedTask
PendingHandlerRemovedTask
本文只分享 PendingHandlerAddedTask 的代码实现。
PendingHandlerCallback
的存在,主要是为了处理在ChannelHandler
被添加或移除时,可能由于Channel
尚未注册到EventLoop
上,导致这些操作无法立即执行的情况。设计背景
在 Netty 中,
ChannelHandler
的添加和移除操作必须在Channel
所属的EventLoop
线程中执行,因为这些操作可能会影响事件的处理顺序和线程安全性。然而,有些情况下,Channel
还未注册到EventLoop
,这时就需要一种机制来“缓存”这些操作,等待Channel
注册完成后再执行。PendingHandlerCallback
就是为了解决这个问题而设计的。两个实现类
PendingHandlerAddedTask
:
这个类用于处理
ChannelHandler
的添加操作。当ChannelHandler
需要添加到ChannelPipeline
中,但Channel
还未注册到EventLoop
上时,PendingHandlerAddedTask
会将这次添加操作缓存起来,直到Channel
注册完成后再执行handlerAdded()
方法,通知ChannelHandler
已经成功添加。
PendingHandlerRemovedTask
:
这个类则用于处理
ChannelHandler
的移除操作。如果在ChannelHandler
需要从ChannelPipeline
中移除时,Channel
尚未注册到EventLoop
,PendingHandlerRemovedTask
会将移除操作缓存起来,待Channel
注册到EventLoop
后,再执行handlerRemoved()
方法,完成ChannelHandler
的移除。关键点总结
确保线程安全: Netty 通过
PendingHandlerCallback
确保ChannelHandler
的添加和移除操作都在EventLoop
线程中执行,从而避免多线程环境下的线程安全问题。保证事件顺序: 通过延迟执行,
PendingHandlerCallback
确保ChannelPipeline
中的事件处理顺序不会被打乱,保持整个网络事件处理的逻辑一致性。缓存机制: 当
Channel
尚未准备好执行这些操作时,PendingHandlerCallback
提供了一种缓存机制,等待条件满足时再执行操作。这个设计使得 Netty 在处理复杂的异步事件时,能够保持高效和安全。
8.1 PendingHandlerAddedTask
PendingHandlerAddedTask 实现 PendingHandlerCallback 抽象类,用于回调添加 ChannelHandler 节点。代码如下:
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
// 在 EventLoop 的线程中,回调 ChannelHandler added 事件
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
// 提交 EventLoop 中,执行回调 ChannelHandler added 事件
try {
executor.execute(this); // <1>
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
// 发生异常,进行移除
remove0(ctx);
// 标记 AbstractChannelHandlerContext 为已移除
ctx.setRemoved();
}
}
}
}
在
#execute()
实现方法中,我们可以看到,和#addLast(EventExecutorGroup group, String name, ChannelHandler handler)
方法的【第 28 至 45 行】的代码比较类似,目的是,在 EventLoop 的线程中,执行#callHandlerAdded0(AbstractChannelHandlerContext)
方法,回调 ChannelHandler 添加完成( added )事件。<1>
处,为什么 PendingHandlerAddedTask 可以直接提交到 EventLoop 中呢?因为 PendingHandlerAddedTask 是个 Runnable ,这也就是为什么 PendingHandlerCallback 实现 Runnable 接口的原因。
下面开始分享的方法,属于 DefaultChannelPipeline 类。
8.2 callHandlerCallbackLater
#callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added)
方法,添加 PendingHandlerCallback 回调。代码如下:
/**
* This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
* all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
*
* We only keep the head because it is expected that the list is used infrequently and its size is small.
* Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
* complexity.
*
* 准备添加 ChannelHandler 的回调
*
* @see #callHandlerCallbackLater(AbstractChannelHandlerContext, boolean)
*/
private PendingHandlerCallback pendingHandlerCallbackHead;
1: private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
2: assert !registered;
3:
4: // 创建 PendingHandlerCallback 对象
5: PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
6: PendingHandlerCallback pending = pendingHandlerCallbackHead;
7: // 若原 pendingHandlerCallbackHead 不存在,则赋值给它
8: if (pending == null) {
9: pendingHandlerCallbackHead = task;
10: // 若原 pendingHandlerCallbackHead 已存在,则最后一个回调指向新创建的回调
11: } else {
12: // Find the tail of the linked-list.
13: while (pending.next != null) {
14: pending = pending.next;
15: }
16: pending.next = task;
17: }
18: }
added
方法参数,表示是否是添加 ChannelHandler 的回调。所以在【第 5 行】的代码,根据added
是否为true
,创建 PendingHandlerAddedTask 或 PendingHandlerRemovedTask 对象。在本文中,当然创建的是 PendingHandlerAddedTask 。第 7 至 17 行:将创建的 PendingHandlerCallback 对象,“添加”到
pendingHandlerCallbackHead
中。
8.3 invokeHandlerAddedIfNeeded
#invokeHandlerAddedIfNeeded()
方法,执行在 PendingHandlerCallback 中的 ChannelHandler 添加完成( added )事件。它被两个方法所调用:
AbstractUnsafe#register0(ChannelPromise promise)
方法原因是:
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
例如 ServerBootstrap 通过 ChannelInitializer 注册自定义的 ChannelHandler 到 pipeline 上的情况。
调用栈如下图
HeadContext#channelRegistered(ChannelHandlerContext ctx)
方法。
笔者调试下来,对于 Netty NIO Server 和 NIO Client 貌似没啥作用,因为已经在
AbstractUnsafe#register0(ChannelPromise promise)
中触发。胖友也可以自己调试下。调用栈如下图
#invokeHandlerAddedIfNeeded()
方法,代码如下:
/**
* 是否首次注册
*
* {@link #invokeHandlerAddedIfNeeded()}
*/
private boolean firstRegistration = true;
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop(); // 必须在 EventLoop 的线程中
// 仅有首次注册有效 <1>
if (firstRegistration) {
// 标记非首次注册
firstRegistration = false;
// 执行在 PendingHandlerCallback 中的 ChannelHandler 添加完成( added )事件 // <2>
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
<1>
处,仅有首次注册有效(firstRegistration = true
) 时。而后,标记firstRegistration = false
。这也就是笔者为什么说,
HeadContext#channelRegistered(ChannelHandlerContext ctx)
方法对这个方法的调用,是没有效果的。
<2>
处,调用#callHandlerAddedForAllHandlers()
方法,执行在 PendingHandlerCallback 中的 ChannelHandler 添加完成( added )事件。代码如下:
1: private void callHandlerAddedForAllHandlers() {
2: final PendingHandlerCallback pendingHandlerCallbackHead;
3: // 获得 pendingHandlerCallbackHead
4: synchronized (this) {
5: assert !registered;
6:
7: // This Channel itself was registered.
8: registered = true; // 标记已注册
9:
10: pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
11: // Null out so it can be GC'ed.
12: this.pendingHandlerCallbackHead = null; // 置空,help gc
13: }
14:
15: // 顺序向下,执行 PendingHandlerCallback 的回调
16: // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
17: // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
18: // the EventLoop.
19: PendingHandlerCallback task = pendingHandlerCallbackHead;
20: while (task != null) {
21: task.execute();
22: task = task.next;
23: }
24: }
第 3 至 13 行:获得
pendingHandlerCallbackHead
变量。第 8 行:标记
registered = true
,表示已注册。第 10 至 12 行:置空对象的
pendingHandlerCallbackHead
属性,help GC 。使用
synchronized
的原因,和#addLast(EventExecutorGroup group, String name, ChannelHandler handler)
的【第 16 至 26 行】的代码需要对pendingHandlerCallbackHead
互斥,避免并发修改的问题。
第 15 至 23 行:顺序循环向下,调用
PendingHandlerCallback#execute()
方法,执行 PendingHandlerCallback 的回调,从而将 ChannelHandler 添加到 pipeline 中。这里不适用
synchronized
的原因,看英文注释哈。
添加 ChannelHandler 到 pipeline 中的代码,大部分的比较简单。比较复杂的可能是,「8. PendingHandlerCallback」 中,调用的过程涉及回调,所以理解上稍微可能困难。胖友可以多多调试进行解决噢。
推荐阅读文章:
闪电侠 《Netty 源码分析之 pipeline(一)》
Hypercube 《自顶向下深入分析 Netty(七)–ChannelPipeline 源码实现》