1. 首页
  2. Netty源码解析
  3. Netty源码分析之 EventLoop续

Netty源码分析之 EventLoop续

  • 发布于 2024-08-22
  • 16 次阅读

Netty 源码解析 —— EventLoop(四)之 EventLoop 运行

本文详细解析了 Netty 中 NioEventLoop 的运行原理,重点介绍了 NioEventLoop#run() 方法的实现,包括 SelectStrategy 的选择、Selector 的 select 操作以及任务的处理等关键步骤,并分析了 NIO Selector 空轮询的 Bug 和解决方法。

NioEventLoop 是 Netty 中用于处理 IO 事件和用户任务的单线程事件循环实现。

NioEventLoop 的 run() 方法通过死循环处理各种任务,包括处理 Channel 感兴趣的就绪 IO 事件、运行普通任务和定时任务等。

NioEventLoop 使用 SelectStrategy 来控制 select 操作的策略,默认情况下会根据当前是否有任务来决定是否阻塞 select。

NioEventLoop 为了解决 NIO Selector 空轮询的 Bug,采用了重建 Selector 对象的方式来避免 CPU 100% 的问题。

NioEventLoop 的 wakeup 标记用于减少 Selector#wakeup() 方法的调用次数,避免不必要的唤醒操作。

NioEventLoop 通过 mpsc 队列来存储任务,可以保证多线程生产任务和单线程消费任务的效率。

NioEventLoop 通过 ioRatio 属性来控制 IO 事件处理时间在总时间中的占比。

好的,我将为您概述 Netty 中与 Selector 相关的关键组件,并逐一解释它们的作用和实现细节。

1. 概述

Netty 是一个高性能的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。在处理 I/O 操作时,Netty 使用了 NIO(New I/O)技术来提高性能。Netty 的 NioEventLoop 类负责处理 I/O 事件,其中包括了 Selector 的管理和优化。

2. SelectorTuple

SelectorTupleNioEventLoop 中的一个私有静态内部类,用于封装一个或两个 Selector 对象。它可以包含一个原始的 Selector 对象 (unwrappedSelector) 和一个可能经过优化的 Selector 对象 (selector)。当 Selector 需要优化时,这两个字段可以指向不同的对象;否则,它们指向同一个对象。

3. openSelector

openSelector 方法是 NioEventLoop 中的一个私有方法,用于创建一个新的 Selector 对象,并对其进行可能的优化。如果不需要优化,它会直接返回一个包含原始 SelectorSelectorTuple 对象。如果需要优化,它会尝试替换 Selector 内部使用的 Set,以便提高性能。

4. SelectedSelectionKeySet

SelectedSelectionKeySet 是一个继承自 AbstractSet 的内部类,用于存储已就绪的 SelectionKey。它使用一个 SelectionKey 数组和一个计数器 size 来跟踪已就绪的 SelectionKey。这种方法相对于标准的 HashSet 更加高效,因为它减少了内存消耗并降低了 add 操作的时间复杂度。

5. SelectedSelectionKeySetSelector

SelectedSelectionKeySetSelector 是一个扩展自 Selector 的类,它使用 SelectedSelectionKeySet 替换原始 SelectorselectedKeys 集合。这个类在每次调用 select 相关的方法之前都会重置 SelectedSelectionKeySet,以确保每次选择操作后都有一个全新的就绪事件列表。

6. rebuildSelector

rebuildSelectorNioEventLoop 中的一个方法,用于重建 Selector。当需要重新创建 Selector 时,例如在关闭一个 Selector 后,这个方法会被调用。它会先关闭当前的 Selector,然后通过 openSelector 方法创建一个新的 Selector

6.1 rebuildSelector0

rebuildSelector0rebuildSelector 方法的一部分,它包含了重建 Selector 的具体逻辑。这个方法会首先关闭当前的 Selector,然后创建一个新的 Selector,并重新注册所有相关的 Channel

7. processSelectedKeys

processSelectedKeys 方法用于处理 Selector 选择出来的 SelectionKey。它有几种不同的实现,根据 Selector 是否进行了优化来决定使用哪种实现。

7.1 processSelectedKeysOptimized

Selector 进行了优化时,使用 processSelectedKeysOptimized 方法来处理已就绪的 SelectionKey。这个方法利用了 SelectedSelectionKeySet 的优化。

7.2 processSelectedKeysPlain

Selector 没有进行优化时,使用 processSelectedKeysPlain 方法来处理已就绪的 SelectionKey。这个方法使用了标准的 SelectorselectedKeys 集合。

7.3 processSelectedKey

这是一个辅助方法,用于处理单个 SelectionKey。它会根据 SelectionKey 的就绪操作来调用相应的处理器方法。

8. NioTask

NioTask 是一个接口,用于定义如何处理 Selector 的 I/O 事件。它并不是 Netty 的一部分,而是 Java NIO 中的一个概念。Netty 自身并没有实现这个接口,但在处理 NIO 事件时,可能会涉及到类似的概念。

8.1 register

register 方法用于注册一个 NioTaskSelector 上,以便当特定的 I/O 事件发生时可以执行相应的任务。

8.2 invokeChannelUnregistered

invokeChannelUnregistered 方法用于当一个 ChannelSelector 中取消注册时执行相应的处理。

8.3 processSelectedKey

processSelectedKey 方法用于处理单个 SelectionKey 的就绪事件。

好的,我将为您概述 Netty 中与 Selector 相关的关键组件,并逐一解释它们的作用和实现细节。

1. 概述

Netty 是一个高性能的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。在处理 I/O 操作时,Netty 使用了 NIO(New I/O)技术来提高性能。Netty 的 NioEventLoop 类负责处理 I/O 事件,其中包括了 Selector 的管理和优化。

2. SelectorTuple

SelectorTupleNioEventLoop 中的一个私有静态内部类,用于封装一个或两个 Selector 对象。它可以包含一个原始的 Selector 对象 (unwrappedSelector) 和一个可能经过优化的 Selector 对象 (selector)。当 Selector 需要优化时,这两个字段可以指向不同的对象;否则,它们指向同一个对象。

3. openSelector

openSelector 方法是 NioEventLoop 中的一个私有方法,用于创建一个新的 Selector 对象,并对其进行可能的优化。如果不需要优化,它会直接返回一个包含原始 SelectorSelectorTuple 对象。如果需要优化,它会尝试替换 Selector 内部使用的 Set,以便提高性能。

4. SelectedSelectionKeySet

SelectedSelectionKeySet 是一个继承自 AbstractSet 的内部类,用于存储已就绪的 SelectionKey。它使用一个 SelectionKey 数组和一个计数器 size 来跟踪已就绪的 SelectionKey。这种方法相对于标准的 HashSet 更加高效,因为它减少了内存消耗并降低了 add 操作的时间复杂度。

5. SelectedSelectionKeySetSelector

SelectedSelectionKeySetSelector 是一个扩展自 Selector 的类,它使用 SelectedSelectionKeySet 替换原始 SelectorselectedKeys 集合。这个类在每次调用 select 相关的方法之前都会重置 SelectedSelectionKeySet,以确保每次选择操作后都有一个全新的就绪事件列表。

6. rebuildSelector

rebuildSelectorNioEventLoop 中的一个方法,用于重建 Selector。当需要重新创建 Selector 时,例如在关闭一个 Selector 后,这个方法会被调用。它会先关闭当前的 Selector,然后通过 openSelector 方法创建一个新的 Selector

6.1 rebuildSelector0

rebuildSelector0rebuildSelector 方法的一部分,它包含了重建 Selector 的具体逻辑。这个方法会首先关闭当前的 Selector,然后创建一个新的 Selector,并重新注册所有相关的 Channel

7. processSelectedKeys

processSelectedKeys 方法用于处理 Selector 选择出来的 SelectionKey。它有几种不同的实现,根据 Selector 是否进行了优化来决定使用哪种实现。

7.1 processSelectedKeysOptimized

Selector 进行了优化时,使用 processSelectedKeysOptimized 方法来处理已就绪的 SelectionKey。这个方法利用了 SelectedSelectionKeySet 的优化。

7.2 processSelectedKeysPlain

Selector 没有进行优化时,使用 processSelectedKeysPlain 方法来处理已就绪的 SelectionKey。这个方法使用了标准的 SelectorselectedKeys 集合。

7.3 processSelectedKey

这是一个辅助方法,用于处理单个 SelectionKey。它会根据 SelectionKey 的就绪操作来调用相应的处理器方法。

8. NioTask

NioTask 是一个接口,用于定义如何处理 Selector 的 I/O 事件。它并不是 Netty 的一部分,而是 Java NIO 中的一个概念。Netty 自身并没有实现这个接口,但在处理 NIO 事件时,可能会涉及到类似的概念。

8.1 register

register 方法用于注册一个 NioTaskSelector 上,以便当特定的 I/O 事件发生时可以执行相应的任务。

8.2 invokeChannelUnregistered

invokeChannelUnregistered 方法用于当一个 ChannelSelector 中取消注册时执行相应的处理。

8.3 processSelectedKey

processSelectedKey 方法用于处理单个 SelectionKey 的就绪事件。

在 Netty 中,EventLoop 是负责处理 I/O 事件和执行任务的核心组件。EventLoop 的运行机制涉及到事件循环、任务调度和线程管理等多个方面。本文将对 EventLoop 的运行机制进行详细解析。

EventLoop 的基本结构

EventLoop 是一个接口,它定义了事件循环的基本操作。实际的实现类是 NioEventLoop,它负责处理 NIO 事件和执行任务。

EventLoop 的运行机制

1. 事件循环

EventLoop 的核心是事件循环,它会不断地从 Selector 中获取已就绪的事件,并处理这些事件。事件循环的伪代码如下:

while (!stop) {
    selector.select();
    processSelectedKeys();
}

在这个循环中,selector.select() 会阻塞等待事件的发生。当有事件发生时,processSelectedKeys() 方法会被调用,处理这些事件。

2. 任务调度

除了处理 I/O 事件,EventLoop 还负责执行任务。任务可以分为两种:普通任务和延迟任务。普通任务会被立即执行,而延迟任务会在指定的延迟时间后执行。

任务调度的伪代码如下:

void execute(Runnable task) {
    if (isInEventLoop()) {
        scheduleTask(task);
    } else {
        queueTask(task);
    }
}
​
void scheduleTask(Runnable task) {
    tasks.add(task);
    if (is忙()) {
        wakeup();
    }
}
​
void queueTask(Runnable task) {
    pendingTasks.add(task);
    if (!isInEventLoop() && !isShuttingDown()) {
        executor.execute(this::runAllTasks);
    }
}

execute() 方法中,首先判断当前线程是否在 EventLoop 中。如果在,则将任务添加到任务队列中;否则,将任务添加到待处理任务队列中,并尝试唤醒 EventLoop。

3. 线程管理

EventLoopGroup 是一组 EventLoop 的集合,它负责管理和调度 EventLoop。Netty 提供了多种 EventLoopGroup 的实现,如 DefaultEventLoopGroupNioEventLoopGroup 等。

EventLoopGroup 的运行机制如下:

void start() {
    for (EventLoop eventLoop : eventLoops) {
        eventLoop.start();
    }
}
​
void shutdown() {
    for (EventLoop eventLoop : eventLoops) {
        eventLoop.shutdown();
    }
}

start() 方法中,遍历所有的 EventLoop 并启动它们。在 shutdown() 方法中,遍历所有的 EventLoop 并关闭它们。

总结

EventLoop 是 Netty 中的核心组件,负责处理 I/O 事件和执行任务。它的运行机制包括事件循环、任务调度和线程管理等方面。通过深入了解 EventLoop 的运行机制,可以更好地理解 Netty 的内部工作原理,并在实际开发中更好地利用 Netty 的特性。

Netty 是一个高性能的异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在 Netty 中,EventLoop 是非常核心的一个组件,它负责处理 I/O 事件以及执行用户定义的 Runnable 任务。

EventLoop 的启动与运行

1. EventLoop 的创建

  • EventLoopEventLoopGroup 的一部分,通常通过 EventLoopGroupnext() 方法获取。

  • EventLoopGroup 负责管理多个 EventLoop 实例,并且能够根据一定的策略选择一个合适的 EventLoop 来处理 I/O 操作。

2. EventLoop 的内部机制

  • Selector:每个 EventLoop 对应一个或多个 Selector,用来监听注册在其上的 Channel 的 I/O 事件。

  • Channel:每个 Channel 注册到一个 EventLoop 上,由该 EventLoop 负责处理该 Channel 的所有 I/O 事件。

  • TaskQueueScheduledTaskQueueEventLoop 内部有两个队列,一个是用来存放普通任务的 TaskQueue,另一个是存放定时任务的 ScheduledTaskQueue。这两个队列中的任务由 EventLoop 在适当的时机执行。

3. EventLoop 的运行循环

  • EventLoop 的核心是一个无限循环,通常称为 “事件循环” 或 “运行循环”。

  • 在这个循环中,EventLoop 会轮询 Selector 来检查是否有就绪的 I/O 事件,如果有,则调用相应的处理器来处理这些事件。

  • 处理完 I/O 事件后,EventLoop 会处理 TaskQueueScheduledTaskQueue 中的任务。

4. EventLoop 的核心方法

  • run():这是 EventLoop 的主要运行方法,在启动 EventLoop 时被调用。

  • register():用于将 Channel 注册到 EventLoopSelector 上。

  • execute():用于提交任务到 EventLoop 的队列中进行执行。

  • schedule():用于提交定时任务到 EventLoopScheduledTaskQueue 中。

示例代码

以下是一个简化的 EventLoop 的运行循环的伪代码示例:

public class EventLoop extends Thread {
    private final Selector selector;
    private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<>();
    
    public EventLoop(Selector selector) {
        this.selector = selector;
    }
​
    public void register(Channel channel, int interestOps) {
        // Register the channel with the selector
    }
​
    public void execute(Runnable task) {
        // Add the task to the queue and wake up the loop if necessary
    }
​
    @Override
    public void run() {
        while (true) {
            // Select ready keys from the selector
            selector.select();
​
            // Process selected keys
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove(); // Remove the key to prevent duplicate processing
                // Process the event based on the key's readiness
            }
​
            // Execute tasks in the task queue
            Runnable task;
            while ((task = taskQueue.poll()) != null) {
                task.run();
            }
        }
    }
}

请注意,上述代码是简化后的示例,实际的 EventLoop 实现要复杂得多,并且包含了更多的细节和优化措施。如果你对 Netty 的源码感兴趣,建议深入阅读 Netty 的官方文档和源码以获得更全面的理解。

1. 概述

本文我们分享 EventLoop 的运行相关代码的实现。

因为 EventLoop 的运行主要是通过 NioEventLoop 的 #run() 方法实现,考虑到内容相对的完整性,在 《精尽 Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化》 一文中,我们并未分享 NioEventLoop 的初始化,所以本文也会分享这部分的内容。

OK ,还是老样子,自上而下的方式,一起来看看 NioEventLoop 的代码实现。

本文的重点在 「2.9 run」 和 「2.12 select」 中。

2. NioEventLoop

io.netty.channel.nio.NioEventLoop ,继承 SingleThreadEventLoop 抽象类,NIO EventLoop 实现类,实现对注册到其中的 Channel 的就绪的 IO 事件,和对用户提交的任务进行处理。

2.1 static

static 代码块中,初始化了 NioEventLoop 的静态属性们。代码如下:

/**
 * TODO 1007 NioEventLoop cancel
 */
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.

/**
 * 是否禁用 SelectionKey 的优化,默认开启
 */
private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

/**
 * 少于该 N 值,不开启空轮询重建新的 Selector 对象的功能
 */
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
/**
 * NIO Selector 空轮询该 N 次后,重建新的 Selector 对象
 */
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;

