1. 首页
  2. Netty源码解析
  3. Netty 源码解析 —— ChannelPipeline

Netty 源码解析 —— ChannelPipeline

  • 发布于 2024-08-23
  • 24 次阅读

Netty 源码解析 —— ChannelPipeline(一)之初始化

  • Netty 是一个高性能的异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在 Netty 中,ChannelPipeline 是一个非常核心的概念,它定义了处理入站和出站事件的顺序。

    初始化 ChannelPipeline

    1. ChannelInitializer

    ChannelInitializer 是一个抽象类,通常用于在连接建立之后初始化 Channel。它有一个默认的方法 initChannel(Channel ch),在这个方法中你可以添加不同的处理器到 ChannelPipeline 上。

    示例代码如下:

    public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    ​
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            
            // 添加编码器
            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
            
            // 添加解码器
            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
            
            // 添加业务逻辑处理器
            pipeline.addLast(new MyServerHandler());
        }
    }

    2. ChannelPipeline 的创建

    当你创建一个新的 Channel 时,ChannelPipeline 会被自动创建并关联到这个 Channel。你可以通过调用 channel.pipeline() 获取到这个 ChannelPipeline 实例。

    3. 添加处理器

    可以通过 ChannelPipelineaddLast 方法来添加处理器。例如:

    ChannelPipeline pipeline = channel.pipeline();
    pipeline.addLast("handler1", new MyHandler1());
    pipeline.addLast("handler2", new MyHandler2());

    这里 "handler1""handler2" 分别是处理器的名字,名字需要唯一。MyHandler1MyHandler2 需要实现 ChannelInboundHandlerChannelOutboundHandler 接口。

    4. 处理器的执行顺序

    • 入站:对于入站消息,ChannelPipeline 会从头开始按顺序调用每一个处理器,直到没有更多的处理器为止。

    • 出站:对于出站消息,ChannelPipeline 会从最后一个处理器开始逆序调用每一个处理器,直到没有更多的处理器为止。

    具体流程

    1. 当客户端与服务端建立连接后,ChannelInitializerinitChannel 方法被调用。

    2. initChannel 方法中,向 ChannelPipeline 添加处理器。

    3. 这些处理器会根据它们被添加的顺序来决定它们处理事件的顺序。

    以上就是对 Netty 中 ChannelPipeline 初始化的基本介绍。如果你想要深入了解 ChannelPipeline 的内部实现和工作原理,可以进一步阅读 Netty 的源代码,特别是 ChannelPipelineChannelHandlerContext 以及相关的处理器类。

Netty 是一个基于 Java 的异步事件驱动的网络应用框架,广泛应用于高性能网络服务器和客户端开发。在 Netty 中,ChannelPipeline 是一个核心组件,负责处理和管理与通道(Channel)相关的所有处理器(ChannelHandler)。

下面是对 Netty 源码中 ChannelPipeline 的初始化过程的解析。

一、ChannelPipeline 简介

ChannelPipeline 是一个处理器链(pipeline),它内部维护了一组 ChannelHandler,这些处理器负责处理入站和出站的数据流。每个 Channel 都会关联一个 ChannelPipeline,而 ChannelPipeline 则会包含多个 ChannelHandlerContext,每个 ChannelHandlerContext 都与一个 ChannelHandler 关联。

二、ChannelPipeline 的初始化

1. 初始化方式

在 Netty 中,ChannelPipeline 的初始化通常发生在 Channel 初始化时。具体来说,ChannelPipeline 是在 Channel 的构造函数中被创建的。

以下是 AbstractChannel 类中的构造函数部分源码:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();  // 初始化 ChannelPipeline
}

在上面的代码中,可以看到 pipeline 是通过 newChannelPipeline() 方法来创建的。

2. newChannelPipeline() 方法

newChannelPipeline() 方法是一个抽象方法,不同的 Channel 实现类会有自己的实现。例如,在 NioSocketChannel 中的实现如下:

@Override
protected ChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

DefaultChannelPipelineChannelPipeline 的默认实现。

3. DefaultChannelPipeline 的构造函数

接下来,我们来看一下 DefaultChannelPipeline 的构造函数:

public DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise = new VoidChannelPromise(channel, true);
​
    tail = new TailContext(this);
    head = new HeadContext(this);
​
    head.next = tail;
    tail.prev = head;
}

在这个构造函数中,DefaultChannelPipeline 做了以下几件事情:

  • Channel 引用:首先,通过 ObjectUtil.checkNotNull() 方法确保 channel 非空,并将其保存。

  • SucceededFuture 和 VoidPromise:为 ChannelPipeline 创建了两个特殊的 ChannelFuture 实例:succeededFuturevoidPromise,它们在后续的操作中会被频繁使用。

  • 头尾节点初始化:接下来,DefaultChannelPipeline 初始化了两个关键的处理器上下文(ChannelHandlerContext)实例:HeadContextTailContext,分别对应处理链的头部和尾部。它们是 ChannelPipeline 内部的链表结构的两端节点。

  • 链表连接:最后,构造函数将 HeadContextTailContext 连接起来,形成一个双向链表结构。

三、处理器链的结构

DefaultChannelPipeline 中,处理器链(ChannelHandler 的链表)是一个双向链表结构。HeadContextTailContext 作为链表的两端,它们之间可以插入其他的 ChannelHandlerContext,这些 ChannelHandlerContext 会包装实际的 ChannelHandler

  • HeadContext 是默认处理链的头部,它处理出站事件,并且会将入站事件传递给下一个处理器。

  • TailContext 是默认处理链的尾部,它处理入站事件的结束,并且会将出站事件传递给前一个处理器。

四、总结

在 Netty 中,ChannelPipeline 的初始化过程主要围绕 DefaultChannelPipeline 展开。DefaultChannelPipeline 通过构造函数将头尾节点(HeadContextTailContext)进行初始化,并建立起处理器链的基础结构。理解 ChannelPipeline 的初始化过程,对于后续深入理解 Netty 的数据流处理机制至关重要。

在后续的解析中,我们会进一步探讨如何向 ChannelPipeline 中添加、删除 ChannelHandler,以及数据在 ChannelPipeline 中的流转过程。

ChannelInitializer 是 Netty 提供的一个抽象类,主要用于在新的 Channel 被注册到 EventLoop 之后,对其进行初始化。ChannelInitializer 的主要职责是配置 ChannelPipeline,即将各种 ChannelHandler 添加到 ChannelPipeline 中。它通常用于服务器端和客户端的初始化场景。

ChannelInitializer 的基本使用

ChannelInitializer 的核心方法是 initChannel(Channel ch)。这个方法是一个抽象方法,必须在子类中实现。在这个方法中,你可以向 ChannelPipeline 中添加各种 ChannelHandler,这些处理器将负责处理该 Channel 上的入站和出站数据。

以下是一个简单的示例,展示了如何使用 ChannelInitializer

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
​
        // 添加各种处理器
        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
        pipeline.addLast(new MyInboundHandler());
        pipeline.addLast(new MyOutboundHandler());
    }
}

在上面的代码中:

  • MyChannelInitializer 继承了 ChannelInitializer,并实现了 initChannel 方法。

  • initChannel 方法中,通过 ch.pipeline() 获取当前 ChannelChannelPipeline

  • 然后,使用 pipeline.addLast() 方法,将各种处理器(ChannelHandler)添加到 ChannelPipeline 中。

ChannelInitializer 的工作流程

ChannelInitializer 的工作流程如下:

  1. Channel 注册:当 Channel 注册到 EventLoop 时,Netty 会调用 ChannelInitializerinitChannel(Channel ch) 方法。

  2. 初始化 Pipeline:在 initChannel(Channel ch) 方法中,开发者可以根据需要,将各种 ChannelHandler 添加到 ChannelPipeline 中。

  3. 自动移除:在 initChannel(Channel ch) 方法完成执行后,Netty 会自动将 ChannelInitializerChannelPipeline 中移除。这是因为 ChannelInitializer 仅用于初始化 ChannelPipeline,在初始化完成后就不再需要了。

ChannelInitializer 的应用场景

ChannelInitializer 在实际开发中有广泛的应用,特别是在以下场景中:

  1. 服务器端初始化:通常在服务器端的 ServerBootstrap 中使用 ChannelInitializer 来初始化每个新连接的 Channel。例如:

    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .childHandler(new MyChannelInitializer());

    在这个例子中,每当有新的客户端连接时,MyChannelInitializer 会被调用来初始化客户端的 Channel

  2. 客户端初始化:在客户端使用 Bootstrap 时,也可以通过 ChannelInitializer 来初始化 Channel。例如:

    Bootstrap b = new Bootstrap();
    b.group(group)
     .channel(NioSocketChannel.class)
     .handler(new MyChannelInitializer());

    这里,MyChannelInitializer 会初始化客户端的 Channel,通常用于添加出站请求的编码器和入站响应的解码器。

总结

ChannelInitializer 是 Netty 提供的一个便捷工具,用于在 Channel 注册后初始化其 ChannelPipeline。通过实现 initChannel(Channel ch) 方法,开发者可以轻松地将所需的 ChannelHandler 添加到 ChannelPipeline 中。ChannelInitializer 的自动移除机制确保了它在初始化完成后不会对 ChannelPipeline 造成多余的负担。

本文主要介绍 Netty 中 ChannelPipeline 的初始化过程,包括 ChannelPipeline 接口定义、Inbound 和 Outbound 事件的特性以及 DefaultChannelPipeline、ChannelHandlerContext、HeadContext、TailContext 和 DefaultChannelHandlerContext 类的实现细节。

Key Takeaways

  • ChannelPipeline 为 ChannelHandler 的链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API。

  • DefaultChannelPipeline 是 ChannelPipeline 接口的默认实现类,也是唯一实现类。

  • ChannelHandlerContext 是 ChannelPipeline 中的节点,每个节点包含一个 ChannelHandler、它的上下节点以及其他上下文。

  • HeadContext 是 DefaultChannelPipeline 的头部节点,实现 ChannelOutboundHandler 接口,负责处理出站事件。

  • TailContext 是 DefaultChannelPipeline 的尾部节点,实现 ChannelInboundHandler 接口,负责处理入站事件。

  • DefaultChannelHandlerContext 内嵌一个 ChannelHandler 对象,负责处理事件的具体逻辑。

  • Outbound 事件是请求事件,由 Channel 发起,由 Unsafe 处理;Inbound 事件是通知事件,由 Unsafe 发起,由 TailContext 处理。

1. 概述

在 《Netty 源码分析 —— Netty 简介(二)之核心组件》 中,对 EventLoopGroup 和 EventLoop 做了定义,我们再来回顾下:

ChannelPipeline 为 ChannelHandler 的,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API 。一个数据或者事件可能会被多个 Handler 处理,在这个过程中,数据或者事件经流 ChannelPipeline ,由 ChannelHandler 处理。在这个处理过程中,一个 ChannelHandler 接收数据后处理完成后交给下一个 ChannelHandler,或者什么都不做直接交给下一个 ChannelHandler。

因为 ChannelPipeline 涉及的代码量较大,所以笔者会分成好几篇文章分别分享。而本文,我们来分享 ChannelPipeline 的初始化。也因此,本文更多是体现 ChannelPipeline 的整体性,所以不会过多介绍每个类的具体的每个方法的实现。

2. ChannelPipeline

io.netty.channel.ChannelPipeline ,继承 ChannelInboundInvoker、ChannelOutboundInvoker、Iterable 接口,Channel Pipeline 接口。代码如下:

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    // ========== 添加 ChannelHandler 相关 ==========
    ChannelPipeline addFirst(String name, ChannelHandler handler);
    ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
    ChannelPipeline addLast(String name, ChannelHandler handler);
    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
    ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
    ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
    ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
    ChannelPipeline addFirst(ChannelHandler... handlers);
    ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
    ChannelPipeline addLast(ChannelHandler... handlers);
    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

    // ========== 移除 ChannelHandler 相关 ==========
    ChannelPipeline remove(ChannelHandler handler);
    ChannelHandler remove(String name);
    <T extends ChannelHandler> T remove(Class<T> handlerType);
    ChannelHandler removeFirst();
    ChannelHandler removeLast();
    
    // ========== 替换 ChannelHandler 相关 ==========
    ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
    <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler);

    // ========== 查询 ChannelHandler 相关 ==========
    ChannelHandler first();
    ChannelHandlerContext firstContext();
    ChannelHandler last();
    ChannelHandlerContext lastContext();
    ChannelHandler get(String name);
    <T extends ChannelHandler> T get(Class<T> handlerType);
    ChannelHandlerContext context(ChannelHandler handler);
    ChannelHandlerContext context(String name);
    ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
    List<String> names();

    // ========== Channel 相关 ==========
    Channel channel();

    // ========== ChannelInboundInvoker 相关 ==========    
    @Override
    ChannelPipeline fireChannelRegistered();
    @Override
    ChannelPipeline fireChannelUnregistered();
    @Override
    ChannelPipeline fireChannelActive();
    @Override
    ChannelPipeline fireChannelInactive();
    @Override
    ChannelPipeline fireExceptionCaught(Throwable cause);
    @Override
    ChannelPipeline fireUserEventTriggered(Object event);
    @Override
    ChannelPipeline fireChannelRead(Object msg);
    @Override
    ChannelPipeline fireChannelReadComplete();
    @Override
    ChannelPipeline fireChannelWritabilityChanged();

    // ========== ChannelOutboundInvoker 相关 ==========    
    @Override
    ChannelPipeline flush();
    
}

虽然接口的方法比较多,笔者做了归类如下:

  • ChannelHandler 的增删改查的相关方法。

  • Channel 的相关方法,目前只有一个。

  • 继承自 ChannelInboundInvoker 的相关方法。

  • 继承自 ChannelOutboundInvoker 的相关方法。

有可能会疑惑为什么继承 Iterable 接口?因为 ChannelPipeline 是 ChannelHandler 的

ChannelPipeline 的类图如下:

2.1 ChannelInboundInvoker

io.netty.channel.ChannelInboundInvoker ,Channel Inbound Invoker( 调用者 ) 接口。代码如下:

ChannelPipeline fireChannelRegistered();
ChannelPipeline fireChannelUnregistered();
ChannelPipeline fireChannelActive();
ChannelPipeline fireChannelInactive();
ChannelPipeline fireExceptionCaught(Throwable cause);
ChannelPipeline fireUserEventTriggered(Object event);
ChannelPipeline fireChannelRead(Object msg);
ChannelPipeline fireChannelReadComplete();
ChannelPipeline fireChannelWritabilityChanged();
  • 通知 Channel 事件的接口方法。

