Netty 源码解析 —— EventLoop(四)之 EventLoop 运行
本文详细解析了 Netty 中 NioEventLoop 的运行原理,重点介绍了 NioEventLoop#run() 方法的实现,包括 SelectStrategy 的选择、Selector 的 select 操作以及任务的处理等关键步骤,并分析了 NIO Selector 空轮询的 Bug 和解决方法。
NioEventLoop 是 Netty 中用于处理 IO 事件和用户任务的单线程事件循环实现。
NioEventLoop 的 run() 方法通过死循环处理各种任务,包括处理 Channel 感兴趣的就绪 IO 事件、运行普通任务和定时任务等。
NioEventLoop 使用 SelectStrategy 来控制 select 操作的策略,默认情况下会根据当前是否有任务来决定是否阻塞 select。
NioEventLoop 为了解决 NIO Selector 空轮询的 Bug,采用了重建 Selector 对象的方式来避免 CPU 100% 的问题。
NioEventLoop 的 wakeup 标记用于减少 Selector#wakeup() 方法的调用次数,避免不必要的唤醒操作。
NioEventLoop 通过 mpsc 队列来存储任务,可以保证多线程生产任务和单线程消费任务的效率。
NioEventLoop 通过 ioRatio 属性来控制 IO 事件处理时间在总时间中的占比。
好的,我将为您概述 Netty 中与
Selector
相关的关键组件,并逐一解释它们的作用和实现细节。1. 概述
Netty 是一个高性能的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。在处理 I/O 操作时,Netty 使用了 NIO(New I/O)技术来提高性能。Netty 的
NioEventLoop
类负责处理 I/O 事件,其中包括了Selector
的管理和优化。2. SelectorTuple
SelectorTuple
是NioEventLoop
中的一个私有静态内部类,用于封装一个或两个Selector
对象。它可以包含一个原始的Selector
对象 (unwrappedSelector
) 和一个可能经过优化的Selector
对象 (selector
)。当Selector
需要优化时,这两个字段可以指向不同的对象;否则,它们指向同一个对象。3. openSelector
openSelector
方法是NioEventLoop
中的一个私有方法,用于创建一个新的Selector
对象,并对其进行可能的优化。如果不需要优化,它会直接返回一个包含原始Selector
的SelectorTuple
对象。如果需要优化,它会尝试替换Selector
内部使用的Set
,以便提高性能。4. SelectedSelectionKeySet
SelectedSelectionKeySet
是一个继承自AbstractSet
的内部类,用于存储已就绪的SelectionKey
。它使用一个SelectionKey
数组和一个计数器size
来跟踪已就绪的SelectionKey
。这种方法相对于标准的HashSet
更加高效,因为它减少了内存消耗并降低了add
操作的时间复杂度。5. SelectedSelectionKeySetSelector
SelectedSelectionKeySetSelector
是一个扩展自Selector
的类,它使用SelectedSelectionKeySet
替换原始Selector
的selectedKeys
集合。这个类在每次调用select
相关的方法之前都会重置SelectedSelectionKeySet
,以确保每次选择操作后都有一个全新的就绪事件列表。6. rebuildSelector
rebuildSelector
是NioEventLoop
中的一个方法,用于重建Selector
。当需要重新创建Selector
时,例如在关闭一个Selector
后,这个方法会被调用。它会先关闭当前的Selector
,然后通过openSelector
方法创建一个新的Selector
。6.1 rebuildSelector0
rebuildSelector0
是rebuildSelector
方法的一部分,它包含了重建Selector
的具体逻辑。这个方法会首先关闭当前的Selector
,然后创建一个新的Selector
,并重新注册所有相关的Channel
。7. processSelectedKeys
processSelectedKeys
方法用于处理Selector
选择出来的SelectionKey
。它有几种不同的实现,根据Selector
是否进行了优化来决定使用哪种实现。7.1 processSelectedKeysOptimized
当
Selector
进行了优化时,使用processSelectedKeysOptimized
方法来处理已就绪的SelectionKey
。这个方法利用了SelectedSelectionKeySet
的优化。7.2 processSelectedKeysPlain
当
Selector
没有进行优化时,使用processSelectedKeysPlain
方法来处理已就绪的SelectionKey
。这个方法使用了标准的Selector
的selectedKeys
集合。7.3 processSelectedKey
这是一个辅助方法,用于处理单个
SelectionKey
。它会根据SelectionKey
的就绪操作来调用相应的处理器方法。8. NioTask
NioTask
是一个接口,用于定义如何处理Selector
的 I/O 事件。它并不是 Netty 的一部分,而是 Java NIO 中的一个概念。Netty 自身并没有实现这个接口,但在处理 NIO 事件时,可能会涉及到类似的概念。8.1 register
register
方法用于注册一个NioTask
到Selector
上,以便当特定的 I/O 事件发生时可以执行相应的任务。8.2 invokeChannelUnregistered
invokeChannelUnregistered
方法用于当一个Channel
从Selector
中取消注册时执行相应的处理。8.3 processSelectedKey
processSelectedKey
方法用于处理单个SelectionKey
的就绪事件。
好的,我将为您概述 Netty 中与
Selector
相关的关键组件,并逐一解释它们的作用和实现细节。1. 概述
Netty 是一个高性能的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。在处理 I/O 操作时,Netty 使用了 NIO(New I/O)技术来提高性能。Netty 的
NioEventLoop
类负责处理 I/O 事件,其中包括了Selector
的管理和优化。2. SelectorTuple
SelectorTuple
是NioEventLoop
中的一个私有静态内部类,用于封装一个或两个Selector
对象。它可以包含一个原始的Selector
对象 (unwrappedSelector
) 和一个可能经过优化的Selector
对象 (selector
)。当Selector
需要优化时,这两个字段可以指向不同的对象;否则,它们指向同一个对象。3. openSelector
openSelector
方法是NioEventLoop
中的一个私有方法,用于创建一个新的Selector
对象,并对其进行可能的优化。如果不需要优化,它会直接返回一个包含原始Selector
的SelectorTuple
对象。如果需要优化,它会尝试替换Selector
内部使用的Set
,以便提高性能。4. SelectedSelectionKeySet
SelectedSelectionKeySet
是一个继承自AbstractSet
的内部类,用于存储已就绪的SelectionKey
。它使用一个SelectionKey
数组和一个计数器size
来跟踪已就绪的SelectionKey
。这种方法相对于标准的HashSet
更加高效,因为它减少了内存消耗并降低了add
操作的时间复杂度。5. SelectedSelectionKeySetSelector
SelectedSelectionKeySetSelector
是一个扩展自Selector
的类,它使用SelectedSelectionKeySet
替换原始Selector
的selectedKeys
集合。这个类在每次调用select
相关的方法之前都会重置SelectedSelectionKeySet
,以确保每次选择操作后都有一个全新的就绪事件列表。6. rebuildSelector
rebuildSelector
是NioEventLoop
中的一个方法,用于重建Selector
。当需要重新创建Selector
时,例如在关闭一个Selector
后,这个方法会被调用。它会先关闭当前的Selector
,然后通过openSelector
方法创建一个新的Selector
。6.1 rebuildSelector0
rebuildSelector0
是rebuildSelector
方法的一部分,它包含了重建Selector
的具体逻辑。这个方法会首先关闭当前的Selector
,然后创建一个新的Selector
,并重新注册所有相关的Channel
。7. processSelectedKeys
processSelectedKeys
方法用于处理Selector
选择出来的SelectionKey
。它有几种不同的实现,根据Selector
是否进行了优化来决定使用哪种实现。7.1 processSelectedKeysOptimized
当
Selector
进行了优化时,使用processSelectedKeysOptimized
方法来处理已就绪的SelectionKey
。这个方法利用了SelectedSelectionKeySet
的优化。7.2 processSelectedKeysPlain
当
Selector
没有进行优化时,使用processSelectedKeysPlain
方法来处理已就绪的SelectionKey
。这个方法使用了标准的Selector
的selectedKeys
集合。7.3 processSelectedKey
这是一个辅助方法,用于处理单个
SelectionKey
。它会根据SelectionKey
的就绪操作来调用相应的处理器方法。8. NioTask
NioTask
是一个接口,用于定义如何处理Selector
的 I/O 事件。它并不是 Netty 的一部分,而是 Java NIO 中的一个概念。Netty 自身并没有实现这个接口,但在处理 NIO 事件时,可能会涉及到类似的概念。8.1 register
register
方法用于注册一个NioTask
到Selector
上,以便当特定的 I/O 事件发生时可以执行相应的任务。8.2 invokeChannelUnregistered
invokeChannelUnregistered
方法用于当一个Channel
从Selector
中取消注册时执行相应的处理。8.3 processSelectedKey
processSelectedKey
方法用于处理单个SelectionKey
的就绪事件。
在 Netty 中,EventLoop 是负责处理 I/O 事件和执行任务的核心组件。EventLoop 的运行机制涉及到事件循环、任务调度和线程管理等多个方面。本文将对 EventLoop 的运行机制进行详细解析。
EventLoop 的基本结构
EventLoop 是一个接口,它定义了事件循环的基本操作。实际的实现类是
NioEventLoop
,它负责处理 NIO 事件和执行任务。EventLoop 的运行机制
1. 事件循环
EventLoop 的核心是事件循环,它会不断地从 Selector 中获取已就绪的事件,并处理这些事件。事件循环的伪代码如下:
while (!stop) { selector.select(); processSelectedKeys(); }
在这个循环中,
selector.select()
会阻塞等待事件的发生。当有事件发生时,processSelectedKeys()
方法会被调用,处理这些事件。2. 任务调度
除了处理 I/O 事件,EventLoop 还负责执行任务。任务可以分为两种:普通任务和延迟任务。普通任务会被立即执行,而延迟任务会在指定的延迟时间后执行。
任务调度的伪代码如下:
void execute(Runnable task) { if (isInEventLoop()) { scheduleTask(task); } else { queueTask(task); } } void scheduleTask(Runnable task) { tasks.add(task); if (is忙()) { wakeup(); } } void queueTask(Runnable task) { pendingTasks.add(task); if (!isInEventLoop() && !isShuttingDown()) { executor.execute(this::runAllTasks); } }
在
execute()
方法中,首先判断当前线程是否在 EventLoop 中。如果在,则将任务添加到任务队列中;否则,将任务添加到待处理任务队列中,并尝试唤醒 EventLoop。3. 线程管理
EventLoopGroup 是一组 EventLoop 的集合,它负责管理和调度 EventLoop。Netty 提供了多种 EventLoopGroup 的实现,如
DefaultEventLoopGroup
、NioEventLoopGroup
等。EventLoopGroup 的运行机制如下:
void start() { for (EventLoop eventLoop : eventLoops) { eventLoop.start(); } } void shutdown() { for (EventLoop eventLoop : eventLoops) { eventLoop.shutdown(); } }
在
start()
方法中,遍历所有的 EventLoop 并启动它们。在shutdown()
方法中,遍历所有的 EventLoop 并关闭它们。总结
EventLoop 是 Netty 中的核心组件,负责处理 I/O 事件和执行任务。它的运行机制包括事件循环、任务调度和线程管理等方面。通过深入了解 EventLoop 的运行机制,可以更好地理解 Netty 的内部工作原理,并在实际开发中更好地利用 Netty 的特性。
Netty 是一个高性能的异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。在 Netty 中,
EventLoop
是非常核心的一个组件,它负责处理 I/O 事件以及执行用户定义的 Runnable 任务。EventLoop 的启动与运行
1. EventLoop 的创建
EventLoop
是EventLoopGroup
的一部分,通常通过EventLoopGroup
的next()
方法获取。
EventLoopGroup
负责管理多个EventLoop
实例,并且能够根据一定的策略选择一个合适的EventLoop
来处理 I/O 操作。2. EventLoop 的内部机制
Selector:每个
EventLoop
对应一个或多个Selector
,用来监听注册在其上的 Channel 的 I/O 事件。Channel:每个
Channel
注册到一个EventLoop
上,由该EventLoop
负责处理该Channel
的所有 I/O 事件。TaskQueue 和 ScheduledTaskQueue:
EventLoop
内部有两个队列,一个是用来存放普通任务的TaskQueue
,另一个是存放定时任务的ScheduledTaskQueue
。这两个队列中的任务由EventLoop
在适当的时机执行。3. EventLoop 的运行循环
EventLoop
的核心是一个无限循环,通常称为 “事件循环” 或 “运行循环”。在这个循环中,
EventLoop
会轮询Selector
来检查是否有就绪的 I/O 事件,如果有,则调用相应的处理器来处理这些事件。处理完 I/O 事件后,
EventLoop
会处理TaskQueue
和ScheduledTaskQueue
中的任务。4. EventLoop 的核心方法
run():这是
EventLoop
的主要运行方法,在启动EventLoop
时被调用。register():用于将
Channel
注册到EventLoop
的Selector
上。execute():用于提交任务到
EventLoop
的队列中进行执行。schedule():用于提交定时任务到
EventLoop
的ScheduledTaskQueue
中。示例代码
以下是一个简化的
EventLoop
的运行循环的伪代码示例:public class EventLoop extends Thread { private final Selector selector; private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<>(); public EventLoop(Selector selector) { this.selector = selector; } public void register(Channel channel, int interestOps) { // Register the channel with the selector } public void execute(Runnable task) { // Add the task to the queue and wake up the loop if necessary } @Override public void run() { while (true) { // Select ready keys from the selector selector.select(); // Process selected keys Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); // Remove the key to prevent duplicate processing // Process the event based on the key's readiness } // Execute tasks in the task queue Runnable task; while ((task = taskQueue.poll()) != null) { task.run(); } } } }
请注意,上述代码是简化后的示例,实际的
EventLoop
实现要复杂得多,并且包含了更多的细节和优化措施。如果你对 Netty 的源码感兴趣,建议深入阅读 Netty 的官方文档和源码以获得更全面的理解。
1. 概述
本文我们分享 EventLoop 的运行相关代码的实现。
因为 EventLoop 的运行主要是通过 NioEventLoop 的 #run()
方法实现,考虑到内容相对的完整性,在 《精尽 Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化》 一文中,我们并未分享 NioEventLoop 的初始化,所以本文也会分享这部分的内容。
OK ,还是老样子,自上而下的方式,一起来看看 NioEventLoop 的代码实现。
本文的重点在 「2.9 run」 和 「2.12 select」 中。
2. NioEventLoop
io.netty.channel.nio.NioEventLoop
,继承 SingleThreadEventLoop 抽象类,NIO EventLoop 实现类,实现对注册到其中的 Channel 的就绪的 IO 事件,和对用户提交的任务进行处理。
2.1 static
在 static
代码块中,初始化了 NioEventLoop 的静态属性们。代码如下:
/**
* TODO 1007 NioEventLoop cancel
*/
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
/**
* 是否禁用 SelectionKey 的优化,默认开启
*/
private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
/**
* 少于该 N 值,不开启空轮询重建新的 Selector 对象的功能
*/
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
/**
* NIO Selector 空轮询该 N 次后,重建新的 Selector 对象
*/
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
static {
// 解决 Selector#open() 方法 // <1>
final String key = "sun.nio.ch.bugLevel";
final String buglevel = SystemPropertyUtil.get(key);
if (buglevel == null) {
try {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
System.setProperty(key, "");
return null;
}
});
} catch (final SecurityException e) {
logger.debug("Unable to get/set System Property: " + key, e);
}
}
// 初始化
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
}
}
-
CLEANUP_INTERVAL
属性,TODO 1007 NioEventLoop cancel -
DISABLE_KEYSET_OPTIMIZATION
属性,是否禁用 SelectionKey 的优化,默认开启。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。 -
SELECTOR_AUTO_REBUILD_THRESHOLD
属性,NIO Selector 空轮询该 N 次后,重建新的 Selector 对象,用以解决 JDK NIO 的 epoll 空轮询 Bug 。-
MIN_PREMATURE_SELECTOR_RETURNS
属性,少于该 N 值,不开启空轮询重建新的 Selector 对象的功能。
-
-
<1>
处,解决Selector#open()
方法,发生 NullPointException 异常。详细解析,见 http://bugs.sun.com/view_bug.do?bug_id=6427854 和 https://github.com/netty/netty/issues/203 。 -
<2>
处,初始化SELECTOR_AUTO_REBUILD_THRESHOLD
属性。默认 512 。
2.2 构造方法
/**
* The NIO {@link Selector}.
*
* 包装的 Selector 对象,经过优化
*
* {@link #openSelector()}
*/
private Selector selector;
/**
* 未包装的 Selector 对象
*/
private Selector unwrappedSelector;
/**
* 注册的 SelectionKey 集合。Netty 自己实现,经过优化。
*/
private SelectedSelectionKeySet selectedKeys;
/**
* SelectorProvider 对象,用于创建 Selector 对象
*/
private final SelectorProvider provider;
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
*
* 唤醒标记。因为唤醒方法 {@link Selector#wakeup()} 开销比较大,通过该标识,减少调用。
*
* @see #wakeup(boolean)
* @see #run()
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
/**
* Select 策略
*
* @see #select(boolean)
*/
private final SelectStrategy selectStrategy;
/**
* 处理 Channel 的就绪的 IO 事件,占处理任务的总时间的比例
*/
private volatile int ioRatio = 50;
/**
* 取消 SelectionKey 的数量
*
* TODO 1007 NioEventLoop cancel
*/
private int cancelledKeys;
/**
* 是否需要再次 select Selector 对象
*
* TODO 1007 NioEventLoop cancel
*/
private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 创建 Selector 对象 <1>
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
-
Selector 相关:
-
unwrappedSelector
属性,未包装的 NIO Selector 对象。 -
selector
属性,包装的 NIO Selector 对象。Netty 对 NIO Selector 做了优化。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。 -
selectedKeys
属性,注册的 NIO SelectionKey 集合。Netty 自己实现,经过优化。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。 -
provider
属性,NIO SelectorProvider 对象,用于创建 NIO Selector 对象。 -
在
<1>
处,调用#openSelector()
方法,创建 NIO Selector 对象。
-
-
wakenUp
属性,唤醒标记。因为唤醒方法Selector#wakeup()
开销比较大,通过该标识,减少调用。详细解析,见 「2.8 wakeup」 。 -
selectStrategy
属性,Select 策略。详细解析,见 「2.10 SelectStrategy」 。 -
ioRatio
属性,在 NioEventLoop 中,会三种类型的任务:1) Channel 的就绪的 IO 事件;2) 普通任务;3) 定时任务。而ioRatio
属性,处理 Channel 的就绪的 IO 事件,占处理任务的总时间的比例。 -
取消 SelectionKey 相关:
-
cancelledKeys
属性, 取消 SelectionKey 的数量。TODO 1007 NioEventLoop cancel -
needsToSelectAgain
属性,是否需要再次 select Selector 对象。TODO 1007 NioEventLoop cancel
-
2.3 openSelector
#openSelector()
方法,创建 NIO Selector 对象。
考虑到让本文更专注在 EventLoop 的逻辑,并且不影响对本文的理解,所以暂时不讲解它的具体实现。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。
2.4 rebuildSelector
#rebuildSelector()
方法,重建 NIO Selector 对象。
考虑到让本文更专注在 EventLoop 的逻辑,并且不影响对本文的理解,所以暂时不讲解它的具体实现。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。
2.5 newTaskQueue
#newTaskQueue(int maxPendingTasks)
方法,创建任务队列。代码如下:
该方法覆写父类的该方法。
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
调用 PlatformDependent#newMpscQueue(...)
方法,创建 mpsc 队列。我们来看看代码注释对 mpsc 队列的描述:
Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single consumer (one thread!).
-
mpsc 是 multiple producers and a single consumer 的缩写。
-
mpsc 是对多线程生产任务,单线程消费任务的消费,恰好符合 NioEventLoop 的情况。
-
详细解析,见后续文章。当然,着急的胖友,可以先看看 《原理剖析(第 012 篇)Netty 之无锁队列 MpscUnboundedArrayQueue 原理分析》 。
2.6 pendingTasks
#pendingTasks()
方法,获得待执行的任务数量。代码如下:
该方法覆写父类的该方法。
@Override
public int pendingTasks() {
// As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
// otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
// See https://github.com/netty/netty/issues/5297
if (inEventLoop()) {
return super.pendingTasks();
} else {
return submit(pendingTasksCallable).syncUninterruptibly().getNow();
}
}
-
因为 MpscQueue 仅允许单消费,所以获得队列的大小,仅允许在 EventLoop 的线程中调用。
2.7 setIoRatio
#setIoRatio(int ioRatio)
方法,设置 ioRatio
属性。代码如下:
/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
this.ioRatio = ioRatio;
}
setIoRatio(int ioRatio)
方法用于设置 EventLoop 中用于 I/O 操作的时间占比。这个方法允许用户自定义 EventLoop 在处理 I/O 任务和非 I/O 任务之间的时间分配比例。参数说明:
int ioRatio
:表示 I/O 操作所占的时间百分比。取值范围为 1 到 100,其中 1 表示事件循环几乎全部时间都用于非 I/O 任务,而 100 表示事件循环几乎全部时间都用于 I/O 任务。默认值为 50,表示事件循环在 I/O 任务和非 I/O 任务之间平均分配时间。方法实现:
首先检查传入的
ioRatio
参数是否在有效范围内(1 到 100),如果不在有效范围内,则抛出IllegalArgumentException
异常。如果参数有效,则将
ioRatio
赋值给类的成员变量this.ioRatio
。示例:
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); ((NioEventLoop) eventLoopGroup.next()).setIoRatio(75);
在这个示例中,我们创建了一个
NioEventLoopGroup
实例,并获取其第一个NioEventLoop
实例。然后调用setIoRatio(75)
方法,将 I/O 操作的时间占比设置为 75%,即事件循环将有 75% 的时间用于处理 I/O 任务,剩余 25% 的时间用于处理非 I/O 任务。
setIoRatio(int ioRatio)
方法是EventLoop
中的一个配置方法,用于设置EventLoop
在处理 I/O 任务与非 I/O 任务(如计算密集型任务)之间的时间分配比例。这个方法允许开发者根据应用的具体需求来调整 I/O 操作和其他任务之间的平衡。参数说明
ioRatio:这是一个整数参数,表示希望
EventLoop
在每次循环中花费在 I/O 操作上的时间百分比。取值范围为 1 到 100。默认值
默认情况下,
ioRatio
的值为 50,这意味着EventLoop
将尝试花费相等的时间来进行 I/O 操作和执行其他任务。工作原理
当
EventLoop
执行其主循环时,它会处理两部分的工作:I/O 事件:这包括从
Selector
中轮询就绪的 I/O 事件并处理这些事件。非 I/O 任务:这包括执行
TaskQueue
和ScheduledTaskQueue
中的任务。
setIoRatio
设置的值决定了EventLoop
在处理完 I/O 事件之后,应该花多少时间去处理非 I/O 任务。例如,如果ioRatio
设置为 80%,那么EventLoop
在处理完 I/O 事件之后,将只执行足够多的非 I/O 任务使得总的执行时间占整个循环周期的 20%。代码实现
以下是
EventLoop
中setIoRatio
方法的实现示例:public class EventLoop extends Thread { private volatile int ioRatio = 50; // Default value /** * Sets the percentage of the desired amount of time spent for I/O in the event loop. * The default value is 50, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. * * @param ioRatio the desired percentage of time spent for I/O in the event loop. */ public void setIoRatio(int ioRatio) { if (ioRatio <= 0 || ioRatio > 100) { throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); } this.ioRatio = ioRatio; } // ... 其他方法 ... @Override public void run() { while (true) { // Select ready keys from the selector selector.select(); // Process selected keys // ... // Execute tasks in the task queue long ioTimeNanos = System.nanoTime(); Runnable task; while ((task = taskQueue.poll()) != null) { task.run(); } long nonIoTimeNanos = System.nanoTime() - ioTimeNanos; // Adjust non-io time according to ioRatio long totalCycleTimeNanos = System.nanoTime() - lastCycleStartTimeNanos; long expectedNonIoTimeNanos = totalCycleTimeNanos * (100 - ioRatio) / 100; if (nonIoTimeNanos > expectedNonIoTimeNanos) { // If we spent too much time in non-io tasks, sleep to make up for it long sleepTimeNanos = nonIoTimeNanos - expectedNonIoTimeNanos; if (sleepTimeNanos > 0) { Thread.sleep(TimeUnit.NANOSECONDS.toMillis(sleepTimeNanos)); } } lastCycleStartTimeNanos = System.nanoTime(); } } }
注意事项
如果
ioRatio
设置得过高,可能会导致非 I/O 任务积压,影响系统的响应能力。如果
ioRatio
设置得过低,可能会降低 I/O 操作的效率。在实际使用中,需要根据应用程序的具体情况来调整
ioRatio
的值。通过调整
ioRatio
的值,可以有效地控制EventLoop
在 I/O 密集型操作和 CPU 密集型操作之间的平衡,从而优化应用程序的性能。设置
ioRatio
对网络性能的影响主要体现在以下几个方面:
I/O 密集型任务与非 I/O 密集型任务的平衡:
ioRatio
参数决定了 EventLoop 在处理 I/O 任务(如读写网络数据)和非 I/O 任务(如执行用户代码、处理业务逻辑等)之间的时间分配。设置合适的ioRatio
可以帮助你在不同类型的任务之间找到一个平衡点,从而提高整体性能。响应时间:如果
ioRatio
设置得过高,EventLoop 将花费更多时间在 I/O 操作上,可能导致非 I/O 任务的响应时间变长。相反,如果ioRatio
设置得过低,EventLoop 将花费更多时间在非 I/O 任务上,可能导致 I/O 操作的延迟增加。吞吐量:合适的
ioRatio
设置可以提高系统的吞吐量。如果 I/O 密集型任务较多,提高ioRatio
可能会提高网络传输速率;如果非 I/O 密集型任务较多,降低ioRatio
可能会提高业务处理速度。资源利用率:设置合适的
ioRatio
可以提高系统资源的利用率。例如,在 I/O 密集型任务较多的场景下,提高io负载因子
可以充分利用 CPU 资源,提高系统性能。总之,设置合适的
ioRatio
对网络性能有很大影响。在实际应用中,你需要根据业务需求和系统特点来调整ioRatio
参数,以达到最佳性能。可以通过压力测试和性能监控来评估不同ioRatio
设置下的系统表现,从而找到最佳的配置。要监控 EventLoop 中 I/O 和非 I/O 任务的实际执行时间,可以使用以下方法:
使用内置的监控工具:Netty 提供了一些内置的监控工具,可以帮助您收集和查看 EventLoop 中 I/O 和非 I/O 任务的执行时间。例如,可以使用
ChannelMetricsHandler
来收集通道级别的指标,包括 I/O 操作的时间。public class MetricsServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ChannelMetricsHandler()); // ... } }
然后,可以通过 JMX(Java Management Extensions)或其他监控工具查看收集到的指标。
自定义监控逻辑:如果您需要更详细的监控数据,可以自定义监控逻辑。例如,可以在 I/O 和非 I/O 任务的开始和结束时记录时间戳,然后计算执行时间。
public class CustomChannelHandler extends ChannelInboundHandlerAdapter { private final AtomicLong ioTime = new AtomicLong(); private final AtomicLong nonIoTime = new AtomicLong(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { long startTime = System.nanoTime(); try { super.channelRead(ctx, msg); } finally { long endTime = System.nanoTime(); nonIoTime.addAndGet(endTime - startTime); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof io.netty.channel.EventLoop.IoEvent) { long startTime = System.nanoTime(); try { super.userEventTriggered(ctx, evt); } finally { long endTime = System.nanoTime(); ioTime.addAndGet(endTime - startTime); } } else { super.userEventTriggered(ctx, evt); } } public long getIoTime() { return ioTime.get(); } public long getNonIoTime() { return nonIoTime.get(); } }
在这个示例中,我们创建了一个自定义的
ChannelInboundHandlerAdapter
,用于记录 I/O 和非 I/O 任务的执行时间。通过调用getIoTime()
和getNonIoTime()
方法,可以获取累计的 I/O 和非 I/O 任务执行时间。请注意,这些方法可能会对性能产生一定影响,因此在生产环境中使用时需要谨慎。在实际应用中,可以根据需求选择合适的监控方法。
2.8 wakeup
#wakeup(boolean inEventLoop)
方法,唤醒线程。代码如下:
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) { // <2>
selector.wakeup(); // <1>
}
}
-
<1>
处,因为 NioEventLoop 的线程阻塞,主要是调用Selector#select(long timeout)
方法,阻塞等待有 Channel 感兴趣的 IO 事件,或者超时。所以需要调用Selector#wakeup()
方法,进行唤醒 Selector 。 -
<2>
处,因为Selector#wakeup()
方法的唤醒操作是开销比较大的操作,并且每次重复调用相当于重复唤醒。所以,通过wakenUp
属性,通过 CAS 修改false => true
,保证有且仅有进行一次唤醒。 -
当然,详细的解析,可以结合 「2.9 run」 一起看,这样会更加清晰明了。
2.9 run
#run()
方法,NioEventLoop 运行,处理任务。这是本文最重要的方法。代码如下:
1: @Override
2: protected void run() {
3: for (;;) {
4: try {
5: switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
6: case SelectStrategy.CONTINUE: // 默认实现下,不存在这个情况。
7: continue;
8: case SelectStrategy.SELECT:
9: // 重置 wakenUp 标记为 false
10: // 选择( 查询 )任务
11: select(wakenUp.getAndSet(false));
12:
13: // 'wakenUp.compareAndSet(false, true)' is always evaluated
14: // before calling 'selector.wakeup()' to reduce the wake-up
15: // overhead. (Selector.wakeup() is an expensive operation.)
16: //
17: // However, there is a race condition in this approach.
18: // The race condition is triggered when 'wakenUp' is set to
19: // true too early.
20: //
21: // 'wakenUp' is set to true too early if:
22: // 1) Selector is waken up between 'wakenUp.set(false)' and
23: // 'selector.select(...)'. (BAD)
24: // 2) Selector is waken up between 'selector.select(...)' and
25: // 'if (wakenUp.get()) { ... }'. (OK)
26: //
27: // In the first case, 'wakenUp' is set to true and the
28: // following 'selector.select(...)' will wake up immediately.
29: // Until 'wakenUp' is set to false again in the next round,
30: // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
31: // any attempt to wake up the Selector will fail, too, causing
32: // the following 'selector.select(...)' call to block
33: // unnecessarily.
34: //
35: // To fix this problem, we wake up the selector again if wakenUp
36: // is true immediately after selector.select(...).
37: // It is inefficient in that it wakes up the selector for both
38: // the first case (BAD - wake-up required) and the second case
39: // (OK - no wake-up required).
40:
41: // 唤醒。原因,见上面中文注释
42: if (wakenUp.get()) {
43: selector.wakeup();
44: }
45: // fall through
46: default:
47: }
48:
49: // TODO 1007 NioEventLoop cancel 方法
50: cancelledKeys = 0;
51: needsToSelectAgain = false;
52:
53: final int ioRatio = this.ioRatio;
54: if (ioRatio == 100) {
55: try {
56: // 处理 Channel 感兴趣的就绪 IO 事件
57: processSelectedKeys();
58: } finally {
59: // 运行所有普通任务和定时任务,不限制时间
60: // Ensure we always run tasks.
61: runAllTasks();
62: }
63: } else {
64: final long ioStartTime = System.nanoTime();
65: try {
66: // 处理 Channel 感兴趣的就绪 IO 事件
67: processSelectedKeys();
68: } finally {
69: // 运行所有普通任务和定时任务,限制时间
70: // Ensure we always run tasks.
71: final long ioTime = System.nanoTime() - ioStartTime;
72: runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
73: }
74: }
75: } catch (Throwable t) {
76: handleLoopException(t);
77: }
78: // TODO 1006 EventLoop 优雅关闭
79: // Always handle shutdown even if the loop processing threw an exception.
80: try {
81: if (isShuttingDown()) {
82: closeAll();
83: if (confirmShutdown()) {
84: return;
85: }
86: }
87: } catch (Throwable t) {
88: handleLoopException(t);
89: }
90: }
91: }
-
第 3 行:“死”循环,直到 NioEventLoop 关闭,即【第 78 至 89 行】的代码。
-
第 5 行:调用
SelectStrategy#calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)
方法,获得使用的 select 策略。详细解析,先跳到 「2.10 SelectStrategy」 中研究。😈 看完回来。-
我们知道
SelectStrategy#calculateStrategy(...)
方法,有 3 种返回的情况。 -
第 6 至 7 行:第一种,
SelectStrategy.CONTINUE
,默认实现下,不存在这个情况。 -
第 8 至 44 行:第二种,
SelectStrategy.SELECT
,进行 Selector 阻塞 select 。-
第 11 行:重置
wakeUp
标识为false
,并返回修改前的值。 -
第 11 行:调用
#select(boolean oldWakeUp)
方法,选择( 查询 )任务。直接看这个方法不能完全表达出该方法的用途,所以详细解析,见 「2.12 select」 。 -
第 41 至 44 行:若唤醒标识
wakeup
为true
时,调用Selector#wakeup()
方法,唤醒 Selector 。可能看到此处,很多胖友会和我一样,一脸懵逼。实际上,耐下性子,答案在上面的英文注释中。笔者来简单解析下:-
1)在
wakenUp.getAndSet(false)
和#select(boolean oldWakeUp)
之间,在标识wakeUp
设置为false
时,在#select(boolean oldWakeUp)
方法中,正在调用Selector#select(...)
方法,处于阻塞中。 -
2)此时,有另外的线程调用了
#wakeup()
方法,会将标记wakeUp
设置为true
,并唤醒Selector#select(...)
方法的阻塞等待。 -
3)标识
wakeUp
为true
,所以再有另外的线程调用#wakeup()
方法,都无法唤醒Selector#select(...)
。为什么呢?因为#wakeup()
的 CAS 修改false => true
会失败,导致无法调用Selector#wakeup()
方法。 -
解决方式:所以在
#select(boolean oldWakeUp)
执行完后,增加了【第 41 至 44 行】来解决。 -
😈😈😈 整体比较绕,胖友结合实现代码 + 英文注释,再好好理解下。
-
-
-
第 46 行:第三种,
>= 0
,已经有可以处理的任务,直接向下。
-
-
第 49 至 51 行:TODO 1007 NioEventLoop cancel 方法
-
第 53 至 74 行:根据
ioRatio
的配置不同,分成略有差异的 2 种:-
第一种,
ioRatio
为 100 ,则不考虑时间占比的分配。-
第 57 行:调用
#processSelectedKeys()
方法,处理 Channel 感兴趣的就绪 IO 事件。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。 -
第 58 至 62 行:调用
#runAllTasks()
方法,运行所有普通任务和定时任务,不限制时间。详细解析,见 《Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。
-
-
第二种,
ioRatio
为< 100
,则考虑时间占比的分配。-
第 64 行:记录当前时间。
-
第 67 行:和【第 57 行】的代码一样。
-
第 71 至 72 行:🙂 比较巧妙的方式,是不是和胖友之前认为的不太一样。它是以
#processSelectedKeys()
方法的执行时间作为基准,计算#runAllTasks(long timeoutNanos)
方法可执行的时间。 -
第 72 行:调用 #runAllTasks(long timeoutNanos)` 方法,运行所有普通任务和定时任务,限制时间。
-
-
-
第 75 至 77 行:当发生异常时,调用
#handleLoopException(Throwable t)
方法,处理异常。代码如下:
private static void handleLoopException(Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
-
第 78 至 89 行:TODO 1006 EventLoop 优雅关闭
-
总的来说,
#run()
的执行过程,就是如下一张图:
2.10 SelectStrategy
io.netty.channel.SelectStrategy
,选择( select )策略接口。代码如下:
public interface SelectStrategy {
/**
* Indicates a blocking select should follow.
*
* 表示使用阻塞 select 的策略。
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
*
* 表示需要进行重试的策略。
*/
int CONTINUE = -2;
/**
* The {@link SelectStrategy} can be used to steer the outcome of a potential select
* call.
*
* @param selectSupplier The supplier with the result of a select result.
* @param hasTasks true if tasks are waiting to be processed.
* @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
* the next step should be to not select but rather jump back to the IO loop and try
* again. Any value >= 0 is treated as an indicator that work needs to be done.
*/
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
-
calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)
接口方法有 3 种返回的情况:-
SELECT
,-1
,表示使用阻塞 select 的策略。 -
CONTINUE
,-2
,表示需要进行重试的策略。实际上,默认情况下,不会返回CONTINUE
的策略。 -
>= 0
,表示不需要 select ,目前已经有可以执行的任务了。
-
2.10.1 DefaultSelectStrategy
io.netty.channel.DefaultSelectStrategy
,实现 SelectStrategy 接口,默认选择策略实现类。代码如下:
final class DefaultSelectStrategy implements SelectStrategy {
/**
* 单例
*/
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
当 hasTasks
为 true
,表示当前已经有任务,所以调用 IntSupplier#get()
方法,返回当前 Channel 新增的 IO 就绪事件的数量。代码如下:
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
io.netty.util.IntSupplier
,代码如下:
public interface IntSupplier {
/**
* Gets a result.
*
* @return a result
*/
int get() throws Exception;
}
-
类似 Java 自带的
Callable<Int>
。
-
IntSupplier 在 NioEventLoop 中的实现为
selectNowSupplier
属性。在它的内部会调用#selectNow()
方法。详细解析,见 「2.11 selectNow」 。 -
实际上,这里不调用
IntSupplier#get()
方法,也是可以的。只不过考虑到,可以通过#selectNow()
方法,无阻塞的 select Channel 是否有感兴趣的就绪事件。
-
当
hasTasks
为false
时,直接返回SelectStrategy.SELECT
,进行阻塞 select Channel 感兴趣的就绪 IO 事件。
2.11 selectNow
#selectNow()
方法,代码如下:
int selectNow() throws IOException {
try {
return selector.selectNow(); // <1>
} finally {
// restore wakeup state if needed <2>
if (wakenUp.get()) {
selector.wakeup();
}
}
}
-
<1>
处,调用Selector#selectorNow()
方法,立即( 无阻塞 )返回 Channel 新增的感兴趣的就绪 IO 事件数量。 -
<2>
处,若唤醒标识wakeup
为true
时,调用Selector#wakeup()
方法,唤醒 Selector 。因为<1>
处的Selector#selectorNow()
会使用我们对 Selector 的唤醒,所以需要进行复原。有一个冷知道,可能有胖友不知道:注意,如果有其它线程调用了
#wakeup()
方法,但当前没有线程阻塞在#select()
方法上,下个调用#select()
方法的线程会立即被唤醒。😈 有点神奇。
2.12 select
#select(boolean oldWakenUp)
方法,选择( 查询 )任务。这是本文最重要的方法。代码如下:
1: private void select(boolean oldWakenUp) throws IOException {
2: // 记录下 Selector 对象
3: Selector selector = this.selector;
4: try {
5: // select 计数器
6: int selectCnt = 0; // cnt 为 count 的缩写
7: // 记录当前时间,单位:纳秒
8: long currentTimeNanos = System.nanoTime();
9: // 计算 select 截止时间,单位:纳秒。
10: long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
11:
12: for (;;) {
13: // 计算本次 select 的超时时长,单位:毫秒。
14: // + 500000L 是为了四舍五入
15: // / 1000000L 是为了纳秒转为毫秒
16: long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
17: // 如果超时时长,则结束 select
18: if (timeoutMillis <= 0) {
19: if (selectCnt == 0) { // 如果是首次 select ,selectNow 一次,非阻塞
20: selector.selectNow();
21: selectCnt = 1;
22: }
23: break;
24: }
25:
26: // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
27: // Selector#wakeup. So we need to check task queue again before executing select operation.
28: // If we don't, the task might be pended until select operation was timed out.
29: // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
30: // 若有新的任务加入
31: if (hasTasks() && wakenUp.compareAndSet(false, true)) {
32: // selectNow 一次,非阻塞
33: selector.selectNow();
34: // 重置 select 计数器
35: selectCnt = 1;
36: break;
37: }
38:
39: // 阻塞 select ,查询 Channel 是否有就绪的 IO 事件
40: int selectedKeys = selector.select(timeoutMillis);
41: // select 计数器 ++
42: selectCnt ++;
43:
44: // 结束 select ,如果满足下面任一一个条件
45: if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
46: // - Selected something,
47: // - waken up by user, or
48: // - the task queue has a pending task.
49: // - a scheduled task is ready for processing
50: break;
51: }
52: // 线程被打断。一般情况下不会出现,出现基本是 bug ,或者错误使用。
53: if (Thread.interrupted()) {
54: // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
55: // As this is most likely a bug in the handler of the user or it's client library we will
56: // also log it.
57: //
58: // See https://github.com/netty/netty/issues/2426
59: if (logger.isDebugEnabled()) {
60: logger.debug("Selector.select() returned prematurely because " +
61: "Thread.currentThread().interrupt() was called. Use " +
62: "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
63: }
64: selectCnt = 1;
65: break;
66: }
67:
68: // 记录当前时间
69: long time = System.nanoTime();
70: // 符合 select 超时条件,重置 selectCnt 为 1
71: if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
72: // timeoutMillis elapsed without anything selected.
73: selectCnt = 1;
74: // 不符合 select 超时的提交,若 select 次数到达重建 Selector 对象的上限,进行重建
75: } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
76: selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
77: // The selector returned prematurely many times in a row.
78: // Rebuild the selector to work around the problem.
79: logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector);
80:
81: // 重建 Selector 对象
82: rebuildSelector();
83: // 修改下 Selector 对象
84: selector = this.selector;
85:
86: // Select again to populate selectedKeys.
87: // 立即 selectNow 一次,非阻塞
88: selector.selectNow();
89: // 重置 selectCnt 为 1
90: selectCnt = 1;
91: // 结束 select
92: break;
93: }
94:
95: currentTimeNanos = time;
96: }
97:
98: if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
99: if (logger.isDebugEnabled()) {
100: logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
101: }
102: }
103: } catch (CancelledKeyException e) {
104: if (logger.isDebugEnabled()) {
105: logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e);
106: }
107: // Harmless exception - log anyway
108: }
109: }
-
第 3 行:获得使用的 Selector 对象,不需要每次访问使用
volatile
修饰的selector
属性。 -
第 6 行:获得 select 操作的计数器。主要用于记录 Selector 空轮询次数,所以每次在正在轮询完成( 例如:轮询超时 ),则重置
selectCnt
为 1 。 -
第 8 行:记录当前时间,单位:纳秒。
-
第 10 行:计算 select 操作的截止时间,单位:纳秒。
-
#delayNanos(currentTimeNanos)
方法返回的为下一个定时任务距离现在的时间,如果不存在定时任务,则默认返回 1000 ms 。该方法的详细解析,见后续文章。
-
-
第 12 行:“死”循环,直到符合如下任一一种情况后结束:
-
select 操作超时,对应【第 18 至 24 行】。
-
若有新的任务加入,对应【第 26 至 37 行】。
-
查询到任务或者唤醒,对应【第 45 至 51 行】。
-
线程被异常打断,对应【第 52 至 66 行】。
-
发生 NIO 空轮询的 Bug 后重建 Selector 对象后,对应【第 75 至 93 行】。
-
-
第 16 行:计算本次 select 的超时时长,单位:毫秒。因为【第 40 行】的
Selector#select(timeoutMillis)
方法,可能因为各种情况结束,所以需要循环,并且每次重新计算超时时间。至于+ 500000L
和/ 1000000L
的用途,看下代码注释。 -
第 17 至 24 行:如果超过 select 超时时长,则结束 select 。
-
第 19 至 21 行:如果是首次 select ,则调用
Selector#selectNow()
方法,获得非阻塞的 Channel 感兴趣的就绪的 IO 事件,并重置selectCnt
为 1 。
-
-
第 26 至 37 行:若有新的任务加入。这里实际要分成两种情况:
-
第一种,提交的任务的类型是 NonWakeupRunnable ,那么它并不会调用
#wakeup()
方法,原因胖友自己看#execute(Runnable task)
思考下。Netty 在#select()
方法的设计上,能尽快执行任务。此时如果标记wakeup
为false
,说明符合这种情况,直接结束 select 。 -
第二种,提交的任务的类型不是 NonWakeupRunnable ,那么在
#run()
方法的【第 8 至 11 行】的wakenUp.getAndSet(false)
之前,发起了一次#wakeup()
方法,那么因为wakenUp.getAndSet(false)
会将标记wakeUp
设置为false
,所以就能满足hasTasks() && wakenUp.compareAndSet(false, true)
的条件。-
这个解释,就和【第 27 至 28 行】的英文注释
So we need to check task queue again before executing select operation.If we don't, the task might be pended until select operation was timed out.
有出入了?这是为什么呢?因为 Selector 被提前 wakeup 了,所以下一次 Selector 的 select 是被直接唤醒结束的。
-
-
第 33 行:虽然已经发现任务,但是还是调用
Selector#selectNow()
方法,非阻塞的获取一次 Channel 新增的就绪的 IO 事件。 -
对应 Github 的代码提交为 https://github.com/lightningMan/netty/commit/f44f3e7926f1676315ae86d0f18bdd9b95681d9f 。
-
-
第 40 行:调用
Selector#select(timeoutMillis)
方法,阻塞 select ,获得 Channel 新增的就绪的 IO 事件的数量。 -
第 42 行:select 计数器加 1 。
-
第 44 至 51 行:如果满足下面任一一个条件,结束 select :
-
selectedKeys != 0
时,表示有 Channel 新增的就绪的 IO 事件,所以结束 select ,很好理解。 -
oldWakenUp || wakenUp.get()
时,表示 Selector 被唤醒,所以结束 select 。 -
hasTasks() || hasScheduledTasks()
,表示有普通任务或定时任务,所以结束 select 。 -
那么剩余的情况,主要是 select 超时或者发生空轮询,即【第 68 至 93 行】的代码。
-
-
第 52 至 66 行:线程被打断。一般情况下不会出现,出现基本是 bug ,或者错误使用。感兴趣的胖友,可以看看 https://github.com/netty/netty/issues/2426 。
-
第 69 行:记录当前时间。
-
第 70 至 73 行:若满足
time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
,说明到达此处时,Selector 是超时 select ,那么是正常的,所以重置selectCnt
为 1 。 -
第 74 至 93 行:不符合 select 超时的提交,若 select 次数到达重建 Selector 对象的上限,进行重建。这就是 Netty 判断发生 NIO Selector 空轮询的方式,N ( 默认 512 )次 select 并未阻塞超时这么长,那么就认为发生 NIO Selector 空轮询。过多的 NIO Selector 将会导致 CPU 100% 。
-
第 82 行:调用
#rebuildSelector()
方法,重建 Selector 对象。 -
第 84 行:重新获得使用的 Selector 对象。
-
第 86 至 90 行:同【第 20 至 21 行】的代码。
-
第 92 行:结束 select 。
-
-
-
第 95 行:记录新的当前时间,用于【第 16 行】,重新计算本次 select 的超时时长。
总的来说还是比较简单的,比较困难的,在于对标记 wakeup
的理解。真的是,细思极恐!!!
推荐阅读文章:
Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件
在 Netty 中,
EventLoop
负责处理 I/O 事件以及执行用户定义的任务。对于 I/O 事件的处理,主要包括从Selector
中轮询就绪的 I/O 事件,并调用相应的处理器来处理这些事件。下面我们将详细探讨EventLoop
如何处理 I/O 事件。EventLoop 处理 I/O 事件的过程
1. EventLoop 的启动
EventLoop
的启动通常是在创建EventLoopGroup
并调用EventLoopGroup
的next()
方法时完成的。每个
EventLoop
对象都有一个关联的Selector
,用于监听注册在其上的Channel
的 I/O 事件。2. Channel 注册
当创建一个新的
Channel
时,它会被注册到一个EventLoop
上。注册过程涉及将
Channel
的SelectionKey
添加到Selector
中,并设置相应的事件兴趣集合(例如OP_READ
,OP_WRITE
)。3. 事件循环
EventLoop
的核心是一个无限循环,通常称为 “事件循环” 或 “运行循环”。在这个循环中,
EventLoop
会调用Selector#select()
方法来轮询就绪的 I/O 事件。如果有事件就绪,
Selector
会返回一个包含就绪事件的SelectionKey
集合。4. 处理 I/O 事件
获取到就绪的
SelectionKey
集合后,EventLoop
会遍历这些SelectionKey
,并根据每个SelectionKey
的就绪事件类型调用相应的处理方法。例如,如果
SelectionKey
表示读事件已准备好 (OP_READ
),则会调用Channel
的read()
方法来处理读事件。同样地,如果
SelectionKey
表示写事件已准备好 (OP_WRITE
),则会调用Channel
的write()
方法来处理写事件。5. 任务执行
在处理完 I/O 事件之后,
EventLoop
会检查TaskQueue
和ScheduledTaskQueue
,执行其中的任务。示例代码
下面是一个简化的
EventLoop
处理 I/O 事件的伪代码示例:public class EventLoop extends Thread { private final Selector selector; private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<>(); private final Queue<ScheduledFuture<?>> scheduledTaskQueue = new LinkedBlockingQueue<>(); public EventLoop(Selector selector) { this.selector = selector; } public void register(Channel channel, int interestOps) { // Register the channel with the selector } public void execute(Runnable task) { // Add the task to the queue and wake up the loop if necessary } public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) { // Schedule the task to be executed after the specified delay } @Override public void run() { while (true) { // Select ready keys from the selector selector.select(); // Process selected keys Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); // Remove the key to prevent duplicate processing // Process the event based on the key's readiness if (key.isReadable()) { // Handle read event Channel ch = (Channel) key.channel(); ch.pipeline().fireChannelRead(...); } else if (key.isWritable()) { // Handle write event Channel ch = (Channel) key.channel(); ch.pipeline().fireChannelWrite(...); } } // Execute tasks in the task queue Runnable task; while ((task = taskQueue.poll()) != null) { task.run(); } // Execute scheduled tasks ScheduledFuture<?> scheduledTask; while ((scheduledTask = scheduledTaskQueue.poll()) != null) { scheduledTask.run(); } } } }
注意事项
EventLoop
在处理 I/O 事件时,通常会调用Channel
的Pipeline
来处理具体的事件,而Pipeline
中包含了多个Handler
。
Pipeline
提供了一个灵活的方式来处理事件,可以添加多个Handler
来处理读写事件,比如编码器、解码器、业务逻辑处理器等。
EventLoop
在处理 I/O 事件时需要确保线程安全,特别是当从外部唤醒或提交任务时。通过以上步骤,
EventLoop
可以有效地处理 I/O 事件,并执行相关的任务,从而实现高效的网络通信处理。
在 Netty 中,EventLoop 是负责处理 I/O 事件和执行任务的核心组件。在前面的文章中,我们已经了解了 EventLoop 的基本结构和运行机制。本文将对 EventLoop 处理 I/O 事件的详细过程进行解析。
EventLoop 处理 I/O 事件的过程
1. 选择器(Selector)准备
在 EventLoop 的
run()
方法中,首先会调用selector.selectNow()
或selector.select(timeout)
方法来获取已就绪的事件。这些事件包括连接建立、数据可读、连接关闭等。int selectedKeysCount = selector.selectNow(); if (selectedKeysCount == 0) { continue; }
2. 处理已就绪的事件
获取到已就绪的事件后,EventLoop 会遍历这些事件,并根据事件类型进行相应的处理。
Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isAcceptable()) { // 处理连接建立事件 } else if (key.isReadable()) { // 处理数据可读事件 } else if (key.isWritable()) { // 处理数据可写事件 } else if (key.isConnectable()) { // 处理连接完成事件 } else if (key.isClosed()) { // 处理连接关闭事件 } keyIterator.remove(); }
3. 处理 I/O 事件
针对不同的事件类型,EventLoop 会调用相应的处理器进行处理。以下是一些常见的事件处理逻辑:
连接建立事件(isAcceptable):当有新的连接请求时,会调用
ServerSocketChannel.accept()
方法接受连接,并将新连接的SocketChannel
注册到 Selector 上。if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocket.classicAccept(); if (socketChannel != null) { configureAndRegister(socketChannel); } }
数据可读事件(isReadable):当通道中有数据可读时,会调用
SocketChannel.read()
方法读取数据,并将数据传递给 ChannelPipeline 进行处理。if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int bytesRead = socketChannel.read(buffer); if (bytesRead > 0) { buffer.flip(); channel.pipeline().fireChannelRead(buffer); } else if (bytesRead == -1) { closeChannel(socketChannel); } }
数据可写事件(isWritable):当通道可写时,会调用
SocketChannel.write()
方法将数据写入通道。if (key.isWritable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); List<ByteBuf> pendingData = socketChannel.unsafe().outboundBuffer(); for (ByteBuf buf : pendingData) { int bytesWritten = socketChannel.write(buf); if (bytesWritten < 0 || buf.isReadOnly()) { closeChannel(socketChannel); break; } } }
连接完成事件(isConnectable):当连接建立完成时,会调用
SocketChannel.finishConnect()
方法完成连接。if (key.isConnectable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); if (socketChannel.finishConnect()) { channel.pipeline().fireChannelActive(); } else { closeChannel(socketChannel); } }
连接关闭事件(isClosed):当连接关闭时,会调用
SocketChannel.close()
方法关闭通道。if (key.isClosed()) { SocketChannel socketChannel = (SocketChannel) key.channel(); closeChannel(socketChannel); }
总结
EventLoop 处理 I/O 事件的过程包括选择器准备、处理已就绪的事件和处理 I/O 事件等步骤。针对不同的事件类型,EventLoop 会调用相应的处理器进行处理。通过深入了解 EventLoop 处理 I/O 事件的过程,可以更好地理解 Netty 的内部工作原理,并在实际开发中更好地利用 Netty 的特性。
本文主要介绍了 Netty 的 NioEventLoop 类中处理 IO 事件相关代码的实现,包括如何创建 Selector 对象、优化 SelectionKey 集合、重建 Selector 对象以及处理就绪的 IO 事件等。
Key Takeaways
NioEventLoop 类使用 SelectorTuple 来封装 Selector 对象,并通过 SelectedSelectionKeySet 和 SelectedSelectionKeySetSelector 优化 SelectionKey 集合。
NioEventLoop 类提供了 openSelector 方法,用于创建 Selector 对象,并通过反射机制优化 Selector 对象的 selectedKeys 集合。
当 NIO Selector 发生 epoll bug 时,NioEventLoop 类会调用 rebuildSelector 方法重建 Selector 对象。
NioEventLoop 类使用 processSelectedKeysOptimized 方法处理 Channel 新增就绪的 IO 事件,并根据 attachment 类型选择不同的处理方式。
NioEventLoop 类提供了 register 方法,用于注册任意 SelectableChannel 到 Selector 上,并通过 NioTask 接口处理事件。
NioEventLoop 类使用 processSelectedKey 方法处理 Channel 就绪的 IO 事件,包括完成连接、写入数据和读取数据等。
NioTask 接口定义了 channelReady 和 channelUnregistered 方法,分别用于处理 Channel IO 就绪的事件和 Channel 取消注册的事件。
因为在 《Netty 源码解析 —— EventLoop(四)之 EventLoop 运行》 中,#openSelector()
和 #rebuildSelector()
方法并未做分享,所以我们先来一起看看。
2. SelectorTuple
SelectorTuple ,Selector 元组。代码如下:
SelectorTuple 内嵌在 NioEventLoop
private static final class SelectorTuple {
/**
* 未包装的 Selector 对象
*/
final Selector unwrappedSelector;
/**
* 未包装的 Selector 对象
*/
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
SelectorTuple
类是 Netty 中NioEventLoop
的一部分,用于处理基于 NIO(New I/O)的网络操作。这个类的设计目的是为了处理 Java NIO 中的一个已知问题,即Selector
的 WakenUp 机制可能会导致的 bug。让我们分析一下
SelectorTuple
类:
成员变量:
unwrappedSelector
: 这个字段代表了原始的、未被包装的Selector
对象。通常情况下,这个Selector
是直接由Selector.open()
创建得到的。
selector
: 这个字段同样是一个Selector
对象,但它可能与unwrappedSelector
相同,也可能不同。当Selector
被包装以解决某些问题时,这个字段就变得有意义了。构造函数:
第一个构造函数只有一个参数
unwrappedSelector
,它创建了一个SelectorTuple
实例,其中selector
和unwrappedSelector
指向同一个对象。第二个构造函数接受两个参数,允许创建一个
SelectorTuple
实例,在这种情况下,selector
和unwrappedSelector
可以指向不同的Selector
对象。使用场景
在 Netty 中,
SelectorTuple
主要用于解决 Java NIOSelector
的一些问题,比如:
Wakeup Bug: 当一个
Selector
被唤醒(wakeup
方法被调用),然后立即进行选择操作(select
或selectNow
方法被调用),可能会导致某些事件丢失或不被正确处理的问题。多线程访问问题: 在多线程环境中,如果多个线程同时访问同一个
Selector
,可能会出现竞态条件。为了解决这些问题,Netty 可能会创建一个包装过的
Selector
,这个包装器能够确保即使在上述情况下也能正确处理事件。SelectorTuple
的selector
字段就可能指向这样一个包装过的Selector
。示例
在
NioEventLoop
中,SelectorTuple
通常是在initialize
方法中创建的。如果需要的话,Netty 会使用一个WrappedSelectionKey
和WrappedSelector
来包装原始的Selector
。这些包装类可以帮助解决上述提到的问题。总结
SelectorTuple
在 Netty 中的作用是为了确保Selector
的正确性和健壮性,特别是在并发和复杂网络环境中。通过提供一个可选的包装Selector
,Netty 能够更好地控制和管理底层的 I/O 操作。
SelectorTuple
是 Netty 中的一个内部类,用于封装Selector
对象。在 Netty 的 NIO 事件循环(NioEventLoop
)中,SelectorTuple
被用来管理和操作底层的 I/O 多路复用器(即Selector
)。Selector
是 Java NIO 的核心组件之一,它允许单个线程处理多个网络连接。
SelectorTuple
类有两个构造函数:
一个接受单个
Selector
对象作为参数。在这种情况下,unwrappedSelector
和selector
都指向同一个Selector
实例。另一个构造函数接受两个
Selector
对象作为参数。这通常用于包装或替换底层的Selector
,例如在使用自定义的SelectorProvider
或者在进行某些性能优化时。
SelectorTuple
类的两个主要字段是:
unwrappedSelector
:这是原始的、未经过任何包装的Selector
对象。它通常用于执行底层的 I/O 多路复用操作。
selector
:这是可能被包装过的Selector
对象。在某些情况下,Netty 可能会对Selector
进行包装,以添加额外的功能或行为。在 Netty 的
NioEventLoop
中,SelectorTuple
被用来管理 I/O 事件的分发和处理。NioEventLoop
会使用Selector
来监视多个通道(Channel
)上的 I/O 事件(如可读、可写等),并在事件发生时触发相应的处理器。总之,
SelectorTuple
是 Netty 中用于封装和管理Selector
对象的一个简单类。它允许 Netty 在其内部实现中对Selector
进行灵活的操作和扩展。
3. openSelector
#openSelector()
方法,创建 Selector 对象。代码如下:
1: private SelectorTuple openSelector() {
2: // 创建 Selector 对象,作为 unwrappedSelector
3: final Selector unwrappedSelector;
4: try {
5: unwrappedSelector = provider.openSelector();
6: } catch (IOException e) {
7: throw new ChannelException("failed to open a new selector", e);
8: }
9:
10: // 禁用 SelectionKey 的优化,则直接返回 SelectorTuple 对象。即,selector 也使用 unwrappedSelector 。
11: if (DISABLE_KEYSET_OPTIMIZATION) {
12: return new SelectorTuple(unwrappedSelector);
13: }
14:
15: // 获得 SelectorImpl 类
16: Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
17: @Override
18: public Object run() {
19: try {
20: return Class.forName(
21: "sun.nio.ch.SelectorImpl",
22: false,
23: PlatformDependent.getSystemClassLoader()); // 成功,则返回该类
24: } catch (Throwable cause) {
25: return cause; // 失败,则返回该异常
26: }
27: }
28: });
29:
30: // 获得 SelectorImpl 类失败,则直接返回 SelectorTuple 对象。即,selector 也使用 unwrappedSelector 。
31: if (!(maybeSelectorImplClass instanceof Class) ||
32: // ensure the current selector implementation is what we can instrument.
33: !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
34: if (maybeSelectorImplClass instanceof Throwable) {
35: Throwable t = (Throwable) maybeSelectorImplClass;
36: logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
37: }
38: return new SelectorTuple(unwrappedSelector);
39: }
40:
41: final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
42:
43: // 创建 SelectedSelectionKeySet 对象
44: final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
45:
46: // 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 中
47: Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
48: @Override
49: public Object run() {
50: try {
51: // 获得 "selectedKeys" "publicSelectedKeys" 的 Field
52: Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
53: Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
54:
55: // 设置 Field 可访问
56: Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
57: if (cause != null) {
58: return cause;
59: }
60: cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
61: if (cause != null) {
62: return cause;
63: }
64:
65: // 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 的 Field 中
66: selectedKeysField.set(unwrappedSelector, selectedKeySet);
67: publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
68: return null;
69: } catch (NoSuchFieldException e) {
70: return e; // 失败,则返回该异常
71: } catch (IllegalAccessException e) {
72: return e; // 失败,则返回该异常
73: }
74: }
75: });
76:
77: // 设置 SelectedSelectionKeySet 对象到 unwrappedSelector 中失败,则直接返回 SelectorTuple 对象。即,selector 也使用 unwrappedSelector 。
78: if (maybeException instanceof Exception) {
79: selectedKeys = null;
80: Exception e = (Exception) maybeException;
81: logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
82: return new SelectorTuple(unwrappedSelector);
83: }
84:
85: // 设置 SelectedSelectionKeySet 对象到 selectedKeys 中
86: selectedKeys = selectedKeySet;
87: logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
88:
89: // 创建 SelectedSelectionKeySetSelector 对象
90: // 创建 SelectorTuple 对象。即,selector 也使用 SelectedSelectionKeySetSelector 对象。
91: return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
92: }
-
第 2 至 8 行:创建 Selector 对象,作为
unwrappedSelector
。 -
第 10 至 13 行:禁用 SelectionKey 的优化,则直接返回 SelectorTuple 对象。即,
selector
也使用unwrappedSelector
。 -
第 15 至 28 行:获得 SelectorImpl 类。可以自动过滤掉
AccessController#.doPrivileged(...)
外层代码。在方法内部,调用Class#forName(String name, boolean initialize, ClassLoader loader)
方法,加载sun.nio.ch.SelectorImpl
类。加载成功,则返回该类,否则返回异常。-
第 30 至 39 行: 获得 SelectorImpl 类失败,则直接返回 SelectorTuple 对象。即,
selector
也使用unwrappedSelector
。
-
-
第 44 行:创建 SelectedSelectionKeySet 对象。这是 Netty 对 Selector 的
selectionKeys
的优化。关于 SelectedSelectionKeySet 的详细实现,见 「4. SelectedSelectionKeySet」 。-
第 46 至 75 行: 设置 SelectedSelectionKeySet 对象到
unwrappedSelector
中的selectedKeys
和publicSelectedKeys
属性。整个过程,笔者已经添加中文注释,胖友自己看下。 -
selectedKeys
和publicSelectedKeys
属性在 SelectorImpl 类中,代码如下:
-
protected HashSet<SelectionKey> keys = new HashSet(); // => publicKeys
private Set<SelectionKey> publicKeys;
protected Set<SelectionKey> selectedKeys = new HashSet(); // => publicSelectedKeys
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) { // 可以无视
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}
-
可以看到,
selectedKeys
和publicSelectedKeys
的类型都是 HashSet 。
-
第 77 至 83 行:设置 SelectedSelectionKeySet 对象到
unwrappedSelector
中失败,则直接返回 SelectorTuple 对象。即,selector
也使用unwrappedSelector
。
-
第 86 行:设置 SelectedSelectionKeySet 对象到
selectedKeys
中。在下文,我们会看到,是否成功优化 Selector 对象,是通过selectedKeys
是否成功初始化来判断。 -
第 91 行:创建 SelectedSelectionKeySetSelector 对象。这是 Netty 对 Selector 的优化实现类。关于 SelectedSelectionKeySetSelector 的详细实现,见 「5. SelectedSelectionKeySetSelector」 。
-
第 91 行:创建 SelectorTuple 对象。即,
selector
使用 SelectedSelectionKeySetSelector 对象。😈 总算,创建成功优化的selector
对象了。
这段代码展示了
NioEventLoop
类中的openSelector
方法,其目的是创建一个新的Selector
并对其进行可能的优化。根据上下文,我们可以看到这个方法主要做了以下几件事:
创建原始的
Selector
:
在第 3 行至第 8 行之间,通过
provider.openSelector()
创建了一个新的Selector
对象,并将其存储在unwrappedSelector
变量中。如果创建过程中抛出了IOException
,则会抛出一个ChannelException
。禁用优化检查:
如果
DISABLE_KEYSET_OPTIMIZATION
为true
,那么将不会进行任何额外的优化,并且直接返回一个SelectorTuple
对象,其中unwrappedSelector
和selector
字段都指向同一个Selector
对象(第 10 至 13 行)。尝试优化
Selector
:
如果
DISABLE_KEYSET_OPTIMIZATION
为false
,则尝试对Selector
进行优化。这包括尝试替换Selector
内部使用的Set
以提高性能(第 15 至 92 行)。获取
SelectorImpl
类:
使用
AccessController.doPrivileged
安全地获取SelectorImpl
类(第 16 至 28 行)。这是为了确保代码可以在没有足够的权限的情况下安全运行。检查
SelectorImpl
类:
如果未能成功获取
SelectorImpl
类或者unwrappedSelector
不是SelectorImpl
的实例,则不进行任何优化并直接返回SelectorTuple
对象(第 30 至 39 行)。优化
Selector
:
如果成功获取到了
SelectorImpl
类并且unwrappedSelector
是它的实例,则创建一个新的SelectedSelectionKeySet
对象来替换Selector
内部的selectedKeys
和publicSelectedKeys
字段(第 44 至 54 行)。设置
SelectedSelectionKeySet
:
使用反射设置
SelectedSelectionKeySet
到unwrappedSelector
的内部字段(第 55 至 75 行)。如果设置过程中发生异常,则放弃优化并直接返回SelectorTuple
对象(第 76 至 83 行)。创建
SelectedSelectionKeySetSelector
:
如果优化成功,创建一个
SelectedSelectionKeySetSelector
对象,并将其作为SelectorTuple
的selector
字段返回(第 89 至 92 行)。总结
openSelector
方法的主要目标是创建一个新的Selector
并尝试对其进行优化。这种优化主要是通过替换Selector
内部使用的Set
来实现的,从而提高性能。如果优化失败,方法将直接返回一个简单的SelectorTuple
对象,其中包含原始的Selector
。
4. SelectedSelectionKeySet
io.netty.channel.nio.SelectedSelectionKeySet
,继承 AbstractSet 抽象类,已 select 的 NIO SelectionKey 集合。代码如下:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
/**
* SelectionKey 数组
*/
SelectionKey[] keys;
/**
* 数组可读大小
*/
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024]; // 默认 1024 大小
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
// 添加到数组
keys[size++] = o;
// 超过数组大小上限,进行扩容
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public int size() {
return size;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public Iterator<SelectionKey> iterator() {
throw new UnsupportedOperationException();
}
void reset() {
reset(0);
}
void reset(int start) {
// 重置数组内容为空
Arrays.fill(keys, start, size, null);
// 重置可读大小为 0
size = 0;
}
private void increaseCapacity() {
// 两倍扩容
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
// 复制老数组到新数组
System.arraycopy(keys, 0, newKeys, 0, size);
// 赋值给老数组
keys = newKeys;
}
}
-
通过
keys
和size
两个属性,实现可重用的数组。 -
#add(SelectionKey o)
方法,添加新 select 到就绪事件的 SelectionKey 到keys
中。当超过数组大小上限时,调用#increaseCapacity()
方法,进行两倍扩容。相比 SelectorImpl 中使用的selectedKeys
所使用的 HashSet 的#add(E e)
方法,事件复杂度从O(lgn)
降低到O(1)
。 -
#reset(...)
方法,每次读取使用完数据,调用该方法,进行重置。 -
因为
#remove(Object o)
、#contains(Object o)
、#iterator()
不会使用到,索性不进行实现。
SelectedSelectionKeySet
类是 Netty 为了优化Selector
的性能而设计的一个类。这个类继承自AbstractSet<SelectionKey>
,用于存储已就绪的SelectionKey
对象。下面是对其关键特性的分析:关键特性
成员变量:
keys
: 一个SelectionKey
类型的数组,用于存储已就绪的SelectionKey
对象。
size
: 一个整数,表示当前keys
数组中有多少个有效的SelectionKey
。构造函数:
构造函数初始化
keys
数组,默认大小为 1024。方法:
add(SelectionKey o)
:
此方法用于添加一个新的
SelectionKey
到集合中。如果
SelectionKey
为null
,则返回false
。否则,将
SelectionKey
添加到keys
数组的末尾,并增加size
。如果
size
达到keys
数组的长度,则调用increaseCapacity()
方法进行数组扩容。
size()
:
返回当前
SelectedSelectionKeySet
中有效SelectionKey
的数量。
remove(Object o)
:
由于不需要移除功能,此方法总是返回
false
。
contains(Object o)
:
由于不需要查询功能,此方法总是返回
false
。
iterator()
:
抛出
UnsupportedOperationException
,表明此集合不允许迭代访问。
reset()
和reset(int start)
:
reset()
方法用于清空整个keys
数组,并将size
重置为 0。
reset(int start)
方法用于从指定位置开始清空keys
数组,并将size
重置为指定的位置。
increaseCapacity()
:
此方法用于当
keys
数组满时进行扩容。新数组的大小是原数组大小的两倍。
原数组的内容会被复制到新数组中。
keys
变量会被更新为新数组。性能优化
SelectedSelectionKeySet
的设计旨在提高性能,尤其是在高并发环境下。与传统的HashSet
相比,它有以下优势:
减少内存消耗:
由于
SelectedSelectionKeySet
使用固定大小的数组来存储SelectionKey
,因此相比于HashSet
,它在内存使用上更加高效。减少 CPU 开销:
SelectedSelectionKeySet
的add
方法的时间复杂度为 O(1),这意味着添加SelectionKey
的操作非常快。
HashSet
的add
方法的时间复杂度通常为 O(log n) 或更差,具体取决于哈希冲突的情况。简化操作:
由于不需要支持删除或查找操作,因此可以省略相应的逻辑,进一步提高性能。
使用场景
SelectedSelectionKeySet
主要用于 Netty 的Selector
优化中,尤其是当Selector
的内部selectedKeys
字段被替换为此类时。这样,当执行Selector.select()
或Selector.selectNow()
时,已就绪的SelectionKey
将被存储在这个优化后的集合中,从而提高整体的 I/O 处理效率。综上所述,
SelectedSelectionKeySet
是 Netty 中为了提高Selector
性能而设计的一个重要组件,通过使用数组和简单的数组操作来替代HashSet
,实现了更好的性能表现。
SelectedSelectionKeySet
是 Netty 中的一个内部类,用于存储已选择的SelectionKey
集合。这个类继承自AbstractSet<SelectionKey>
,并实现了部分Set
接口的方法。SelectedSelectionKeySet
的主要目的是优化Selector
的selectedKeys
和publicSelected
属性,以提高性能和减少内存开销。以下是
SelectedSelectionKeySet
类的主要特点和方法:主要属性:
keys
:一个SelectionKey
数组,用于存储已选择的SelectionKey
。
size
:数组中实际存储的SelectionKey
的数量。主要方法:
add(SelectionKey o)
:向keys
数组中添加一个新的SelectionKey
。如果数组已满,则调用increaseCapacity()
方法进行扩容。
size()
:返回数组中实际存储的SelectionKey
的数量。
remove(Object o)
和contains(Object o)
:这两个方法在SelectedSelectionKeySet
中没有实现,因为它们不会被使用。
iterator()
:这个方法抛出UnsupportedOperationException
,因为SelectedSelectionKeySet
不支持迭代。
reset()
和reset(int start)
:这两个方法用于重置keys
数组的内容和size
属性。在每次读取完数据后,调用这些方法进行重置。
increaseCapacity()
:当数组已满时,这个方法用于将数组容量扩大一倍。性能优化:
数组实现:与
SelectorImpl
中使用的HashSet
相比,SelectedSelectionKeySet
使用数组来存储SelectionKey
。这使得添加操作的复杂度从 O(log n) 降低到 O(1)。可重用性:通过
reset()
方法,SelectedSelectionKeySet
可以在每次读取完数据后重置,从而实现可重用性,减少内存分配和垃圾回收的开销。总之,
SelectedSelectionKeySet
是 Netty 对Selector
的selectedKeys
和publicSelectedKeys
属性的优化实现。通过使用数组和重置机制,它提高了性能并减少了内存开销。
5. SelectedSelectionKeySetSelector
io.netty.channel.nio.SelectedSelectionKeySetSelector
,基于 Netty SelectedSelectionKeySet 作为 selectionKeys
的 Selector 实现类。代码如下:
final class SelectedSelectionKeySetSelector extends Selector {
/**
* SelectedSelectionKeySet 对象
*/
private final SelectedSelectionKeySet selectionKeys;
/**
* 原始 Java NIO Selector 对象
*/
private final Selector delegate;
SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
this.delegate = delegate;
this.selectionKeys = selectionKeys;
}
@Override
public boolean isOpen() {
return delegate.isOpen();
}
@Override
public SelectorProvider provider() {
return delegate.provider();
}
@Override
public Set<SelectionKey> keys() {
return delegate.keys();
}
@Override
public Set<SelectionKey> selectedKeys() {
return delegate.selectedKeys();
}
@Override
public int selectNow() throws IOException {
// 重置 selectionKeys
selectionKeys.reset();
// selectNow
return delegate.selectNow();
}
@Override
public int select(long timeout) throws IOException {
// 重置 selectionKeys
selectionKeys.reset();
// select
return delegate.select(timeout);
}
@Override
public int select() throws IOException {
// 重置 selectionKeys
selectionKeys.reset();
// select
return delegate.select();
}
@Override
public Selector wakeup() {
return delegate.wakeup();
}
@Override
public void close() throws IOException {
delegate.close();
}
}
-
除了 select 相关的 3 个方法,每个实现方法,都是基于 Java NIO Selector 对应的方法的调用。
-
select 相关的 3 个方法,在调用对应的 Java NIO Selector 方法之前,会调用
SelectedSelectionKeySet#reset()
方法,重置selectionKeys
。从而实现,每次 select 之后,都是新的已 select 的 NIO SelectionKey 集合。
SelectedSelectionKeySetSelector
是 Netty 中的一个内部类,它基于 Java NIO 的Selector
接口实现,但使用了 Netty 自定义的SelectedSelectionKeySet
作为其selectionKeys
集合。这个类的主要目的是为了优化Selector
的性能,特别是在处理大量 I/O 事件时。以下是
SelectedSelectionKeySetSelector
类的主要特点和方法:主要属性:
selectionKeys
:一个SelectedSelectionKeySet
对象,用于存储已选择的SelectionKey
。
delegate
:原始的 Java NIOSelector
对象,SelectedSelectionKeySetSelector
的大部分方法都会委托给这个对象。主要方法:
isOpen()
、provider()
、keys()
、selectedKeys()
、wakeup()
和close()
:这些方法直接调用delegate
对应的方法,不做任何修改。
selectNow()
、select(long timeout)
和select()
:这三个方法在调用delegate
对应的方法之前,会先调用selectionKeys.reset()
方法重置selectionKeys
。这样做的目的是确保每次select
操作后,selectionKeys
都包含最新的已选择SelectionKey
集合。性能优化:
重置
selectionKeys
:通过在每次select
操作前重置selectionKeys
,SelectedSelectionKeySetSelector
确保了selectionKeys
始终包含最新的已选择SelectionKey
。这避免了在SelectorImpl
中使用HashSet
时可能出现的并发修改问题,同时也减少了内存分配和垃圾回收的开销。基于数组的
SelectedSelectionKeySet
:如前所述,SelectedSelectionKeySet
使用数组来存储SelectionKey
,这比HashSet
更高效,因为它避免了HashSet
的 O(log n) 添加复杂度,提供了 O(1) 的添加复杂度。总的来说,
SelectedSelection
KeySetSelector是 Netty 对 Java NIO
Selector的一个优化实现,它通过使用自定义的
SelectedSelectionKeySet` 和重置机制,提高了处理大量 I/O 事件时的性能。
重置机制可以提高性能的原因主要体现在以下几个方面:
避免并发修改问题:
在多线程环境下,如果多个线程同时修改
selectionKeys
集合,可能会导致并发修改问题。通过重置机制,在每次select
操作前清空selectionKeys
,可以避免这种问题,确保selectionKeys
始终包含最新的已选择SelectionKey
。减少内存分配和垃圾回收开销:
如果不使用重置机制,每次
select
操作后,旧的SelectionKey
集合不会被立即回收,可能会导致内存占用不断增加。通过重置机制,旧的SelectionKey
集合会被立即清空,释放内存,从而减少垃圾回收的开销。提高数据访问效率:
使用数组实现的
SelectedSelectionKeySet
比HashSet
更高效,因为它避免了HashSet
的 O(log n) 添加复杂度,提供了 O(1) 的添加复杂度。通过重置机制,可以确保SelectedSelectionKeySet
始终处于最佳状态,提高数据访问效率。简化代码逻辑:
通过重置机制,可以简化代码逻辑,避免在每次
select
操作后手动清理selectionKeys
。这使得代码更简洁,更容易维护。提高可扩展性:
使用重置机制,可以更容易地扩展和优化
SelectedSelectionKeySet
和SelectedSelectionKeySetSelector
。例如,可以在reset
方法中添加更多的逻辑,以实现更高级的功能,如统计已选择的SelectionKey
数量等。总之,重置机制通过避免并发修改问题、减少内存分配和垃圾回收开销、提高数据访问效率、简化代码逻辑和提高可扩展性等方式,提高了处理大量 I/O 事件时的性能。
SelectedSelectionKeySetSelector
类是 Netty 中的一个内部类,它扩展了标准的Selector
类,并使用SelectedSelectionKeySet
来优化选择操作。以下是对此类的关键特性的分析:关键特性
成员变量:
selectionKeys
: 一个SelectedSelectionKeySet
对象,用于存储已就绪的SelectionKey
。
delegate
: 一个原始的Selector
对象,用于委托实际的选择操作。构造函数:
构造函数接受一个
Selector
对象和一个SelectedSelectionKeySet
对象,并将它们分别赋值给delegate
和selectionKeys
成员变量。方法:
isOpen()
:
返回
delegate
的isOpen()
方法的结果。
provider()
:
返回
delegate
的provider()
方法的结果。
keys()
:
返回
delegate
的keys()
方法的结果。
selectedKeys()
:
返回
delegate
的selectedKeys()
方法的结果。
selectNow()
:
在调用
delegate
的selectNow()
方法之前,先调用selectionKeys.reset()
方法来重置SelectedSelectionKeySet
。返回
delegate
的selectNow()
方法的结果。
select(long timeout)
:
在调用
delegate
的select(timeout)
方法之前,先调用selectionKeys.reset()
方法来重置SelectedSelectionKeySet
。返回
delegate
的select(timeout)
方法的结果。
select()
:
在调用
delegate
的select()
方法之前,先调用selectionKeys.reset()
方法来重置SelectedSelectionKeySet
。返回
delegate
的select()
方法的结果。
wakeup()
:
返回
delegate
的wakeup()
方法的结果。
close()
:
调用
delegate
的close()
方法。性能优化
SelectedSelectionKeySetSelector
的设计旨在提高性能,尤其是在高并发环境下。与标准的Selector
相比,它有以下优势:
减少内存消耗:
由于
SelectedSelectionKeySet
使用固定大小的数组来存储SelectionKey
,因此相比于标准的Selector
使用的HashSet
,它在内存使用上更加高效。减少 CPU 开销:
SelectedSelectionKeySet
的add
方法的时间复杂度为 O(1),这意味着添加SelectionKey
的操作非常快。标准的
Selector
的selectedKeys()
方法通常需要遍历一个HashSet
,其时间复杂度为 O(n)。简化操作:
由于不需要支持删除或查找操作,因此可以省略相应的逻辑,进一步提高性能。
使用场景
SelectedSelectionKeySetSelector
主要用于 Netty 的Selector
优化中。当创建Selector
时,如果条件允许,Netty 会创建一个SelectedSelectionKeySetSelector
来代替标准的Selector
。这样,当执行select
或selectNow
时,已就绪的SelectionKey
将被存储在这个优化后的集合中,从而提高整体的 I/O 处理效率。综上所述,
SelectedSelectionKeySetSelector
是 Netty 中为了提高Selector
性能而设计的一个重要组件,通过使用优化过的SelectedSelectionKeySet
来替代标准的HashSet
,实现了更好的性能表现。
您总结得很准确!
SelectedSelectionKeySetSelector
确实是 Netty 对 Java NIOSelector
的一种优化实现。它通过使用自定义的SelectedSelectionKeySet
和重置机制,显著提高了处理大量 I/O 事件时的性能。下面是对这一优化的一些详细解释:优化点
使用
SelectedSelectionKeySet
:
SelectedSelectionKeySet
是一个针对SelectionKey
的优化容器,它使用一个数组来存储已就绪的SelectionKey
,而不是使用HashSet
。与
HashSet
相比,这种方法减少了内存消耗,并且提供了更快的添加操作(O(1) 时间复杂度)。重置机制:
每次执行
select
、selectNow
或带有超时的select
方法之前,都会调用SelectedSelectionKeySet#reset()
方法来清空已就绪的SelectionKey
集合。这样可以确保每次选择操作后都有一个全新的就绪事件列表,从而避免了多次遍历旧的事件列表,进一步提高了性能。
性能提升
减少内存消耗:
SelectedSelectionKeySet
使用固定大小的数组来存储SelectionKey
,相比于HashSet
,它在内存使用上更加高效。减少 CPU 开销:
SelectedSelectionKeySet
的add
方法的时间复杂度为 O(1),这意味着添加SelectionKey
的操作非常快。标准的
Selector
的selectedKeys()
方法通常需要遍历一个HashSet
,其时间复杂度为 O(n)。简化操作:
由于不需要支持删除或查找操作,因此可以省略相应的逻辑,进一步提高性能。
使用场景
高并发环境:
在处理大量连接和 I/O 事件的场景下,如 Web 服务器或游戏服务器,这种优化特别有用。
减少内存消耗和提高性能对于维持系统的稳定性和响应能力至关重要。
总结
SelectedSelectionKeySetSelector
通过使用自定义的SelectedSelectionKeySet
和重置机制,为 Netty 提供了一种更高效的Selector
实现方式。这种方式在处理大量 I/O 事件时能够显著提高性能,尤其是在高并发的网络应用中。通过减少内存消耗和提高 CPU 效率,Netty 能够更好地应对现代网络应用的需求。
重置机制在
SelectedSelectionKeySetSelector
中的作用是确保每次选择操作 (select
,selectNow
,select(long timeout)
) 都能从一个干净的状态开始。这是因为每次选择操作完成后,Selector
的selectedKeys()
集合会保留上次选择操作的结果,直到这些结果被显式地清理或处理。这种机制可能会导致性能问题,特别是当有大量的SelectionKey
时。问题背景
在 Java NIO 中,
Selector
的selectedKeys()
方法返回的是一个Set
,这个Set
包含了所有已准备好 I/O 操作的SelectionKey
。但是,这个Set
在选择操作完成之后并不会自动清空。这意味着如果应用程序没有显式地处理这些SelectionKey
,例如通过调用Set.clear()
方法,那么这些SelectionKey
会一直保留在Set
中,这可能导致以下问题:
内存消耗增加:
随着时间的推移,越来越多的
SelectionKey
会被累积在selectedKeys()
集合中,导致内存占用增加。CPU 开销增加:
每次执行选择操作时,
Selector
需要遍历selectedKeys()
集合来确定哪些SelectionKey
已经准备好。随着集合的增长,这个过程会变得更加耗时。不必要的工作:
应用程序可能会处理不再相关的
SelectionKey
,因为它们可能已经处理过但没有被清除。重置机制的优势
SelectedSelectionKeySetSelector
通过在每次选择操作之前调用SelectedSelectionKeySet#reset()
方法来解决这些问题。这种方法有几个关键优势:
确保干净状态:
每次选择操作前都会清空
SelectedSelectionKeySet
,确保只处理最新的就绪事件。减少内存消耗:
清空
SelectedSelectionKeySet
避免了累积大量的SelectionKey
,从而减少了内存占用。减少 CPU 开销:
清空
SelectedSelectionKeySet
后,每次选择操作只需要处理新就绪的SelectionKey
,而不是之前遗留下来的旧事件,从而减少了 CPU 的开销。提高性能:
通过减少不必要的工作和减少遍历的次数,整体性能得到了提高。
总结
重置机制通过确保每次选择操作都能从一个干净的状态开始,有效地解决了累积事件的问题。这不仅减少了内存消耗,还减少了 CPU 的开销,从而提高了处理大量 I/O 事件时的性能。这种机制对于高性能的网络应用程序来说非常重要,因为它有助于保持系统的响应能力和稳定性。
6. rebuildSelector
#rebuildSelector()
方法,重建 Selector 对象。代码如下:
该方法用于 NIO Selector 发生 epoll bug 时,重建 Selector 对象。
😈 突然又找到一个讨论,可以看看 《JDK 1.7 及以下 NIO 的 epoll bug》 和 《应用服务器中对JDK的epoll空转bug的处理》 。
public void rebuildSelector() {
// 只允许在 EventLoop 的线程中执行
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}只允许在 EventLoop 的线程中,调用 #rebuildSelector0() 方法,重建 Selector 对象。
6.1 rebuildSelector0
#rebuildSelector0()
方法,重建 Selector 对象。代码如下:
1: private void rebuildSelector0() {
2: final Selector oldSelector = selector;
3: if (oldSelector == null) {
4: return;
5: }
6:
7: // 创建新的 Selector 对象
8: final SelectorTuple newSelectorTuple;
9: try {
10: newSelectorTuple = openSelector();
11: } catch (Exception e) {
12: logger.warn("Failed to create a new Selector.", e);
13: return;
14: }
15:
16: // Register all channels to the new Selector.
17: // 将注册在 NioEventLoop 上的所有 Channel ,注册到新创建 Selector 对象上
18: int nChannels = 0; // 计算重新注册成功的 Channel 数量
19: for (SelectionKey key: oldSelector.keys()) {
20: Object a = key.attachment();
21: try {
22: if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
23: continue;
24: }
25:
26: int interestOps = key.interestOps();
27: // 取消老的 SelectionKey
28: key.cancel();
29: // 将 Channel 注册到新的 Selector 对象上
30: SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
31: // 修改 Channel 的 selectionKey 指向新的 SelectionKey 上
32: if (a instanceof AbstractNioChannel) {
33: // Update SelectionKey
34: ((AbstractNioChannel) a).selectionKey = newKey;
35: }
36:
37: // 计数 ++
38: nChannels ++;
39: } catch (Exception e) {
40: logger.warn("Failed to re-register a Channel to the new Selector.", e);
41: // 关闭发生异常的 Channel
42: if (a instanceof AbstractNioChannel) {
43: AbstractNioChannel ch = (AbstractNioChannel) a;
44: ch.unsafe().close(ch.unsafe().voidPromise());
45: // 调用 NioTask 的取消注册事件
46: } else {
47: @SuppressWarnings("unchecked")
48: NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
49: invokeChannelUnregistered(task, key, e);
50: }
51: }
52: }
53:
54: // 修改 selector 和 unwrappedSelector 指向新的 Selector 对象
55: selector = newSelectorTuple.selector;
56: unwrappedSelector = newSelectorTuple.unwrappedSelector;
57:
58: // 关闭老的 Selector 对象
59: try {
60: // time to close the old selector as everything else is registered to the new one
61: oldSelector.close();
62: } catch (Throwable t) {
63: if (logger.isWarnEnabled()) {
64: logger.warn("Failed to close the old Selector.", t);
65: }
66: }
67:
68: if (logger.isInfoEnabled()) {
69: logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
70: }
71: }
-
第 7 行:调用
#openSelector()
方法,创建新的 Selector 对象。 -
第 16 至 52 行:遍历老的 Selector 对象的
selectionKeys
,将注册在 NioEventLoop 上的所有 Channel ,注册到新创建 Selector 对象上。-
第 22 至 24 行:校验 SelectionKey 有效,并且 Java NIO Channel 并未注册在新的 Selector 对象上。
-
第 28 行:调用
SelectionKey#cancel()
方法,取消老的 SelectionKey 。 -
第 30 行:将 Java NIO Channel 注册到新的 Selector 对象上,返回新的 SelectionKey 对象。
-
第 31 至 35 行:修改 Channel 的
selectionKey
指向新的 SelectionKey 对象 -
第 39 至 51 行:当发生异常时候,根据不同的 SelectionKey 的
attachment
来判断处理方式:-
第 41 至 44 行:当
attachment
是 Netty NIO Channel 时,调用Unsafe#close(ChannelPromise promise)
方法,关闭发生异常的 Channel 。 -
第 45 至 50 行:当
attachment
是 Netty NioTask 时,调用#invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause)
方法,通知 Channel 取消注册。详细解析,见 「8. NioTask」 。
-
-
-
第 54 至 56 行:修改
selector
和unwrappedSelector
指向新的 Selector 对象。 -
第 58 至 66 行:调用
Selector#close()
方法,关闭老的 Selector 对象。
总的来说,#rebuildSelector()
方法,相比 #openSelector()
方法,主要是需要将老的 Selector 对象的“数据”复制到新的 Selector 对象上,并关闭老的 Selector 对象。
7. processSelectedKeys
在 #run()
方法中,会调用 #processSelectedKeys()
方法,处理 Channel 新增就绪的 IO 事件。代码如下:
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
7.1 processSelectedKeysOptimized
#processSelectedKeysOptimized()
方法,基于 Netty SelectedSelectionKeySetSelector ,处理 Channel 新增就绪的 IO 事件。代码如下:
从方法名,我们也可以看出,这是个经过优化的实现。
1: private void processSelectedKeysOptimized() {
2: // 遍历数组
3: for (int i = 0; i < selectedKeys.size; ++i) {
4: final SelectionKey k = selectedKeys.keys[i];
5: // null out entry in the array to allow to have it GC'ed once the Channel close
6: // See https://github.com/netty/netty/issues/2363
7: selectedKeys.keys[i] = null;
8:
9: final Object a = k.attachment();
10:
11: // 处理一个 Channel 就绪的 IO 事件
12: if (a instanceof AbstractNioChannel) {
13: processSelectedKey(k, (AbstractNioChannel) a);
14: // 使用 NioTask 处理一个 Channel 就绪的 IO 事件
15: } else {
16: @SuppressWarnings("unchecked")
17: NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
18: processSelectedKey(k, task);
19: }
20:
21: // TODO 1007 NioEventLoop cancel 方法
22: if (needsToSelectAgain) {
23: // null out entries in the array to allow to have it GC'ed once the Channel close
24: // See https://github.com/netty/netty/issues/2363
25: selectedKeys.reset(i + 1);
26:
27: selectAgain();
28: i = -1;
29: }
30: }
31: }
-
第 3 行:循环
selectedKeys
数组。-
第 4 至 7 行:置空,原因见 https://github.com/netty/netty/issues/2363 。
-
第 11 至 13 行:当
attachment
是 Netty NIO Channel 时,调用#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
方法,处理一个 Channel 就绪的 IO 事件。详细解析,见 「7.3 processSelectedKey」 。 -
第 14 至 19 行:当
attachment
是 Netty NioTask 时,调用#processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
方法,使用 NioTask 处理一个 Channel 的 IO 事件。详细解析,见 「8. NioTask」 。 -
第 21 至 29 行:TODO 1007 NioEventLoop cancel 方法
-
7.2 processSelectedKeysPlain
#processSelectedKeysOptimized()
方法,基于 Java NIO 原生 Selecotr ,处理 Channel 新增就绪的 IO 事件。代码如下:
总体和
#processSelectedKeysOptimized()
方法类似。
1: private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
2: // check if the set is empty and if so just return to not create garbage by
3: // creating a new Iterator every time even if there is nothing to process.
4: // See https://github.com/netty/netty/issues/597
5: if (selectedKeys.isEmpty()) {
6: return;
7: }
8:
9: // 遍历 SelectionKey 迭代器
10: Iterator<SelectionKey> i = selectedKeys.iterator();
11: for (;;) {
12: // 获得 SelectionKey 对象
13: final SelectionKey k = i.next();
14: // 从迭代器中移除
15: i.remove();
16:
17: final Object a = k.attachment();
18: // 处理一个 Channel 就绪的 IO 事件
19: if (a instanceof AbstractNioChannel) {
20: processSelectedKey(k, (AbstractNioChannel) a);
21: // 使用 NioTask 处理一个 Channel 就绪的 IO 事件
22: } else {
23: @SuppressWarnings("unchecked")
24: NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
25: processSelectedKey(k, task);
26: }
27:
28: // 无下一个节点,结束
29: if (!i.hasNext()) {
30: break;
31: }
32:
33: // TODO 1007 NioEventLoop cancel 方法
34: if (needsToSelectAgain) {
35: selectAgain();
36: selectedKeys = selector.selectedKeys();
37:
38: // Create the iterator again to avoid ConcurrentModificationException
39: if (selectedKeys.isEmpty()) {
40: break;
41: } else {
42: i = selectedKeys.iterator();
43: }
44: }
45: }
46: }
-
第 10 至 11 行:遍历 SelectionKey 迭代器。
-
第 12 至 15 行:获得下一个 SelectionKey 对象,并从迭代器中移除。
-
第 18 至 20 行:当
attachment
是 Netty NIO Channel 时,调用#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
方法,处理一个 Channel 就绪的 IO 事件。详细解析,见 「7.3 processSelectedKey」 。 -
第 21 至 26 行:当
attachment
是 Netty NioTask 时,调用#processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
方法,使用 NioTask 处理一个 Channel 的 IO 事件。详细解析,见 「8. NioTask」 。 -
第 33 至 44 行:TODO 1007 NioEventLoop cancel 方法
-
7.3 processSelectedKey
#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
方法,处理一个 Channel 就绪的 IO 事件。代码如下:
1: private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
2: // 如果 SelectionKey 是不合法的,则关闭 Channel
3: final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
4: if (!k.isValid()) {
5: final EventLoop eventLoop;
6: try {
7: eventLoop = ch.eventLoop();
8: } catch (Throwable ignored) {
9: // If the channel implementation throws an exception because there is no event loop, we ignore this
10: // because we are only trying to determine if ch is registered to this event loop and thus has authority
11: // to close ch.
12: return;
13: }
14: // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
15: // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
16: // still healthy and should not be closed.
17: // See https://github.com/netty/netty/issues/5125
18: if (eventLoop != this) {
19: return;
20: }
21: // close the channel if the key is not valid anymore
22: unsafe.close(unsafe.voidPromise());
23: return;
24: }
25:
26: try {
27: // 获得就绪的 IO 事件的 ops
28: int readyOps = k.readyOps();
29:
30: // OP_CONNECT 事件就绪
31: // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
32: // the NIO JDK channel implementation may throw a NotYetConnectedException.
33: if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
34: // 移除对 OP_CONNECT 感兴趣
35: // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
36: // See https://github.com/netty/netty/issues/924
37: int ops = k.interestOps();
38: ops &= ~SelectionKey.OP_CONNECT;
39: k.interestOps(ops);
40: // 完成连接
41: unsafe.finishConnect();
42: }
43:
44: // OP_WRITE 事件就绪
45: // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
46: if ((readyOps & SelectionKey.OP_WRITE) != 0) {
47: // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
48: // 向 Channel 写入数据
49: ch.unsafe().forceFlush();
50: }
51:
52: // SelectionKey.OP_READ 或 SelectionKey.OP_ACCEPT 就绪
53: // readyOps == 0 是对 JDK Bug 的处理,防止空的死循环
54: // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
55: // to a spin loop
56: if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
57: unsafe.read();
58: }
59: } catch (CancelledKeyException ignored) {
60: // 发生异常,关闭 Channel
61: unsafe.close(unsafe.voidPromise());
62: }
63: }
-
第 2 至 24 行:如果 SelectionKey 是不合法的,则关闭 Channel 。
-
第 30 至 42 行:如果对
OP_CONNECT
事件就绪:-
第 34 至 39 行:移除对
OP_CONNECT
的感兴趣,即不再监听连接事件。 -
【重要】第 41 行:调用
Unsafe#finishConnect()
方法,完成连接。后续的逻辑,对应 《Netty 源码分析 —— 启动(二)之客户端》 的 「3.6.4 finishConnect」 小节。
-
-
第 44 至 50 行:如果对
OP_WRITE
事件就绪,调用Unsafe#forceFlush()
方法,向 Channel 写入数据。在完成写入数据后,会移除对OP_WRITE
的感兴趣。想要提前了解的胖友,可以自己看下AbstractNioByteChannel#clearOpWrite()
和AbstractNioMessageChannel#doWrite(ChannelOutboundBuffer in)
方法。 -
第 52 至 58 行:如果对
OP_READ
或OP_ACCEPT
事件就绪:调用Unsafe#read()
方法,处理读或者者接受客户端连接的事件。
8. NioTask
io.netty.channel.nio.NioTask
,用于自定义 Nio 事件处理接口。对于每个 Nio 事件,可以认为是一个任务( Task ),代码如下:
public interface NioTask<C extends SelectableChannel> {
/**
* Invoked when the {@link SelectableChannel} has been selected by the {@link Selector}.
*/
void channelReady(C ch, SelectionKey key) throws Exception;
/**
* Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus
* this {@link NioTask} will not be notified anymore.
*
* @param cause the cause of the unregistration. {@code null} if a user called {@link SelectionKey#cancel()} or
* the event loop has been shut down.
*/
void channelUnregistered(C ch, Throwable cause) throws Exception;
}
-
#channelReady(C ch, SelectionKey key)
方法,处理 Channel IO 就绪的事件。相当于说,我们可以通过实现该接口方法,实现 「7.3 processSelectedKey」 的逻辑。 -
#channelUnregistered(C ch, Throwable cause)
方法,Channel 取消注册。一般来说,我们可以通过实现该接口方法,关闭 Channel 。
😈 实际上,NioTask 在 Netty 自身中并未有相关的实现类。所以对 NioTask 不感兴趣的,可以跳过本小节。另外,NioTask 是在 Allow a user to access the Selector of an EventLoop 中有相关的讨论。
8.1 register
#register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
方法,注册 Java NIO Channel ( 不一定需要通过 Netty 创建的 Channel )到 Selector 上,相当于说,也注册到了 EventLoop 上。代码如下:
/**
* Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
* of this event loop. Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
* be executed by this event loop when the {@link SelectableChannel} is ready.
*/
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
if (ch == null) {
throw new NullPointerException("ch");
}
if (interestOps == 0) {
throw new IllegalArgumentException("interestOps must be non-zero.");
}
if ((interestOps & ~ch.validOps()) != 0) {
throw new IllegalArgumentException(
"invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
}
if (task == null) {
throw new NullPointerException("task");
}
if (isShutdown()) {
throw new IllegalStateException("event loop shut down");
}
// <1>
try {
ch.register(selector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
}
}
-
<1>
处,调用SelectableChannel#register(Selector sel, int ops, Object att)
方法,注册 Java NIO Channel 到 Selector 上。这里我们可以看到,attachment
为 NioTask 对象,而不是 Netty Channel 对象。
8.2 invokeChannelUnregistered
#invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause)
方法,执行 Channel 取消注册。代码如下:
private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
try {
task.channelUnregistered(k.channel(), cause);
} catch (Exception e) {
logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
}
}
-
在方法内部,调用
NioTask#channelUnregistered()
方法,执行 Channel 取消注册。
8.3 processSelectedKey
#processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
方法,使用 NioTask ,自定义实现 Channel 处理 Channel IO 就绪的事件。代码如下:
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0; // 未执行
try {
// 调用 NioTask 的 Channel 就绪事件
task.channelReady(k.channel(), k);
state = 1; // 执行成功
} catch (Exception e) {
// SelectionKey 取消
k.cancel();
// 执行 Channel 取消注册
invokeChannelUnregistered(task, k, e);
state = 2; // 执行异常
} finally {
switch (state) {
case 0:
// SelectionKey 取消
k.cancel();
// 执行 Channel 取消注册
invokeChannelUnregistered(task, k, null);
break;
case 1:
// SelectionKey 不合法,则执行 Channel 取消注册
if (!k.isValid()) { // Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
}
break;
}
}
}
简单小文一篇,没什么太大难度的一篇。
如果有不理解的地方,也可以看看下面的文章:
-
Hypercube 《自顶向下深入分析 Netty(四)–EventLoop-2》
Netty 源码解析 —— EventLoop(六)之 EventLoop 处理普通任务1. 概述
本文分析了 Netty EventLoop 中 runAllTasks 方法的实现,重点关注了普通任务的执行过程和超时机制。
Key Takeaways
EventLoop 执行普通任务时,会调用 runAllTasks 方法,该方法会循环执行所有任务直到完成或超过时间限制。
EventLoop 每隔 64 个任务检查一次执行时间,因为 nanoTime() 方法相对比较耗时。
EventLoop 会调用 fetchFromScheduledTaskQueue 方法将定时任务队列中到达可执行时间的任务添加到普通任务队列中。
EventLoop 会调用 afterRunningAllTasks 方法执行所有任务完成后的后续操作,例如批量提交写入操作。
批量提交写入功能的 Handler 会在 EventLoop 每次循环执行时添加一个任务到 tailTasks 队列中,以实现批量提交操作。
批量提交写入功能可以减少 pipeline 的执行次数,提高吞吐量,尤其是在高并发场景下。
在 Netty 中,
EventLoop
是负责处理 I/O 事件的核心组件之一,同时也是处理普通任务的基础。普通任务指的是那些不需要与 I/O 事件直接关联的任务,例如定时任务、计算任务等。本节将详细介绍EventLoop
如何处理这些普通任务。1. 概述
EventLoop
是 Netty 中的一个核心组件,它负责处理 I/O 事件以及执行非 I/O 相关的任务。在 Netty 中,EventLoop
是通过EventLoopGroup
创建的,而EventLoopGroup
可以包含多个EventLoop
实例。每个EventLoop
实例通常绑定到一个线程,并且负责处理该线程上的所有任务。2. Task Queue
EventLoop
维护了一个TaskQueue
来存放待处理的任务。TaskQueue
是一个线程安全的队列,可以容纳各种类型的任务,包括 I/O 事件处理任务和普通任务。3. 提交任务
用户可以通过
EventLoop
的execute()
或者submit()
方法提交任务。这些方法最终会将任务添加到TaskQueue
中。4. 处理任务
EventLoop
在它的主循环中会不断地从TaskQueue
中取出任务并执行。这个过程通常是在EventLoop
的run()
方法中实现的,该方法是一个无限循环,直到EventLoop
被关闭为止。5. EventLoop 的 run() 方法
EventLoop
的run()
方法是其核心部分,它包含了处理任务的主要逻辑。这个方法会在EventLoop
对应的线程中运行,不断执行以下步骤:
调用
select()
方法来获取已准备好的 I/O 事件。调用
processSelectedKeys()
方法来处理这些 I/O 事件。执行
TaskQueue
中的任务。6. TaskQueue 的实现
TaskQueue
通常实现为一个线程安全的队列,例如LinkedBlockingQueue
或者ArrayBlockingQueue
。这样的实现保证了多线程环境下的安全性,并且可以高效地管理任务。7. 处理普通任务
当
EventLoop
完成 I/O 事件的处理后,它会检查TaskQueue
是否有等待执行的任务。如果有,它会从队列中取出任务并执行。8. 任务调度
EventLoop
支持立即执行的任务和延迟执行的任务。对于延迟执行的任务,EventLoop
会使用一个ScheduledTaskQueue
来存放这些任务,并在适当的时间执行它们。9. ScheduledTaskQueue
ScheduledTaskQueue
是一个专门用于存放延迟任务的队列。它通常实现为一个优先级队列,例如PriorityBlockingQueue
,这样可以确保任务按预定的时间顺序被执行。10. 示例代码
以下是一个简单的示例,展示如何向
EventLoop
提交一个普通任务:EventLoop eventLoop = ...; // 获取 EventLoop 的实例 // 提交一个普通任务 eventLoop.execute(new Runnable() { @Override public void run() { System.out.println("Hello from a task!"); } });
总结
EventLoop
不仅负责处理 I/O 事件,也负责执行普通任务。通过维护一个TaskQueue
和一个ScheduledTaskQueue
,EventLoop
能够有效地管理不同类型的任务,并确保它们在适当的时机得到执行。这种设计使得 Netty 能够高效地处理各种任务,无论是 I/O 相关还是非 I/O 相关的。如果您需要更详细的解释或代码示例,请告诉我具体的部分,我可以为您提供更深入的解释。
本文我们分享 EventLoop 的执行任务相关代码的实现。对应如下图的紫条 run tasks 部分:
EventLoop 执行的任务分成普通任务和定时任务,考虑到内容切分的更细粒度,本文近仅仅分享【普通任务】的部分。
2. runAllTasks 带超时
在 #run()
方法中,会调用 #runAllTasks(long timeoutNanos)
方法,执行所有任务直到完成所有,或者超过执行时间上限。代码如下:
1: protected boolean runAllTasks(long timeoutNanos) {
2: // 从定时任务获得到时间的任务
3: fetchFromScheduledTaskQueue();
4: // 获得队头的任务
5: Runnable task = pollTask();
6: // 获取不到,结束执行
7: if (task == null) {
8: // 执行所有任务完成的后续方法
9: afterRunningAllTasks();
10: return false;
11: }
12:
13: // 计算执行任务截止时间
14: final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
15: long runTasks = 0; // 执行任务计数
16: long lastExecutionTime;
17: // 循环执行任务
18: for (;;) {
19: // 执行任务
20: safeExecute(task);
21:
22: // 计数 +1
23: runTasks ++;
24:
25: // 每隔 64 个任务检查一次时间,因为 nanoTime() 是相对费时的操作
26: // 64 这个值当前是硬编码的,无法配置,可能会成为一个问题。
27: // Check timeout every 64 tasks because nanoTime() is relatively expensive.
28: // XXX: Hard-coded value - will make it configurable if it is really a problem.
29: if ((runTasks & 0x3F) == 0) {
30: // 重新获得时间
31: lastExecutionTime = ScheduledFutureTask.nanoTime();
32: // 超过任务截止时间,结束
33: if (lastExecutionTime >= deadline) {
34: break;
35: }
36: }
37:
38: // 获得队头的任务
39: task = pollTask();
40: // 获取不到,结束执行
41: if (task == null) {
42: // 重新获得时间
43: lastExecutionTime = ScheduledFutureTask.nanoTime();
44: break;
45: }
46: }
47:
48: // 执行所有任务完成的后续方法
49: afterRunningAllTasks();
50:
51: // 设置最后执行时间
52: this.lastExecutionTime = lastExecutionTime;
53: return true;
54: }
-
方法的返回值,表示是否执行过任务。因为,任务队列可能为空,那么就会返回
false
,表示没有执行过任务。 -
第 3 行:调用
#fetchFromScheduledTaskQueue()
方法,将定时任务队列scheduledTaskQueue
到达可执行的任务,添加到任务队列taskQueue
中。通过这样的方式,定时任务得以被执行。详细解析,见 《Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务》 。 -
第 5 行:首次调用
#pollTask()
方法,获得队头的任务。详细解析,胖友先跳到 「4. pollTask」 。-
第 6 至 11 行:获取不到任务,结束执行,并返回
false
。-
第 9 行:调用
#afterRunningAllTasks()
方法,执行所有任务完成的后续方法。详细解析,见 「5. afterRunningAllTasks」 。
-
-
-
第 14 行:计算执行任务截止时间。其中,
ScheduledFutureTask#nanoTime()
方法,我们可以暂时理解成,获取当前的时间,单位为纳秒。详细解析,见 《Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务》 。 -
第 17 至 46 行:循环执行任务。
-
第 20 行:【重要】调用
#safeExecute(Runnable task)
方法,执行任务。 -
第 23 行:计算
runTasks
加一。 -
第 29 至 36 行:每隔 64 个任务检查一次时间,因为
System#nanoTime()
是相对费时的操作。也因此,超过执行时间上限是“近似的”,而不是绝对准确。-
第 31 行:调用
ScheduledFutureTask#nanoTime()
方法,获取当前的时间。 -
第 32 至 35 行:超过执行时间上限,结束执行。
-
-
第 39 行:再次调用
#pollTask()
方法,获得队头的任务。-
第 41 至 45 行:获取不到,结束执行。
-
第 43 行:调用
ScheduledFutureTask#nanoTime()
方法,获取当前的时间,作为最终的.lastExecutionTime
,即【第 52 行】的代码。
-
-
-
第 49 行:调用
#afterRunningAllTasks()
方法,执行所有任务完成的后续方法。 -
第 53 行:返回
true
,表示有执行任务。
3. runAllTasks
在 #run()
方法中,会调用 #runAllTasks()
方法,执行所有任务直到完成所有。代码如下:
1: protected boolean runAllTasks() {
2: assert inEventLoop();
3: boolean fetchedAll;
4: boolean ranAtLeastOne = false; // 是否执行过任务
5:
6: do {
7: // 从定时任务获得到时间的任务
8: fetchedAll = fetchFromScheduledTaskQueue();
9: // 执行任务队列中的所有任务
10: if (runAllTasksFrom(taskQueue)) {
11: // 若有任务执行,则标记为 true
12: ranAtLeastOne = true;
13: }
14: } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
15:
16: // 如果执行过任务,则设置最后执行时间
17: if (ranAtLeastOne) {
18: lastExecutionTime = ScheduledFutureTask.nanoTime();
19: }
20:
21: // 执行所有任务完成的后续方法
22: afterRunningAllTasks();
23: return ranAtLeastOne;
24: }
-
第 4 行:
ranAtLeastOne
,标记是否执行过任务。 -
第 6 至 14 行:调用
#fetchFromScheduledTaskQueue()
方法,将定时任务队列scheduledTaskQueue
到达可执行的任务,添加到任务队列taskQueue
中。但是实际上,任务队列taskQueue
是有队列大小上限的,因此使用while
循环,直到没有到达可执行的任务为止。-
第 10 行:调用
#runAllTasksFrom(taskQueue)
方法,执行任务队列中的所有任务。代码如下:
-
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 获得队头的任务
Runnable task = pollTaskFrom(taskQueue);
// 获取不到,结束执行,返回 false
if (task == null) {
return false;
}
for (;;) {
// 执行任务
safeExecute(task);
// 获得队头的任务
task = pollTaskFrom(taskQueue);
// 获取不到,结束执行,返回 true
if (task == null) {
return true;
}
}
}
-
代码比较简单,和
#runAllTasks(long timeoutNanos))
方法的代码,大体是相似的。
-
第 12 行:若有任务被执行,则标记
ranAtLeastOne
为true
。
-
第 16 至 19 行:如果执行过任务,则设置最后执行时间。
-
第 22 行:调用
#afterRunningAllTasks()
方法,执行所有任务完成的后续方法。 -
第 23 行:返回是否执行过任务。和
#runAllTasks(long timeoutNanos))
方法的返回是一致的。
4. pollTask
#pollTask()
方法,获得队头的任务。代码如下:
protected Runnable pollTask() {
assert inEventLoop();
return pollTaskFrom(taskQueue);
}
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) { // <2>
// 获得并移除队首元素。如果获得不到,返回 null
Runnable task = taskQueue.poll(); // <1>
// 忽略 WAKEUP_TASK 任务,因为是空任务
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}
-
<1>
处,调用Queue#poll()
方法,获得并移除队首元素。如果获得不到,返回 null 。注意,这个操作是非阻塞的。如果不知道,请 Google 重新学习下。 -
<2>
处,因为获得的任务可能是WAKEUP_TASK
,所以需要通过循环来跳过。
Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务
1. 概述
本文接 《Netty 源码解析 —— EventLoop(六)之 EventLoop 处理普通任务》 ,分享【处理定时任务】的部分。
因为 AbstractScheduledEventExecutor 在 《精尽 Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化》 并未分享,并且它是本文的处理定时任务的前置,所以本文先写这部分内容。
2. ScheduledFutureTask
io.netty.util.concurrent.ScheduledFutureTask
,实现 ScheduledFuture、PriorityQueueNode 接口,继承 PromiseTask 抽象类,Netty 定时任务。
也有文章喜欢把“定时任务”叫作“调度任务”,意思是相同的,本文统一使用“定时任务”。
2.1 静态属性
/**
* 任务序号生成器,通过 AtomicLong 实现递增发号
*/
private static final AtomicLong nextTaskId = new AtomicLong();
/**
* 定时任务时间起点
*/
private static final long START_TIME = System.nanoTime();
-
nextTaskId
静态属性,任务序号生成器,通过 AtomicLong 实现递增发号。 -
START_TIME
静态属性,定时任务时间起点。在 ScheduledFutureTask 中,定时任务的执行时间,都是基于START_TIME
做相对时间。😈 至于为什么使用相对时间?笔者暂时没有搞清楚。-
笔者也搜索了下和
System.nanoTime()
相关的内容,唯一能看的是 《System.nanoTime() 的隐患》 ,但是应该不是这个原因。因为是定时调度,我改了系统时间也没关系
存的是距离下次调度还要多长时间
不受系统时间影响
最大的好处
-
2.2 nanoTime
#nanoTime()
静态方法,获得当前时间,这个是相对 START_TIME
来算的。代码如下:
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
-
这是个重要的方法,后续很多方法都会调用到它。
2.3 deadlineNanos
#deadlineNanos(long delay)
静态方法,获得任务执行时间,这个也是相对 START_TIME
来算的。代码如下:
/**
* @param delay 延迟时长,单位:纳秒
* @return 获得任务执行时间,也是相对 {@link #START_TIME} 来算的。
* 实际上,返回的结果,会用于 {@link #deadlineNanos} 字段
*/
static long deadlineNanos(long delay) {
long deadlineNanos = nanoTime() + delay;
// Guard against overflow 防御性编程
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}
2.4 构造方法
/**
* 任务编号
*/
private final long id = nextTaskId.getAndIncrement();
/**
* 任务执行时间,即到了该时间,该任务就会被执行
*/
private long deadlineNanos;
/**
* 任务执行周期
*
* =0 - 只执行一次
* >0 - 按照计划执行时间计算
* <0 - 按照实际执行时间计算
*
* 推荐阅读文章 https://blog.csdn.net/gtuu0123/article/details/6040159
*/
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
/**
* 队列编号
*/
private int queueIndex = INDEX_NOT_IN_QUEUE;
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Runnable runnable, V result, long nanoTime) {
this(executor, toCallable(runnable, result), nanoTime);
}
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime, long period) {
super(executor, callable);
if (period == 0) {
throw new IllegalArgumentException("period: 0 (expected: != 0)");
}
deadlineNanos = nanoTime;
periodNanos = period;
}
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime) {
super(executor, callable);
deadlineNanos = nanoTime;
periodNanos = 0;
}
-
每个字段比较简单,胖友看上面的注释。
2.5 delayNanos
#delayNanos(...)
方法,获得距离指定时间,还要多久可执行。代码如下:
/**
* @return 距离当前时间,还要多久可执行。若为负数,直接返回 0
*/
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
/**
* @param currentTimeNanos 指定时间
* @return 距离指定时间,还要多久可执行。若为负数,直接返回 0
*/
public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
2.6 run
#run()
方法,执行定时任务。代码如下:
1: @Override
2: public void run() {
3: assert executor().inEventLoop();
4: try {
5: if (periodNanos == 0) {
6: // 设置任务不可取消
7: if (setUncancellableInternal()) {
8: // 执行任务
9: V result = task.call();
10: // 通知任务执行成功
11: setSuccessInternal(result);
12: }
13: } else {
14: // 判断任务并未取消
15: // check if is done as it may was cancelled
16: if (!isCancelled()) {
17: // 执行任务
18: task.call();
19: if (!executor().isShutdown()) {
20: // 计算下次执行时间
21: long p = periodNanos;
22: if (p > 0) {
23: deadlineNanos += p;
24: } else {
25: deadlineNanos = nanoTime() - p;
26: }
27: // 判断任务并未取消
28: if (!isCancelled()) {
29: // 重新添加到任务队列,等待下次定时执行
30: // scheduledTaskQueue can never be null as we lazy init it before submit the task!
31: Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
32: ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
33: assert scheduledTaskQueue != null;
34: scheduledTaskQueue.add(this);
35: }
36: }
37: }
38: }
39: // 发生异常,通知任务执行失败
40: } catch (Throwable cause) {
41: setFailureInternal(cause);
42: }
43: }
-
第 3 行:校验,必须在 EventLoop 的线程中。
-
根据不同的任务执行周期
periodNanos
,在执行任务会略有不同。当然,大体是相同的。 -
第 5 至 12 行:执行周期为“只执行一次”的定时任务。
-
第 7 行:调用
PromiseTask#setUncancellableInternal()
方法,设置任务不可取消。具体的方法实现,我们在后续关于 Promise 的文章中分享。 -
第 9 行:【重要】调用
Callable#call()
方法,执行任务。 -
第 11 行:调用
PromiseTask#setSuccessInternal(V result)
方法,回调通知注册在定时任务上的监听器。为什么能这么做呢?因为 ScheduledFutureTask 继承了 PromiseTask 抽象类。
-
-
第 13 至 38 行:执行周期为“固定周期”的定时任务。
-
第 16 行:调用
DefaultPromise#isCancelled()
方法,判断任务是否已经取消。这一点,和【第 7 行】的代码,是不同的。具体的方法实现,我们在后续关于 Promise 的文章中分享。 -
第 18 行:【重要】调用
Callable#call()
方法,执行任务。 -
第 19 行:判断 EventExecutor 并未关闭。
-
第 20 至 26 行:计算下次定时执行的时间。不同的执行
fixed
方式,计算方式不同。其中【第 25 行】的- p
的代码,因为p
是负数,所以通过负负得正来计算。另外,这块会修改定时任务的deadlineNanos
属性,从而变成新的定时任务执行时间。 -
第 28 行:和【第 16 行】的代码是一致的。
-
第 29 至 34 行:重新添加到定时任务队列
scheduledTaskQueue
中,等待下次定时执行。
-
-
第 39 至 42 行:发生异常,调用
PromiseTask#setFailureInternal(Throwable cause)
方法,回调通知注册在定时任务上的监听器。
2.7 cancel
有两个方法,可以取消定时任务。代码如下:
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean canceled = super.cancel(mayInterruptIfRunning);
// 取消成功,移除出定时任务队列
if (canceled) {
((AbstractScheduledEventExecutor) executor()).removeScheduled(this);
}
return canceled;
}
// 移除任务
boolean cancelWithoutRemove(boolean mayInterruptIfRunning) {
return super.cancel(mayInterruptIfRunning);
}