static {
    // 解决 Selector#open() 方法 // <1>
    final String key = "sun.nio.ch.bugLevel";
    final String buglevel = SystemPropertyUtil.get(key);
    if (buglevel == null) {
        try {
            AccessController.doPrivileged(new PrivilegedAction<Void>() {
                @Override
                public Void run() {
                    System.setProperty(key, "");
                    return null;
                }
            });
        } catch (final SecurityException e) {
            logger.debug("Unable to get/set System Property: " + key, e);
        }
    }

    // 初始化
    int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
    if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
        selectorAutoRebuildThreshold = 0;
    }
    SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
        logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
    }
}
  • CLEANUP_INTERVAL 属性,TODO 1007 NioEventLoop cancel

  • DISABLE_KEYSET_OPTIMIZATION 属性,是否禁用 SelectionKey 的优化,默认开启。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。

  • SELECTOR_AUTO_REBUILD_THRESHOLD 属性,NIO Selector 空轮询该 N 次后,重建新的 Selector 对象,用以解决 JDK NIO 的 epoll 空轮询 Bug 。

    • MIN_PREMATURE_SELECTOR_RETURNS 属性,少于该 N 值,不开启空轮询重建新的 Selector 对象的功能。

  • <1> 处,解决 Selector#open() 方法,发生 NullPointException 异常。详细解析,见 http://bugs.sun.com/view_bug.do?bug_id=6427854https://github.com/netty/netty/issues/203

  • <2> 处,初始化 SELECTOR_AUTO_REBUILD_THRESHOLD 属性。默认 512 。

2.2 构造方法

/**
 * The NIO {@link Selector}.
 *
 * 包装的 Selector 对象,经过优化
 *
 * {@link #openSelector()}
 */
private Selector selector;
/**
 * 未包装的 Selector 对象
 */
private Selector unwrappedSelector;
/**
 * 注册的 SelectionKey 集合。Netty 自己实现,经过优化。
 */
private SelectedSelectionKeySet selectedKeys;
/**
 * SelectorProvider 对象,用于创建 Selector 对象
 */
private final SelectorProvider provider;

/**
 * Boolean that controls determines if a blocked Selector.select should
 * break out of its selection process. In our case we use a timeout for
 * the select method and the select method will block for that time unless
 * waken up.
 *
 * 唤醒标记。因为唤醒方法 {@link Selector#wakeup()} 开销比较大,通过该标识,减少调用。
 *
 * @see #wakeup(boolean)
 * @see #run() 
 */
private final AtomicBoolean wakenUp = new AtomicBoolean();
/**
 * Select 策略
 *
 * @see #select(boolean)
 */
private final SelectStrategy selectStrategy;
/**
 * 处理 Channel 的就绪的 IO 事件,占处理任务的总时间的比例
 */
private volatile int ioRatio = 50;
/**
 * 取消 SelectionKey 的数量
 *
 * TODO 1007 NioEventLoop cancel
 */
private int cancelledKeys;
/**
 * 是否需要再次 select Selector 对象
 *
 * TODO 1007 NioEventLoop cancel
 */
private boolean needsToSelectAgain;

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // 创建 Selector 对象 <1>
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}
  • Selector 相关:

    • unwrappedSelector 属性,未包装的 NIO Selector 对象。

    • selector 属性,包装的 NIO Selector 对象。Netty 对 NIO Selector 做了优化。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。

    • selectedKeys 属性,注册的 NIO SelectionKey 集合。Netty 自己实现,经过优化。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。

    • provider 属性,NIO SelectorProvider 对象,用于创建 NIO Selector 对象。

    • <1> 处,调用 #openSelector() 方法,创建 NIO Selector 对象。

  • wakenUp 属性,唤醒标记。因为唤醒方法 Selector#wakeup() 开销比较大,通过该标识,减少调用。详细解析,见 「2.8 wakeup」 。

  • selectStrategy 属性,Select 策略。详细解析,见 「2.10 SelectStrategy」 。

  • ioRatio 属性,在 NioEventLoop 中,会三种类型的任务:1) Channel 的就绪的 IO 事件;2) 普通任务;3) 定时任务。而 ioRatio 属性,处理 Channel 的就绪的 IO 事件,占处理任务的总时间的比例。

  • 取消 SelectionKey 相关:

    • cancelledKeys 属性, 取消 SelectionKey 的数量。TODO 1007 NioEventLoop cancel

    • needsToSelectAgain 属性,是否需要再次 select Selector 对象。TODO 1007 NioEventLoop cancel

2.3 openSelector

#openSelector() 方法,创建 NIO Selector 对象。

考虑到让本文更专注在 EventLoop 的逻辑,并且不影响对本文的理解,所以暂时不讲解它的具体实现。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。

2.4 rebuildSelector

#rebuildSelector() 方法,重建 NIO Selector 对象。

考虑到让本文更专注在 EventLoop 的逻辑,并且不影响对本文的理解,所以暂时不讲解它的具体实现。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。

2.5 newTaskQueue

#newTaskQueue(int maxPendingTasks) 方法,创建任务队列。代码如下:

该方法覆写父类的该方法。

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    // This event loop never calls takeTask()
    return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                                                : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

调用 PlatformDependent#newMpscQueue(...) 方法,创建 mpsc 队列。我们来看看代码注释对 mpsc 队列的描述:

Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single consumer (one thread!).

2.6 pendingTasks

#pendingTasks() 方法,获得待执行的任务数量。代码如下:

该方法覆写父类的该方法。

@Override
public int pendingTasks() {
    // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
    // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
    // See https://github.com/netty/netty/issues/5297
    if (inEventLoop()) {
        return super.pendingTasks();
    } else {
        return submit(pendingTasksCallable).syncUninterruptibly().getNow();
    }
}
  • 因为 MpscQueue 仅允许单消费,所以获得队列的大小,仅允许在 EventLoop 的线程中调用。

2.7 setIoRatio

#setIoRatio(int ioRatio) 方法,设置 ioRatio 属性。代码如下:

/**
 * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
 * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
 */
public void setIoRatio(int ioRatio) {
    if (ioRatio <= 0 || ioRatio > 100) {
        throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
    }
    this.ioRatio = ioRatio;
}

setIoRatio(int ioRatio) 方法用于设置 EventLoop 中用于 I/O 操作的时间占比。这个方法允许用户自定义 EventLoop 在处理 I/O 任务和非 I/O 任务之间的时间分配比例。

参数说明:

  • int ioRatio:表示 I/O 操作所占的时间百分比。取值范围为 1 到 100,其中 1 表示事件循环几乎全部时间都用于非 I/O 任务,而 100 表示事件循环几乎全部时间都用于 I/O 任务。默认值为 50,表示事件循环在 I/O 任务和非 I/O 任务之间平均分配时间。

方法实现:

  1. 首先检查传入的 ioRatio 参数是否在有效范围内(1 到 100),如果不在有效范围内,则抛出 IllegalArgumentException 异常。

  2. 如果参数有效,则将 ioRatio 赋值给类的成员变量 this.ioRatio

示例:

EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
((NioEventLoop) eventLoopGroup.next()).setIoRatio(75);

在这个示例中,我们创建了一个 NioEventLoopGroup 实例,并获取其第一个 NioEventLoop 实例。然后调用 setIoRatio(75) 方法,将 I/O 操作的时间占比设置为 75%,即事件循环将有 75% 的时间用于处理 I/O 任务,剩余 25% 的时间用于处理非 I/O 任务。

setIoRatio(int ioRatio) 方法是 EventLoop 中的一个配置方法,用于设置 EventLoop 在处理 I/O 任务与非 I/O 任务(如计算密集型任务)之间的时间分配比例。这个方法允许开发者根据应用的具体需求来调整 I/O 操作和其他任务之间的平衡。

参数说明

ioRatio:这是一个整数参数,表示希望 EventLoop 在每次循环中花费在 I/O 操作上的时间百分比。取值范围为 1 到 100。

默认值

默认情况下,ioRatio 的值为 50,这意味着 EventLoop 将尝试花费相等的时间来进行 I/O 操作和执行其他任务。

工作原理

EventLoop 执行其主循环时,它会处理两部分的工作:

I/O 事件:这包括从 Selector 中轮询就绪的 I/O 事件并处理这些事件。

非 I/O 任务:这包括执行 TaskQueueScheduledTaskQueue 中的任务。

setIoRatio 设置的值决定了 EventLoop 在处理完 I/O 事件之后,应该花多少时间去处理非 I/O 任务。例如,如果 ioRatio 设置为 80%,那么 EventLoop 在处理完 I/O 事件之后,将只执行足够多的非 I/O 任务使得总的执行时间占整个循环周期的 20%。

代码实现

以下是 EventLoopsetIoRatio 方法的实现示例:

public class EventLoop extends Thread {
    private volatile int ioRatio = 50; // Default value
    
    /**
     * Sets the percentage of the desired amount of time spent for I/O in the event loop.
     * The default value is 50, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
     *
     * @param ioRatio the desired percentage of time spent for I/O in the event loop.
     */
    public void setIoRatio(int ioRatio) {
        if (ioRatio <= 0 || ioRatio > 100) {
            throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
        }
        this.ioRatio = ioRatio;
    }
    
    // ... 其他方法 ...
    
    @Override
    public void run() {
        while (true) {
            // Select ready keys from the selector
            selector.select();
​
            // Process selected keys
            // ...
​
            // Execute tasks in the task queue
            long ioTimeNanos = System.nanoTime();
            Runnable task;
            while ((task = taskQueue.poll()) != null) {
                task.run();
            }
            long nonIoTimeNanos = System.nanoTime() - ioTimeNanos;
​
            // Adjust non-io time according to ioRatio
            long totalCycleTimeNanos = System.nanoTime() - lastCycleStartTimeNanos;
            long expectedNonIoTimeNanos = totalCycleTimeNanos * (100 - ioRatio) / 100;
            if (nonIoTimeNanos > expectedNonIoTimeNanos) {
                // If we spent too much time in non-io tasks, sleep to make up for it
                long sleepTimeNanos = nonIoTimeNanos - expectedNonIoTimeNanos;
                if (sleepTimeNanos > 0) {
                    Thread.sleep(TimeUnit.NANOSECONDS.toMillis(sleepTimeNanos));
                }
            }
            lastCycleStartTimeNanos = System.nanoTime();
        }
    }
}

注意事项

如果 ioRatio 设置得过高,可能会导致非 I/O 任务积压,影响系统的响应能力。

如果 ioRatio 设置得过低,可能会降低 I/O 操作的效率。

在实际使用中,需要根据应用程序的具体情况来调整 ioRatio 的值。

通过调整 ioRatio 的值,可以有效地控制 EventLoop 在 I/O 密集型操作和 CPU 密集型操作之间的平衡,从而优化应用程序的性能。

设置 ioRatio 对网络性能的影响主要体现在以下几个方面:

  1. I/O 密集型任务与非 I/O 密集型任务的平衡ioRatio 参数决定了 EventLoop 在处理 I/O 任务(如读写网络数据)和非 I/O 任务(如执行用户代码、处理业务逻辑等)之间的时间分配。设置合适的 ioRatio 可以帮助你在不同类型的任务之间找到一个平衡点,从而提高整体性能。

  2. 响应时间:如果 ioRatio 设置得过高,EventLoop 将花费更多时间在 I/O 操作上,可能导致非 I/O 任务的响应时间变长。相反,如果 ioRatio 设置得过低,EventLoop 将花费更多时间在非 I/O 任务上,可能导致 I/O 操作的延迟增加。

  3. 吞吐量:合适的 ioRatio 设置可以提高系统的吞吐量。如果 I/O 密集型任务较多,提高 ioRatio 可能会提高网络传输速率;如果非 I/O 密集型任务较多,降低 ioRatio 可能会提高业务处理速度。

  4. 资源利用率:设置合适的 ioRatio 可以提高系统资源的利用率。例如,在 I/O 密集型任务较多的场景下,提高 io负载因子 可以充分利用 CPU 资源,提高系统性能。

总之,设置合适的 ioRatio 对网络性能有很大影响。在实际应用中,你需要根据业务需求和系统特点来调整 ioRatio 参数,以达到最佳性能。可以通过压力测试和性能监控来评估不同 ioRatio 设置下的系统表现,从而找到最佳的配置。

要监控 EventLoop 中 I/O 和非 I/O 任务的实际执行时间,可以使用以下方法:

  1. 使用内置的监控工具:Netty 提供了一些内置的监控工具,可以帮助您收集和查看 EventLoop 中 I/O 和非 I/O 任务的执行时间。例如,可以使用 ChannelMetricsHandler 来收集通道级别的指标,包括 I/O 操作的时间。

public class MetricsServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ChannelMetricsHandler());
        // ...
    }
}

然后,可以通过 JMX(Java Management Extensions)或其他监控工具查看收集到的指标。

  1. 自定义监控逻辑:如果您需要更详细的监控数据,可以自定义监控逻辑。例如,可以在 I/O 和非 I/O 任务的开始和结束时记录时间戳,然后计算执行时间。

public class CustomChannelHandler extends ChannelInboundHandlerAdapter {
    private final AtomicLong ioTime = new AtomicLong();
    private final AtomicLong nonIoTime = new AtomicLong();
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        long startTime = System.nanoTime();
        try {
            super.channelRead(ctx, msg);
        } finally {
            long endTime = System.nanoTime();
            nonIoTime.addAndGet(endTime - startTime);
        }
    }
​
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof io.netty.channel.EventLoop.IoEvent) {
            long startTime = System.nanoTime();
            try {
                super.userEventTriggered(ctx, evt);
            } finally {
                long endTime = System.nanoTime();
                ioTime.addAndGet(endTime - startTime);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
​
    public long getIoTime() {
        return ioTime.get();
    }
​
    public long getNonIoTime() {
        return nonIoTime.get();
    }
}

在这个示例中,我们创建了一个自定义的 ChannelInboundHandlerAdapter,用于记录 I/O 和非 I/O 任务的执行时间。通过调用 getIoTime()getNonIoTime() 方法,可以获取累计的 I/O 和非 I/O 任务执行时间。

请注意,这些方法可能会对性能产生一定影响,因此在生产环境中使用时需要谨慎。在实际应用中,可以根据需求选择合适的监控方法。

2.8 wakeup

#wakeup(boolean inEventLoop) 方法,唤醒线程。代码如下:

@Override
protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) { // <2>
        selector.wakeup(); // <1>
    }
}
  • <1> 处,因为 NioEventLoop 的线程阻塞,主要是调用 Selector#select(long timeout) 方法,阻塞等待有 Channel 感兴趣的 IO 事件,或者超时。所以需要调用 Selector#wakeup() 方法,进行唤醒 Selector 。

  • <2> 处,因为 Selector#wakeup() 方法的唤醒操作是开销比较大的操作,并且每次重复调用相当于重复唤醒。所以,通过 wakenUp 属性,通过 CAS 修改 false => true ,保证有且仅有进行一次唤醒。

  • 当然,详细的解析,可以结合 「2.9 run」 一起看,这样会更加清晰明了。

2.9 run