2.2 ChannelOutboundInvoker

io.netty.channel.ChannelOutboundInvoker ,Channel Outbound Invoker( 调用者 ) 接口。代码如下:

// ========== Channel 操作相关 ==========    
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
ChannelFuture disconnect(ChannelPromise promise);
ChannelFuture close(ChannelPromise promise);
ChannelFuture deregister(ChannelPromise promise);
ChannelOutboundInvoker read();
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);

// ========== Promise 相关 ==========    
ChannelPromise newPromise();
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable cause);
ChannelPromise voidPromise();
  • 发起 Channel 操作的接口方法。

  • 创建 Promise 对象的接口方法。

2.3 Outbound v.s Inbound 事件

在 《Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (二)》 中,看到一个比较不错的总结:

因为要加一些注释,所以暂时不使用引用。

对于 Outbound 事件

  • Outbound 事件是【请求】事件(由 Connect 发起一个请求, 并最终由 Unsafe 处理这个请求)

  • Outbound 事件的发起者是 Channel

  • Outbound 事件的处理者是 Unsafe

  • Outbound 事件在 Pipeline 中的传输方向是 tail -> head

  • 旁白:Outbound 翻译为“出站”,所以从 tail( 尾 )到 head( 头 )也合理。

    至于什么是 headtail ,等看了具体的 ChannelPipeline 实现类 DefaultChannelPipeline 再说。

  • 在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Handler, 则需要调用 ctx.xxx (例如 ctx.connect ) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.

  • Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT

对于 Inbound 事件

  • Inbound 事件是【通知】事件, 当某件事情已经就绪后, 通知上层.

  • Inbound 事件发起者是 Unsafe

  • Inbound 事件的处理者是 TailContext, 如果用户没有实现自定义的处理方法, 那么Inbound 事件默认的处理者是 TailContext, 并且其处理方法是空实现.

  • Inbound 事件在 Pipeline 中传输方向是 head( 头 ) -> tail( 尾 )

  • 旁白:Inbound 翻译为“入站”,所以从 head( 头 )到 tail( 尾 )也合理。

  • 在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Handler, 则需要调用 ctx.fireIN_EVT (例如 ctx.fireChannelActive ) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.

  • Inbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT

Outbound 和 Inbound 事件十分的镜像, 并且 Context 与 Handler 直接的调用关系是否容易混淆, 因此读者在阅读这里的源码时, 需要特别的注意。

3. DefaultChannelPipeline

io.netty.channel.DefaultChannelPipeline ,实现 ChannelPipeline 接口,默认 ChannelPipeline 实现类。😈 实际上,也只有这个实现类。

3.1 静态属性

/**
 * {@link #head} 的名字
 */
private static final String HEAD_NAME = generateName0(HeadContext.class);
/**
 * {@link #tail} 的名字
 */
private static final String TAIL_NAME = generateName0(TailContext.class);

/**
 * 名字({@link AbstractChannelHandlerContext#name})缓存 ,基于 ThreadLocal ,用于生成在线程中唯一的名字。
 */
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() {

    @Override
    protected Map<Class<?>, String> initialValue() throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }

};

/**
 * {@link #estimatorHandle} 的原子更新器
 */
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
        AtomicReferenceFieldUpdater.newUpdater(
                DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");

HEAD_NAMETAIL_NAME 静态属性,通过调用 #generateName0(Class<?> handlerType) 方法,生成对应的名字。代码如下:

private static String generateName0(Class<?> handlerType) {
    return StringUtil.simpleClassName(handlerType) + "#0";
}
  • HEAD_NAME = "HeadContext#0"TAIL_NAME= "TailContext#0"

  • nameCaches 静态属性,名字( AbstractChannelHandlerContext.name )缓存 ,基于 ThreadLocal ,用于生成在线程中唯一的名字。详细解析,见 《Netty 源码解析 —— Pipeline(二)之添加 ChannelHandler》 。

  • ESTIMATOR 静态属性,estimatorHandle 属性的原子更新器。

3.2 构造方法

/**
 * Head 节点
 */
final AbstractChannelHandlerContext head;
/**
 * Tail 节点
 */
final AbstractChannelHandlerContext tail;

/**
 * 所属 Channel 对象
 */
private final Channel channel;
/**
 * 成功的 Promise 对象
 */
private final ChannelFuture succeededFuture;
/**
 * 不进行通知的 Promise 对象
 *
 * 用于一些方法执行,需要传入 Promise 类型的方法参数,但是不需要进行通知,就传入该值
 *
 * @see io.netty.channel.AbstractChannel.AbstractUnsafe#safeSetSuccess(ChannelPromise) 
 */
private final VoidChannelPromise voidPromise;
/**
 * TODO 1008 DefaultChannelPipeline 字段用途
 */
private final boolean touch = ResourceLeakDetector.isEnabled();

/**
 * 子执行器集合。
 *
 * 默认情况下,ChannelHandler 使用 Channel 所在的 EventLoop 作为执行器。
 * 但是如果有需要,也可以自定义执行器。详细解析,见 {@link #childExecutor(EventExecutorGroup)} 。
 */
private Map<EventExecutorGroup, EventExecutor> childExecutors;
/**
 * TODO 1008 DefaultChannelPipeline 字段用途
 */
private volatile MessageSizeEstimator.Handle estimatorHandle;
/**
 * 是否首次注册
 */
private boolean firstRegistration = true;

/**
 * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
 * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
 *
 * We only keep the head because it is expected that the list is used infrequently and its size is small.
 * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
 * complexity.
 * 
 * 准备添加 ChannelHandler 的回调
 */
private PendingHandlerCallback pendingHandlerCallbackHead;

/**
 * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
 * change.
 * Channel 是否已注册
 */
private boolean registered;

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    // succeededFuture 的创建
    succeededFuture = new SucceededChannelFuture(channel, null);
    // voidPromise 的创建
    voidPromise =  new VoidChannelPromise(channel, true);

    // 创建 Tail 及诶点
    tail = new TailContext(this); // <1>
    // 创建 Head 节点
    head = new HeadContext(this); // <2>

    // 相互指向 <3>
    head.next = tail;
    tail.prev = head;
}
  • head 属性,Head 节点,在构造方法的 <1> 处初始化。详细解析,见 「4.2 HeadContext」 。

  • tail 节点,Tail 节点,在构造方法的 <2> 处初始化。详细解析,见 「4.3 TailContext」 。

  • 在构造方法的 <3> 处,head 节点向指向 tail 节点,tail 节点向指向 head 节点,从而形成相互的指向。即如下图所示:

FROM 《netty 源码分析之 pipeline(一)》

pipeline 节点链(默认)

  • pipeline 中的节点的数据结构是 ChannelHandlerContext 类。每个 ChannelHandlerContext 包含一个 ChannelHandler、它的上下节点( 从而形成 ChannelHandler 链 )、以及其他上下文。详细解析,见 「4. ChannelHandlerContext」 。

  • 默认情况下,pipeline 有 headtail 节点,形成默认的 ChannelHandler 链。而我们可以在它们之间,加入自定义的 ChannelHandler 节点。如下图所示:

  • childExecutors 属性,子执行器集合。默认情况下,ChannelHandler 使用 Channel 所在的 EventLoop 作为执行器。

    • 但是如果有需要,也可以自定义执行器。详细解析,见 《Netty 源码解析 —— Pipeline(二)之添加 ChannelHandler》 。

  • pendingHandlerCallbackHead 属性,准备添加 ChannelHandler 的回调。详细解析,见 《Netty 源码解析 —— Pipeline(二)之添加 ChannelHandler》 。

  • registered 属性,Channel 是否已注册。详细解析,见 《Netty 源码解析 —— Pipeline(二)之添加 ChannelHandler》 。

  • firstRegistration 属性,是否首次注册。详细解析,见 《Netty 源码解析 —— Pipeline(二)之添加 ChannelHandler》 。

3.3 其他方法

DefaultChannelPipeline 中的其他方法,详细解析,见后续的文章。

4. ChannelHandlerContext

ChannelHandlerContext 是 Netty 中的一个非常重要的概念,它是 ChannelPipeline 中处理器的上下文环境。每个处理器都有一个对应的 ChannelHandlerContext 实例,这个实例提供了访问 ChannelChannelPipeline 以及其他相关操作的能力。

ChannelHandlerContext 的作用

  1. 访问 ChannelPipelineChannelHandlerContext 可以用来访问所属的 ChannelPipeline,进而访问其他处理器或者改变处理器链。

  2. 访问 Channel:通过 ChannelHandlerContext 可以访问到当前处理器所绑定的 Channel

  3. 发送事件:可以使用 ChannelHandlerContext 发送各种事件,包括入站事件和出站事件。

  4. 调用上下游处理器:可以调用上下游的处理器来处理特定的事件或消息。

  5. 属性管理ChannelHandlerContext 实现了 AttributeMap 接口,允许存储和检索键值对,这对于传递状态信息特别有用。

  6. 生命周期管理ChannelHandlerContext 还支持处理器的生命周期管理,例如 ChannelHandlerContextfireUserEventTriggered 方法可以用来触发用户自定义事件。

ChannelHandlerContext 的使用

下面是一些 ChannelHandlerContext 的常见使用方法:

  1. 发送数据:可以使用 writeAndFlush 方法发送数据。

    ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, World!", CharsetUtil.UTF_8));
  2. 触发事件:可以使用 fireUserEventTriggered 方法触发用户自定义事件。

    ctx.fireUserEventTriggered(new UserEvent());
  3. 访问上下游处理器

    • 上游处理器:可以通过 invokeFireUserEventTriggered 方法来调用上游处理器。

      ctx.invokeFireUserEventTriggered(new UserEvent());
    • 下游处理器:可以通过 fireUserEventTriggered 方法来调用下游处理器。

      ctx.fireUserEventTriggered(new UserEvent());
  4. 访问 ChannelPipeline

    • 获取 ChannelPipeline:通过 ctx.pipeline() 方法。

    • 获取特定处理器:通过 ctx.pipeline().get(String name) 方法。

  5. 属性管理

    • 获取属性:通过 ctx.attr(AttributeKey<T>) 方法。

    • 设置属性:通过 ctx.attr(AttributeKey<T>).set(T value) 方法。

示例代码

假设我们有一个简单的 ChannelHandlerContext 的使用示例:

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
​
public class MyServerHandler extends SimpleChannelInboundHandler<String> {
​
    private static final AttributeKey<Integer> VISIT_COUNT_KEY = AttributeKey.valueOf("visitCount");
​
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("Server received: " + msg);
​
        // 设置属性
        ctx.attr(VISIT_COUNT_KEY).setIfAbsent(0);
        int count = ctx.attr(VISIT_COUNT_KEY).getAndIncrement();
​
        // 发送响应
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, this is message " + count, CharsetUtil.UTF_8));
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

在这个示例中,我们使用 ChannelHandlerContext 来发送响应,并且使用 AttributeKey 来记录客户端访问次数。

总结来说,ChannelHandlerContext 是 Netty 中处理器的上下文环境,提供了访问 ChannelChannelPipeline 的能力,同时还支持发送事件、管理属性等功能。

ChannelHandlerContext 是 Netty 中的一个关键接口,表示 ChannelPipeline 中每个 ChannelHandler 所在的上下文(节点)。它继承了以下三个接口:

  • ChannelInboundInvoker

  • ChannelOutboundInvoker

  • AttributeMap

ChannelHandlerContext 作为 ChannelPipeline 中的一个节点,负责管理 ChannelHandler,并在 ChannelPipeline 中提供入站和出站事件的传递能力。

ChannelHandlerContext 的作用

ChannelHandlerContext 的主要作用包括:

  1. ChannelHandler 的关联:每个 ChannelHandlerChannelPipeline 中都有一个对应的 ChannelHandlerContext,这个上下文不仅关联了处理器,还维护了其在 ChannelPipeline 中的位置。

  2. 事件传播ChannelHandlerContext 提供了方法来将事件传播到 ChannelPipeline 中的下一个处理器。它支持两类事件传播:

    • 入站事件(Inbound Events):例如读操作、解码、异常处理等。这些事件从头部 HeadContext 开始依次向下传播。

    • 出站事件(Outbound Events):例如写操作、编码等。这些事件从尾部 TailContext 开始依次向上传播。

  3. 属性管理:通过 AttributeMap 接口,ChannelHandlerContext 可以为 ChannelHandler 提供一个存储自定义属性的容器,这些属性可以在处理不同事件时共享。

ChannelHandlerContext 的方法

ChannelHandlerContext 提供了一些重要的方法用于事件的传播和处理,以下是一些关键方法:

1. 入站事件传播方法

  • fireChannelRead(Object msg):将数据传递给 ChannelPipeline 中的下一个 ChannelInboundHandlerchannelRead 方法。

  • fireChannelActive():将 Channel 激活事件(如连接建立)传递给下一个 ChannelInboundHandlerchannelActive 方法。

  • fireExceptionCaught(Throwable cause):将异常传递给 ChannelPipeline 中的下一个 ChannelInboundHandlerexceptionCaught 方法。

2. 出站事件传播方法

  • write(Object msg):请求将数据写出到远程 Channel,会触发 ChannelPipeline 中的下一个 ChannelOutboundHandlerwrite 方法。

  • flush():请求刷新之前 write 的数据到远程 Channel

  • close():请求关闭 Channel,会触发 ChannelPipeline 中的下一个 ChannelOutboundHandlerclose 方法。

3. 属性管理方法

  • attr(AttributeKey<T> key):获取或创建与 ChannelHandlerContext 相关联的属性。

  • hasAttr(AttributeKey<T> key):检查 ChannelHandlerContext 是否具有指定的属性。

ChannelHandlerContext 的使用示例

以下是一个简单的 ChannelHandler 示例,展示了如何在处理入站和出站事件时使用 ChannelHandlerContext 传播事件:

public class MyHandler extends ChannelInboundHandlerAdapter {
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Received message: " + msg);
​
        // 继续传播事件给下一个处理器
        ctx.fireChannelRead(msg);
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
​
        // 关闭通道
        ctx.close();
    }
}
​
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
​
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("Writing message: " + msg);
​
        // 继续传播事件给下一个处理器
        ctx.write(msg, promise);
    }
​
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Flushing channel");
​
        // 继续传播事件给下一个处理器
        ctx.flush();
    }
}

在这个例子中:

  • MyHandler 是一个入站处理器,接收消息后打印并继续传递。

  • MyOutboundHandler 是一个出站处理器,写入消息时打印并继续传递。

总结

ChannelHandlerContext 是 Netty 中 ChannelPipeline 的关键节点,负责管理 ChannelHandler 并在处理器链中传播事件。它不仅提供了丰富的 API 用于事件传播,还支持属性的存储和访问,使得开发者可以灵活地处理网络数据流。理解 ChannelHandlerContext 的工作机制,对于编写高效的 Netty 应用程序至关重要。

io.netty.channel.ChannelHandlerContext ,继承 ChannelInboundInvoker、ChannelOutboundInvoker、AttributeMap 接口,ChannelHandler Context( 上下文 )接口,作为 ChannelPipeline 中的节点。代码如下:

// ========== Context 相关 ==========
String name();
Channel channel();
EventExecutor executor();
ChannelHandler handler();
ChannelPipeline pipeline();
boolean isRemoved(); // 是否已经移除

// ========== ByteBuf 相关 ==========    
ByteBufAllocator alloc();

// ========== ChannelInboundInvoker 相关 ==========    
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();

// ========== ChannelOutboundInvoker 相关 ==========
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();

// ========== AttributeMap 相关 ==========
@Deprecated
@Override
<T> Attribute<T> attr(AttributeKey<T> key);
@Deprecated
@Override
<T> boolean hasAttr(AttributeKey<T> key);

虽然接口的方法比较多,笔者做了归类如下:

  • Context 相关的接口方法。

  • 继承自 ChannelInboundInvoker 的相关方法,和 ChannelPipeline 一样

  • 继承自 ChannelOutboundInvoker 的相关方法,和 ChannelPipeline 一样

  • 继承自 AttributeMap 的相关方法,实际上已经废弃( @Deprecated )了,不再从 ChannelHandlerContext 中获取,而是从 Channel 中获取。

ChannelHandlerContext 的类图如下:

😈 类图中的 AttributeMap 和 DefaultAttributeMap 可以无视。

4.1 AbstractChannelHandlerContext

AbstractChannelHandlerContext 是 Netty 中 ChannelHandlerContext 的抽象实现类,是所有 ChannelHandlerContext 实现的基础类。它在 Netty 的 ChannelPipeline 中扮演了核心角色,负责管理 ChannelHandler 的执行顺序,并提供了事件在 ChannelPipeline 中传播的基础设施。

AbstractChannelHandlerContext 的作用

AbstractChannelHandlerContext 主要负责以下几个方面:

  1. 双向链表节点AbstractChannelHandlerContextChannelPipeline 中充当一个双向链表节点,它通过 prevnext 引用相邻的上下文,使得 ChannelHandler 可以按照顺序进行执行。

  2. 事件传播机制AbstractChannelHandlerContext 实现了事件传播的核心逻辑,包括入站事件(Inbound Events)和出站事件(Outbound Events)的传播。

  3. 状态管理:它管理 ChannelHandler 的状态(如是否被添加到 ChannelPipeline 中、是否是入站/出站处理器等),并在适当的时候触发相应的事件。

  4. 异步执行支持AbstractChannelHandlerContext 支持在不同的线程或事件循环中执行 ChannelHandler 的方法调用。这通过检查 ChannelHandler 是否需要在不同的 EventLoop 上执行来实现。

AbstractChannelHandlerContext 的关键字段

AbstractChannelHandlerContext 包含了一些关键字段,这些字段在事件传播和处理器管理中起着重要作用:

  • pipeline:表示当前上下文所属的 ChannelPipeline,通过它可以访问整个处理器链。

  • prevnext:分别表示当前上下文的前一个和后一个节点,用于维护处理器链的顺序。

  • handler:与当前上下文关联的 ChannelHandler,用于实际处理事件。

  • executor:用于在特定的 EventExecutor 中执行 ChannelHandler 的方法。如果 executornull,则表示该处理器将在默认的 EventLoop 中执行。

  • inboundoutbound:布尔值,用于指示当前上下文是否处理入站或出站事件。

AbstractChannelHandlerContext 的关键方法

1. 事件传播方法

AbstractChannelHandlerContext 提供了多个方法用于在 ChannelPipeline 中传播事件:

  • fireChannelRead(Object msg):将读事件传递给下一个 ChannelInboundHandlerContext

  • fireChannelActive():将通道激活事件传递给下一个 ChannelInboundHandlerContext

  • fireExceptionCaught(Throwable cause):将异常事件传递给下一个 ChannelInboundHandlerContext

  • write(Object msg, ChannelPromise promise):将写请求传递给下一个 ChannelOutboundHandlerContext

  • flush():将刷新请求传递给下一个 ChannelOutboundHandlerContext

2. 执行管理方法

  • invokeChannelRead():调用当前 ChannelHandlerchannelRead 方法。如果处理器需要在不同的线程中执行,该方法会安排异步执行。

  • invokeWrite():调用当前 ChannelHandlerwrite 方法,同样支持异步执行。

  • invokeFlush():调用当前 ChannelHandlerflush 方法。

AbstractChannelHandlerContext 的工作流程

  1. 事件接收:当 ChannelPipeline 接收到某个事件时,会从 HeadContextTailContext 开始,逐步调用链表中的每个 ChannelHandlerContext

  2. 事件传播AbstractChannelHandlerContext 根据当前事件类型(入站或出站),调用对应的传播方法(如 fireChannelReadwrite),并将事件传递给下一个处理器。

  3. 处理器执行:在事件传播过程中,AbstractChannelHandlerContext 会判断当前处理器是否需要在不同的 EventExecutor 中执行。如果需要,则安排异步执行,否则直接在当前线程中执行处理器的方法。

  4. 状态管理:在事件传播和处理器执行过程中,AbstractChannelHandlerContext 会更新处理器的状态,并在适当的时候触发相应的生命周期事件(如 handlerAddedhandlerRemoved)。

AbstractChannelHandlerContext 在 Netty 中的地位

AbstractChannelHandlerContext 是 Netty 处理链机制的核心,它为 ChannelPipeline 中的事件传播提供了基础设施,并确保每个 ChannelHandler 都能够按照预期的顺序和线程模型执行。通过管理链表节点、执行上下文和事件传播逻辑,AbstractChannelHandlerContext 实现了 Netty 的高效且灵活的处理器链架构。

理解 AbstractChannelHandlerContext 对于掌握 Netty 的内部运作原理至关重要,尤其是在处理复杂的异步事件流时。

AbstractChannelHandlerContext 是 Netty 中的一个抽象类,它实现了 ChannelHandlerContext 接口,并提供了 ChannelHandlerContext 的基本实现。AbstractChannelHandlerContext 的主要目的是为了简化 ChannelHandlerContext 的实现,并提供了一些默认的行为和方法实现。

AbstractChannelHandlerContext 的作用

  1. 简化实现AbstractChannelHandlerContext 提供了一个基础实现,使得开发者在编写自己的处理器时不必实现所有方法,只需要重写必要的部分即可。

  2. 通用行为:它包含了 ChannelHandlerContext 的一些通用行为,如事件的转发、属性管理等。

  3. 生命周期管理:提供了处理器生命周期管理的钩子方法,如 handlerAdded, handlerRemoved 等。

  4. 简化调用:提供了对上下游处理器的调用方法,如 fireChannelRead, fireExceptionCaught 等。

AbstractChannelHandlerContext 的使用

通常情况下,你不需要直接使用 AbstractChannelHandlerContext,而是通过实现 ChannelInboundHandlerChannelOutboundHandler 接口来创建自己的处理器。这些处理器会在 ChannelPipeline 中被创建,并由 Netty 自动提供一个 ChannelHandlerContext 的实例。

示例代码

下面是一个简单的例子,展示如何使用 AbstractChannelHandlerContext 的实现类 SimpleChannelInboundHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
​
public class MyServerHandler extends SimpleChannelInboundHandler<String> {
​
    private static final AttributeKey<Integer> VISIT_COUNT_KEY = AttributeKey.valueOf("visitCount");
​
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("Server received: " + msg);
​
        // 设置属性
        ctx.attr(VISIT_COUNT_KEY).setIfAbsent(0);
        int count = ctx.attr(VISIT_COUNT_KEY).getAndIncrement();
​
        // 发送响应
        ctx.writeAndFlush("Hello, this is message " + count);
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

在这个例子中,MyServerHandler 继承了 SimpleChannelInboundHandler,而 SimpleChannelInboundHandler 实际上是 AbstractChannelInboundHandler 的一个子类,它简化了 ChannelInboundHandler 的实现。

AbstractChannelHandlerContext 的内部实现

AbstractChannelHandlerContext 内部包含了一些关键的成员变量和方法:

  1. 成员变量

    • final ChannelPipeline pipeline: 当前处理器所在的 ChannelPipeline

    • final Channel channel: 当前处理器所绑定的 Channel

    • final ChannelHandler handler: 当前处理器实例。

    • final String name: 处理器的名称。

    • final boolean addBefore: 标记处理器添加的位置信息(在内部实现中用于优化)。

  2. 构造方法

    • AbstractChannelHandlerContext(ChannelPipeline pipeline, ChannelHandler handler, String name, boolean addBefore):

      • pipeline: 当前处理器所在的 ChannelPipeline

      • handler: 当前处理器实例。

      • name: 处理器的名称。

      • addBefore: 是否将处理器添加到 ChannelPipeline 的前面。

  3. 常用方法

    • Channel channel(): 返回当前处理器所绑定的 Channel

    • ChannelPipeline pipeline(): 返回当前处理器所在的 ChannelPipeline

    • ChannelHandler handler(): 返回当前处理器实例。

    • String name(): 返回处理器的名称。

    • boolean isRemoved(): 判断处理器是否已经被移除。

    • void fireChannelRegistered(): 触发 channelRegistered 事件。

    • void fireChannelUnregistered(): 触发 channelUnregistered 事件。

    • void fireChannelActive(): 触发 channelActive 事件。

    • void fireChannelInactive(): 触发 channelInactive 事件。

    • void fireExceptionCaught(Throwable cause): 触发 exceptionCaught 事件。

    • void fireUserEventTriggered(Object event): 触发 userEventTriggered 事件。

总结

AbstractChannelHandlerContext 是 Netty 中 ChannelHandlerContext 的一个抽象实现,主要用于简化处理器的实现。在实际开发中,我们通常不直接使用 AbstractChannelHandlerContext,而是通过继承 SimpleChannelInboundHandlerSimpleChannelOutboundHandler 来创建具体的处理器。这些处理器会自动获得 ChannelHandlerContext 的实例,用于与 ChannelPipelineChannel 交互。

io.netty.channel.AbstractChannelHandlerContext ,实现 ChannelHandlerContext、ResourceLeakHint 接口,继承 DefaultAttributeMap 类,ChannelHandlerContext 抽象基类。

4.1.1 静态属性

/**
 * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
 * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
 */
private static final int INIT = 0; // 初始化
/**
 * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
 */
private static final int ADD_PENDING = 1; // 添加准备中
/**
 * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
 */
private static final int ADD_COMPLETE = 2; // 已添加
/**
 * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
 */
private static final int REMOVE_COMPLETE = 3; // 已移除

/**
 * {@link #handlerState} 的原子更新器
 */
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

// ========== 非静态属性 ==========

/**
 * 处理器状态
 */
private volatile int handlerState = INIT;

handlerState 属性( 非静态属性,放这里主要是为了统一讲 ),处理器状态。共有 4 种状态。状态变迁如下图:


  • 详细解析,见 「4.1.3 setAddComplete」、「4.1.4 setRemoved」、「4.1.5 setAddPending」 中。

  • HANDLER_STATE_UPDATER 静态属性,handlerState 的原子更新器。

4.1.2 构造方法

/**
 * 上一个节点
 */
volatile AbstractChannelHandlerContext next;
/**
 * 下一个节点
 */
volatile AbstractChannelHandlerContext prev;
/**
 * 是否为 inbound
 */
private final boolean inbound;
/**
 * 是否为 outbound
 */
private final boolean outbound;
/**
 * 所属 pipeline
 */
private final DefaultChannelPipeline pipeline;
/**
 * 名字
 */
private final String name;
/**
 * 是否使用有序的 EventExecutor ( {@link #executor} ),即 OrderedEventExecutor
 */
private final boolean ordered;

// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
/**
 * EventExecutor 对象
 */
final EventExecutor executor;
/**
 * 成功的 Promise 对象
 */
private ChannelFuture succeededFuture;

// Lazily instantiated tasks used to trigger events to a handler with different executor. 懒加载
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
/**
 * 执行 Channel ReadComplete 事件的任务
 */
private Runnable invokeChannelReadCompleteTask;
/**
 * 执行 Channel Read 事件的任务
 */
private Runnable invokeReadTask;
/**
 * 执行 Channel WritableStateChanged 事件的任务
 */
private Runnable invokeChannelWritableStateChangedTask;
/**
 * 执行 flush 事件的任务
 */
private Runnable invokeFlushTask;
/**
 * 处理器状态
 */
private volatile int handlerState = INIT;

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor; // <1>
}
  • nextprev 属性,分别记录上、下一个节点。

  • Handler 相关属性:

    • 在 AbstractChannelHandlerContext 抽象类中,按照我们上文的分享,应该会看到一个类型为 ChannelHandler 的处理器,但是实际并不是这样。而是,😈 我们下文 DefaultChannelHandlerContext、TailContext、HeadContext 见。

    • inboundoutbound 属性,分别是否为 Inbound、Outbound 处理器。

    • name 属性,处理器名字。

    • handlerState 属性,处理器状态,初始为 INIT

  • executor 属性,EventExecutor 对象

    • ordered 属性,是否使用有序的 executor,即 OrderedEventExecutor ,在构造方法的 <1> 处理的初始化。

  • pipeline 属性,所属 DefaultChannelPipeline 对象。

4.1.3 setAddComplete

#setAddComplete() 方法,设置 ChannelHandler 添加完成。完成后,状态有两种结果:

  1. REMOVE_COMPLETE

  2. ADD_COMPLETE

代码如下:

final void setAddComplete() {
    for (;;) {
        int oldState = handlerState;
        // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
        // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
        // exposing ordering guarantees.
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return;
        }
    }
}
  • 循环 + CAS 保证多线程下的安全变更 handlerState 属性。

4.1.4 setRemoved

#setRemoved() 方法,设置 ChannelHandler 已移除。代码如下:

final void setRemoved() {
    handlerState = REMOVE_COMPLETE;
}

4.1.5 setAddPending

#setAddPending() 方法,设置 ChannelHandler 准备添加中。代码如下:

final void setAddPending() {
    boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
    assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
}
  • 当且仅当 INIT 可修改为 ADD_PENDING 。理论来说,这是一个绝对会成功的操作,原因见英文注释。

4.1.6 其他方法

AbstractChannelHandlerContext 中的其他方法,详细解析,见后续的文章。