#run() 方法,NioEventLoop 运行,处理任务。这是本文最重要的方法。代码如下:

 1: @Override
 2: protected void run() {
 3:     for (;;) {
 4:         try {
 5:             switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
 6:                 case SelectStrategy.CONTINUE: // 默认实现下,不存在这个情况。
 7:                     continue;
 8:                 case SelectStrategy.SELECT:
 9:                     // 重置 wakenUp 标记为 false
10:                     // 选择( 查询 )任务
11:                     select(wakenUp.getAndSet(false));
12: 
13:                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
14:                     // before calling 'selector.wakeup()' to reduce the wake-up
15:                     // overhead. (Selector.wakeup() is an expensive operation.)
16:                     //
17:                     // However, there is a race condition in this approach.
18:                     // The race condition is triggered when 'wakenUp' is set to
19:                     // true too early.
20:                     //
21:                     // 'wakenUp' is set to true too early if:
22:                     // 1) Selector is waken up between 'wakenUp.set(false)' and
23:                     //    'selector.select(...)'. (BAD)
24:                     // 2) Selector is waken up between 'selector.select(...)' and
25:                     //    'if (wakenUp.get()) { ... }'. (OK)
26:                     //
27:                     // In the first case, 'wakenUp' is set to true and the
28:                     // following 'selector.select(...)' will wake up immediately.
29:                     // Until 'wakenUp' is set to false again in the next round,
30:                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
31:                     // any attempt to wake up the Selector will fail, too, causing
32:                     // the following 'selector.select(...)' call to block
33:                     // unnecessarily.
34:                     //
35:                     // To fix this problem, we wake up the selector again if wakenUp
36:                     // is true immediately after selector.select(...).
37:                     // It is inefficient in that it wakes up the selector for both
38:                     // the first case (BAD - wake-up required) and the second case
39:                     // (OK - no wake-up required).
40: 
41:                     // 唤醒。原因,见上面中文注释
42:                     if (wakenUp.get()) {
43:                         selector.wakeup();
44:                     }
45:                     // fall through
46:                 default:
47:             }
48: 
49:             // TODO 1007 NioEventLoop cancel 方法
50:             cancelledKeys = 0;
51:             needsToSelectAgain = false;
52: 
53:             final int ioRatio = this.ioRatio;
54:             if (ioRatio == 100) {
55:                 try {
56:                     // 处理 Channel 感兴趣的就绪 IO 事件
57:                     processSelectedKeys();
58:                 } finally {
59:                     // 运行所有普通任务和定时任务,不限制时间
60:                     // Ensure we always run tasks.
61:                     runAllTasks();
62:                 }
63:             } else {
64:                 final long ioStartTime = System.nanoTime();
65:                 try {
66:                     // 处理 Channel 感兴趣的就绪 IO 事件
67:                     processSelectedKeys();
68:                 } finally {
69:                     // 运行所有普通任务和定时任务,限制时间
70:                     // Ensure we always run tasks.
71:                     final long ioTime = System.nanoTime() - ioStartTime;
72:                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
73:                 }
74:             }
75:         } catch (Throwable t) {
76:             handleLoopException(t);
77:         }
78:         // TODO 1006 EventLoop 优雅关闭
79:         // Always handle shutdown even if the loop processing threw an exception.
80:         try {
81:             if (isShuttingDown()) {
82:                 closeAll();
83:                 if (confirmShutdown()) {
84:                     return;
85:                 }
86:             }
87:         } catch (Throwable t) {
88:             handleLoopException(t);
89:         }
90:     }
91: }
  • 第 3 行:“死”循环,直到 NioEventLoop 关闭,即【第 78 至 89 行】的代码。

  • 第 5 行:调用 SelectStrategy#calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) 方法,获得使用的 select 策略。详细解析,先跳到 「2.10 SelectStrategy」 中研究。😈 看完回来。

    • 我们知道 SelectStrategy#calculateStrategy(...) 方法,有 3 种返回的情况。

    • 第 6 至 7 行:第一种,SelectStrategy.CONTINUE ,默认实现下,不存在这个情况。

    • 第 8 至 44 行:第二种,SelectStrategy.SELECT ,进行 Selector 阻塞 select 。

      • 第 11 行:重置 wakeUp 标识为 false ,并返回修改前的值。

      • 第 11 行:调用 #select(boolean oldWakeUp) 方法,选择( 查询 )任务。直接看这个方法不能完全表达出该方法的用途,所以详细解析,见 「2.12 select」 。

      • 第 41 至 44 行:若唤醒标识 wakeuptrue 时,调用 Selector#wakeup() 方法,唤醒 Selector 。可能看到此处,很多胖友会和我一样,一脸懵逼。实际上,耐下性子,答案在上面的英文注释中。笔者来简单解析下:

        • 1)在 wakenUp.getAndSet(false)#select(boolean oldWakeUp) 之间,在标识 wakeUp 设置为 false 时,在 #select(boolean oldWakeUp) 方法中,正在调用 Selector#select(...) 方法,处于阻塞中。

        • 2)此时,有另外的线程调用了 #wakeup() 方法,会将标记 wakeUp 设置为 true ,并唤醒 Selector#select(...) 方法的阻塞等待。

        • 3)标识 wakeUptrue ,所以再有另外的线程调用 #wakeup() 方法,都无法唤醒 Selector#select(...) 。为什么呢?因为 #wakeup() 的 CAS 修改 false => true失败,导致无法调用 Selector#wakeup() 方法。

        • 解决方式:所以在 #select(boolean oldWakeUp) 执行完后,增加了【第 41 至 44 行】来解决。

        • 😈😈😈 整体比较绕,胖友结合实现代码 + 英文注释,再好好理解下。

    • 第 46 行:第三种,>= 0 ,已经有可以处理的任务,直接向下。

  • 第 49 至 51 行:TODO 1007 NioEventLoop cancel 方法

  • 第 53 至 74 行:根据 ioRatio 的配置不同,分成略有差异的 2 种:

    • 第一种,ioRatio 为 100 ,则不考虑时间占比的分配。

      • 第 57 行:调用 #processSelectedKeys() 方法,处理 Channel 感兴趣的就绪 IO 事件。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。

      • 第 58 至 62 行:调用 #runAllTasks() 方法,运行所有普通任务和定时任务,不限制时间。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。

    • 第二种,ioRatio< 100 ,则考虑时间占比的分配。

      • 第 64 行:记录当前时间。

      • 第 67 行:和【第 57 行】的代码一样

      • 第 71 至 72 行:🙂 比较巧妙的方式,是不是和胖友之前认为的不太一样。它是以 #processSelectedKeys() 方法的执行时间作为基准,计算 #runAllTasks(long timeoutNanos) 方法可执行的时间。

      • 第 72 行:调用 #runAllTasks(long timeoutNanos)` 方法,运行所有普通任务和定时任务,限制时间

  • 第 75 至 77 行:当发生异常时,调用 #handleLoopException(Throwable t) 方法,处理异常。代码如下:

private static void handleLoopException(Throwable t) {
    logger.warn("Unexpected exception in the selector loop.", t);

    // Prevent possible consecutive immediate failures that lead to
    // excessive CPU consumption.
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        // Ignore.
    }
}
  • 第 78 至 89 行:TODO 1006 EventLoop 优雅关闭

  • 总的来说,#run() 的执行过程,就是如下一张图:

2.10 SelectStrategy

io.netty.channel.SelectStrategy ,选择( select )策略接口。代码如下:

public interface SelectStrategy {

    /**
     * Indicates a blocking select should follow.
     *
     * 表示使用阻塞 select 的策略。
     */
    int SELECT = -1;
    /**
     * Indicates the IO loop should be retried, no blocking select to follow directly.
     *
     * 表示需要进行重试的策略。
     */
    int CONTINUE = -2;

    /**
     * The {@link SelectStrategy} can be used to steer the outcome of a potential select
     * call.
     *
     * @param selectSupplier The supplier with the result of a select result.
     * @param hasTasks true if tasks are waiting to be processed.
     * @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
     *         the next step should be to not select but rather jump back to the IO loop and try
     *         again. Any value >= 0 is treated as an indicator that work needs to be done.
     */
    int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
    
}
  • calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) 接口方法有 3 种返回的情况:

    • SELECT-1 ,表示使用阻塞 select 的策略。

    • CONTINUE-2,表示需要进行重试的策略。实际上,默认情况下,不会返回 CONTINUE 的策略。

    • >= 0 ,表示不需要 select ,目前已经有可以执行的任务了。

2.10.1 DefaultSelectStrategy

io.netty.channel.DefaultSelectStrategy ,实现 SelectStrategy 接口,默认选择策略实现类。代码如下:

final class DefaultSelectStrategy implements SelectStrategy {

    /**
     * 单例
     */
    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

    private DefaultSelectStrategy() { }

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }

}

hasTaskstrue ,表示当前已经有任务,所以调用 IntSupplier#get() 方法,返回当前 Channel 新增的 IO 就绪事件的数量。代码如下:

private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

io.netty.util.IntSupplier ,代码如下:

public interface IntSupplier {

    /**
     * Gets a result.
     *
     * @return a result
     */
    int get() throws Exception;

}
  • 类似 Java 自带的 Callable<Int>

  • IntSupplier 在 NioEventLoop 中的实现为 selectNowSupplier 属性。在它的内部会调用 #selectNow() 方法。详细解析,见 「2.11 selectNow」 。

  • 实际上,这里不调用 IntSupplier#get() 方法,也是可以的。只不过考虑到,可以通过 #selectNow() 方法,无阻塞的 select Channel 是否有感兴趣的就绪事件。

  • hasTasksfalse 时,直接返回 SelectStrategy.SELECT ,进行阻塞 select Channel 感兴趣的就绪 IO 事件。

2.11 selectNow

#selectNow() 方法,代码如下:

int selectNow() throws IOException {
    try {
        return selector.selectNow(); // <1>
    } finally {
        // restore wakeup state if needed <2>
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}
  • <1> 处,调用 Selector#selectorNow() 方法,立即( 无阻塞 )返回 Channel 新增的感兴趣的就绪 IO 事件数量。

  • <2> 处,若唤醒标识 wakeuptrue 时,调用 Selector#wakeup() 方法,唤醒 Selector 。因为 <1> 处的 Selector#selectorNow() 会使用我们对 Selector 的唤醒,所以需要进行复原。有一个冷知道,可能有胖友不知道:

    注意,如果有其它线程调用了 #wakeup() 方法,但当前没有线程阻塞在 #select() 方法上,下个调用 #select() 方法的线程会立即被唤醒。😈 有点神奇。

2.12 select

#select(boolean oldWakenUp) 方法,选择( 查询 )任务。这是本文最重要的方法。代码如下:

  1: private void select(boolean oldWakenUp) throws IOException {
  2:     // 记录下 Selector 对象
  3:     Selector selector = this.selector;
  4:     try {
  5:         // select 计数器
  6:         int selectCnt = 0; // cnt 为 count 的缩写
  7:         // 记录当前时间,单位:纳秒
  8:         long currentTimeNanos = System.nanoTime();
  9:         // 计算 select 截止时间,单位:纳秒。
 10:         long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
 11: 
 12:         for (;;) {
 13:             // 计算本次 select 的超时时长,单位:毫秒。
 14:             // + 500000L 是为了四舍五入
 15:             // / 1000000L 是为了纳秒转为毫秒
 16:             long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
 17:             // 如果超时时长,则结束 select
 18:             if (timeoutMillis <= 0) {
 19:                 if (selectCnt == 0) { // 如果是首次 select ,selectNow 一次,非阻塞
 20:                     selector.selectNow();
 21:                     selectCnt = 1;
 22:                 }
 23:                 break;
 24:             }
 25: 
 26:             // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
 27:             // Selector#wakeup. So we need to check task queue again before executing select operation.
 28:             // If we don't, the task might be pended until select operation was timed out.
 29:             // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
 30:             // 若有新的任务加入
 31:             if (hasTasks() && wakenUp.compareAndSet(false, true)) {
 32:                 // selectNow 一次,非阻塞
 33:                 selector.selectNow();
 34:                 // 重置 select 计数器
 35:                 selectCnt = 1;
 36:                 break;
 37:             }
 38: 
 39:             // 阻塞 select ,查询 Channel 是否有就绪的 IO 事件
 40:             int selectedKeys = selector.select(timeoutMillis);
 41:             // select 计数器 ++
 42:             selectCnt ++;
 43: 
 44:             // 结束 select ,如果满足下面任一一个条件
 45:             if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
 46:                 // - Selected something,
 47:                 // - waken up by user, or
 48:                 // - the task queue has a pending task.
 49:                 // - a scheduled task is ready for processing
 50:                 break;
 51:             }
 52:             // 线程被打断。一般情况下不会出现,出现基本是 bug ,或者错误使用。
 53:             if (Thread.interrupted()) {
 54:                 // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
 55:                 // As this is most likely a bug in the handler of the user or it's client library we will
 56:                 // also log it.
 57:                 //
 58:                 // See https://github.com/netty/netty/issues/2426
 59:                 if (logger.isDebugEnabled()) {
 60:                     logger.debug("Selector.select() returned prematurely because " +
 61:                             "Thread.currentThread().interrupt() was called. Use " +
 62:                             "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
 63:                 }
 64:                 selectCnt = 1;
 65:                 break;
 66:             }
 67: 
 68:             // 记录当前时间
 69:             long time = System.nanoTime();
 70:             // 符合 select 超时条件,重置 selectCnt 为 1
 71:             if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
 72:                 // timeoutMillis elapsed without anything selected.
 73:                 selectCnt = 1;
 74:             // 不符合 select 超时的提交,若 select 次数到达重建 Selector 对象的上限,进行重建
 75:             } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
 76:                     selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
 77:                 // The selector returned prematurely many times in a row.
 78:                 // Rebuild the selector to work around the problem.
 79:                 logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector);
 80: 
 81:                 // 重建 Selector 对象
 82:                 rebuildSelector();
 83:                 // 修改下 Selector 对象
 84:                 selector = this.selector;
 85: 
 86:                 // Select again to populate selectedKeys.
 87:                 // 立即 selectNow 一次,非阻塞
 88:                 selector.selectNow();
 89:                 // 重置 selectCnt 为 1
 90:                 selectCnt = 1;
 91:                 // 结束 select
 92:                 break;
 93:             }
 94: 
 95:             currentTimeNanos = time;
 96:         }
 97: 
 98:         if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
 99:             if (logger.isDebugEnabled()) {
100:                 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
101:             }
102:         }
103:     } catch (CancelledKeyException e) {
104:         if (logger.isDebugEnabled()) {
105:             logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e);
106:         }
107:         // Harmless exception - log anyway
108:     }
109: }
  • 第 3 行:获得使用的 Selector 对象,不需要每次访问使用 volatile 修饰的 selector 属性。

  • 第 6 行:获得 select 操作的计数器。主要用于记录 Selector 空轮询次数,所以每次在正在轮询完成( 例如:轮询超时 ),则重置 selectCnt 为 1 。

  • 第 8 行:记录当前时间,单位:纳秒。

  • 第 10 行:计算 select 操作的截止时间,单位:纳秒。

    • #delayNanos(currentTimeNanos) 方法返回的为下一个定时任务距离现在的时间,如果不存在定时任务,则默认返回 1000 ms 。该方法的详细解析,见后续文章。

  • 第 12 行:“死”循环,直到符合如下任一一种情况后结束

    1. select 操作超时,对应【第 18 至 24 行】。

    2. 若有新的任务加入,对应【第 26 至 37 行】。

    3. 查询到任务或者唤醒,对应【第 45 至 51 行】。

    4. 线程被异常打断,对应【第 52 至 66 行】。

    5. 发生 NIO 空轮询的 Bug 后重建 Selector 对象后,对应【第 75 至 93 行】。

  • 第 16 行:计算本次 select 的超时时长,单位:毫秒。因为【第 40 行】的 Selector#select(timeoutMillis) 方法,可能因为各种情况结束,所以需要循环,并且每次重新计算超时时间。至于 + 500000L/ 1000000L 的用途,看下代码注释。

  • 第 17 至 24 行:如果超过 select 超时时长,则结束 select 。

    • 第 19 至 21 行:如果是首次 select ,则调用 Selector#selectNow() 方法,获得非阻塞的 Channel 感兴趣的就绪的 IO 事件,并重置 selectCnt 为 1 。

  • 第 26 至 37 行:若有新的任务加入。这里实际要分成两种情况:

    • 第一种,提交的任务的类型是 NonWakeupRunnable ,那么它并不会调用 #wakeup() 方法,原因胖友自己看 #execute(Runnable task) 思考下。Netty 在 #select() 方法的设计上,能尽快执行任务。此时如果标记 wakeupfalse ,说明符合这种情况,直接结束 select 。

    • 第二种,提交的任务的类型不是 NonWakeupRunnable ,那么在 #run() 方法的【第 8 至 11 行】的 wakenUp.getAndSet(false) 之前,发起了一次 #wakeup() 方法,那么因为 wakenUp.getAndSet(false) 会将标记 wakeUp 设置为 false ,所以就能满足 hasTasks() && wakenUp.compareAndSet(false, true) 的条件。

      • 这个解释,就和【第 27 至 28 行】的英文注释 So we need to check task queue again before executing select operation.If we don't, the task might be pended until select operation was timed out. 有出入了?这是为什么呢?因为 Selector 被提前 wakeup 了,所以下一次 Selector 的 select 是被直接唤醒结束的。

    • 第 33 行:虽然已经发现任务,但是还是调用 Selector#selectNow() 方法,非阻塞的获取一次 Channel 新增的就绪的 IO 事件。

    • 对应 Github 的代码提交为 https://github.com/lightningMan/netty/commit/f44f3e7926f1676315ae86d0f18bdd9b95681d9f

  • 第 40 行:调用 Selector#select(timeoutMillis) 方法,阻塞 select ,获得 Channel 新增的就绪的 IO 事件的数量。

  • 第 42 行:select 计数器加 1 。

  • 第 44 至 51 行:如果满足下面任一一个条件,结束 select :

    1. selectedKeys != 0 时,表示有 Channel 新增的就绪的 IO 事件,所以结束 select ,很好理解。

    2. oldWakenUp || wakenUp.get() 时,表示 Selector 被唤醒,所以结束 select 。

    3. hasTasks() || hasScheduledTasks() ,表示有普通任务或定时任务,所以结束 select 。

    4. 那么剩余的情况,主要是 select 超时或者发生空轮询,即【第 68 至 93 行】的代码。

  • 第 52 至 66 行:线程被打断。一般情况下不会出现,出现基本是 bug ,或者错误使用。感兴趣的胖友,可以看看 https://github.com/netty/netty/issues/2426

  • 第 69 行:记录当前时间。

    • 第 70 至 73 行:若满足 time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos ,说明到达此处时,Selector 是超时 select ,那么是正常的,所以重置 selectCnt 为 1 。

    • 第 74 至 93 行:不符合 select 超时的提交,若 select 次数到达重建 Selector 对象的上限,进行重建。这就是 Netty 判断发生 NIO Selector 空轮询的方式,N ( 默认 512 )次 select 并未阻塞超时这么长,那么就认为发生 NIO Selector 空轮询。过多的 NIO Selector 将会导致 CPU 100% 。

      • 第 82 行:调用 #rebuildSelector() 方法,重建 Selector 对象。

      • 第 84 行:重新获得使用的 Selector 对象。

      • 第 86 至 90 行:同【第 20 至 21 行】的代码。

      • 第 92 行:结束 select 。

  • 第 95 行:记录新的当前时间,用于【第 16 行】,重新计算本次 select 的超时时长。

总的来说还是比较简单的,比较困难的,在于对标记 wakeup 的理解。真的是,细思极恐!!!

推荐阅读文章:

Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件

在 Netty 中,EventLoop 负责处理 I/O 事件以及执行用户定义的任务。对于 I/O 事件的处理,主要包括从 Selector 中轮询就绪的 I/O 事件,并调用相应的处理器来处理这些事件。下面我们将详细探讨 EventLoop 如何处理 I/O 事件。

EventLoop 处理 I/O 事件的过程

1. EventLoop 的启动

  • EventLoop 的启动通常是在创建 EventLoopGroup 并调用 EventLoopGroupnext() 方法时完成的。

  • 每个 EventLoop 对象都有一个关联的 Selector,用于监听注册在其上的 Channel 的 I/O 事件。

2. Channel 注册

  • 当创建一个新的 Channel 时,它会被注册到一个 EventLoop 上。

  • 注册过程涉及将 ChannelSelectionKey 添加到 Selector 中,并设置相应的事件兴趣集合(例如 OP_READ, OP_WRITE)。

3. 事件循环

  • EventLoop 的核心是一个无限循环,通常称为 “事件循环” 或 “运行循环”。

  • 在这个循环中,EventLoop 会调用 Selector#select() 方法来轮询就绪的 I/O 事件。

  • 如果有事件就绪,Selector 会返回一个包含就绪事件的 SelectionKey 集合。

4. 处理 I/O 事件

  • 获取到就绪的 SelectionKey 集合后,EventLoop 会遍历这些 SelectionKey,并根据每个 SelectionKey 的就绪事件类型调用相应的处理方法。

  • 例如,如果 SelectionKey 表示读事件已准备好 (OP_READ),则会调用 Channelread() 方法来处理读事件。

  • 同样地,如果 SelectionKey 表示写事件已准备好 (OP_WRITE),则会调用 Channelwrite() 方法来处理写事件。

5. 任务执行

  • 在处理完 I/O 事件之后,EventLoop 会检查 TaskQueueScheduledTaskQueue,执行其中的任务。

示例代码

下面是一个简化的 EventLoop 处理 I/O 事件的伪代码示例:

public class EventLoop extends Thread {
    private final Selector selector;
    private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<>();
    private final Queue<ScheduledFuture<?>> scheduledTaskQueue = new LinkedBlockingQueue<>();
​
    public EventLoop(Selector selector) {
        this.selector = selector;
    }
​
    public void register(Channel channel, int interestOps) {
        // Register the channel with the selector
    }
​
    public void execute(Runnable task) {
        // Add the task to the queue and wake up the loop if necessary
    }
​
    public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
        // Schedule the task to be executed after the specified delay
    }
​
    @Override
    public void run() {
        while (true) {
            // Select ready keys from the selector
            selector.select();
​
            // Process selected keys
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove(); // Remove the key to prevent duplicate processing
​
                // Process the event based on the key's readiness
                if (key.isReadable()) {
                    // Handle read event
                    Channel ch = (Channel) key.channel();
                    ch.pipeline().fireChannelRead(...);
                } else if (key.isWritable()) {
                    // Handle write event
                    Channel ch = (Channel) key.channel();
                    ch.pipeline().fireChannelWrite(...);
                }
            }
​
            // Execute tasks in the task queue
            Runnable task;
            while ((task = taskQueue.poll()) != null) {
                task.run();
            }
​
            // Execute scheduled tasks
            ScheduledFuture<?> scheduledTask;
            while ((scheduledTask = scheduledTaskQueue.poll()) != null) {
                scheduledTask.run();
            }
        }
    }
}

注意事项

  • EventLoop 在处理 I/O 事件时,通常会调用 ChannelPipeline 来处理具体的事件,而 Pipeline 中包含了多个 Handler

  • Pipeline 提供了一个灵活的方式来处理事件,可以添加多个 Handler 来处理读写事件,比如编码器、解码器、业务逻辑处理器等。

  • EventLoop 在处理 I/O 事件时需要确保线程安全,特别是当从外部唤醒或提交任务时。

通过以上步骤,EventLoop 可以有效地处理 I/O 事件,并执行相关的任务,从而实现高效的网络通信处理。

在 Netty 中,EventLoop 是负责处理 I/O 事件和执行任务的核心组件。在前面的文章中,我们已经了解了 EventLoop 的基本结构和运行机制。本文将对 EventLoop 处理 I/O 事件的详细过程进行解析。

EventLoop 处理 I/O 事件的过程

1. 选择器(Selector)准备

在 EventLoop 的 run() 方法中,首先会调用 selector.selectNow()selector.select(timeout) 方法来获取已就绪的事件。这些事件包括连接建立、数据可读、连接关闭等。

int selectedKeysCount = selector.selectNow();
if (selectedKeysCount == 0) {
    continue;
}

2. 处理已就绪的事件

获取到已就绪的事件后,EventLoop 会遍历这些事件,并根据事件类型进行相应的处理。

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if (key.isAcceptable()) {
        // 处理连接建立事件
    } else if (key.isReadable()) {
        // 处理数据可读事件
    } else if (key.isWritable()) {
        // 处理数据可写事件
    } else if (key.isConnectable()) {
        // 处理连接完成事件
    } else if (key.isClosed()) {
        // 处理连接关闭事件
    }
    keyIterator.remove();
}

3. 处理 I/O 事件

针对不同的事件类型,EventLoop 会调用相应的处理器进行处理。以下是一些常见的事件处理逻辑:

  • 连接建立事件(isAcceptable):当有新的连接请求时,会调用 ServerSocketChannel.accept() 方法接受连接,并将新连接的 SocketChannel 注册到 Selector 上。

if (key.isAcceptable()) {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
    SocketChannel socketChannel = serverSocket.classicAccept();
    if (socketChannel != null) {
        configureAndRegister(socketChannel);
    }
}
  • 数据可读事件(isReadable):当通道中有数据可读时,会调用 SocketChannel.read() 方法读取数据,并将数据传递给 ChannelPipeline 进行处理。

if (key.isReadable()) {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    int bytesRead = socketChannel.read(buffer);
    if (bytesRead > 0) {
        buffer.flip();
        channel.pipeline().fireChannelRead(buffer);
    } else if (bytesRead == -1) {
        closeChannel(socketChannel);
    }
}
  • 数据可写事件(isWritable):当通道可写时,会调用 SocketChannel.write() 方法将数据写入通道。

if (key.isWritable()) {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    List<ByteBuf> pendingData = socketChannel.unsafe().outboundBuffer();
    for (ByteBuf buf : pendingData) {
        int bytesWritten = socketChannel.write(buf);
        if (bytesWritten < 0 || buf.isReadOnly()) {
            closeChannel(socketChannel);
            break;
        }
    }
}
  • 连接完成事件(isConnectable):当连接建立完成时,会调用 SocketChannel.finishConnect() 方法完成连接。

if (key.isConnectable()) {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    if (socketChannel.finishConnect()) {
        channel.pipeline().fireChannelActive();
    } else {
        closeChannel(socketChannel);
    }
}
  • 连接关闭事件(isClosed):当连接关闭时,会调用 SocketChannel.close() 方法关闭通道。

if (key.isClosed()) {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    closeChannel(socketChannel);
}

总结

EventLoop 处理 I/O 事件的过程包括选择器准备、处理已就绪的事件和处理 I/O 事件等步骤。针对不同的事件类型,EventLoop 会调用相应的处理器进行处理。通过深入了解 EventLoop 处理 I/O 事件的过程,可以更好地理解 Netty 的内部工作原理,并在实际开发中更好地利用 Netty 的特性。

本文主要介绍了 Netty 的 NioEventLoop 类中处理 IO 事件相关代码的实现,包括如何创建 Selector 对象、优化 SelectionKey 集合、重建 Selector 对象以及处理就绪的 IO 事件等。

Key Takeaways

  • NioEventLoop 类使用 SelectorTuple 来封装 Selector 对象,并通过 SelectedSelectionKeySet 和 SelectedSelectionKeySetSelector 优化 SelectionKey 集合。

  • NioEventLoop 类提供了 openSelector 方法,用于创建 Selector 对象,并通过反射机制优化 Selector 对象的 selectedKeys 集合。

  • 当 NIO Selector 发生 epoll bug 时,NioEventLoop 类会调用 rebuildSelector 方法重建 Selector 对象。

  • NioEventLoop 类使用 processSelectedKeysOptimized 方法处理 Channel 新增就绪的 IO 事件,并根据 attachment 类型选择不同的处理方式。

  • NioEventLoop 类提供了 register 方法,用于注册任意 SelectableChannel 到 Selector 上,并通过 NioTask 接口处理事件。

  • NioEventLoop 类使用 processSelectedKey 方法处理 Channel 就绪的 IO 事件,包括完成连接、写入数据和读取数据等。

  • NioTask 接口定义了 channelReady 和 channelUnregistered 方法,分别用于处理 Channel IO 就绪的事件和 Channel 取消注册的事件。

因为在 《Netty 源码解析 —— EventLoop(四)之 EventLoop 运行》 中,#openSelector()#rebuildSelector() 方法并未做分享,所以我们先来一起看看。

2. SelectorTuple

SelectorTuple ,Selector 元组。代码如下:

SelectorTuple 内嵌在 NioEventLoop

private static final class SelectorTuple {

    /**
     * 未包装的 Selector 对象
     */
    final Selector unwrappedSelector;
    /**
     * 未包装的 Selector 对象
     */
    final Selector selector;

    SelectorTuple(Selector unwrappedSelector) {
        this.unwrappedSelector = unwrappedSelector;
        this.selector = unwrappedSelector;
    }

    SelectorTuple(Selector unwrappedSelector, Selector selector) {
        this.unwrappedSelector = unwrappedSelector;
        this.selector = selector;
    }

}

SelectorTuple 类是 Netty 中 NioEventLoop 的一部分,用于处理基于 NIO(New I/O)的网络操作。这个类的设计目的是为了处理 Java NIO 中的一个已知问题,即 Selector 的 WakenUp 机制可能会导致的 bug。

让我们分析一下 SelectorTuple 类:

  1. 成员变量:

    • unwrappedSelector: 这个字段代表了原始的、未被包装的 Selector 对象。通常情况下,这个 Selector 是直接由 Selector.open() 创建得到的。

    • selector: 这个字段同样是一个 Selector 对象,但它可能与 unwrappedSelector 相同,也可能不同。当 Selector 被包装以解决某些问题时,这个字段就变得有意义了。

  2. 构造函数:

    • 第一个构造函数只有一个参数 unwrappedSelector,它创建了一个 SelectorTuple 实例,其中 selectorunwrappedSelector 指向同一个对象。

    • 第二个构造函数接受两个参数,允许创建一个 SelectorTuple 实例,在这种情况下,selectorunwrappedSelector 可以指向不同的 Selector 对象。

使用场景

在 Netty 中,SelectorTuple 主要用于解决 Java NIO Selector 的一些问题,比如:

  • Wakeup Bug: 当一个 Selector 被唤醒(wakeup 方法被调用),然后立即进行选择操作(selectselectNow 方法被调用),可能会导致某些事件丢失或不被正确处理的问题。

  • 多线程访问问题: 在多线程环境中,如果多个线程同时访问同一个 Selector,可能会出现竞态条件。

为了解决这些问题,Netty 可能会创建一个包装过的 Selector,这个包装器能够确保即使在上述情况下也能正确处理事件。SelectorTupleselector 字段就可能指向这样一个包装过的 Selector

示例

NioEventLoop 中,SelectorTuple 通常是在 initialize 方法中创建的。如果需要的话,Netty 会使用一个 WrappedSelectionKeyWrappedSelector 来包装原始的 Selector。这些包装类可以帮助解决上述提到的问题。

总结

SelectorTuple 在 Netty 中的作用是为了确保 Selector 的正确性和健壮性,特别是在并发和复杂网络环境中。通过提供一个可选的包装 Selector,Netty 能够更好地控制和管理底层的 I/O 操作。

SelectorTuple 是 Netty 中的一个内部类,用于封装 Selector 对象。在 Netty 的 NIO 事件循环(NioEventLoop)中,SelectorTuple 被用来管理和操作底层的 I/O 多路复用器(即 Selector)。Selector 是 Java NIO 的核心组件之一,它允许单个线程处理多个网络连接。

SelectorTuple 类有两个构造函数:

  1. 一个接受单个 Selector 对象作为参数。在这种情况下,unwrappedSelectorselector 都指向同一个 Selector 实例。

  2. 另一个构造函数接受两个 Selector 对象作为参数。这通常用于包装或替换底层的 Selector,例如在使用自定义的 SelectorProvider 或者在进行某些性能优化时。

SelectorTuple 类的两个主要字段是:

  • unwrappedSelector:这是原始的、未经过任何包装的 Selector 对象。它通常用于执行底层的 I/O 多路复用操作。

  • selector:这是可能被包装过的 Selector 对象。在某些情况下,Netty 可能会对 Selector 进行包装,以添加额外的功能或行为。

在 Netty 的 NioEventLoop 中,SelectorTuple 被用来管理 I/O 事件的分发和处理。NioEventLoop 会使用 Selector 来监视多个通道(Channel)上的 I/O 事件(如可读、可写等),并在事件发生时触发相应的处理器。

总之,SelectorTuple 是 Netty 中用于封装和管理 Selector 对象的一个简单类。它允许 Netty 在其内部实现中对 Selector 进行灵活的操作和扩展。

3. openSelector