4.2 HeadContext

HeadContext ,实现 ChannelOutboundHandler、ChannelInboundHandler 接口,继承 AbstractChannelHandlerContext 抽象类,pipe 头节点 Context 实现类。

HeadContext 是 DefaultChannelPipeline 的内部类。

4.2.1 构造方法

private final Unsafe unsafe;

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true); // <1>
    unsafe = pipeline.channel().unsafe(); // <2>
    setAddComplete(); // <3>
}
  • <1> 处,调用父 AbstractChannelHandlerContext 的构造方法,设置 inbound = falseoutbound = true

  • <2> 处,使用 Channel 的 Unsafe 作为 unsafe 属性。HeadContext 实现 ChannelOutboundHandler 接口的方法,都会调用 Unsafe 对应的方法。代码如下:

@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
    unsafe.bind(localAddress, promise);
}

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
    unsafe.disconnect(promise);
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
    unsafe.close(promise);
}

@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
    unsafe.deregister(promise);
}

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}
  • 这也就是为什么设置 outbound = true 的原因。

  • <3> 处,调用 #setAddComplete() 方法,设置 ChannelHandler 添加完成。此时,handlerStatus 会变成 ADD_COMPLETE 状态。

4.2.2 handler

#handler() 方法,返回自己作为 Context 的 ChannelHandler 。代码如下:

@Override
public ChannelHandler handler() {
    return this;
}
  • 因为 HeadContext ,实现 ChannelOutboundHandler、ChannelInboundHandler 接口,而它们本身就是 ChannelHandler 。

4.2.3 其他方法

HeadContext 中的其他方法,详细解析,见后续的文章。

4.3 TailContext

TailContext ,实现 ChannelInboundHandler 接口,继承 AbstractChannelHandlerContext 抽象类,pipe 尾节点 Context 实现类。

TailContext 是 DefaultChannelPipeline 的内部类。

4.3.1 构造方法

TailContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, TAIL_NAME, true, false); // <1>
    setAddComplete(); // <2>
}
  • <1> 处,调用父 AbstractChannelHandlerContext 的构造方法,设置 inbound = trueoutbound = false ,和 HeadContext 相反

  • <2> 处,调用 #setAddComplete() 方法,设置 ChannelHandler 添加完成。此时,handlerStatus 会变成 ADD_COMPLETE 状态。

4.3.2 handler

#handler() 方法,返回自己作为 Context 的 ChannelHandler 。代码如下:

@Override
public ChannelHandler handler() {
    return this;
}
  • 因为 HeadContext ,实现 ChannelInboundHandler 接口,而它们本身就是 ChannelHandler 。

4.3.3 其他方法

TailContext 中的其他方法,详细解析,见后续的文章。

4.4 DefaultChannelHandlerContext

io.netty.channel.DefaultChannelHandlerContext ,实现 AbstractChannelHandlerContext 抽象类。代码如下:

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); // <1>
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler; // <2>
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }

}
  • 不同于 HeadContext、TailContext,它们自身就是一个 Context 的同时,也是一个 ChannelHandler 。而 DefaultChannelHandlerContext 是内嵌 一个 ChannelHandler 对象,即 handler 。这个属性通过构造方法传入,在 <2> 处进行赋值。

  • <1> 处,调用父 AbstractChannelHandlerContext 的构造方法,通过判断传入的 handler 是否为 ChannelInboundHandler 和 ChannelOutboundHandler 来分别判断是否为 inboundoutbound

推荐阅读如下文章:

Netty 源码解析 —— ChannelPipeline(二)之添加 ChannelHandler

在 Netty 中,ChannelPipeline 是处理网络事件的核心组件。它是一个 ChannelHandler 的链,每个 ChannelHandler 负责处理或拦截传入的 I/O 事件或操作。当你添加一个 ChannelHandlerChannelPipeline 时,它会插入到这条处理链中,从而在指定的位置处理特定类型的事件。

ChannelPipeline 的结构

ChannelPipeline 本质上是一个双向链表,链表中的每一个节点都是一个 ChannelHandlerContext 对象,它包装了 ChannelHandler。这些节点可以通过名字或类型来相互引用和操作。

添加 ChannelHandler 的方法

Netty 提供了多种方法将 ChannelHandler 添加到 ChannelPipeline 中:

  1. addFirst 方法:将 ChannelHandler 添加到 ChannelPipeline 的头部。

    pipeline.addFirst("handler1", new MyChannelHandler());
  2. addLast 方法:将 ChannelHandler 添加到 ChannelPipeline 的尾部。

    pipeline.addLast("handler2", new MyChannelHandler());
  3. addBefore 方法:将 ChannelHandler 添加到指定的 ChannelHandler 之前。

    pipeline.addBefore("handler2", "handler1.5", new MyChannelHandler());
  4. addAfter 方法:将 ChannelHandler 添加到指定的 ChannelHandler 之后。

    pipeline.addAfter("handler1", "handler1.5", new MyChannelHandler());

ChannelHandler 添加的实现原理

当调用 addFirstaddLastaddBeforeaddAfter 方法时,ChannelPipeline 内部会创建一个新的 ChannelHandlerContext 对象,并将其插入到双向链表中。这些方法都是通过 AbstractChannelHandlerContext 及其子类实现的。

以下是 addLast 方法的简化流程:

  1. 创建 ChannelHandlerContext: Netty 会根据你提供的 ChannelHandler 创建一个新的 ChannelHandlerContext,这个对象将包含指向前一个和后一个 ChannelHandlerContext 的引用。

  2. 链表的插入操作: 新创建的 ChannelHandlerContext 会被插入到链表的尾部,前一个节点的 next 指针指向这个新的节点,而新节点的 prev 指针指向前一个节点。

  3. 事件传播: 一旦 ChannelHandler 被添加到 ChannelPipeline,所有传入的事件将从这个 ChannelHandler 开始被处理,如果没有被当前的 ChannelHandler 处理,它将传递给链中的下一个 ChannelHandler

实际代码解析

public final class DefaultChannelPipeline implements ChannelPipeline {
​
    // 省略其他代码
​
    @Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        // 获取 Channel 的 Unsafe 对象,用于执行底层的操作
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 将 handler 包装为一个 ChannelHandlerContext
            newCtx = newContext(null, name, handler);
            // 将该 context 添加到链表尾部
            addLast0(newCtx);
        }
        // 如果 Channel 已经注册,则立即调用 handlerAdded 方法
        callHandlerAdded0(newCtx);
        return this;
    }
​
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        // 获取尾部节点的引用
        AbstractChannelHandlerContext prev = tail.prev;
        // 进行双向链表的插入操作
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
​
    private static AbstractChannelHandlerContext newContext(
        DefaultChannelPipeline pipeline, String name, ChannelHandler handler) {
        // 省略部分代码:检查 name 的合法性
        return new DefaultChannelHandlerContext(
                pipeline, null, name, handler);
    }
​
    // 省略其他代码
}

从代码可以看出,DefaultChannelPipeline 中的 addLast 方法通过同步块来确保线程安全,然后创建一个 ChannelHandlerContext,并将它插入到链表的尾部。添加完成后,还会调用 callHandlerAdded0 方法来触发 handlerAdded 回调。

小结

Netty 的 ChannelPipeline 通过一种灵活而高效的方式管理 ChannelHandler,它采用双向链表的结构来组织这些处理器,并提供了多种方法在不同位置插入 ChannelHandler。通过这种机制,Netty 可以高效地处理网络事件,提供高度可扩展和可定制的网络应用开发框架。

ChannelPipeline 是 Netty 中一个非常重要的组件,它负责管理一系列的 ChannelHandler 并定义了这些处理器的执行顺序。当一个事件(例如数据读取或写入、异常发生等)触发时,这个事件会沿着 ChannelPipeline 传递给每一个注册的 ChannelHandler

添加 ChannelHandler

添加方式

ChannelPipeline 提供了多种方法来添加 ChannelHandler,以下是一些常用的方法:

  1. addLast: 将指定的 ChannelHandler 添加到 ChannelPipeline 的尾部。

  2. addFirst: 将指定的 ChannelHandler 添加到 ChannelPipeline 的头部。

  3. addBefore: 在指定的 ChannelHandler 前面插入一个新的 ChannelHandler

  4. addAfter: 在指定的 ChannelHandler 后面插入一个新的 ChannelHandler

  5. addLast/First: 这些方法可以接受多个 ChannelHandler,允许你一次添加多个处理器。

每个 ChannelHandlerChannelPipeline 中都有一个唯一的名称,这样可以通过名字来引用和操作处理器。

示例代码

以下是一个简单的示例,展示如何使用 ChannelPipeline 添加不同的 ChannelHandler

public class MyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Server received: " + msg);
        ctx.fireChannelRead(msg);
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
​
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        
        // 添加编码器和解码器
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());
​
        // 添加业务逻辑处理器
        pipeline.addLast("handler1", new MyServerHandler());
        pipeline.addLast("handler2", new MyServerHandler());
    }
}

在这个例子中,我们创建了一个名为 MyServerInitializer 的类,该类继承了 ChannelInitializer。在 initChannel 方法中,我们通过 addLast 方法向 ChannelPipeline 添加了三个 ChannelHandlerStringDecoderStringEncoder 和两个 MyServerHandler 实例。

注意事项

  • 执行顺序ChannelHandler 的执行顺序取决于它们被添加到 ChannelPipeline 的顺序。

  • 双向处理ChannelPipeline 支持两种类型的处理器:ChannelInboundHandlerChannelOutboundHandler。前者处理入站事件(如接收到的数据),后者处理出站事件(如要发送的数据)。你可以使用同一个处理器实现这两种接口来处理双向通信。

  • 共享实例:多个 Channel 可以共享相同的 ChannelHandler 实例,但每个 Channel 都有自己的 ChannelPipeline

了解这些基础知识后,你可以更深入地研究 ChannelPipelineChannelHandler 的具体工作原理以及如何有效地利用它们来构建高性能的应用程序。

1. 概述

本文我们来分享,添加 ChannelHandler 到 pipeline 中的代码具体实现。

在 《Netty 源码解析 —— ChannelPipeline(一)之初始化》 中,我们看到 ChannelPipeline 定义了一大堆添加 ChannelHandler 的接口方法:

ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
  • 考虑到实际当中,我们使用 #addLast(ChannelHandler... handlers) 方法较多,所以本文只分享这个方法的具体实现。

2. addLast

#addLast(ChannelHandler... handlers) 方法,添加任意数量的 ChannelHandler 对象。代码如下:

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {
        throw new NullPointerException("handlers");
    }

    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h); // <1>
    }

    return this;
}
  • <1> 处,调用 #addLast(EventExecutorGroup group, String name, ChannelHandler handler) 方法,添加一个 ChannelHandler 对象到 pipeline 中。

#addLast(EventExecutorGroup group, String name, ChannelHandler handler) 方法,代码如下:

 1: @Override
 2: @SuppressWarnings("Duplicates")
 3: public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
 4:     final AbstractChannelHandlerContext newCtx;
 5:     synchronized (this) { // 同步,为了防止多线程并发操作 pipeline 底层的双向链表
 6:         // 检查是否有重复 handler
 7:         checkMultiplicity(handler);
 8: 
 9:         // 创建节点名
10:         // 创建节点
11:         newCtx = newContext(group, filterName(name, handler), handler);
12: 
13:         // 添加节点
14:         addLast0(newCtx);
15: 
16:         // <1> pipeline 暂未注册,添加回调。再注册完成后,执行回调。详细解析,见 {@link #invokeHandlerAddedIfNeeded} 方法。
17:         // If the registered is false it means that the channel was not registered on an eventloop yet.
18:         // In this case we add the context to the pipeline and add a task that will call
19:         // ChannelHandler.handlerAdded(...) once the channel is registered.
20:         if (!registered) {
21:             // 设置 AbstractChannelHandlerContext 准备添加中
22:             newCtx.setAddPending();
23:             // 添加 PendingHandlerCallback 回调
24:             callHandlerCallbackLater(newCtx, true);
25:             return this;
26:         }
27: 
28:         // <2> 不在 EventLoop 的线程中,提交 EventLoop 中,执行回调用户方法
29:         EventExecutor executor = newCtx.executor();
30:         if (!executor.inEventLoop()) {
31:             // 设置 AbstractChannelHandlerContext 准备添加中
32:             newCtx.setAddPending();
33:             // 提交 EventLoop 中,执行回调 ChannelHandler added 事件
34:             executor.execute(new Runnable() {
35:                 @Override
36:                 public void run() {
37:                     callHandlerAdded0(newCtx);
38:                 }
39:             });
40:             return this;
41:         }
42:     }
43: 
44:     // <3> 回调 ChannelHandler added 事件
45:     callHandlerAdded0(newCtx);
46:     return this;
47: }
  • 第 5 行:synchronized 同步,为了防止多线程并发操作 pipeline 底层的双向链表。

  • 第 7 行:调用 #checkMultiplicity(ChannelHandler) 方法,校验是否重复的 ChannelHandler 。详细解析,见 「3. checkMultiplicity」 。

  • 第 11 行:调用 #filterName(String name, ChannelHandler handler) 方法,获得 ChannelHandler 的名字。详细解析,见 「4. filterName」 。

  • 第 11 行:调用 #newContext(EventExecutorGroup group, String name, ChannelHandler handler) 方法,创建 DefaultChannelHandlerContext 节点。详细解析,见 「5. newContext」 。

  • 第 14 行:#addLast0(AbstractChannelHandlerContext newCtx) 方法,添加到最后一个节点。详细解析,见 「6. addLast0」 。

  • ========== 后续分成 3 种情况 ==========

  • <1>

  • 第 20 行:Channel 并未注册。这种情况,发生于 ServerBootstrap 启动的过程中。在 ServerBootstrap#init(Channel channel) 方法中,会添加 ChannelInitializer 对象到 pipeline 中,恰好此时 Channel 并未注册。

  • 第 22 行:调用 AbstractChannelHandlerContext#setAddPending() 方法,设置 AbstractChannelHandlerContext 准备添加中

  • 第 24 行:调用 #callHandlerCallbackLater(AbstractChannelHandlerContext, added) 方法,添加 PendingHandlerAddedTask 回调。在 Channel 注册完成后,执行该回调。详细解析,见 「8. PendingHandlerCallback」 。

  • <2>

  • 第 30 行:不在 EventLoop 的线程中。

  • 第 32 行:调用 AbstractChannelHandlerContext#setAddPending() 方法,设置 AbstractChannelHandlerContext 准备添加中

  • 第 34 至 39 行:提交 EventLoop 中,调用 #callHandlerAdded0(AbstractChannelHandlerContext) 方法,执行回调 ChannelHandler 添加完成( added )事件。详细解析,见 「7. callHandlerAdded0」 。

  • <3>

  • 这种情况,是 <2> 在 EventLoop 的线程中的版本。也因为此,已经确认在 EventLoop 的线程中,所以不需要在 synchronized 中。

  • 第 45 行:和【第 37 行】的代码一样,调用 #callHandlerAdded0(AbstractChannelHandlerContext) 方法,执行回调 ChannelHandler 添加完成( added )事件。