#openSelector() 方法,创建 Selector 对象。代码如下:

 1: private SelectorTuple openSelector() {
 2:     // 创建 Selector 对象,作为 unwrappedSelector
 3:     final Selector unwrappedSelector;
 4:     try {
 5:         unwrappedSelector = provider.openSelector();
 6:     } catch (IOException e) {
 7:         throw new ChannelException("failed to open a new selector", e);
 8:     }
 9: 
10:     // 禁用 SelectionKey 的优化,则直接返回 SelectorTuple 对象。即,selector 也使用 unwrappedSelector 。
11:     if (DISABLE_KEYSET_OPTIMIZATION) {
12:         return new SelectorTuple(unwrappedSelector);
13:     }
14: 
15:     // 获得 SelectorImpl 类
16:     Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
17:         @Override
18:         public Object run() {
19:             try {
20:                 return Class.forName(
21:                         "sun.nio.ch.SelectorImpl",
22:                         false,
23:                         PlatformDependent.getSystemClassLoader()); // 成功,则返回该类
24:             } catch (Throwable cause) {
25:                 return cause; // 失败,则返回该异常
26:             }
27:         }
28:     });
29: 
30:     // 获得 SelectorImpl 类失败,则直接返回 SelectorTuple 对象。即,selector 也使用 unwrappedSelector 。
31:     if (!(maybeSelectorImplClass instanceof Class) ||
32:             // ensure the current selector implementation is what we can instrument.
33:             !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
34:         if (maybeSelectorImplClass instanceof Throwable) {
35:             Throwable t = (Throwable) maybeSelectorImplClass;
36:             logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
37:         }
38:         return new SelectorTuple(unwrappedSelector);
39:     }
40: 
41:     final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
42: 
43:     // 创建 SelectedSelectionKeySet 对象
44:     final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
45: 
46:     // 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 中
47:     Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
48:         @Override
49:         public Object run() {
50:             try {
51:                 // 获得 "selectedKeys" "publicSelectedKeys" 的 Field
52:                 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
53:                 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
54: 
55:                 // 设置 Field 可访问
56:                 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
57:                 if (cause != null) {
58:                     return cause;
59:                 }
60:                 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
61:                 if (cause != null) {
62:                     return cause;
63:                 }
64: 
65:                 // 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 的 Field 中
66:                 selectedKeysField.set(unwrappedSelector, selectedKeySet);
67:                 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
68:                 return null;
69:             } catch (NoSuchFieldException e) {
70:                 return e; // 失败,则返回该异常
71:             } catch (IllegalAccessException e) {
72:                 return e; // 失败,则返回该异常
73:             }
74:         }
75:     });
76: 
77:     // 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 中失败,则直接返回 SelectorTuple 对象。即,selector 也使用 unwrappedSelector 。
78:     if (maybeException instanceof Exception) {
79:         selectedKeys = null;
80:         Exception e = (Exception) maybeException;
81:         logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
82:         return new SelectorTuple(unwrappedSelector);
83:     }
84: 
85:     // 设置 SelectedSelectionKeySet 对象到 selectedKeys 中
86:     selectedKeys = selectedKeySet;
87:     logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
88: 
89:     // 创建 SelectedSelectionKeySetSelector 对象
90:     // 创建 SelectorTuple 对象。即,selector 也使用 SelectedSelectionKeySetSelector 对象。
91:     return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
92: }
  • 第 2 至 8 行:创建 Selector 对象,作为 unwrappedSelector

  • 第 10 至 13 行:禁用 SelectionKey 的优化,则直接返回 SelectorTuple 对象。即,selector 也使用 unwrappedSelector

  • 第 15 至 28 行:获得 SelectorImpl 类。可以自动过滤掉 AccessController#.doPrivileged(...) 外层代码。在方法内部,调用 Class#forName(String name, boolean initialize, ClassLoader loader) 方法,加载 sun.nio.ch.SelectorImpl 类。加载成功,则返回该类,否则返回异常。

    • 第 30 至 39 行: 获得 SelectorImpl 类失败,则直接返回 SelectorTuple 对象。即,selector 也使用 unwrappedSelector

  • 第 44 行:创建 SelectedSelectionKeySet 对象。这是 Netty 对 Selector 的 selectionKeys 的优化。关于 SelectedSelectionKeySet 的详细实现,见 「4. SelectedSelectionKeySet」 。

    • 第 46 至 75 行: 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 中的 selectedKeyspublicSelectedKeys 属性。整个过程,笔者已经添加中文注释,胖友自己看下。

    • selectedKeyspublicSelectedKeys 属性在 SelectorImpl 类中,代码如下:

protected HashSet<SelectionKey> keys = new HashSet(); // => publicKeys
private Set<SelectionKey> publicKeys;

protected Set<SelectionKey> selectedKeys = new HashSet(); // => publicSelectedKeys
private Set<SelectionKey> publicSelectedKeys;

protected SelectorImpl(SelectorProvider var1) {
    super(var1);
    if (Util.atBugLevel("1.4")) { // 可以无视
        this.publicKeys = this.keys;
        this.publicSelectedKeys = this.selectedKeys;
    } else {
        this.publicKeys = Collections.unmodifiableSet(this.keys);
        this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
    }

}
  • 可以看到,selectedKeyspublicSelectedKeys 的类型都是 HashSet 。

  • 第 77 至 83 行:设置 SelectedSelectionKeySet 对象到 unwrappedSelector 中失败,则直接返回 SelectorTuple 对象。即,selector 也使用 unwrappedSelector

  • 第 86 行:设置 SelectedSelectionKeySet 对象到 selectedKeys 中。在下文,我们会看到,是否成功优化 Selector 对象,是通过 selectedKeys 是否成功初始化来判断。

  • 第 91 行:创建 SelectedSelectionKeySetSelector 对象。这是 Netty 对 Selector 的优化实现类。关于 SelectedSelectionKeySetSelector 的详细实现,见 「5. SelectedSelectionKeySetSelector」 。

  • 第 91 行:创建 SelectorTuple 对象。即,selector 使用 SelectedSelectionKeySetSelector 对象。😈 总算,创建成功优化的 selector 对象了。

这段代码展示了 NioEventLoop 类中的 openSelector 方法,其目的是创建一个新的 Selector 并对其进行可能的优化。根据上下文,我们可以看到这个方法主要做了以下几件事:

  1. 创建原始的 Selector:

    • 在第 3 行至第 8 行之间,通过 provider.openSelector() 创建了一个新的 Selector 对象,并将其存储在 unwrappedSelector 变量中。如果创建过程中抛出了 IOException,则会抛出一个 ChannelException

  2. 禁用优化检查:

    • 如果 DISABLE_KEYSET_OPTIMIZATIONtrue,那么将不会进行任何额外的优化,并且直接返回一个 SelectorTuple 对象,其中 unwrappedSelectorselector 字段都指向同一个 Selector 对象(第 10 至 13 行)。

  3. 尝试优化 Selector:

    • 如果 DISABLE_KEYSET_OPTIMIZATIONfalse,则尝试对 Selector 进行优化。这包括尝试替换 Selector 内部使用的 Set 以提高性能(第 15 至 92 行)。

  4. 获取 SelectorImpl 类:

    • 使用 AccessController.doPrivileged 安全地获取 SelectorImpl 类(第 16 至 28 行)。这是为了确保代码可以在没有足够的权限的情况下安全运行。

  5. 检查 SelectorImpl 类:

    • 如果未能成功获取 SelectorImpl 类或者 unwrappedSelector 不是 SelectorImpl 的实例,则不进行任何优化并直接返回 SelectorTuple 对象(第 30 至 39 行)。

  6. 优化 Selector:

    • 如果成功获取到了 SelectorImpl 类并且 unwrappedSelector 是它的实例,则创建一个新的 SelectedSelectionKeySet 对象来替换 Selector 内部的 selectedKeyspublicSelectedKeys 字段(第 44 至 54 行)。

  7. 设置 SelectedSelectionKeySet:

    • 使用反射设置 SelectedSelectionKeySetunwrappedSelector 的内部字段(第 55 至 75 行)。如果设置过程中发生异常,则放弃优化并直接返回 SelectorTuple 对象(第 76 至 83 行)。

  8. 创建 SelectedSelectionKeySetSelector:

    • 如果优化成功,创建一个 SelectedSelectionKeySetSelector 对象,并将其作为 SelectorTupleselector 字段返回(第 89 至 92 行)。

总结

openSelector 方法的主要目标是创建一个新的 Selector 并尝试对其进行优化。这种优化主要是通过替换 Selector 内部使用的 Set 来实现的,从而提高性能。如果优化失败,方法将直接返回一个简单的 SelectorTuple 对象,其中包含原始的 Selector

4. SelectedSelectionKeySet

io.netty.channel.nio.SelectedSelectionKeySet ,继承 AbstractSet 抽象类,已 select 的 NIO SelectionKey 集合。代码如下:

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    /**
     * SelectionKey 数组
     */
    SelectionKey[] keys;
    /**
     * 数组可读大小
     */
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024]; // 默认 1024 大小
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        // 添加到数组
        keys[size++] = o;

        // 超过数组大小上限,进行扩容
        if (size == keys.length) {
            increaseCapacity();
        }

        return true;
    }

    @Override
    public int size() {
        return size;
    }

    @Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public Iterator<SelectionKey> iterator() {
        throw new UnsupportedOperationException();
    }

    void reset() {
        reset(0);
    }

    void reset(int start) {
        // 重置数组内容为空
        Arrays.fill(keys, start, size, null);
        // 重置可读大小为 0
        size = 0;
    }

    private void increaseCapacity() {
        // 两倍扩容
        SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
        // 复制老数组到新数组
        System.arraycopy(keys, 0, newKeys, 0, size);
        // 赋值给老数组
        keys = newKeys;
    }

}
  • 通过 keyssize 两个属性,实现可重用的数组。

  • #add(SelectionKey o) 方法,添加新 select 到就绪事件的 SelectionKey 到 keys 中。当超过数组大小上限时,调用 #increaseCapacity() 方法,进行两倍扩容。相比 SelectorImpl 中使用的 selectedKeys 所使用的 HashSet 的 #add(E e) 方法,事件复杂度从 O(lgn) 降低O(1)

  • #reset(...) 方法,每次读取使用完数据,调用该方法,进行重置。

  • 因为 #remove(Object o)#contains(Object o)#iterator() 不会使用到,索性不进行实现。

SelectedSelectionKeySet 类是 Netty 为了优化 Selector 的性能而设计的一个类。这个类继承自 AbstractSet<SelectionKey>,用于存储已就绪的 SelectionKey 对象。下面是对其关键特性的分析:

关键特性

  1. 成员变量:

    • keys: 一个 SelectionKey 类型的数组,用于存储已就绪的 SelectionKey 对象。

    • size: 一个整数,表示当前 keys 数组中有多少个有效的 SelectionKey

  2. 构造函数:

    • 构造函数初始化 keys 数组,默认大小为 1024。

  3. 方法:

    • add(SelectionKey o):

      • 此方法用于添加一个新的 SelectionKey 到集合中。

      • 如果 SelectionKeynull,则返回 false

      • 否则,将 SelectionKey 添加到 keys 数组的末尾,并增加 size

      • 如果 size 达到 keys 数组的长度,则调用 increaseCapacity() 方法进行数组扩容。

    • size():

      • 返回当前 SelectedSelectionKeySet 中有效 SelectionKey 的数量。

    • remove(Object o):

      • 由于不需要移除功能,此方法总是返回 false

    • contains(Object o):

      • 由于不需要查询功能,此方法总是返回 false

    • iterator():

      • 抛出 UnsupportedOperationException,表明此集合不允许迭代访问。

    • reset()reset(int start):

      • reset() 方法用于清空整个 keys 数组,并将 size 重置为 0。

      • reset(int start) 方法用于从指定位置开始清空 keys 数组,并将 size 重置为指定的位置。

    • increaseCapacity():

      • 此方法用于当 keys 数组满时进行扩容。

      • 新数组的大小是原数组大小的两倍。

      • 原数组的内容会被复制到新数组中。

      • keys 变量会被更新为新数组。

性能优化

SelectedSelectionKeySet 的设计旨在提高性能,尤其是在高并发环境下。与传统的 HashSet 相比,它有以下优势:

  • 减少内存消耗:

    • 由于 SelectedSelectionKeySet 使用固定大小的数组来存储 SelectionKey,因此相比于 HashSet,它在内存使用上更加高效。

  • 减少 CPU 开销:

    • SelectedSelectionKeySetadd 方法的时间复杂度为 O(1),这意味着添加 SelectionKey 的操作非常快。

    • HashSetadd 方法的时间复杂度通常为 O(log n) 或更差,具体取决于哈希冲突的情况。

  • 简化操作:

    • 由于不需要支持删除或查找操作,因此可以省略相应的逻辑,进一步提高性能。

使用场景

SelectedSelectionKeySet 主要用于 Netty 的 Selector 优化中,尤其是当 Selector 的内部 selectedKeys 字段被替换为此类时。这样,当执行 Selector.select()Selector.selectNow() 时,已就绪的 SelectionKey 将被存储在这个优化后的集合中,从而提高整体的 I/O 处理效率。

综上所述,SelectedSelectionKeySet 是 Netty 中为了提高 Selector 性能而设计的一个重要组件,通过使用数组和简单的数组操作来替代 HashSet,实现了更好的性能表现。

SelectedSelectionKeySet 是 Netty 中的一个内部类,用于存储已选择的 SelectionKey 集合。这个类继承自 AbstractSet<SelectionKey>,并实现了部分 Set 接口的方法。SelectedSelectionKeySet 的主要目的是优化 SelectorselectedKeyspublicSelected 属性,以提高性能和减少内存开销。

以下是 SelectedSelectionKeySet 类的主要特点和方法:

主要属性:

  • keys:一个 SelectionKey 数组,用于存储已选择的 SelectionKey

  • size:数组中实际存储的 SelectionKey 的数量。

主要方法:

  • add(SelectionKey o):向 keys 数组中添加一个新的 SelectionKey。如果数组已满,则调用 increaseCapacity() 方法进行扩容。

  • size():返回数组中实际存储的 SelectionKey 的数量。

  • remove(Object o)contains(Object o):这两个方法在 SelectedSelectionKeySet 中没有实现,因为它们不会被使用。

  • iterator():这个方法抛出 UnsupportedOperationException,因为 SelectedSelectionKeySet 不支持迭代。

  • reset()reset(int start):这两个方法用于重置 keys 数组的内容和 size 属性。在每次读取完数据后,调用这些方法进行重置。

  • increaseCapacity():当数组已满时,这个方法用于将数组容量扩大一倍。

性能优化:

  • 数组实现:与 SelectorImpl 中使用的 HashSet 相比,SelectedSelectionKeySet 使用数组来存储 SelectionKey。这使得添加操作的复杂度从 O(log n) 降低到 O(1)。

  • 可重用性:通过 reset() 方法,SelectedSelectionKeySet 可以在每次读取完数据后重置,从而实现可重用性,减少内存分配和垃圾回收的开销。

总之,SelectedSelectionKeySet 是 Netty 对 SelectorselectedKeyspublicSelectedKeys 属性的优化实现。通过使用数组和重置机制,它提高了性能并减少了内存开销。

5. SelectedSelectionKeySetSelector

io.netty.channel.nio.SelectedSelectionKeySetSelector ,基于 Netty SelectedSelectionKeySet 作为 selectionKeys 的 Selector 实现类。代码如下:

final class SelectedSelectionKeySetSelector extends Selector {

    /**
     * SelectedSelectionKeySet 对象
     */
    private final SelectedSelectionKeySet selectionKeys;
    /**
     * 原始 Java NIO Selector 对象
     */
    private final Selector delegate;

    SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
        this.delegate = delegate;
        this.selectionKeys = selectionKeys;
    }

    @Override
    public boolean isOpen() {
        return delegate.isOpen();
    }

    @Override
    public SelectorProvider provider() {
        return delegate.provider();
    }

    @Override
    public Set<SelectionKey> keys() {
        return delegate.keys();
    }

    @Override
    public Set<SelectionKey> selectedKeys() {
        return delegate.selectedKeys();
    }

    @Override
    public int selectNow() throws IOException {
        // 重置 selectionKeys
        selectionKeys.reset();
        // selectNow
        return delegate.selectNow();
    }

    @Override
    public int select(long timeout) throws IOException {
        // 重置 selectionKeys
        selectionKeys.reset();
        // select
        return delegate.select(timeout);
    }

    @Override
    public int select() throws IOException {
        // 重置 selectionKeys
        selectionKeys.reset();
        // select
        return delegate.select();
    }

    @Override
    public Selector wakeup() {
        return delegate.wakeup();
    }

    @Override
    public void close() throws IOException {
        delegate.close();
    }

}
  • 除了 select 相关的 3 个方法,每个实现方法,都是基于 Java NIO Selector 对应的方法的调用。

  • select 相关的 3 个方法,在调用对应的 Java NIO Selector 方法之前,会调用 SelectedSelectionKeySet#reset() 方法,重置 selectionKeys 。从而实现,每次 select 之后,都是新的已 select 的 NIO SelectionKey 集合。