3. checkMultiplicity

在 Netty 中,ChannelPipelineaddLast 和其他添加方法通常会检查是否已经存在具有相同名称的 ChannelHandler。如果存在,则会抛出一个 IllegalArgumentException,这是因为每个 ChannelHandlerChannelPipeline 中必须有唯一的名称。

这个检查可以通过调用 checkMultiplicity 方法来实现。虽然 checkMultiplicity 不是公开的 API,但在内部实现中它用于确保没有重复的处理器名称。下面是一个简化的示例,展示了 ChannelPipeline 如何通过类似的方式来检查处理器名称的唯一性:

public class SimpleChannelPipeline extends AbstractChannelPipeline {
​
    private final Map<String, ChannelHandler> nameToHandlerMap;
​
    public SimpleChannelPipeline() {
        this.nameToHandlerMap = new LinkedHashMap<>();
    }
​
    @Override
    public ChannelPipeline addLast(String name, ChannelHandler handler) {
        checkNotNull(name, "name");
        checkNotNull(handler, "handler");
​
        // Check if the handler with the same name already exists.
        checkMultiplicity(name);
​
        // Add the handler to the pipeline.
        // This is a simplified example and does not reflect the full logic of Netty's implementation.
        nameToHandlerMap.put(name, handler);
        return this;
    }
​
    private void checkMultiplicity(String name) {
        if (nameToHandlerMap.containsKey(name)) {
            throw new IllegalArgumentException("Duplicate handler name: " + name);
        }
    }
}

在这个简化的示例中,checkMultiplicity 方法检查提供的名称是否已经存在于 nameToHandlerMap 中。如果存在,则抛出一个 IllegalArgumentException

内部实现

实际上,在 Netty 中,ChannelPipeline 的实现更为复杂,并且涉及多个层次的检查和优化。例如,ChannelPipeline 会维护一个处理器链表,同时也会有一个映射表来快速查找处理器。此外,Netty 的 ChannelPipeline 还支持动态地添加、移除和替换处理器,而这些操作都需要保证线程安全。

在实际的 Netty 实现中,checkMultiplicity 方法通常会在添加处理器之前被调用,以避免重复添加相同名称的处理器。如果需要深入了解具体的实现细节,建议查看 Netty 的源代码,因为这会提供最准确的信息。

如果你有关于 Netty 的 ChannelPipelineChannelHandler 的具体问题,请随时提问!

checkMultiplicity 是 Netty 内部用于验证 ChannelHandler 能否被多次添加到 ChannelPipeline 的方法。这个方法在添加 ChannelHandlerChannelPipeline 时会被调用,以确保 ChannelHandler 的实例只能在满足其预期的情况下被多次添加。

ChannelHandler 的类型

在 Netty 中,ChannelHandler 有两种主要类型:

  1. @Sharable 注解的 ChannelHandler:标识该 ChannelHandler 可以在多个 ChannelPipeline 中共享,即一个实例可以被多个 ChannelPipeline 或者同一个 ChannelPipeline 多次添加使用。

  2. @SharableChannelHandler:一个实例只能在一个 ChannelPipeline 中使用,不能被重复添加到同一个 ChannelPipeline 中。

checkMultiplicity 方法的作用

checkMultiplicity 方法的主要作用就是检查是否可以将 ChannelHandler 的同一实例多次添加到同一个 ChannelPipeline 中。如果尝试重复添加一个非 @SharableChannelHandler,会抛出异常。

实现代码解析

以下是 DefaultChannelPipeline 类中 checkMultiplicity 方法的实现:

private static void checkMultiplicity(ChannelHandler handler) {
    // 如果 handler 没有 @Sharable 注解,并且已经在别处被使用,则抛出异常
    if (handler instanceof ChannelInboundHandlerAdapter ||
        handler instanceof ChannelOutboundHandlerAdapter) {
        // 如果是 Inbound 或 Outbound Handler Adapter 且不是 Sharable 的
        if (!handler.getClass().isAnnotationPresent(Sharable.class)) {
            if (DefaultChannelHandlerContext.isHandlerAdded(handler)) {
                throw new ChannelPipelineException(
                        handler.getClass().getName() + " is not a @Sharable handler, " +
                        "so can't be added or removed multiple times.");
            }
        }
    }
}

解析

  1. 检查 ChannelHandler 类型:

    • 方法首先检查 ChannelHandler 是否是 ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter 类型的实例。

  2. 判断是否有 @Sharable 注解:

    • 如果 ChannelHandler 的类上没有 @Sharable 注解,则表示该 ChannelHandler 不是可共享的,不能在多个 ChannelPipeline 中使用。

  3. 验证是否已添加

    • 使用 DefaultChannelHandlerContext.isHandlerAdded 方法来检查该 ChannelHandler 是否已经被添加到某个 ChannelPipeline 中。

    • 如果已经添加,则抛出 ChannelPipelineException 异常,表示该 ChannelHandler 不能重复添加。

结论

checkMultiplicity 方法是 Netty 保证 ChannelHandler 正确使用的关键机制,尤其是在涉及到非共享 ChannelHandler 的情况下。它确保每个 ChannelHandler 的实例按照预期的方式使用,防止由于不正确的多次添加导致的潜在问题。通过这种机制,Netty 能够提供更健壮和可靠的网络处理管道。

什么时候能多次添加,为什么

在 Netty 中,通常情况下不允许在同一个 ChannelPipeline 中多次添加具有相同名称的 ChannelHandler。这是因为在 ChannelPipeline 中,每个 ChannelHandler 必须有一个唯一的名称,以确保能够正确地引用和操作这些处理器。

然而,有一些特殊情况允许你多次添加 ChannelHandler,这些情况主要与处理器的类型和用途有关:

  1. 使用不同的名称

    • 即使是相同的 ChannelHandler 类型,只要给它们分配不同的名称,就可以多次添加到同一个 ChannelPipeline 中。例如,可以在 ChannelPipeline 中添加多个 MyServerHandler 实例,只要它们的名字不同即可。

  2. 共享处理器实例

    • 当你需要在多个 ChannelPipeline 之间共享一个 ChannelHandler 实例时,可以将这个实例添加到多个 ChannelPipeline 中,但是每次添加时都需要使用不同的名称。

  3. 使用复合处理器

    • 有时,你可能希望在同一个 ChannelPipeline 中使用多个功能相似的处理器。例如,可以创建一个复合处理器,该处理器包含多个子处理器,然后将这个复合处理器添加到 ChannelPipeline 中。这样,即使这些子处理器是相同的类型,也可以通过这种方式进行管理。

示例

假设你有一个简单的 MyServerHandler 类,并且你想在一个 ChannelPipeline 中添加两个这样的处理器实例:

public class MyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Server received: " + msg);
        ctx.fireChannelRead(msg);
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
​
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
​
        // 添加多个 MyServerHandler 实例,但使用不同的名称
        pipeline.addLast("handler1", new MyServerHandler());
        pipeline.addLast("handler2", new MyServerHandler());
    }
}

在这个例子中,尽管添加了两个 MyServerHandler 实例,但由于它们的名字不同,因此这种做法是可行的。每个处理器都会按照它们在 ChannelPipeline 中的顺序依次执行。

总结

  • 推荐做法:一般情况下,你应该为每个 ChannelHandler 使用不同的名称,以便于管理和调试。

  • 避免重复:尽量避免在同一个 ChannelPipeline 中使用相同名称的 ChannelHandler,除非你有明确的理由这样做,并且理解其潜在的影响。

  • 性能考虑:重复添加处理器可能会导致不必要的开销,尤其是在处理器数量较多的情况下。因此,合理设计处理器的结构和逻辑是非常重要的。


在 Netty 中,ChannelHandler 是否能多次添加到 ChannelPipeline 取决于它是否具备线程安全的属性,以及它的设计目标。能够多次添加的 ChannelHandler 通常具有以下特征:

1. 使用 @Sharable 注解

  • ChannelHandler 类使用 @Sharable 注解时,表示这个 ChannelHandler 可以在多个 ChannelPipeline 中共享,或者在同一个 ChannelPipeline 中多次添加。@Sharable 注解表明该 ChannelHandler 实例是线程安全的,可以在多个线程同时处理 I/O 事件,而不会产生并发问题。

@Sharable
public class MySharableHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 处理逻辑
    }
}

多次添加的场景

  • 如果你希望在多个 Channel 中使用同一个 ChannelHandler 实例,可以通过 @Sharable 让这个 ChannelHandler 在多个 ChannelPipeline 中共享。

  • 同样,如果在一个 ChannelPipeline 中需要同一逻辑处理多次(如在不同阶段处理相同的数据),也可以多次添加同一个 @SharableChannelHandler

2. 无状态的 ChannelHandler

  • 无状态的 ChannelHandler 通常可以被多次添加。无状态意味着该 ChannelHandler 没有保存任何与特定连接或数据包相关的上下文数据,因此它可以被多个连接共享。

  • 例如,一个简单的日志记录 ChannelHandler 可以是无状态的,它只会记录传入或传出的数据,并不依赖于任何特定的状态。

多次添加的场景

  • 无状态的 ChannelHandler 通常可以在不同的 ChannelPipeline 中重复使用,因为它们不会因并发访问或不同的连接上下文而出错。

为什么能多次添加?

  • 线程安全@Sharable 和无状态的 ChannelHandler 由于其设计使得它们在并发环境下是线程安全的,因此可以在多个 ChannelPipeline 中重复使用,而不必担心数据竞争或并发问题。

  • 功能复用:多次添加相同的 ChannelHandler 可以减少代码重复。例如,如果你有一个通用的日志记录功能,可能希望在多个位置使用相同的 ChannelHandler 来记录不同阶段的数据。

不能多次添加的情况

  • 状态相关的 ChannelHandler:如果 ChannelHandler 维护了某些状态(如统计信息、连接上下文),并且这些状态与特定的连接或数据流相关联,那么它就不能被多次添加或共享。因为不同连接或数据流的状态不应混淆在一起。

  • @Sharable 注解:如果 ChannelHandler 没有 @Sharable 注解,那么它假定为非线程安全的,且设计上只能在一个 ChannelPipeline 中使用。这种设计是为了防止在多个 ChannelPipeline 中使用同一实例时,出现线程安全问题或数据不一致的问题。

总结

ChannelHandler 只有在标注为 @Sharable 或者是无状态的情况下才能在多个 ChannelPipeline 中多次添加使用。这是因为这样的 ChannelHandler 通常是线程安全的,并且设计上支持在多个上下文中共享使用。如果一个 ChannelHandler 维护了与特定连接或数据流相关的状态,并且没有 @Sharable 注解,那么它就不能被多次添加,以避免并发问题和数据混乱。

#checkMultiplicity(ChannelHandler handler) 方法,校验是否重复的 ChannelHandler 。代码如下:

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        // 若已经添加,并且未使用 @Sharable 注解,则抛出异常
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        // 标记已经添加
        h.added = true;
    }
}

在 pipeline 中,一个创建的 ChannelHandler 对象,如果不使用 Netty @Sharable 注解,则只能添加到一个 Channel 的 pipeline 中。所以,如果我们想要重用一个 ChannelHandler 对象( 例如在 Spring 环境中 ),则必须给这个 ChannelHandler 添加 @Sharable 注解。

例如,在 Dubbo 的 com.alibaba.dubbo.remoting.transport.netty.NettyHandler 处理器,它就使用了 @Sharable 注解。

4. filterName

#filterName(String name, ChannelHandler handler) 方法,获得 ChannelHandler 的名字。代码如下:

private String filterName(String name, ChannelHandler handler) {
    if (name == null) { // <1>
        return generateName(handler);
    }
    checkDuplicateName(name); // <2>
    return name;
}
  • <1> 处,若传入默认的名字 name ,则调用 #generateName(ChannelHandler) 方法,根据 ChannelHandler 生成一个唯一的名字。详细解析,见 「4.1 generateName」 。

  • <2> 处,若传入默认的名字 name ,则调用 #checkDuplicateName(String name) 方法,校验名字唯一。详细解析,见 「4.2 checkDuplicateName」 。

4.1 generateName

#generateName(ChannelHandler) 方法,根据 ChannelHandler 生成一个唯一名字。代码如下:

 1: private String generateName(ChannelHandler handler) {
 2:     // 从缓存中查询,是否已经生成默认名字
 3:     Map<Class<?>, String> cache = nameCaches.get();
 4:     Class<?> handlerType = handler.getClass();
 5:     String name = cache.get(handlerType);
 6:     // 若未生成过,进行生成
 7:     if (name == null) {
 8:         name = generateName0(handlerType);
 9:         cache.put(handlerType, name);
10:     }
11: 
12:     // 判断是否存在相同名字的节点
13:     // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
14:     // any name conflicts.  Note that we don't cache the names generated here.
15:     if (context0(name) != null) {
16:         // 若存在,则使用基础名字 + 编号,循环生成,直到一个是唯一的
17:         String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
18:         for (int i = 1;; i ++) {
19:             String newName = baseName + i;
20:             if (context0(newName) == null) { // // 判断是否存在相同名字的节点
21:                 name = newName;
22:                 break;
23:             }
24:         }
25:     }
26:     return name;
27: }
  • 第 2 至 5 行:从缓存 nameCaches 中,查询是否已经生成默认名字。

    • 若未生成过,调用 #generateName0(ChannelHandler) 方法,进行生成。而后,添加到缓存 nameCaches 中。

  • 第 15 行:调用 #context0(String name) 方法,判断是否存在相同名字的节点。代码如下:

private AbstractChannelHandlerContext context0(String name) {
    AbstractChannelHandlerContext context = head.next;
    // 顺序向下遍历节点,判断是否有指定名字的节点。如果有,则返回该节点。
    while (context != tail) {
        if (context.name().equals(name)) {
            return context;
        }
        context = context.next;
    }
    return null;
}
  • 顺序向下遍历节点,判断是否有指定名字的节点。如果有,则返回该节点。

  • 第 15 至 25 行:若存在相同名字的节点,则使用基础名字 + 编号,循环生成,直到一个名字是唯一的,然后结束循环。

4.2 checkDuplicateName

#checkDuplicateName(String name) 方法,校验名字唯一。代码如下:

private void checkDuplicateName(String name) {
    if (context0(name) != null) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}
  • 通过调用 #context0(String name) 方法,获得指定名字的节点。若存在节点,意味着不唯一,抛出 IllegalArgumentException 异常。

5. newContext

#newContext(EventExecutorGroup group, String name, ChannelHandler handler) 方法,创建 DefaultChannelHandlerContext 节点。而这个节点,内嵌传入的 ChannelHandler 参数。代码如下:

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group) /** <1> **/, name, handler);
}


newContext
方法的作用是创建一个 DefaultChannelHandlerContext 节点。这个节点是 ChannelPipeline 中的重要组成部分,它封装了一个 ChannelHandler,并在 ChannelPipeline 中扮演处理器节点的角色。

newContext 方法签名

private DefaultChannelHandlerContext newContext(
    EventExecutorGroup group, String name, ChannelHandler handler) {
    // Implementation here
}

参数说明

  • group: 该参数是 EventExecutorGroup 类型,代表了一个线程池,用来执行与该 ChannelHandler 相关的事件。如果传入 null,则会使用 ChannelEventLoop 作为执行环境。

  • name: 这是 ChannelHandlerContext 的名称,通常用于标识每个 ChannelHandler。如果传入 null,则会自动生成一个默认名称。

  • handler: 这是 ChannelHandler 类型的参数,表示需要被封装的实际处理器。它是 DefaultChannelHandlerContext 内部的核心组件,处理管道中的事件和数据。

方法的核心逻辑

  1. 创建 DefaultChannelHandlerContext 实例:

    • 通过调用 new DefaultChannelHandlerContext(...) 来创建一个新的 DefaultChannelHandlerContext 对象。

    • 该对象会关联上面的三个参数,包括执行器(group)、名字(name)和处理器(handler)。

  2. 返回新创建的上下文:

    • 方法最后返回这个新创建的 DefaultChannelHandlerContext 对象。

DefaultChannelHandlerContext 的作用

  • DefaultChannelHandlerContextChannelPipeline 中的一个节点,它连接了 ChannelHandler,并负责在管道中传播事件。

  • 每个 ChannelHandlerContext 都持有一个 ChannelHandler,并且包含指向下一个和上一个节点的指针,这使得事件能够沿着管道流动。

代码示例

以下是一个简化的 newContext 方法的实现示例:

private DefaultChannelHandlerContext newContext(
    EventExecutorGroup group, String name, ChannelHandler handler) {
​
    // 创建并返回 DefaultChannelHandlerContext 实例
    return new DefaultChannelHandlerContext(this, group, name, handler);
}

在 Netty 中,ChannelPipeline 是一个包含了 ChannelHandler 的双向链表,每个 ChannelHandlerContext 代表链表中的一个节点。newContext 方法就是用来创建这些节点的。

通过 newContext 方法,Netty 能够将多个 ChannelHandler 串联起来,形成一个可以处理 IO 操作的管道。

<1> 处,调用 #childExecutor(EventExecutorGroup group) 方法,创建执行器。代码如下:


private EventExecutor childExecutor(EventExecutorGroup group) {
    // <1> 不创建子执行器
    if (group == null) {
        return null;
    }
    // <2> 根据配置项 SINGLE_EVENTEXECUTOR_PER_GROUP ,每个 Channel 从 EventExecutorGroup 获得不同 EventExecutor 执行器
    Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
    if (pinEventExecutor != null && !pinEventExecutor) {
        return group.next();
    }
    // <3> 通过 childExecutors 缓存实现,一个 Channel 从 EventExecutorGroup 获得相同 EventExecutor 执行器
    Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
    if (childExecutors == null) {
        // Use size of 4 as most people only use one extra EventExecutor.
        childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
    }
    // Pin one of the child executors once and remember it so that the same child executor
    // is used to fire events for the same channel.
    EventExecutor childExecutor = childExecutors.get(group);
    // 缓存不存在,进行 从 EventExecutorGroup 获得 EventExecutor 执行器
    if (childExecutor == null) {
        childExecutor = group.next();
        childExecutors.put(group, childExecutor); // 进行缓存
    }
    return childExecutor;
}
  • 一共有三种情况:

    • <1> ,当不传入 EventExecutorGroup 时,不创建执行器。即,使用 Channel 所注册的 EventLoop 作为执行器。对于我们日常使用,基本完全都是这种情况。所以,下面两种情况,胖友不理解也是没关系的。

    • <2> ,根据配置项 ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP ,每个 Channel 从 EventExecutorGroup 获得不同 EventExecutor 执行器。

    • <3> ,通过 childExecutors 缓存实现,每个 Channel 从 EventExecutorGroup 获得相同 EventExecutor 执行器。是否获得相同的 EventExecutor 执行器,这就是 <2><3> 的不同。

  • 注意,创建的是 DefaultChannelHandlerContext 对象。

newContext 方法的实现中,调用 childExecutor(EventExecutorGroup group) 方法是为了确定要为当前的 ChannelHandler 选择哪一个 EventExecutor 来执行相关的任务。这个步骤对于保证 ChannelHandler 在合适的线程上执行至关重要。

方法调用背景

假设在 newContext 方法中,有这样的一段代码:

private DefaultChannelHandlerContext newContext(
    EventExecutorGroup group, String name, ChannelHandler handler) {
​
    // <1> 调用 childExecutor 方法
    EventExecutor executor = childExecutor(group);
​
    // 创建并返回 DefaultChannelHandlerContext 实例
    return new DefaultChannelHandlerContext(this, executor, name, handler);
}

childExecutor(EventExecutorGroup group) 方法

childExecutor(EventExecutorGroup group) 方法的作用是从 EventExecutorGroup 中选择一个合适的 EventExecutor 来执行 ChannelHandler 的任务。

主要逻辑

  1. 检查传入的 group:

    • 如果 groupnull,则使用默认的 EventLoop,这通常是与 Channel 相关联的 EventLoop

    • 如果 group 不为 null,那么从 group 中选择一个具体的 EventExecutor 来执行任务。

  2. 选择合适的执行器:

    • 如果 group 是一个非空的 EventExecutorGroup,则调用其 next() 方法来选择一个 EventExecutor

    • 这个 EventExecutor 将会与 ChannelHandler 绑定,在事件发生时处理它们。

示例实现

private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        // 如果 group 为空,则使用当前 Channel 的 EventLoop 作为执行器
        return null;
    } else {
        // 否则从 group 中选择一个具体的 EventExecutor
        return group.next();
    }
}

整体流程

  1. newContext 方法被调用:

    • 传入的 EventExecutorGroupnamehandler 作为参数。

  2. 调用 childExecutor

    • 判断 group 是否为空,决定使用哪个 EventExecutor

  3. 创建 DefaultChannelHandlerContext

    • 使用选定的 EventExecutor 创建新的 DefaultChannelHandlerContext

  4. 返回新创建的上下文

    • 新创建的上下文会包含对 ChannelHandler 的引用,并负责在管道中传播事件。

作用与意义

通过 childExecutor 的调用,Netty 可以灵活地为不同的 ChannelHandler 分配执行线程。这样可以确保每个处理器在合适的执行环境中运行,避免线程安全问题,并提升系统的并发处理能力。

6. addLast0

#addLast0(AbstractChannelHandlerContext newCtx) 方法,添加到最后一个节点。注意,实际上,是添加到 tail 节点之前。代码如下:

private void addLast0(AbstractChannelHandlerContext newCtx) {
    // 获得 tail 节点的前一个节点
    AbstractChannelHandlerContext prev = tail.prev;
    // 新节点,指向 prev 和 tail 节点
    newCtx.prev = prev; // <1>
    newCtx.next = tail; // <2>
    // 在 prev 和 tail ,指向新节点
    prev.next = newCtx; // <3>
    tail.prev = newCtx; // <4>
}

FROM 闪电侠 《Netty 源码分析之pipeline(一)》

  • 用下面这幅图可见简单的表示这段过程,说白了,其实就是一个双向链表的插入操作:

    添加节点过程添加节点过程

  • 操作完毕,该节点就加入到 pipeline 中:

    添加节点之后

在 Netty 中,ChannelPipeline 内部维护了一个双向链表结构来存储所有的 ChannelHandler。当您向 ChannelPipeline 添加一个新的 ChannelHandler 时,实际上是通过双向链表来组织这些处理器的顺序。

您提到的 addLast0 方法是一个内部方法,用于将一个新的 ChannelHandlerContext 添加到链表的末尾,但在此之前的位置。下面是对您提供的代码的详细解释:

private void addLast0(AbstractChannelHandlerContext newCtx) {
    // 获得 tail 节点的前一个节点
    AbstractChannelHandlerContext prev = tail.prev;
    // 新节点,指向 prev 和 tail 节点
    newCtx.prev = prev; // <1>
    newCtx.next = tail; // <2>
    // 在 prev 和 tail ,指向新节点
    prev.next = newCtx; // <3>
    tail.prev = newCtx; // <4>
}

代码解析

  1. 获取前一个节点:

    • AbstractChannelHandlerContext prev = tail.prev; 获取链表中最后一个有效节点(即 tail 节点的前一个节点)。

  2. 更新新节点的指针:

    • newCtx.prev = prev; 设置新节点的 prev 指针指向当前链表的最后一个有效节点。

    • newCtx.next = tail; 设置新节点的 next 指针指向 tail 节点。

  3. 更新前一个节点的指针:

    • prev.next = newCtx; 更新前一个节点的 next 指针,使其指向新节点。

  4. 更新 tail 节点的指针:

    • tail.prev = newCtx; 更新 tail 节点的 prev 指针,使其指向新节点。

通过这样的方式,newCtx 被插入到了链表的末尾,但在 tail 节点之前。这样做的目的是为了保持 tail 节点作为一个特殊的占位符,始终位于链表的末尾,以便于管理和遍历链表。

示例说明

假设链表初始状态如下:

head <-> ctx1 <-> ctx2 <-> ... <-> ctxN <-> tail

当我们调用 addLast0 方法添加一个新的上下文 newCtx 时,链表的状态将变为:

head <-> ctx1 <-> ctx2 <-> ... <-> ctxN <-> newCtx <-> tail

其中 ctxN 是原先链表的最后一个有效节点。

如果您有任何疑问或者需要进一步的解释,请随时提问。

ChannelPipeline 中,addLast0(AbstractChannelHandlerContext newCtx) 方法用于将新的 ChannelHandlerContext 添加到 ChannelPipeline 的末尾,但并非真正的最后一个节点,而是添加在 tail 节点之前。tail 节点是一个哨兵节点,用于表示管道的末端,并且不处理任何事件或数据。

方法分析

private void addLast0(AbstractChannelHandlerContext newCtx) {
    // 获得 tail 节点的前一个节点
    AbstractChannelHandlerContext prev = tail.prev;
​
    // 新节点,指向 prev 和 tail 节点
    newCtx.prev = prev; // <1>
    newCtx.next = tail; // <2>
​
    // 在 prev 和 tail ,指向新节点
    prev.next = newCtx; // <3>
    tail.prev = newCtx; // <4>
}

详细解读

  1. AbstractChannelHandlerContext prev = tail.prev;:

    • 先获取 tail 节点之前的节点 prev,也就是当前管道中真正的最后一个处理器节点。

  2. newCtx.prev = prev; (<1>):

    • 将新节点 newCtxprev 指针指向之前的 prev 节点。

  3. newCtx.next = tail; (<2>):

    • 将新节点 newCtxnext 指针指向 tail 节点。

  4. prev.next = newCtx; (<3>):

    • 将之前的最后一个节点 prevnext 指针指向新节点 newCtx,将其连接到新节点上。

  5. tail.prev = newCtx; (<4>):

    • 最后,将 tail 节点的 prev 指针指向新节点 newCtx,更新链表末端的连接。

作用与意义

  • 链表操作:

    • 通过上述四步操作,新的 ChannelHandlerContext 被插入到 tail 节点之前,使得它成为新的最后一个有效节点。

  • 维护双向链表:

    • ChannelPipeline 维护的是一个双向链表,通过 prevnext 指针,可以在链表中正向或反向遍历节点,从而处理事件流。

  • 线程安全性:

    • 这些操作通常会在同步块中执行,以保证多线程环境下链表结构的安全性。

这一方法的核心在于维护 ChannelPipeline 的链表结构,确保新添加的 ChannelHandler 能够正确地插入到事件处理流程中。

7. callHandlerAdded0

#callHandlerAdded0(AbstractChannelHandlerContext) 方法,执行回调 ChannelHandler 添加完成( added )事件。代码如下:

 1: private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
 2:     try {
 3:         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
 4:         // any pipeline events ctx.handler() will miss them because the state will not allow it.
 5:         // 设置 AbstractChannelHandlerContext 已添加
 6:         ctx.setAddComplete();
 7:         // 回调 ChannelHandler 添加完成( added )事件
 8:         ctx.handler().handlerAdded(ctx);
 9:     } catch (Throwable t) {
10:         // 发生异常,移除该节点
11:         boolean removed = false;
12:         try {
13:             remove0(ctx); // 移除
14:             try {
15:                 ctx.handler().handlerRemoved(ctx); // 回调 ChannelHandler 移除完成( removed )事件
16:             } finally {
17:                 ctx.setRemoved(); // 标记节点已移除
18:             }
19:             removed = true; // 标记移除成功
20:         } catch (Throwable t2) {
21:             if (logger.isWarnEnabled()) {
22:                 logger.warn("Failed to remove a handler: " + ctx.name(), t2);
23:             }
24:         }
25: 
26:         // 触发异常的传播
27:         if (removed) {
28:             fireExceptionCaught(new ChannelPipelineException(
29:                     ctx.handler().getClass().getName() +
30:                     ".handlerAdded() has thrown an exception; removed.", t));
31:         } else {
32:             fireExceptionCaught(new ChannelPipelineException(
33:                     ctx.handler().getClass().getName() +
34:                     ".handlerAdded() has thrown an exception; also failed to remove.", t));
35:         }
36:     }
37: }
  • 第 6 行:调用 AbstractChannelHandlerContext#setAddComplete() 方法,设置 AbstractChannelHandlerContext 已添加。

  • 第 8 行:调用 ChannelHandler#handlerAdded(AbstractChannelHandlerContext) 方法,回调 ChannelHandler 添加完成( added )事件。一般来说,通过这个方法,来初始化 ChannelHandler 。注意,因为这个方法的执行在 EventLoop 的线程中,所以要尽量避免执行时间过长。

  • 第 9 行:发生异常。

    • 第 10 至 24 行:移除该节点( ChannelHandler )。详细解析,见 《Netty 源码解析 —— ChannelPipeline(三)之移除 ChannelHandler》 。

      • 😈 所以,ChannelHandler#handlerAdded(AbstractChannelHandlerContext) 方法的执行异常时,将被移除。

    • 第 26 至 35 行:触发异常的传播。详细解析,见 《Netty 源码解析 —— ChannelPipeline(六)之异常事件的传播》 。