SelectedSelectionKeySetSelector 是 Netty 中的一个内部类,它基于 Java NIO 的 Selector 接口实现,但使用了 Netty 自定义的 SelectedSelectionKeySet 作为其 selectionKeys 集合。这个类的主要目的是为了优化 Selector 的性能,特别是在处理大量 I/O 事件时。

以下是 SelectedSelectionKeySetSelector 类的主要特点和方法:

主要属性:

  • selectionKeys:一个 SelectedSelectionKeySet 对象,用于存储已选择的 SelectionKey

  • delegate:原始的 Java NIO Selector 对象,SelectedSelectionKeySetSelector 的大部分方法都会委托给这个对象。

主要方法:

  • isOpen()provider()keys()selectedKeys()wakeup()close():这些方法直接调用 delegate 对应的方法,不做任何修改。

  • selectNow()select(long timeout)select():这三个方法在调用 delegate 对应的方法之前,会先调用 selectionKeys.reset() 方法重置 selectionKeys。这样做的目的是确保每次 select 操作后,selectionKeys 都包含最新的已选择 SelectionKey 集合。

性能优化:

  • 重置 selectionKeys:通过在每次 select 操作前重置 selectionKeysSelectedSelectionKeySetSelector 确保了 selectionKeys 始终包含最新的已选择 SelectionKey。这避免了在 SelectorImpl 中使用 HashSet 时可能出现的并发修改问题,同时也减少了内存分配和垃圾回收的开销。

  • 基于数组的 SelectedSelectionKeySet:如前所述,SelectedSelectionKeySet 使用数组来存储 SelectionKey,这比 HashSet 更高效,因为它避免了 HashSet 的 O(log n) 添加复杂度,提供了 O(1) 的添加复杂度。