在 Netty 中,ChannelHandlerContext 提供了生命周期回调方法,其中一个重要的方法是 handlerAdded0。当一个 ChannelHandler 被添加到 ChannelPipeline 时,会调用此方法。下面是关于 callHandlerAdded0 方法的详细解释:

callHandlerAdded0 方法

callHandlerAdded0 方法通常是在 ChannelHandlerContext 的内部实现中调用的,它的主要目的是通知 ChannelHandler 它已经被添加到 ChannelPipeline 中。这使得 ChannelHandler 可以在被添加后执行一些初始化操作。

方法签名

callHandlerAdded0 方法的签名大致如下(请注意,这是简化后的示例,实际源码可能略有不同):

void callHandlerAdded0() {
    final ChannelHandler handler = this.handler;
    if (handler != null) {
        handler.handlerAdded(this);
    }
}

工作原理

当一个 ChannelHandler 被添加到 ChannelPipeline 中时,Netty 会调用 ChannelHandlerContextcallHandlerAdded0 方法。如果 ChannelHandlerContexthandler 字段不为空,则调用 handler.handlerAdded(ChannelHandlerContext) 方法。

使用示例

下面是一个简单的示例,展示了如何在自定义的 ChannelHandler 中重写 handlerAdded 方法:

public class MyChannelHandler extends ChannelInboundHandlerAdapter {
​
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Handler added to the pipeline: " + ctx.name());
        // 这里可以进行一些初始化操作
    }
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 处理接收到的消息
    }
}

MyChannelHandler 被添加到 ChannelPipeline 时,handlerAdded 方法将会被调用。

如何触发 callHandlerAdded0

callHandlerAdded0 通常在 ChannelPipelineaddLastaddFirst 方法被调用后触发。例如:

ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("myHandler", new MyChannelHandler());

在这个例子中,MyChannelHandler 被添加到 ChannelPipeline 后,callHandlerAdded0 方法会在内部被调用。

总结

  • callHandlerAdded0 方法是在 ChannelHandlerContext 中调用的,用于通知 ChannelHandler 已经被添加到 ChannelPipeline 中。

  • ChannelHandler 被添加后,可以通过重写 handlerAdded 方法来执行初始化逻辑。

  • 此方法通常是在 ChannelPipelineaddLastaddFirst 方法被调用后触发的。

如果您有任何疑问或需要进一步的帮助,请随时告诉我。

callHandlerAdded0 方法在 Netty 中用于通知 ChannelHandler 它已被添加到 ChannelPipeline 中。这是 ChannelPipeline 管理 ChannelHandler 生命周期的关键部分之一。

方法背景

在 Netty 中,每个 ChannelHandler 在被添加到 ChannelPipeline 时,都会经历一系列的回调操作,这些回调是由 callHandlerAdded0callHandlerRemoved0 方法触发的。callHandlerAdded0 方法是在 ChannelHandler 被添加到管道中后立即调用的,用于执行 ChannelHandler 的初始化逻辑。

方法签名

private void callHandlerAdded0(AbstractChannelHandlerContext ctx) {
    try {
        // 调用 ChannelHandler 的 handlerAdded 方法
        ctx.handler().handlerAdded(ctx);
        // 设置上下文为已添加状态
        ctx.setAddComplete();
    } catch (Throwable t) {
        // 如果添加过程中发生异常,调用异常处理逻辑
        boolean removed = false;
        try {
            remove(ctx);
            removed = true;
        } catch (Throwable t2) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to remove a handler: " + ctx.name(), t2);
            }
        }
        
        if (removed) {
            // 如果成功移除,则触发异常处理
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", t));
        } else {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", t));
        }
    }
}

详细解读

  1. ctx.handler().handlerAdded(ctx);:

    • 调用当前 ChannelHandlerhandlerAdded 方法,传递当前的上下文 ctx

    • 这是 ChannelHandler 的生命周期方法之一,用于在处理器被添加到 ChannelPipeline 后进行初始化。

    • 不同的 ChannelHandler 可能会在这个方法中执行不同的初始化逻辑,例如分配资源、设置初始状态等。

  2. ctx.setAddComplete();:

    • 标记该 ChannelHandlerContext 已完成添加操作。这个方法通常是为了设置一些状态,确保后续操作可以正确执行。

  3. 异常处理:

    • handlerAdded 方法中,如果发生异常,代码会尝试将该 ChannelHandlerChannelPipeline 中移除,防止系统进入不一致的状态。

    • 如果移除操作成功,则抛出一个 ChannelPipelineException,并在管道中传播该异常。

    • 如果移除操作失败,则会记录日志并抛出更严重的异常。

作用与意义

  • 生命周期管理:

    • callHandlerAdded0ChannelHandler 生命周期管理的重要一环,通过这个方法,可以确保每个 ChannelHandler 在被添加到 ChannelPipeline 后得到正确的初始化。

  • 异常处理:

    • 该方法中对异常的处理逻辑确保了在 ChannelHandler 初始化失败时,可以安全地将其移除,防止管道进入不可预期的状态。

  • 上下文状态:

    • 通过 ctx.setAddComplete(),确保上下文的状态是与处理器的实际情况一致的,从而避免后续操作出现问题。

这个方法在 Netty 的 ChannelPipeline 中起到了保证 ChannelHandler 添加过程的完整性和安全性的作用,使得整个管道的处理逻辑更加健壮。

为什么要回调

在 Netty 中,回调机制是框架设计的一部分,尤其是在 ChannelPipeline 中通过回调来管理 ChannelHandler 的生命周期和处理各种网络事件。回调的使用有几个关键原因:

1. 解耦与灵活性

  • 解耦:通过回调,Netty 可以将事件处理逻辑与事件触发机制解耦。ChannelPipeline 只负责事件的传播,而具体的处理逻辑则由各个 ChannelHandler 自己定义。这样一来,ChannelPipeline 可以非常灵活地处理各种不同类型的 ChannelHandler,而不需要了解它们的内部实现。

  • 灵活性:开发者可以通过实现不同的 ChannelHandler 接口来定制自己的处理逻辑,并将其添加到 ChannelPipeline 中。Netty 通过回调机制自动触发这些处理逻辑,而不需要硬编码逻辑在核心框架中。

2. 生命周期管理

  • 初始化与清理:每个 ChannelHandler 在添加到 ChannelPipeline 时,Netty 会回调 handlerAdded 方法。这给了 ChannelHandler 一个机会去执行一些初始化操作,例如分配资源、配置状态等。同样地,当 ChannelHandler 被移除时,Netty 会回调 handlerRemoved 方法,用于资源清理和状态重置。

  • 安全性:通过这些生命周期回调,Netty 可以确保 ChannelHandler 在整个生命周期内都能正确管理自己的状态和资源,减少内存泄漏或状态不一致的风险。

3. 异步与并发模型的支持

  • 异步处理:Netty 是一个高度异步的框架,回调机制非常适合这种异步模型。通过回调,Netty 可以在事件发生时通知相关的处理器,并在事件处理完成后继续执行其他任务,而不需要阻塞当前线程。

  • 并发控制:回调机制还可以与 EventLoopEventExecutor 一起使用,以确保回调是在正确的线程中执行。这对于并发处理和避免线程安全问题至关重要。

4. 事件驱动编程模型

  • 响应特定事件:在网络编程中,不同的事件需要不同的处理逻辑。例如,连接建立、消息接收、连接断开等事件都需要不同的处理方式。通过回调,Netty 可以在这些事件发生时通知相应的 ChannelHandler,使其可以对特定事件做出反应。

  • 简化逻辑:回调机制使得事件驱动编程变得简单和直观,开发者只需要专注于事件的处理逻辑,而不必关心事件的触发和管理。

5. 扩展性

  • 插拔式设计:通过回调机制,Netty 的 ChannelPipeline 支持插拔式设计,开发者可以随时添加或移除 ChannelHandler,以调整处理逻辑。这种扩展性使得 Netty 能够适应各种不同的应用场景,从简单的协议处理到复杂的多协议栈处理。

总结

回调机制在 Netty 中的应用,主要是为了提供解耦、灵活性、生命周期管理、异步处理支持,以及简化事件驱动编程模型。它使得 Netty 在设计上更加模块化、可扩展,并且能够轻松处理复杂的网络协议和应用场景。

在 Netty 中,回调机制(如 handlerAdded, handlerRemoved, exceptionCaught 等方法)是非常重要的设计模式之一,它允许开发者在特定的时间点执行自定义的逻辑。下面详细解释为什么需要回调以及回调的好处:

1. 生命周期管理

  • 初始化和清理:

    • handlerAdded: 当一个 ChannelHandler 被添加到 ChannelPipeline 中时,handlerAdded 方法会被调用。这为开发者提供了一个机会来执行初始化操作,比如设置内部状态、注册定时器等。

    • handlerRemoved: 相反地,当一个 ChannelHandlerChannelPipeline 中被移除时,handlerRemoved 方法会被调用。这可以用来释放资源、取消定时器等。

2. 错误处理

  • 异常捕获:

    • exceptionCaught: 当在处理事件时发生异常时,exceptionCaught 方法会被调用。这允许开发者记录错误、关闭连接或者采取其他适当的措施来处理异常情况。

3. 状态同步

  • 状态检查:

    • channelActivechannelInactive: 这些方法分别在通道变得活跃(即连接建立成功)和通道变得不活跃(即连接关闭)时被调用。这对于同步状态非常有用,例如,在通道变得活跃时开始接收数据,或者在通道变得不活跃时释放资源。

4. 自定义逻辑

  • 灵活扩展:

    • 回调方法允许开发者根据自己的需求扩展 Netty 的行为,而不必修改框架本身的代码。这提高了系统的灵活性和可扩展性。

5. 事件驱动编程

  • 非阻塞式编程模型:

    • Netty 是基于事件驱动的非阻塞式编程模型构建的。回调机制是这种编程模型的核心,因为它允许程序在特定事件发生时执行代码,而无需主动轮询或等待。

6. 异步操作

  • 异步处理:

    • 通过回调,开发者可以编写异步代码,这意味着处理可以在事件发生时立即开始,而不是等待某个操作完成。

示例

下面是一个简单的示例,展示如何使用回调来处理通道激活和异常捕获:

public class MyChannelHandler extends ChannelInboundHandlerAdapter {
​
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel is active.");
        // 这里可以执行初始化操作
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Exception caught: " + cause.getMessage());
        // 这里可以记录异常、关闭连接等
        cause.printStackTrace();
        ctx.close();
    }
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 处理接收到的消息
    }
}

总结

回调机制是 Netty 中非常重要的一部分,它使得开发者能够轻松地控制和扩展框架的行为。通过使用回调,您可以编写更加灵活、高效和可靠的网络应用程序。如果您有任何具体的问题或需要更深入的解释,请随时提问。

当然可以。让我们更详细地讨论 ChannelHandler 的生命周期管理,特别是 handlerAddedhandlerRemoved 方法。

生命周期管理

1. 初始化和清理

  • handlerAdded 方法:

    • 何时调用:

      • 当一个 ChannelHandler 被添加到 ChannelPipeline 中时,handlerAdded 方法会被调用。

    • 用途:

      • 这个方法提供了一个机会来执行初始化操作。

      • 开发者可以利用这个方法来设置 ChannelHandler 的内部状态,例如初始化计数器、设置标志、注册定时器等。

      • 这也是配置任何依赖于 ChannelHandlerContext 的资源的好时机,例如订阅事件或启动后台任务。

    • 示例:

      public class MyChannelHandler extends ChannelInboundHandlerAdapter {
          private ScheduledFuture<?> scheduledFuture;
      ​
          @Override
          public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
              System.out.println("Handler added to the pipeline: " + ctx.name());
      ​
              // 注册一个定时器,每隔一段时间执行某个任务
              scheduledFuture = ctx.executor().scheduleAtFixedRate(
                      new Runnable() {
                          @Override
                          public void run() {
                              // 执行任务
                          }
                      }, 0, 10, TimeUnit.SECONDS);
          }
      ​
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              // 处理接收到的消息
          }
      }
  • handlerRemoved 方法:

    • 何时调用:

      • 当一个 ChannelHandlerChannelPipeline 中被移除时,handlerRemoved 方法会被调用。

    • 用途:

      • 这个方法提供了一个机会来执行清理操作。

      • 开发者可以利用这个方法来释放任何在 handlerAdded 方法中创建的资源,例如取消定时器、关闭文件句柄等。

    • 示例:

      @Override
      public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
          System.out.println("Handler removed from the pipeline: " + ctx.name());
      ​
          // 取消定时器
          if (scheduledFuture != null) {
              scheduledFuture.cancel(false);
          }
      }

总结

  • handlerAdded:

    • ChannelHandler 被添加到 ChannelPipeline 时调用。

    • 用于执行初始化操作。

  • handlerRemoved:

    • ChannelHandlerChannelPipeline 中被移除时调用。

    • 用于执行清理操作。

通过这些方法,您可以确保您的 ChannelHandler 在整个生命周期内正确地管理资源,从而提高应用程序的健壮性和性能。如果您有任何进一步的问题或需要具体的代码示例,请随时提问。

8. PendingHandlerCallback

PendingHandlerCallback ,实现 Runnable 接口,等待添加 ChannelHandler 回调抽象类。代码如下:

PendingHandlerCallback 是 DefaultChannelPipeline 的内部静态类。

private abstract static class PendingHandlerCallback implements Runnable {

    /**
     * AbstractChannelHandlerContext 节点
     */
    final AbstractChannelHandlerContext ctx;
    /**
     * 下一个回调 PendingHandlerCallback 对象
     */
    PendingHandlerCallback next;

    PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    /**
     * 执行方法
     */
    abstract void execute();

}
  • 通过 ctxnext 字段,形成回调链

  • #execute() 抽象方法,通过实现它,执行回调逻辑。


PendingHandlerCallback 是 Netty 内部的一个类,用于在 ChannelPipeline 中管理 ChannelHandler 的添加和移除操作,尤其是在需要异步处理这些操作时。它的设计旨在确保 ChannelHandler 的添加和移除过程可以在正确的线程中执行,并且顺序不会出错。

背景

在 Netty 中,ChannelPipeline 是一个双向链表,负责管理 ChannelHandler 的链式调用。由于 Netty 是一个异步框架,ChannelHandler 的添加和移除可能并不会立即生效,尤其是在多线程环境下。为了解决这个问题,Netty 引入了 PendingHandlerCallback,用于管理这些可能被推迟的操作。

PendingHandlerCallback

PendingHandlerCallback 是一个抽象类,其设计目的是为 ChannelHandler 的添加和移除提供一种延迟执行的机制。当需要将一个 ChannelHandler 添加到 ChannelPipeline 中,或者从中移除时,如果当前无法立即执行这些操作,就会将它们包装成 PendingHandlerCallback,然后稍后在合适的时机执行。

核心代码与实现

abstract class PendingHandlerCallback {
    final AbstractChannelHandlerContext ctx;
    PendingHandlerCallback next;
​
    PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
        this.ctx = ctx;
    }
​
    abstract void execute();
}

属性

  • ctx: 持有当前 ChannelHandlerContext 的引用,用于执行实际的添加或移除操作。

  • next: 指向下一个 PendingHandlerCallback,形成一个链表结构,这样多个待执行的回调可以依次进行。

方法

  • execute(): 抽象方法,需要在子类中实现,表示具体的操作逻辑。根据子类的不同,这个方法会执行具体的 ChannelHandler 添加或移除操作。

常见子类

PendingHandlerCallback 有两个常见的子类,分别用于处理 ChannelHandler 的添加和移除:

  1. PendingHandlerAddedTask:

    • 这个子类用于延迟执行 ChannelHandler 的添加操作。

    • 在调用 execute() 方法时,它会触发 ChannelHandlerhandlerAdded() 方法,完成 ChannelHandler 的添加过程。

  2. PendingHandlerRemovedTask:

    • 这个子类用于延迟执行 ChannelHandler 的移除操作。

    • 在调用 execute() 方法时,它会触发 ChannelHandlerhandlerRemoved() 方法,完成 ChannelHandler 的移除过程。

使用场景

  • 异步操作: 如果 ChannelPipeline 在某个非 EventLoop 线程中执行添加或移除操作时,需要将这些操作推迟到 EventLoop 线程中执行,以确保线程安全。在这种情况下,操作会被封装成 PendingHandlerCallback,然后在合适的时机被执行。

  • 事件顺序: 由于 Netty 是异步事件驱动的,ChannelHandler 的添加和移除操作必须保证事件顺序不被打乱。PendingHandlerCallback 通过链表结构来确保这些操作按顺序执行。

作用与意义

  • 线程安全: 通过 PendingHandlerCallback,Netty 可以保证 ChannelHandler 的添加和移除操作在正确的线程中进行,避免了多线程操作带来的不确定性。

  • 顺序执行: 它确保了 ChannelHandler 的生命周期事件(如 handlerAddedhandlerRemoved)按正确的顺序执行,防止事件错乱。

  • 简化开发: 对于开发者而言,PendingHandlerCallback 的引入简化了在多线程环境下管理 ChannelPipeline 的复杂性,使得添加和移除 ChannelHandler 变得更安全和可靠。

总结

PendingHandlerCallback 是 Netty 中用于延迟执行 ChannelHandler 添加和移除操作的重要机制。它通过保证这些操作的线程安全和顺序执行,为 ChannelPipeline 的稳定性提供了基础支持。

"Pending" 是一个英语词汇,意思是“待处理的”或“未决的”。在计算机科学和软件工程中,"pending" 常用来描述那些已经被触发但尚未完成或尚未执行的任务、操作或事件。这个词表示某个动作或操作已经被安排或请求,但还没有实际执行或完成。

PendingHandlerCallback 中的含义

在 Netty 框架中,PendingHandlerCallback 类中的 "Pending" 表示该回调操作(如添加或移除 ChannelHandler)已经被请求,但由于某些原因,它还未立即执行,需要等待一个合适的时机(通常是在适当的线程环境中)再去执行。这种设计能够确保操作顺序的正确性和线程安全性。

一些例子

  1. Pending Task:

    • 一个“待处理任务”可能是已经被加入到任务队列中的操作,但还没有被调度器执行。例如,在 GUI 程序中,一个点击事件可能会将某个任务加入到事件队列中,任务状态就是 "pending",直到该任务被执行。

  2. Pending Request:

    • 在网络通信中,一个“待处理请求”可能是一个已经发送到服务器但尚未得到响应的 HTTP 请求。在这个状态下,请求是 "pending" 的,等待服务器的处理。

  3. Pending Approval:

    • 在审批流程中,一个“待审批”项可能是已经提交但尚未被审核和批准的文档或请求。

总结

"Pending" 强调的是某个动作或事件处于等待执行的状态。它反映了一个已经发生的触发或请求,但由于需要等待某个条件满足或者等待被调度,因此还没有实际执行。

为什么会有 PendingHandlerCallback 呢

因为 ChannelHandler 添加到 pipeline 中,会触发 ChannelHandler 的添加完成( added )事件,并且该事件需要在 Channel 所属的 EventLoop 中执行。

但是 Channel 并未注册在 EventLoop 上时,需要暂时将“触发 ChannelHandler 的添加完成( added )事件”的逻辑,作为一个 PendingHandlerCallback 进行“缓存”。在 Channel 注册到 EventLoop 上时,进行回调执行。


PendingHandlerCallback 有两个实现类:

  • PendingHandlerAddedTask

  • PendingHandlerRemovedTask

本文只分享 PendingHandlerAddedTask 的代码实现。

PendingHandlerCallback 的存在,主要是为了处理在 ChannelHandler 被添加或移除时,可能由于 Channel 尚未注册到 EventLoop 上,导致这些操作无法立即执行的情况。

设计背景

在 Netty 中,ChannelHandler 的添加和移除操作必须在 Channel 所属的 EventLoop 线程中执行,因为这些操作可能会影响事件的处理顺序和线程安全性。然而,有些情况下,Channel 还未注册到 EventLoop,这时就需要一种机制来“缓存”这些操作,等待 Channel 注册完成后再执行。PendingHandlerCallback 就是为了解决这个问题而设计的。

两个实现类

  1. PendingHandlerAddedTask:

    • 这个类用于处理 ChannelHandler 的添加操作。当 ChannelHandler 需要添加到 ChannelPipeline 中,但 Channel 还未注册到 EventLoop 上时,PendingHandlerAddedTask 会将这次添加操作缓存起来,直到 Channel 注册完成后再执行 handlerAdded() 方法,通知 ChannelHandler 已经成功添加。

  2. PendingHandlerRemovedTask:

    • 这个类则用于处理 ChannelHandler 的移除操作。如果在 ChannelHandler 需要从 ChannelPipeline 中移除时,Channel 尚未注册到 EventLoopPendingHandlerRemovedTask 会将移除操作缓存起来,待 Channel 注册到 EventLoop 后,再执行 handlerRemoved() 方法,完成 ChannelHandler 的移除。

关键点总结

  • 确保线程安全: Netty 通过 PendingHandlerCallback 确保 ChannelHandler 的添加和移除操作都在 EventLoop 线程中执行,从而避免多线程环境下的线程安全问题。

  • 保证事件顺序: 通过延迟执行,PendingHandlerCallback 确保 ChannelPipeline 中的事件处理顺序不会被打乱,保持整个网络事件处理的逻辑一致性。

  • 缓存机制: 当 Channel 尚未准备好执行这些操作时,PendingHandlerCallback 提供了一种缓存机制,等待条件满足时再执行操作。

这个设计使得 Netty 在处理复杂的异步事件时,能够保持高效和安全。

8.1 PendingHandlerAddedTask

PendingHandlerAddedTask 实现 PendingHandlerCallback 抽象类,用于回调添加 ChannelHandler 节点。代码如下:

private final class PendingHandlerAddedTask extends PendingHandlerCallback {

    PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    public void run() {
        callHandlerAdded0(ctx);
    }

    @Override
    void execute() {
        EventExecutor executor = ctx.executor();
        // 在 EventLoop 的线程中,回调 ChannelHandler added 事件
        if (executor.inEventLoop()) {
            callHandlerAdded0(ctx);
        } else {
            // 提交 EventLoop 中,执行回调 ChannelHandler added 事件
            try {
                executor.execute(this); // <1>
            } catch (RejectedExecutionException e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                            executor, ctx.name(), e);
                }
                // 发生异常,进行移除
                remove0(ctx);
                // 标记 AbstractChannelHandlerContext 为已移除
                ctx.setRemoved();
            }
        }
    }
    
}
  • #execute() 实现方法中,我们可以看到,和 #addLast(EventExecutorGroup group, String name, ChannelHandler handler) 方法的【第 28 至 45 行】的代码比较类似,目的是,在 EventLoop 的线程中,执行 #callHandlerAdded0(AbstractChannelHandlerContext) 方法,回调 ChannelHandler 添加完成( added )事件。

  • <1> 处,为什么 PendingHandlerAddedTask 可以直接提交到 EventLoop 中呢?因为 PendingHandlerAddedTask 是个 Runnable ,这也就是为什么 PendingHandlerCallback 实现 Runnable 接口的原因。

下面开始分享的方法,属于 DefaultChannelPipeline 类。

8.2 callHandlerCallbackLater

#callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) 方法,添加 PendingHandlerCallback 回调。代码如下:

/**
 * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
 * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
 *
 * We only keep the head because it is expected that the list is used infrequently and its size is small.
 * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
 * complexity.
 *
 * 准备添加 ChannelHandler 的回调
 *
 * @see #callHandlerCallbackLater(AbstractChannelHandlerContext, boolean)
 */
private PendingHandlerCallback pendingHandlerCallbackHead;
    
  1: private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
  2:     assert !registered;
  3: 
  4:     // 创建 PendingHandlerCallback 对象
  5:     PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
  6:     PendingHandlerCallback pending = pendingHandlerCallbackHead;
  7:     // 若原 pendingHandlerCallbackHead 不存在,则赋值给它
  8:     if (pending == null) {
  9:         pendingHandlerCallbackHead = task;
 10:     // 若原 pendingHandlerCallbackHead 已存在,则最后一个回调指向新创建的回调
 11:     } else {
 12:         // Find the tail of the linked-list.
 13:         while (pending.next != null) {
 14:             pending = pending.next;
 15:         }
 16:         pending.next = task;
 17:     }
 18: }
  • added 方法参数,表示是否是添加 ChannelHandler 的回调。所以在【第 5 行】的代码,根据 added 是否为 true ,创建 PendingHandlerAddedTask 或 PendingHandlerRemovedTask 对象。在本文中,当然创建的是 PendingHandlerAddedTask 。

  • 第 7 至 17 行:将创建的 PendingHandlerCallback 对象,“添加”到 pendingHandlerCallbackHead 中。

8.3 invokeHandlerAddedIfNeeded

#invokeHandlerAddedIfNeeded() 方法,执行在 PendingHandlerCallback 中的 ChannelHandler 添加完成( added )事件。它被两个方法所调用:

  • AbstractUnsafe#register0(ChannelPromise promise) 方法

    • 原因是:

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
  • 例如 ServerBootstrap 通过 ChannelInitializer 注册自定义的 ChannelHandler 到 pipeline 上的情况。

  • 调用栈如下图

HeadContext#channelRegistered(ChannelHandlerContext ctx) 方法。

  • 笔者调试下来,对于 Netty NIO Server 和 NIO Client 貌似没啥作用,因为已经在 AbstractUnsafe#register0(ChannelPromise promise) 中触发。胖友也可以自己调试下。

  • 调用栈如下图

#invokeHandlerAddedIfNeeded() 方法,代码如下:

/**
 * 是否首次注册
 *
 * {@link #invokeHandlerAddedIfNeeded()}
 */
private boolean firstRegistration = true;

final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop(); // 必须在 EventLoop 的线程中
    // 仅有首次注册有效 <1>
    if (firstRegistration) {
        // 标记非首次注册
        firstRegistration = false;
        
        // 执行在 PendingHandlerCallback 中的 ChannelHandler 添加完成( added )事件 // <2>
        // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
        // that were added before the registration was done.
        callHandlerAddedForAllHandlers();
    }
}
  • <1> 处,仅有首次注册有效( firstRegistration = true ) 时。而后,标记 firstRegistration = false

    • 这也就是笔者为什么说,HeadContext#channelRegistered(ChannelHandlerContext ctx) 方法对这个方法的调用,是没有效果的。

  • <2> 处,调用 #callHandlerAddedForAllHandlers() 方法,执行在 PendingHandlerCallback 中的 ChannelHandler 添加完成( added )事件。代码如下:

 1: private void callHandlerAddedForAllHandlers() {
 2:     final PendingHandlerCallback pendingHandlerCallbackHead;
 3:     // 获得 pendingHandlerCallbackHead
 4:     synchronized (this) {
 5:         assert !registered;
 6: 
 7:         // This Channel itself was registered.
 8:         registered = true; // 标记已注册
 9: 
10:         pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
11:         // Null out so it can be GC'ed.
12:         this.pendingHandlerCallbackHead = null; // 置空,help gc
13:     }
14: 
15:     // 顺序向下,执行 PendingHandlerCallback 的回调
16:     // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
17:     // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
18:     // the EventLoop.
19:     PendingHandlerCallback task = pendingHandlerCallbackHead;
20:     while (task != null) {
21:         task.execute();
22:         task = task.next;
23:     }
24: }
  • 第 3 至 13 行:获得 pendingHandlerCallbackHead 变量。

    • 第 8 行:标记 registered = true ,表示已注册。

    • 第 10 至 12 行:置空对象的 pendingHandlerCallbackHead 属性,help GC 。

    • 使用 synchronized 的原因,和 #addLast(EventExecutorGroup group, String name, ChannelHandler handler) 的【第 16 至 26 行】的代码需要对 pendingHandlerCallbackHead 互斥,避免并发修改的问题。

  • 第 15 至 23 行:顺序循环向下,调用 PendingHandlerCallback#execute() 方法,执行 PendingHandlerCallback 的回调,从而将 ChannelHandler 添加到 pipeline 中。

    • 这里不适用 synchronized 的原因,看英文注释哈。

添加 ChannelHandler 到 pipeline 中的代码,大部分的比较简单。比较复杂的可能是,「8. PendingHandlerCallback」 中,调用的过程涉及回调,所以理解上稍微可能困难。胖友可以多多调试进行解决噢。

推荐阅读文章:

  • 闪电侠 《Netty 源码分析之 pipeline(一)》

  • Hypercube 《自顶向下深入分析 Netty(七)–ChannelPipeline 源码实现》