总的来说,SelectedSelectionKeySetSelector是 Netty 对 Java NIOSelector的一个优化实现,它通过使用自定义的SelectedSelectionKeySet` 和重置机制,提高了处理大量 I/O 事件时的性能。

重置机制可以提高性能的原因主要体现在以下几个方面:

  1. 避免并发修改问题

    • 在多线程环境下,如果多个线程同时修改 selectionKeys 集合,可能会导致并发修改问题。通过重置机制,在每次 select 操作前清空 selectionKeys,可以避免这种问题,确保 selectionKeys 始终包含最新的已选择 SelectionKey

  2. 减少内存分配和垃圾回收开销

    • 如果不使用重置机制,每次 select 操作后,旧的 SelectionKey 集合不会被立即回收,可能会导致内存占用不断增加。通过重置机制,旧的 SelectionKey 集合会被立即清空,释放内存,从而减少垃圾回收的开销。

  3. 提高数据访问效率

    • 使用数组实现的 SelectedSelectionKeySetHashSet 更高效,因为它避免了 HashSet 的 O(log n) 添加复杂度,提供了 O(1) 的添加复杂度。通过重置机制,可以确保 SelectedSelectionKeySet 始终处于最佳状态,提高数据访问效率。

  4. 简化代码逻辑

    • 通过重置机制,可以简化代码逻辑,避免在每次 select 操作后手动清理 selectionKeys。这使得代码更简洁,更容易维护。

  5. 提高可扩展性

    • 使用重置机制,可以更容易地扩展和优化 SelectedSelectionKeySetSelectedSelectionKeySetSelector。例如,可以在 reset 方法中添加更多的逻辑,以实现更高级的功能,如统计已选择的 SelectionKey 数量等。

总之,重置机制通过避免并发修改问题、减少内存分配和垃圾回收开销、提高数据访问效率、简化代码逻辑和提高可扩展性等方式,提高了处理大量 I/O 事件时的性能。

SelectedSelectionKeySetSelector 类是 Netty 中的一个内部类,它扩展了标准的 Selector 类,并使用 SelectedSelectionKeySet 来优化选择操作。以下是对此类的关键特性的分析:

关键特性

  1. 成员变量:

    • selectionKeys: 一个 SelectedSelectionKeySet 对象,用于存储已就绪的 SelectionKey

    • delegate: 一个原始的 Selector 对象,用于委托实际的选择操作。

  2. 构造函数:

    • 构造函数接受一个 Selector 对象和一个 SelectedSelectionKeySet 对象,并将它们分别赋值给 delegateselectionKeys 成员变量。

  3. 方法:

    • isOpen():

      • 返回 delegateisOpen() 方法的结果。

    • provider():

      • 返回 delegateprovider() 方法的结果。

    • keys():

      • 返回 delegatekeys() 方法的结果。

    • selectedKeys():

      • 返回 delegateselectedKeys() 方法的结果。

    • selectNow():

      • 在调用 delegateselectNow() 方法之前,先调用 selectionKeys.reset() 方法来重置 SelectedSelectionKeySet

      • 返回 delegateselectNow() 方法的结果。

    • select(long timeout):

      • 在调用 delegateselect(timeout) 方法之前,先调用 selectionKeys.reset() 方法来重置 SelectedSelectionKeySet

      • 返回 delegateselect(timeout) 方法的结果。

    • select():

      • 在调用 delegateselect() 方法之前,先调用 selectionKeys.reset() 方法来重置 SelectedSelectionKeySet

      • 返回 delegateselect() 方法的结果。

    • wakeup():

      • 返回 delegatewakeup() 方法的结果。

    • close():

      • 调用 delegateclose() 方法。

性能优化

SelectedSelectionKeySetSelector 的设计旨在提高性能,尤其是在高并发环境下。与标准的 Selector 相比,它有以下优势:

  • 减少内存消耗:

    • 由于 SelectedSelectionKeySet 使用固定大小的数组来存储 SelectionKey,因此相比于标准的 Selector 使用的 HashSet,它在内存使用上更加高效。

  • 减少 CPU 开销:

    • SelectedSelectionKeySetadd 方法的时间复杂度为 O(1),这意味着添加 SelectionKey 的操作非常快。

    • 标准的 SelectorselectedKeys() 方法通常需要遍历一个 HashSet,其时间复杂度为 O(n)。

  • 简化操作:

    • 由于不需要支持删除或查找操作,因此可以省略相应的逻辑,进一步提高性能。

使用场景

SelectedSelectionKeySetSelector 主要用于 Netty 的 Selector 优化中。当创建 Selector 时,如果条件允许,Netty 会创建一个 SelectedSelectionKeySetSelector 来代替标准的 Selector。这样,当执行 selectselectNow 时,已就绪的 SelectionKey 将被存储在这个优化后的集合中,从而提高整体的 I/O 处理效率。

综上所述,SelectedSelectionKeySetSelector 是 Netty 中为了提高 Selector 性能而设计的一个重要组件,通过使用优化过的 SelectedSelectionKeySet 来替代标准的 HashSet,实现了更好的性能表现。

您总结得很准确!SelectedSelectionKeySetSelector 确实是 Netty 对 Java NIO Selector 的一种优化实现。它通过使用自定义的 SelectedSelectionKeySet 和重置机制,显著提高了处理大量 I/O 事件时的性能。下面是对这一优化的一些详细解释:

优化点

  1. 使用 SelectedSelectionKeySet:

    • SelectedSelectionKeySet 是一个针对 SelectionKey 的优化容器,它使用一个数组来存储已就绪的 SelectionKey,而不是使用 HashSet

    • HashSet 相比,这种方法减少了内存消耗,并且提供了更快的添加操作(O(1) 时间复杂度)。

  2. 重置机制:

    • 每次执行 selectselectNow 或带有超时的 select 方法之前,都会调用 SelectedSelectionKeySet#reset() 方法来清空已就绪的 SelectionKey 集合。

    • 这样可以确保每次选择操作后都有一个全新的就绪事件列表,从而避免了多次遍历旧的事件列表,进一步提高了性能。

性能提升

  • 减少内存消耗:

    • SelectedSelectionKeySet 使用固定大小的数组来存储 SelectionKey,相比于 HashSet,它在内存使用上更加高效。

  • 减少 CPU 开销:

    • SelectedSelectionKeySetadd 方法的时间复杂度为 O(1),这意味着添加 SelectionKey 的操作非常快。

    • 标准的 SelectorselectedKeys() 方法通常需要遍历一个 HashSet,其时间复杂度为 O(n)。

  • 简化操作:

    • 由于不需要支持删除或查找操作,因此可以省略相应的逻辑,进一步提高性能。

使用场景

  • 高并发环境:

    • 在处理大量连接和 I/O 事件的场景下,如 Web 服务器或游戏服务器,这种优化特别有用。

    • 减少内存消耗和提高性能对于维持系统的稳定性和响应能力至关重要。

总结

SelectedSelectionKeySetSelector 通过使用自定义的 SelectedSelectionKeySet 和重置机制,为 Netty 提供了一种更高效的 Selector 实现方式。这种方式在处理大量 I/O 事件时能够显著提高性能,尤其是在高并发的网络应用中。通过减少内存消耗和提高 CPU 效率,Netty 能够更好地应对现代网络应用的需求。

重置机制在 SelectedSelectionKeySetSelector 中的作用是确保每次选择操作 (select, selectNow, select(long timeout)) 都能从一个干净的状态开始。这是因为每次选择操作完成后,SelectorselectedKeys() 集合会保留上次选择操作的结果,直到这些结果被显式地清理或处理。这种机制可能会导致性能问题,特别是当有大量的 SelectionKey 时。

问题背景

在 Java NIO 中,SelectorselectedKeys() 方法返回的是一个 Set,这个 Set 包含了所有已准备好 I/O 操作的 SelectionKey。但是,这个 Set 在选择操作完成之后并不会自动清空。这意味着如果应用程序没有显式地处理这些 SelectionKey,例如通过调用 Set.clear() 方法,那么这些 SelectionKey 会一直保留在 Set 中,这可能导致以下问题:

  1. 内存消耗增加:

    • 随着时间的推移,越来越多的 SelectionKey 会被累积在 selectedKeys() 集合中,导致内存占用增加。

  2. CPU 开销增加:

    • 每次执行选择操作时,Selector 需要遍历 selectedKeys() 集合来确定哪些 SelectionKey 已经准备好。随着集合的增长,这个过程会变得更加耗时。

  3. 不必要的工作:

    • 应用程序可能会处理不再相关的 SelectionKey,因为它们可能已经处理过但没有被清除。

重置机制的优势

SelectedSelectionKeySetSelector 通过在每次选择操作之前调用 SelectedSelectionKeySet#reset() 方法来解决这些问题。这种方法有几个关键优势:

  1. 确保干净状态:

    • 每次选择操作前都会清空 SelectedSelectionKeySet,确保只处理最新的就绪事件。

  2. 减少内存消耗:

    • 清空 SelectedSelectionKeySet 避免了累积大量的 SelectionKey,从而减少了内存占用。

  3. 减少 CPU 开销:

    • 清空 SelectedSelectionKeySet 后,每次选择操作只需要处理新就绪的 SelectionKey,而不是之前遗留下来的旧事件,从而减少了 CPU 的开销。

  4. 提高性能:

    • 通过减少不必要的工作和减少遍历的次数,整体性能得到了提高。

总结

重置机制通过确保每次选择操作都能从一个干净的状态开始,有效地解决了累积事件的问题。这不仅减少了内存消耗,还减少了 CPU 的开销,从而提高了处理大量 I/O 事件时的性能。这种机制对于高性能的网络应用程序来说非常重要,因为它有助于保持系统的响应能力和稳定性。

6. rebuildSelector

#rebuildSelector() 方法,重建 Selector 对象。代码如下:

该方法用于 NIO Selector 发生 epoll bug 时,重建 Selector 对象。

😈 突然又找到一个讨论,可以看看 《JDK 1.7 及以下 NIO 的 epoll bug》《应用服务器中对JDK的epoll空转bug的处理》

public void rebuildSelector() {
    // 只允许在 EventLoop 的线程中执行
    if (!inEventLoop()) {
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector0();
            }
        });
        return;
    }
    rebuildSelector0();
}只允许在 EventLoop 的线程中,调用 #rebuildSelector0() 方法,重建 Selector 对象。

6.1 rebuildSelector0

#rebuildSelector0() 方法,重建 Selector 对象。代码如下:

 1: private void rebuildSelector0() {
 2:     final Selector oldSelector = selector;
 3:     if (oldSelector == null) {
 4:         return;
 5:     }
 6: 
 7:     // 创建新的 Selector 对象
 8:     final SelectorTuple newSelectorTuple;
 9:     try {
10:         newSelectorTuple = openSelector();
11:     } catch (Exception e) {
12:         logger.warn("Failed to create a new Selector.", e);
13:         return;
14:     }
15: 
16:     // Register all channels to the new Selector.
17:     // 将注册在 NioEventLoop 上的所有 Channel ,注册到新创建 Selector 对象上
18:     int nChannels = 0; // 计算重新注册成功的 Channel 数量
19:     for (SelectionKey key: oldSelector.keys()) {
20:         Object a = key.attachment();
21:         try {
22:             if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
23:                 continue;
24:             }
25: 
26:             int interestOps = key.interestOps();
27:             // 取消老的 SelectionKey
28:             key.cancel();
29:             // 将 Channel 注册到新的 Selector 对象上
30:             SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
31:             // 修改 Channel 的 selectionKey 指向新的 SelectionKey 上
32:             if (a instanceof AbstractNioChannel) {
33:                 // Update SelectionKey
34:                 ((AbstractNioChannel) a).selectionKey = newKey;
35:             }
36: 
37:             // 计数 ++
38:             nChannels ++;
39:         } catch (Exception e) {
40:             logger.warn("Failed to re-register a Channel to the new Selector.", e);
41:             // 关闭发生异常的 Channel
42:             if (a instanceof AbstractNioChannel) {
43:                 AbstractNioChannel ch = (AbstractNioChannel) a;
44:                 ch.unsafe().close(ch.unsafe().voidPromise());
45:             // 调用 NioTask 的取消注册事件
46:             } else {
47:                 @SuppressWarnings("unchecked")
48:                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
49:                 invokeChannelUnregistered(task, key, e);
50:             }
51:         }
52:     }
53: 
54:     // 修改 selector 和 unwrappedSelector 指向新的 Selector 对象
55:     selector = newSelectorTuple.selector;
56:     unwrappedSelector = newSelectorTuple.unwrappedSelector;
57: 
58:     // 关闭老的 Selector 对象
59:     try {
60:         // time to close the old selector as everything else is registered to the new one
61:         oldSelector.close();
62:     } catch (Throwable t) {
63:         if (logger.isWarnEnabled()) {
64:             logger.warn("Failed to close the old Selector.", t);
65:         }
66:     }
67: 
68:     if (logger.isInfoEnabled()) {
69:         logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
70:     }
71: }
  • 第 7 行:调用 #openSelector() 方法,创建新的 Selector 对象。

  • 第 16 至 52 行:遍历的 Selector 对象的 selectionKeys ,将注册在 NioEventLoop 上的所有 Channel ,注册到创建 Selector 对象上。

    • 第 22 至 24 行:校验 SelectionKey 有效,并且 Java NIO Channel 并未注册在的 Selector 对象上。

    • 第 28 行:调用 SelectionKey#cancel() 方法,取消的 SelectionKey 。

    • 第 30 行:将 Java NIO Channel 注册到的 Selector 对象上,返回的 SelectionKey 对象。

    • 第 31 至 35 行:修改 Channel 的 selectionKey 指向的 SelectionKey 对象

    • 第 39 至 51 行:当发生异常时候,根据不同的 SelectionKey 的 attachment 来判断处理方式:

      • 第 41 至 44 行:当 attachment 是 Netty NIO Channel 时,调用 Unsafe#close(ChannelPromise promise) 方法,关闭发生异常的 Channel 。

      • 第 45 至 50 行:当 attachment 是 Netty NioTask 时,调用 #invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) 方法,通知 Channel 取消注册。详细解析,见 「8. NioTask」

  • 第 54 至 56 行:修改 selectorunwrappedSelector 指向的 Selector 对象。

  • 第 58 至 66 行:调用 Selector#close() 方法,关闭的 Selector 对象。

总的来说,#rebuildSelector() 方法,相比 #openSelector() 方法,主要是需要将老的 Selector 对象的“数据”复制到新的 Selector 对象上,并关闭老的 Selector 对象。

7. processSelectedKeys

#run() 方法中,会调用 #processSelectedKeys() 方法,处理 Channel 新增就绪的 IO 事件。代码如下:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

7.1 processSelectedKeysOptimized

#processSelectedKeysOptimized() 方法,基于 Netty SelectedSelectionKeySetSelector ,处理 Channel 新增就绪的 IO 事件。代码如下:

从方法名,我们也可以看出,这是个经过优化的实现。

 1: private void processSelectedKeysOptimized() {
 2:     // 遍历数组
 3:     for (int i = 0; i < selectedKeys.size; ++i) {
 4:         final SelectionKey k = selectedKeys.keys[i];
 5:         // null out entry in the array to allow to have it GC'ed once the Channel close
 6:         // See https://github.com/netty/netty/issues/2363
 7:         selectedKeys.keys[i] = null;
 8: 
 9:         final Object a = k.attachment();
10: 
11:         // 处理一个 Channel 就绪的 IO 事件
12:         if (a instanceof AbstractNioChannel) {
13:             processSelectedKey(k, (AbstractNioChannel) a);
14:         // 使用 NioTask 处理一个 Channel 就绪的 IO 事件
15:         } else {
16:             @SuppressWarnings("unchecked")
17:             NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
18:             processSelectedKey(k, task);
19:         }
20: 
21:         // TODO 1007 NioEventLoop cancel 方法
22:         if (needsToSelectAgain) {
23:             // null out entries in the array to allow to have it GC'ed once the Channel close
24:             // See https://github.com/netty/netty/issues/2363
25:             selectedKeys.reset(i + 1);
26: 
27:             selectAgain();
28:             i = -1;
29:         }
30:     }
31: }
  • 第 3 行:循环 selectedKeys 数组。

    • 第 4 至 7 行:置空,原因见 https://github.com/netty/netty/issues/2363

    • 第 11 至 13 行:当 attachment 是 Netty NIO Channel 时,调用 #processSelectedKey(SelectionKey k, AbstractNioChannel ch) 方法,处理一个 Channel 就绪的 IO 事件。详细解析,见 「7.3 processSelectedKey」 。

    • 第 14 至 19 行:当 attachment 是 Netty NioTask 时,调用 #processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) 方法,使用 NioTask 处理一个 Channel 的 IO 事件。详细解析,见 「8. NioTask」 。

    • 第 21 至 29 行:TODO 1007 NioEventLoop cancel 方法

7.2 processSelectedKeysPlain

#processSelectedKeysOptimized() 方法,基于 Java NIO 原生 Selecotr ,处理 Channel 新增就绪的 IO 事件。代码如下:

总体和 #processSelectedKeysOptimized() 方法类似

 1: private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
 2:     // check if the set is empty and if so just return to not create garbage by
 3:     // creating a new Iterator every time even if there is nothing to process.
 4:     // See https://github.com/netty/netty/issues/597
 5:     if (selectedKeys.isEmpty()) {
 6:         return;
 7:     }
 8: 
 9:     // 遍历 SelectionKey 迭代器
10:     Iterator<SelectionKey> i = selectedKeys.iterator();
11:     for (;;) {
12:         // 获得 SelectionKey 对象
13:         final SelectionKey k = i.next();
14:         // 从迭代器中移除
15:         i.remove();
16: 
17:         final Object a = k.attachment();
18:         // 处理一个 Channel 就绪的 IO 事件
19:         if (a instanceof AbstractNioChannel) {
20:             processSelectedKey(k, (AbstractNioChannel) a);
21:         // 使用 NioTask 处理一个 Channel 就绪的 IO 事件
22:         } else {
23:             @SuppressWarnings("unchecked")
24:             NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
25:             processSelectedKey(k, task);
26:         }
27: 
28:         // 无下一个节点,结束
29:         if (!i.hasNext()) {
30:             break;
31:         }
32: 
33:         // TODO 1007 NioEventLoop cancel 方法
34:         if (needsToSelectAgain) {
35:             selectAgain();
36:             selectedKeys = selector.selectedKeys();
37: 
38:             // Create the iterator again to avoid ConcurrentModificationException
39:             if (selectedKeys.isEmpty()) {
40:                 break;
41:             } else {
42:                 i = selectedKeys.iterator();
43:             }
44:         }
45:     }
46: }
  • 第 10 至 11 行:遍历 SelectionKey 迭代器

    • 第 12 至 15 行:获得下一个 SelectionKey 对象,并从迭代器中移除。

    • 第 18 至 20 行:当 attachment 是 Netty NIO Channel 时,调用 #processSelectedKey(SelectionKey k, AbstractNioChannel ch) 方法,处理一个 Channel 就绪的 IO 事件。详细解析,见 「7.3 processSelectedKey」 。

    • 第 21 至 26 行:当 attachment 是 Netty NioTask 时,调用 #processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) 方法,使用 NioTask 处理一个 Channel 的 IO 事件。详细解析,见 「8. NioTask」 。

    • 第 33 至 44 行:TODO 1007 NioEventLoop cancel 方法

7.3 processSelectedKey

#processSelectedKey(SelectionKey k, AbstractNioChannel ch) 方法,处理一个 Channel 就绪的 IO 事件。代码如下:

 1: private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
 2:     // 如果 SelectionKey 是不合法的,则关闭 Channel
 3:     final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
 4:     if (!k.isValid()) {
 5:         final EventLoop eventLoop;
 6:         try {
 7:             eventLoop = ch.eventLoop();
 8:         } catch (Throwable ignored) {
 9:             // If the channel implementation throws an exception because there is no event loop, we ignore this
10:             // because we are only trying to determine if ch is registered to this event loop and thus has authority
11:             // to close ch.
12:             return;
13:         }
14:         // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
15:         // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
16:         // still healthy and should not be closed.
17:         // See https://github.com/netty/netty/issues/5125
18:         if (eventLoop != this) {
19:             return;
20:         }
21:         // close the channel if the key is not valid anymore
22:         unsafe.close(unsafe.voidPromise());
23:         return;
24:     }
25: 
26:     try {
27:         // 获得就绪的 IO 事件的 ops
28:         int readyOps = k.readyOps();
29: 
30:         // OP_CONNECT 事件就绪
31:         // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
32:         // the NIO JDK channel implementation may throw a NotYetConnectedException.
33:         if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
34:             // 移除对 OP_CONNECT 感兴趣
35:             // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
36:             // See https://github.com/netty/netty/issues/924
37:             int ops = k.interestOps();
38:             ops &= ~SelectionKey.OP_CONNECT;
39:             k.interestOps(ops);
40:             // 完成连接
41:             unsafe.finishConnect();
42:         }
43: 
44:         // OP_WRITE 事件就绪
45:         // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
46:         if ((readyOps & SelectionKey.OP_WRITE) != 0) {
47:             // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
48:             // 向 Channel 写入数据
49:             ch.unsafe().forceFlush();
50:         }
51: 
52:         // SelectionKey.OP_READ 或 SelectionKey.OP_ACCEPT 就绪
53:         // readyOps == 0 是对 JDK Bug 的处理,防止空的死循环
54:         // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
55:         // to a spin loop
56:         if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
57:             unsafe.read();
58:         }
59:     } catch (CancelledKeyException ignored) {
60:         // 发生异常,关闭 Channel
61:         unsafe.close(unsafe.voidPromise());
62:     }
63: }
  • 第 2 至 24 行:如果 SelectionKey 是不合法的,则关闭 Channel 。

  • 第 30 至 42 行:如果对 OP_CONNECT 事件就绪:

    • 第 34 至 39 行:移除对 OP_CONNECT 的感兴趣,即不再监听连接事件。

    • 【重要】第 41 行:调用 Unsafe#finishConnect() 方法,完成连接。后续的逻辑,对应 《Netty 源码分析 —— 启动(二)之客户端》 的 「3.6.4 finishConnect」 小节。

  • 第 44 至 50 行:如果对 OP_WRITE 事件就绪,调用 Unsafe#forceFlush() 方法,向 Channel 写入数据。在完成写入数据后,会移除对 OP_WRITE 的感兴趣。想要提前了解的胖友,可以自己看下 AbstractNioByteChannel#clearOpWrite()AbstractNioMessageChannel#doWrite(ChannelOutboundBuffer in) 方法。

  • 第 52 至 58 行:如果对 OP_READOP_ACCEPT 事件就绪:调用 Unsafe#read() 方法,处理读或者者接受客户端连接的事件。

8. NioTask

io.netty.channel.nio.NioTask ,用于自定义 Nio 事件处理接口。对于每个 Nio 事件,可以认为是一个任务( Task ),代码如下:

public interface NioTask<C extends SelectableChannel> {

    /**
     * Invoked when the {@link SelectableChannel} has been selected by the {@link Selector}.
     */
    void channelReady(C ch, SelectionKey key) throws Exception;

    /**
     * Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus
     * this {@link NioTask} will not be notified anymore.
     *
     * @param cause the cause of the unregistration. {@code null} if a user called {@link SelectionKey#cancel()} or
     *              the event loop has been shut down.
     */
    void channelUnregistered(C ch, Throwable cause) throws Exception;

}
  • #channelReady(C ch, SelectionKey key) 方法,处理 Channel IO 就绪的事件。相当于说,我们可以通过实现该接口方法,实现 「7.3 processSelectedKey」 的逻辑。

  • #channelUnregistered(C ch, Throwable cause) 方法,Channel 取消注册。一般来说,我们可以通过实现该接口方法,关闭 Channel 。

😈 实际上,NioTask 在 Netty 自身中并未有相关的实现类。所以对 NioTask 不感兴趣的,可以跳过本小节。另外,NioTask 是在 Allow a user to access the Selector of an EventLoop 中有相关的讨论。

8.1 register

#register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) 方法,注册 Java NIO Channel ( 不一定需要通过 Netty 创建的 Channel )到 Selector 上,相当于说,也注册到了 EventLoop 上。代码如下:

/**
 * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
 * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
 * be executed by this event loop when the {@link SelectableChannel} is ready.
 */
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
    if (ch == null) {
        throw new NullPointerException("ch");
    }
    if (interestOps == 0) {
        throw new IllegalArgumentException("interestOps must be non-zero.");
    }
    if ((interestOps & ~ch.validOps()) != 0) {
        throw new IllegalArgumentException(
                "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
    }
    if (task == null) {
        throw new NullPointerException("task");
    }

    if (isShutdown()) {
        throw new IllegalStateException("event loop shut down");
    }

    // <1>
    try {
        ch.register(selector, interestOps, task);
    } catch (Exception e) {
        throw new EventLoopException("failed to register a channel", e);
    }
}
  • <1> 处,调用 SelectableChannel#register(Selector sel, int ops, Object att) 方法,注册 Java NIO Channel 到 Selector 上。这里我们可以看到,attachment 为 NioTask 对象,而不是 Netty Channel 对象。

8.2 invokeChannelUnregistered

#invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) 方法,执行 Channel 取消注册。代码如下:

private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
    try {
        task.channelUnregistered(k.channel(), cause);
    } catch (Exception e) {
        logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
    }
}
  • 在方法内部,调用 NioTask#channelUnregistered() 方法,执行 Channel 取消注册。

8.3 processSelectedKey

#processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) 方法,使用 NioTask ,自定义实现 Channel 处理 Channel IO 就绪的事件。代码如下:

private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
    int state = 0; // 未执行
    try {
        // 调用 NioTask 的 Channel 就绪事件
        task.channelReady(k.channel(), k);
        state = 1; // 执行成功
    } catch (Exception e) {
        // SelectionKey 取消
        k.cancel();
        // 执行 Channel 取消注册
        invokeChannelUnregistered(task, k, e);
        state = 2; // 执行异常
    } finally {
        switch (state) {
        case 0:
            // SelectionKey 取消
            k.cancel();
            // 执行 Channel 取消注册
            invokeChannelUnregistered(task, k, null);
            break;
        case 1:
            // SelectionKey 不合法,则执行 Channel 取消注册
            if (!k.isValid()) { // Cancelled by channelReady()
                invokeChannelUnregistered(task, k, null);
            }
            break;
        }
    }
}

简单小文一篇,没什么太大难度的一篇。

如果有不理解的地方,也可以看看下面的文章:

Netty 源码解析 —— EventLoop(六)之 EventLoop 处理普通任务1. 概述

本文分析了 Netty EventLoop 中 runAllTasks 方法的实现,重点关注了普通任务的执行过程和超时机制。

Key Takeaways

  • EventLoop 执行普通任务时,会调用 runAllTasks 方法,该方法会循环执行所有任务直到完成或超过时间限制。

  • EventLoop 每隔 64 个任务检查一次执行时间,因为 nanoTime() 方法相对比较耗时。

  • EventLoop 会调用 fetchFromScheduledTaskQueue 方法将定时任务队列中到达可执行时间的任务添加到普通任务队列中。

  • EventLoop 会调用 afterRunningAllTasks 方法执行所有任务完成后的后续操作,例如批量提交写入操作。

  • 批量提交写入功能的 Handler 会在 EventLoop 每次循环执行时添加一个任务到 tailTasks 队列中,以实现批量提交操作。

  • 批量提交写入功能可以减少 pipeline 的执行次数,提高吞吐量,尤其是在高并发场景下。

在 Netty 中,EventLoop 是负责处理 I/O 事件的核心组件之一,同时也是处理普通任务的基础。普通任务指的是那些不需要与 I/O 事件直接关联的任务,例如定时任务、计算任务等。本节将详细介绍 EventLoop 如何处理这些普通任务。

1. 概述

EventLoop 是 Netty 中的一个核心组件,它负责处理 I/O 事件以及执行非 I/O 相关的任务。在 Netty 中,EventLoop 是通过 EventLoopGroup 创建的,而 EventLoopGroup 可以包含多个 EventLoop 实例。每个 EventLoop 实例通常绑定到一个线程,并且负责处理该线程上的所有任务。

2. Task Queue

EventLoop 维护了一个 TaskQueue 来存放待处理的任务。TaskQueue 是一个线程安全的队列,可以容纳各种类型的任务,包括 I/O 事件处理任务和普通任务。

3. 提交任务

用户可以通过 EventLoopexecute() 或者 submit() 方法提交任务。这些方法最终会将任务添加到 TaskQueue 中。

4. 处理任务

EventLoop 在它的主循环中会不断地从 TaskQueue 中取出任务并执行。这个过程通常是在 EventLooprun() 方法中实现的,该方法是一个无限循环,直到 EventLoop 被关闭为止。

5. EventLoop 的 run() 方法

EventLooprun() 方法是其核心部分,它包含了处理任务的主要逻辑。这个方法会在 EventLoop 对应的线程中运行,不断执行以下步骤:

  1. 调用 select() 方法来获取已准备好的 I/O 事件。

  2. 调用 processSelectedKeys() 方法来处理这些 I/O 事件。

  3. 执行 TaskQueue 中的任务。

6. TaskQueue 的实现

TaskQueue 通常实现为一个线程安全的队列,例如 LinkedBlockingQueue 或者 ArrayBlockingQueue。这样的实现保证了多线程环境下的安全性,并且可以高效地管理任务。

7. 处理普通任务

EventLoop 完成 I/O 事件的处理后,它会检查 TaskQueue 是否有等待执行的任务。如果有,它会从队列中取出任务并执行。

8. 任务调度

EventLoop 支持立即执行的任务和延迟执行的任务。对于延迟执行的任务,EventLoop 会使用一个 ScheduledTaskQueue 来存放这些任务,并在适当的时间执行它们。

9. ScheduledTaskQueue

ScheduledTaskQueue 是一个专门用于存放延迟任务的队列。它通常实现为一个优先级队列,例如 PriorityBlockingQueue,这样可以确保任务按预定的时间顺序被执行。

10. 示例代码

以下是一个简单的示例,展示如何向 EventLoop 提交一个普通任务:

EventLoop eventLoop = ...; // 获取 EventLoop 的实例
​
// 提交一个普通任务
eventLoop.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("Hello from a task!");
    }
});

总结

EventLoop 不仅负责处理 I/O 事件,也负责执行普通任务。通过维护一个 TaskQueue 和一个 ScheduledTaskQueueEventLoop 能够有效地管理不同类型的任务,并确保它们在适当的时机得到执行。这种设计使得 Netty 能够高效地处理各种任务,无论是 I/O 相关还是非 I/O 相关的。

如果您需要更详细的解释或代码示例,请告诉我具体的部分,我可以为您提供更深入的解释。

本文我们分享 EventLoop 的执行任务相关代码的实现。对应如下图的紫条 run tasks 部分:

EventLoop 执行的任务分成普通任务和定时任务,考虑到内容切分的更细粒度,本文近仅仅分享【普通任务】的部分。

2. runAllTasks 带超时

#run() 方法中,会调用 #runAllTasks(long timeoutNanos) 方法,执行所有任务直到完成所有,或者超过执行时间上限。代码如下:


 1: protected boolean runAllTasks(long timeoutNanos) {
 2:     // 从定时任务获得到时间的任务
 3:     fetchFromScheduledTaskQueue();
 4:     // 获得队头的任务
 5:     Runnable task = pollTask();
 6:     // 获取不到,结束执行
 7:     if (task == null) {
 8:         // 执行所有任务完成的后续方法
 9:         afterRunningAllTasks();
10:         return false;
11:     }
12: 
13:     // 计算执行任务截止时间
14:     final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
15:     long runTasks = 0; // 执行任务计数
16:     long lastExecutionTime;
17:     // 循环执行任务
18:     for (;;) {
19:         // 执行任务
20:         safeExecute(task);
21: 
22:         // 计数 +1
23:         runTasks ++;
24: 
25:         // 每隔 64 个任务检查一次时间,因为 nanoTime() 是相对费时的操作
26:         // 64 这个值当前是硬编码的,无法配置,可能会成为一个问题。
27:         // Check timeout every 64 tasks because nanoTime() is relatively expensive.
28:         // XXX: Hard-coded value - will make it configurable if it is really a problem.
29:         if ((runTasks & 0x3F) == 0) {
30:             // 重新获得时间
31:             lastExecutionTime = ScheduledFutureTask.nanoTime();
32:             // 超过任务截止时间,结束
33:             if (lastExecutionTime >= deadline) {
34:                 break;
35:             }
36:         }
37: 
38:         // 获得队头的任务
39:         task = pollTask();
40:         // 获取不到,结束执行
41:         if (task == null) {
42:             // 重新获得时间
43:             lastExecutionTime = ScheduledFutureTask.nanoTime();
44:             break;
45:         }
46:     }
47: 
48:     // 执行所有任务完成的后续方法
49:     afterRunningAllTasks();
50: 
51:     // 设置最后执行时间
52:     this.lastExecutionTime = lastExecutionTime;
53:     return true;
54: }
  • 方法的返回值,表示是否执行过任务。因为,任务队列可能为空,那么就会返回 false ,表示没有执行过任务。

  • 第 3 行:调用 #fetchFromScheduledTaskQueue() 方法,将定时任务队列 scheduledTaskQueue 到达可执行的任务,添加到任务队列 taskQueue 中。通过这样的方式,定时任务得以被执行。详细解析,见 《Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务》 。

  • 第 5 行:首次调用 #pollTask() 方法,获得队头的任务。详细解析,胖友先跳到 「4. pollTask」 。

    • 第 6 至 11 行:获取不到任务,结束执行,并返回 false

      • 第 9 行:调用 #afterRunningAllTasks() 方法,执行所有任务完成的后续方法。详细解析,见 「5. afterRunningAllTasks」 。

  • 第 14 行:计算执行任务截止时间。其中,ScheduledFutureTask#nanoTime() 方法,我们可以暂时理解成,获取当前的时间,单位为纳秒。详细解析,见 《Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务》 。

  • 第 17 至 46 行:循环执行任务。

    • 第 20 行:【重要】调用 #safeExecute(Runnable task) 方法,执行任务。

    • 第 23 行:计算 runTasks 加一

    • 第 29 至 36 行:每隔 64 个任务检查一次时间,因为 System#nanoTime()相对费时的操作。也因此,超过执行时间上限是“近似的”,而不是绝对准确。

      • 第 31 行:调用 ScheduledFutureTask#nanoTime() 方法,获取当前的时间。

      • 第 32 至 35 行:超过执行时间上限,结束执行。

    • 第 39 行:再次调用 #pollTask() 方法,获得队头的任务。

      • 第 41 至 45 行:获取不到,结束执行。

      • 第 43 行:调用 ScheduledFutureTask#nanoTime() 方法,获取当前的时间,作为最终.lastExecutionTime ,即【第 52 行】的代码。

  • 第 49 行:调用 #afterRunningAllTasks() 方法,执行所有任务完成的后续方法。

  • 第 53 行:返回 true ,表示有执行任务。

3. runAllTasks

#run() 方法中,会调用 #runAllTasks() 方法,执行所有任务直到完成所有。代码如下:

 1: protected boolean runAllTasks() {
 2:     assert inEventLoop();
 3:     boolean fetchedAll;
 4:     boolean ranAtLeastOne = false; // 是否执行过任务
 5: 
 6:     do {
 7:         // 从定时任务获得到时间的任务
 8:         fetchedAll = fetchFromScheduledTaskQueue();
 9:         // 执行任务队列中的所有任务
10:         if (runAllTasksFrom(taskQueue)) {
11:             // 若有任务执行,则标记为 true
12:             ranAtLeastOne = true;
13:         }
14:     } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
15: 
16:     // 如果执行过任务,则设置最后执行时间
17:     if (ranAtLeastOne) {
18:         lastExecutionTime = ScheduledFutureTask.nanoTime();
19:     }
20: 
21:     // 执行所有任务完成的后续方法
22:     afterRunningAllTasks();
23:     return ranAtLeastOne;
24: }
  • 第 4 行:ranAtLeastOne ,标记是否执行过任务。

  • 第 6 至 14 行:调用 #fetchFromScheduledTaskQueue() 方法,将定时任务队列 scheduledTaskQueue 到达可执行的任务,添加到任务队列 taskQueue 中。但是实际上,任务队列 taskQueue 是有队列大小上限的,因此使用 while 循环,直到没有到达可执行的任务为止。

    • 第 10 行:调用 #runAllTasksFrom(taskQueue) 方法,执行任务队列中的所有任务。代码如下:

protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    // 获得队头的任务
    Runnable task = pollTaskFrom(taskQueue);
    // 获取不到,结束执行,返回 false
    if (task == null) {
        return false;
    }
    for (;;) {
        // 执行任务
        safeExecute(task);
        // 获得队头的任务
        task = pollTaskFrom(taskQueue);
        // 获取不到,结束执行,返回 true
        if (task == null) {
            return true;
        }
    }
}
  • 代码比较简单,和 #runAllTasks(long timeoutNanos)) 方法的代码,大体是相似的。

  • 第 12 行:若有任务被执行,则标记 ranAtLeastOnetrue

  • 第 16 至 19 行:如果执行过任务,则设置最后执行时间。

  • 第 22 行:调用 #afterRunningAllTasks() 方法,执行所有任务完成的后续方法。

  • 第 23 行:返回是否执行过任务。和 #runAllTasks(long timeoutNanos)) 方法的返回是一致的。

4. pollTask

#pollTask() 方法,获得队头的任务。代码如下:

protected Runnable pollTask() {
    assert inEventLoop();
    return pollTaskFrom(taskQueue);
}

protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
    for (;;) { // <2>
        // 获得并移除队首元素。如果获得不到,返回 null
        Runnable task = taskQueue.poll(); // <1>
        // 忽略 WAKEUP_TASK 任务,因为是空任务
        if (task == WAKEUP_TASK) {
            continue;
        }
        return task;
    }
}
  • <1> 处,调用 Queue#poll() 方法,获得并移除队首元素。如果获得不到,返回 null 。注意,这个操作是非阻塞的。如果不知道,请 Google 重新学习下。

  • <2> 处,因为获得的任务可能是 WAKEUP_TASK ,所以需要通过循环来跳过。

Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务

1. 概述

本文接 《Netty 源码解析 —— EventLoop(六)之 EventLoop 处理普通任务》 ,分享【处理定时任务】的部分。

因为 AbstractScheduledEventExecutor 在 《精尽 Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化》 并未分享,并且它是本文的处理定时任务的前置,所以本文先写这部分内容。

2. ScheduledFutureTask

io.netty.util.concurrent.ScheduledFutureTask ,实现 ScheduledFuture、PriorityQueueNode 接口,继承 PromiseTask 抽象类,Netty 定时任务。

也有文章喜欢把“定时任务”叫作“调度任务”,意思是相同的,本文统一使用“定时任务”。

2.1 静态属性

/**
 * 任务序号生成器,通过 AtomicLong 实现递增发号
 */
private static final AtomicLong nextTaskId = new AtomicLong();
/**
 * 定时任务时间起点
 */
private static final long START_TIME = System.nanoTime();
  • nextTaskId 静态属性,任务序号生成器,通过 AtomicLong 实现递增发号。

  • START_TIME 静态属性,定时任务时间起点。在 ScheduledFutureTask 中,定时任务的执行时间,都是基于 START_TIME相对时间。😈 至于为什么使用相对时间?笔者暂时没有搞清楚。

    • 笔者也搜索了下和 System.nanoTime() 相关的内容,唯一能看的是 《System.nanoTime() 的隐患》 ,但是应该不是这个原因。

      因为是定时调度,我改了系统时间也没关系
      存的是距离下次调度还要多长时间
      不受系统时间影响
      最大的好处

2.2 nanoTime

#nanoTime() 静态方法,获得当前时间,这个是相对 START_TIME 来算的。代码如下:

static long nanoTime() {
    return System.nanoTime() - START_TIME;
}
  • 这是个重要的方法,后续很多方法都会调用到它。

2.3 deadlineNanos

#deadlineNanos(long delay) 静态方法,获得任务执行时间,这个也是相对 START_TIME 来算的。代码如下:

/**
 * @param delay 延迟时长,单位:纳秒
 * @return 获得任务执行时间,也是相对 {@link #START_TIME} 来算的。
 *          实际上,返回的结果,会用于 {@link #deadlineNanos} 字段
 */
static long deadlineNanos(long delay) {
    long deadlineNanos = nanoTime() + delay;
    // Guard against overflow 防御性编程
    return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}

2.4 构造方法

/**
 * 任务编号
 */
private final long id = nextTaskId.getAndIncrement();
/**
 * 任务执行时间,即到了该时间,该任务就会被执行
 */
private long deadlineNanos;
/**
 * 任务执行周期
 *
 * =0 - 只执行一次
 * >0 - 按照计划执行时间计算
 * <0 - 按照实际执行时间计算
 *
 * 推荐阅读文章 https://blog.csdn.net/gtuu0123/article/details/6040159
 */
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
/**
 * 队列编号
 */
private int queueIndex = INDEX_NOT_IN_QUEUE;

ScheduledFutureTask(
        AbstractScheduledEventExecutor executor,
        Runnable runnable, V result, long nanoTime) {
    this(executor, toCallable(runnable, result), nanoTime);
}

ScheduledFutureTask(
        AbstractScheduledEventExecutor executor,
        Callable<V> callable, long nanoTime, long period) {
    super(executor, callable);
    if (period == 0) {
        throw new IllegalArgumentException("period: 0 (expected: != 0)");
    }
    deadlineNanos = nanoTime;
    periodNanos = period;
}

ScheduledFutureTask(
        AbstractScheduledEventExecutor executor,
        Callable<V> callable, long nanoTime) {
    super(executor, callable);
    deadlineNanos = nanoTime;
    periodNanos = 0;
}
  • 每个字段比较简单,胖友看上面的注释。

2.5 delayNanos

#delayNanos(...) 方法,获得距离指定时间,还要多久可执行。代码如下:

/**
 * @return 距离当前时间,还要多久可执行。若为负数,直接返回 0
 */
public long delayNanos() {
    return Math.max(0, deadlineNanos() - nanoTime());
}

/**
 * @param currentTimeNanos 指定时间
 * @return 距离指定时间,还要多久可执行。若为负数,直接返回 0
 */
public long delayNanos(long currentTimeNanos) {
    return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}

@Override
public long getDelay(TimeUnit unit) {
    return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}

2.6 run

#run() 方法,执行定时任务。代码如下:

 1: @Override
 2: public void run() {
 3:     assert executor().inEventLoop();
 4:     try {
 5:         if (periodNanos == 0) {
 6:             // 设置任务不可取消
 7:             if (setUncancellableInternal()) {
 8:                 // 执行任务
 9:                 V result = task.call();
10:                 // 通知任务执行成功
11:                 setSuccessInternal(result);
12:             }
13:         } else {
14:             // 判断任务并未取消
15:             // check if is done as it may was cancelled
16:             if (!isCancelled()) {
17:                 // 执行任务
18:                 task.call();
19:                 if (!executor().isShutdown()) {
20:                     // 计算下次执行时间
21:                     long p = periodNanos;
22:                     if (p > 0) {
23:                         deadlineNanos += p;
24:                     } else {
25:                         deadlineNanos = nanoTime() - p;
26:                     }
27:                     // 判断任务并未取消
28:                     if (!isCancelled()) {
29:                         // 重新添加到任务队列,等待下次定时执行
30:                         // scheduledTaskQueue can never be null as we lazy init it before submit the task!
31:                         Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
32:                                 ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
33:                         assert scheduledTaskQueue != null;
34:                         scheduledTaskQueue.add(this);
35:                     }
36:                 }
37:             }
38:         }
39:     // 发生异常,通知任务执行失败
40:     } catch (Throwable cause) {
41:         setFailureInternal(cause);
42:     }
43: }
  • 第 3 行:校验,必须在 EventLoop 的线程中。

  • 根据不同的任务执行周期 periodNanos ,在执行任务会略有不同。当然,大体是相同的。

  • 第 5 至 12 行:执行周期为“只执行一次”的定时任务。

    • 第 7 行:调用 PromiseTask#setUncancellableInternal() 方法,设置任务不可取消。具体的方法实现,我们在后续关于 Promise 的文章中分享。

    • 第 9 行:【重要】调用 Callable#call() 方法,执行任务。

    • 第 11 行:调用 PromiseTask#setSuccessInternal(V result) 方法,回调通知注册在定时任务上的监听器。为什么能这么做呢?因为 ScheduledFutureTask 继承了 PromiseTask 抽象类。

  • 第 13 至 38 行:执行周期为“固定周期”的定时任务。

    • 第 16 行:调用 DefaultPromise#isCancelled() 方法,判断任务是否已经取消。这一点,和【第 7 行】的代码,是不同的。具体的方法实现,我们在后续关于 Promise 的文章中分享。

    • 第 18 行:【重要】调用 Callable#call() 方法,执行任务。

    • 第 19 行:判断 EventExecutor 并未关闭。

    • 第 20 至 26 行:计算下次定时执行的时间。不同的执行 fixed 方式,计算方式不同。其中【第 25 行】的 - p 的代码,因为 p 是负数,所以通过负负得正来计算。另外,这块会修改定时任务的 deadlineNanos 属性,从而变成新的定时任务执行时间。

    • 第 28 行:和【第 16 行】的代码是一致的。

    • 第 29 至 34 行:重新添加到定时任务队列 scheduledTaskQueue 中,等待下次定时执行。

  • 第 39 至 42 行:发生异常,调用 PromiseTask#setFailureInternal(Throwable cause) 方法,回调通知注册在定时任务上的监听器。

2.7 cancel

有两个方法,可以取消定时任务。代码如下:

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    boolean canceled = super.cancel(mayInterruptIfRunning);
    // 取消成功,移除出定时任务队列
    if (canceled) {
        ((AbstractScheduledEventExecutor) executor()).removeScheduled(this);
    }
    return canceled;
}

// 移除任务
boolean cancelWithoutRemove(boolean mayInterruptIfRunning) {
    return super.cancel(mayInterruptIfRunning);
}