Netty 源码解析 —— EventLoop(一)之 Reactor 模型
本文深入解析了 Netty 中 EventLoopGroup 的代码实现,重点讲解了 MultithreadEventExecutorGroup 和 NioEventLoopGroup 的实现细节。
Key Takeaways
EventExecutorGroup 是 EventExecutor ( 事件执行器 )的分组接口,它提供了迭代出 EventExecutor 对象的方法。
MultithreadEventExecutorGroup 是基于多线程的 EventExecutor 分组抽象类,它使用线程池来管理多个 EventExecutor 对象。
NioEventLoopGroup 是 NioEventLoop 的分组实现类,它使用 Java NIO 的 Selector 对象来处理 I/O 事件。
NioEventLoopGroup 的构造方法接受多个参数,包括线程池、SelectorProvider、SelectStrategyFactory 和 RejectedExecutionHandler,用于创建 NioEventLoop 对象。
NioEventLoopGroup 提供了 setIoRatio() 方法,用于设置所有 EventLoop 的 IO 任务占用执行时间的比例。
NioEventLoopGroup 提供了 rebuildSelectors() 方法,用于重建所有 EventLoop 的 Selector 对象。
1. 概述
从本文开始,我们来分享 Netty 非常重要的一个组件 EventLoop 。在看 EventLoop 的具体实现之前,我们先来对 Reactor 模型做个简单的了解。
为什么要了解 Reactor 模型呢?因为 EventLoop 是 Netty 基于 Reactor 模型的思想进行实现。所以理解 Reactor 模型,对于我们理解 EventLoop 会有很大帮助。
我们来看看 Reactor 模型的核心思想:
将关注的 I/O 事件注册到多路复用器上,一旦有 I/O 事件触发,将事件分发到事件处理器中,执行就绪 I/O 事件对应的处理函数中。模型中有三个重要的组件:
多路复用器:由操作系统提供接口,Linux 提供的 I/O 复用接口有select、poll、epoll 。
事件分离器:将多路复用器返回的就绪事件分发到事件处理器中。
事件处理器:处理就绪事件处理函数。
初步一看,Java NIO 符合 Reactor 模型啊?因为 Reactor 有 3 种模型实现:
单 Reactor 单线程模型
单 Reactor 多线程模型
多 Reactor 多线程模型
😈 由于不擅长相对理论文章的内容编写,所以 「2.」、「3.」、「4.」 小节的内容,我决定一本正经的引用基友 wier 的 《【NIO 系列】—— 之Reactor 模型》 。
2. 单 Reactor 单线程模型
示例图如下:
示例代码主要表达大体逻辑,比较奔放。所以,胖友理解大体意思就好。
Reactor 示例代码如下:
/**
* 等待事件到来,分发事件处理
*/
class Reactor implements Runnable {
private Reactor() throws Exception {
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// attach Acceptor 处理新连接
sk.attach(new Acceptor());
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
it.remove();
//分发事件处理
dispatch((SelectionKey) (it.next()));
}
}
} catch (IOException ex) {
//do something
}
}
void dispatch(SelectionKey k) {
// 若是连接事件获取是acceptor
// 若是IO读写事件获取是handler
Runnable runnable = (Runnable) (k.attachment());
if (runnable != null) {
runnable.run();
}
}
}
示例的 Handler 的代码实现应该是漏了。胖友脑补一个实现 Runnable 接口的 Handler 类。😈
这是最基础的单 Reactor 单线程模型。
Reactor 线程,负责多路分离套接字。
有新连接到来触发
OP_ACCEPT
事件之后, 交由 Acceptor 进行处理。有 IO 读写事件之后,交给 Handler 处理。
Acceptor 主要任务是构造 Handler 。
在获取到 Client 相关的 SocketChannel 之后,绑定到相应的 Handler 上。
对应的 SocketChannel 有读写事件之后,基于 Reactor 分发,Handler 就可以处理了。
注意,所有的 IO 事件都绑定到 Selector 上,由 Reactor 统一分发。
该模型适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。
3. 单 Reactor 多线程模型
示例图如下:
相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到 IO 的读写事件之后,交由线程池来处理,这样可以减小主 Reactor 的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。
MultiThreadHandler 示例代码如下:
/**
* 多线程处理读写业务逻辑
*/
class MultiThreadHandler implements Runnable {
public static final int READING = 0, WRITING = 1;
int state;
final SocketChannel socket;
final SelectionKey sk;
//多线程处理业务逻辑
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
this.state = READING;
this.socket = socket;
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);
socket.configureBlocking(false);
}
@Override
public void run() {
if (state == READING) {
read();
} else if (state == WRITING) {
write();
}
}
private void read() {
//任务异步处理
executorService.submit(() -> process());
//下一步处理写事件
sk.interestOps(SelectionKey.OP_WRITE);
this.state = WRITING;
}
private void write() {
//任务异步处理
executorService.submit(() -> process());
//下一步处理读事件
sk.interestOps(SelectionKey.OP_READ);
this.state = READING;
}
/**
* task 业务处理
*/
public void process() {
//do IO ,task,queue something
}
}
在
#read()
和#write()
方法中,提交executorService
线程池,进行处理。
4. 多 Reactor 多线程模型
示例图如下:
第三种模型比起第二种模型,是将 Reactor 分成两部分:
mainReactor 负责监听 ServerSocketChannel ,用来处理客户端新连接的建立,并将建立的客户端的 SocketChannel 指定注册给 subReactor 。
subReactor 维护自己的 Selector ,基于 mainReactor 建立的客户端的 SocketChannel 多路分离 IO 读写事件,读写网络数据。对于业务处理的功能,另外扔给 worker 线程池来完成。
MultiWorkThreadAcceptor 示例代码如下:
/**
* 多work 连接事件Acceptor,处理连接事件
*/
class MultiWorkThreadAcceptor implements Runnable {
// cpu线程数相同多work线程
int workCount = Runtime.getRuntime().availableProcessors();
SubReactor[] workThreadHandlers = new SubReactor[workCount];
volatile int nextHandler = 0;
public MultiWorkThreadAcceptor() {
this.init();
}
public void init() {
nextHandler = 0;
for (int i = 0; i < workThreadHandlers.length; i++) {
try {
workThreadHandlers[i] = new SubReactor();
} catch (Exception e) {
}
}
}
@Override
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {// 注册读写
synchronized (c) {
// 顺序获取SubReactor,然后注册channel
SubReactor work = workThreadHandlers[nextHandler];
work.registerChannel(c);
nextHandler++;
if (nextHandler >= workThreadHandlers.length) {
nextHandler = 0;
}
}
}
} catch (Exception e) {
}
}
}
SubReactor 示例代码如下:
/**
* 多work线程处理读写业务逻辑
*/
class SubReactor implements Runnable {
final Selector mySelector;
//多线程处理业务逻辑
int workCount =Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(workCount);
public SubReactor() throws Exception {
// 每个SubReactor 一个selector
this.mySelector = SelectorProvider.provider().openSelector();
}
/**
* 注册chanel
*
* @param sc
* @throws Exception
*/
public void registerChannel(SocketChannel sc) throws Exception {
sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
}
@Override
public void run() {
while (true) {
try {
//每个SubReactor 自己做事件分派处理读写事件
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
read();
} else if (key.isWritable()) {
write();
}
}
} catch (Exception e) {
}
}
}
private void read() {
//任务异步处理
executorService.submit(() -> process());
}
private void write() {
//任务异步处理
executorService.submit(() -> process());
}
/**
* task 业务处理
*/
public void process() {
//do IO ,task,queue something
}
}
从代码中,我们可以看到:
mainReactor 主要用来处理网络 IO 连接建立操作,通常,mainReactor 只需要一个,因为它一个线程就可以处理。
subReactor 主要和建立起来的客户端的 SocketChannel 做数据交互和事件业务处理操作。通常,subReactor 的个数和 CPU 个数相等,每个 subReactor 独占一个线程来处理。
此种模式中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大大的提升,支持的可并发客户端数量可达到上百万级别。
一般来说,是达到数十万级别。
关于此种模式的应用,目前有很多优秀的框架已经在应用,比如 Mina 和 Netty 等等。上述中去掉线程池的第三种形式的变种,也是 Netty NIO 的默认模式。
5. Netty NIO 客户端
我们来看看 Netty NIO 客户端的示例代码中,和 EventLoop 相关的代码:
// 创建一个 EventLoopGroup 对象
EventLoopGroup group = new NioEventLoopGroup();
// 创建 Bootstrap 对象
Bootstrap b = new Bootstrap();
// 设置使用的 EventLoopGroup
b.group(group);
对于 Netty NIO 客户端来说,仅创建一个 EventLoopGroup 。
一个 EventLoop 可以对应一个 Reactor 。因为 EventLoopGroup 是 EventLoop 的分组,所以对等理解,EventLoopGroup 是一种 Reactor 的分组。
一个 Bootstrap 的启动,只能发起对一个远程的地址。所以只会使用一个 NIO Selector ,也就是说仅使用一个 Reactor 。即使,我们在声明使用一个 EventLoopGroup ,该 EventLoopGroup 也只会分配一个 EventLoop 对 IO 事件进行处理。
因为 Reactor 模型主要使用服务端的开发中,如果套用在 Netty NIO 客户端中,到底使用了哪一种模式呢?
如果只有一个业务线程使用 Netty NIO 客户端,那么可以认为是【单 Reactor 单线程模型】。
如果有多个业务线程使用 Netty NIO 客户端,那么可以认为是【单 Reactor 多线程模型】。
那么 Netty NIO 客户端是否能够使用【多 Reactor 多线程模型】呢?😈 创建多个 Netty NIO 客户端,连接同一个服务端。那么多个 Netty 客户端就可以认为符合多 Reactor 多线程模型了。
一般情况下,我们不会这么干。
当然,实际也有这样的示例。例如 Dubbo 或 Motan 这两个 RPC 框架,支持通过配置,同一个 Consumer 对同一个 Provider 实例同时建立多个客户端连接。
6. Netty NIO 服务端
我们来看看 Netty NIO 服务端的示例代码中,和 EventLoop 相关的代码
// 创建两个 EventLoopGroup 对象
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 创建 boss 线程组 用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建 worker 线程组 用于进行 SocketChannel 的数据读写
// 创建 ServerBootstrap 对象
ServerBootstrap b = new ServerBootstrap();
// 设置使用的 EventLoopGroup
b.group(bossGroup, workerGroup);
对于 Netty NIO 服务端来说,创建两个 EventLoopGroup 。
bossGroup
对应 Reactor 模式的 mainReactor ,用于服务端接受客户端的连接。比较特殊的是,传入了方法参数nThreads = 1
,表示只使用一个 EventLoop ,即只使用一个 Reactor 。这个也符合我们上面提到的,“通常,mainReactor 只需要一个,因为它一个线程就可以处理”。workerGroup
对应 Reactor 模式的 subReactor ,用于进行 SocketChannel 的数据读写。对于 EventLoopGroup ,如果未传递方法参数nThreads
,表示使用 CPU 个数 Reactor 。这个也符合我们上面提到的,“通常,subReactor 的个数和 CPU 个数相等,每个 subReactor 独占一个线程来处理”。
因为使用两个 EventLoopGroup ,所以符合【多 Reactor 多线程模型】的多 Reactor 的要求。实际在使用时,
workerGroup
在读完数据时,具体的业务逻辑处理,我们会提交到专门的业务逻辑线程池,例如在 Dubbo 或 Motan 这两个 RPC 框架中。这样一来,就完全符合【多 Reactor 多线程模型】。那么可能有胖友可能和我有一样的疑问,
bossGroup
如果配置多个线程,是否可以使用多个 mainReactor 呢?我们来分析一波,一个 Netty NIO 服务端同一时间,只能 bind 一个端口,那么只能使用一个 Selector 处理客户端连接事件。又因为,Selector 操作是非线程安全的,所以无法在多个 EventLoop ( 多个线程 )中,同时操作。所以这样就导致,即使bossGroup
配置多个线程,实际能够使用的也就是一个线程。那么如果一定一定一定要多个 mainReactor 呢?创建多个 Netty NIO 服务端,并绑定多个端口。
如果 Reactor 模式讲解的不够清晰,或者想要更加深入的理解,推荐阅读如下文章:
永顺 《Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)》 里面有几个图不错。
Essviv 《Reactor 模型》 里面的代码示例不错。
xieshuang 《异步网络模型》 内容很高端,一看就是高玩。
Netty 源码解析 —— EventLoop(二)之 EventLoopGroup
1. 概述
在 《 Netty 源码分析 —— Netty 简介(二)之核心组件》 中,对 EventLoopGroup 和 EventLoop 做了定义,我们再来回顾下:
Channel 为Netty 网络操作抽象类,EventLoop 负责处理注册到其上的 Channel 处理 I/O 操作,两者配合参与 I/O 操作。
EventLoopGroup 是一个 EventLoop 的分组,它可以获取到一个或者多个 EventLoop 对象,因此它提供了迭代出 EventLoop 对象的方法。
在 《 Netty 源码分析 —— 启动》 中,我们特别熟悉的一段代码就是:
new NioEventLoopGroup()
,创建一个 EventLoopGroup 对象。EventLoopGroup#register(Channel channel)
,将 Channel 注册到 EventLoopGroup 上。
那么,本文我们分享 EventLoopGroup 的具体代码实现,来一探究竟。
2. 类结构图
EventLoopGroup 的整体类结构如下图:
红框部分,为 EventLoopGroup 相关的类关系。其他部分,为 EventLoop 相关的类关系。
因为我们实际上使用的是 NioEventLoopGroup 和 NioEventLoop ,所以笔者省略了其它相关的类,例如 OioEventLoopGroup、EmbeddedEventLoop 等等。
下面,我们逐层看看每个接口和类的实现代码。
3. EventExecutorGroup
io.netty.util.concurrent.EventExecutorGroup
,实现 Iterable、ScheduledExecutorService 接口,EventExecutor ( 事件执行器 )的分组接口。代码如下:
// ========== 自定义接口 ==========
boolean isShuttingDown();
// 优雅关闭
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
Future<?> terminationFuture();
// 选择一个 EventExecutor 对象
EventExecutor next();
// ========== 实现自 Iterable 接口 ==========
@Override
Iterator<EventExecutor> iterator();
// ========== 实现自 ExecutorService 接口 ==========
@Override
Future<?> submit(Runnable task);
@Override
<T> Future<T> submit(Runnable task, T result);
@Override
<T> Future<T> submit(Callable<T> task);
@Override
@Deprecated
void shutdown();
@Override
@Deprecated
List<Runnable> shutdownNow();
// ========== 实现自 ScheduledExecutorService 接口 ==========
@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
每个接口的方法的意思比较好理解,笔者就不一一赘述了。
比较特殊的是,接口方法返回类型为 Future 不是 Java 原生的
java.util.concurrent.Future
,而是 Netty 自己实现的 Future 接口。详细解析,见后续文章。EventExecutorGroup 自身不执行任务,而是将任务
#submit(...)
或#schedule(...)
给自己管理的 EventExecutor 的分组。至于提交给哪一个 EventExecutor ,一般是通过#next()
方法,选择一个 EventExecutor 。
4. AbstractEventExecutorGroup
io.netty.util.concurrent.AbstractEventExecutorGroup
,实现 EventExecutorGroup 接口,EventExecutor ( 事件执行器 )的分组抽象类。
4.1 submit
#submit(...)
方法,提交一个普通任务到 EventExecutor 中。代码如下:
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return next().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return next().submit(task);
}
提交的 EventExecutor ,通过
#next()
方法选择。
4.2 schedule
#schedule(...)
方法,提交一个定时任务到 EventExecutor 中。代码如下:
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return next().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return next().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return next().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
提交的 EventExecutor ,通过
#next()
方法选择。
4.3 execute
#execute(...)
方法,在 EventExecutor 中执行一个普通任务。代码如下:
@Override
public void execute(Runnable command) {
next().execute(command);
}
执行的 EventExecutor ,通过
#next()
方法选择。看起来
#execute(...)
和#submit(...)
方法有几分相似,具体的差异,由 EventExecutor 的实现决定。
4.4 invokeAll
#invokeAll(...)
方法,在 EventExecutor 中执行多个普通任务。代码如下:
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return next().invokeAll(tasks);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return next().invokeAll(tasks, timeout, unit);
}
执行的 EventExecutor ,通过
#next()
方法选择。并且,多个任务使用同一个 EventExecutor 。
4.5 invokeAny
#invokeAll(...)
方法,在 EventExecutor 中执行多个普通任务,有一个执行完成即可。代码如下
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return next().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return next().invokeAny(tasks, timeout, unit);
}
执行的 EventExecutor ,通过
#next()
方法选择。并且,多个任务使用同一个 EventExecutor 。
4.6 shutdown
#shutdown(...)
方法,关闭 EventExecutorGroup 。代码如下:
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD /* 2 */, DEFAULT_SHUTDOWN_TIMEOUT /* 15 */, TimeUnit.SECONDS);
}
@Override
@Deprecated
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
@Override
@Deprecated
public abstract void shutdown();
具体的
#shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
和#shutdown()
方法,由子类实现。
5. MultithreadEventExecutorGroup
io.netty.util.concurrent.MultithreadEventExecutorGroup
,继承 AbstractEventExecutorGroup 抽象类,基于多线程的 EventExecutor ( 事件执行器 )的分组抽象类。
5.1 构造方法
/**
* EventExecutor 数组
*/
private final EventExecutor[] children;
/**
* 不可变( 只读 )的 EventExecutor 数组
*
* @see #MultithreadEventExecutorGroup(int, Executor, EventExecutorChooserFactory, Object...)
*/
private final Set<EventExecutor> readonlyChildren;
/**
* 已终止的 EventExecutor 数量
*/
private final AtomicInteger terminatedChildren = new AtomicInteger();
/**
* 用于终止 EventExecutor 的异步 Future
*/
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
/**
* EventExecutor 选择器
*/
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
1: protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
2: if (nThreads <= 0) {
3: throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
4: }
5:
6: // 创建执行器
7: if (executor == null) {
8: executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
9: }
10:
11: // 创建 EventExecutor 数组
12: children = new EventExecutor[nThreads];
13:
14: for (int i = 0; i < nThreads; i ++) {
15: boolean success = false; // 是否创建成功
16: try {
17: // 创建 EventExecutor 对象
18: children[i] = newChild(executor, args);
19: // 标记创建成功
20: success = true;
21: } catch (Exception e) {
22: // 创建失败,抛出 IllegalStateException 异常
23: // TODO: Think about if this is a good exception type
24: throw new IllegalStateException("failed to create a child event loop", e);
25: } finally {
26: // 创建失败,关闭所有已创建的 EventExecutor
27: if (!success) {
28: // 关闭所有已创建的 EventExecutor
29: for (int j = 0; j < i; j ++) {
30: children[j].shutdownGracefully();
31: }
32: // 确保所有已创建的 EventExecutor 已关闭
33: for (int j = 0; j < i; j ++) {
34: EventExecutor e = children[j];
35: try {
36: while (!e.isTerminated()) {
37: e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
38: }
39: } catch (InterruptedException interrupted) {
40: // Let the caller handle the interruption.
41: Thread.currentThread().interrupt();
42: break;
43: }
44: }
45: }
46: }
47: }
48:
49: // 创建 EventExecutor 选择器
50: chooser = chooserFactory.newChooser(children);
51:
52: // 创建监听器,用于 EventExecutor 终止时的监听
53: final FutureListener<Object> terminationListener = new FutureListener<Object>() {
54:
55: @Override
56: public void operationComplete(Future<Object> future) throws Exception {
57: if (terminatedChildren.incrementAndGet() == children.length) { // 全部关闭
58: terminationFuture.setSuccess(null); // 设置结果,并通知监听器们。
59: }
60: }
61:
62: };
63: // 设置监听器到每个 EventExecutor 上
64: for (EventExecutor e: children) {
65: e.terminationFuture().addListener(terminationListener);
66: }
67:
68: // 创建不可变( 只读 )的 EventExecutor 数组
69: Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
70: Collections.addAll(childrenSet, children);
71: readonlyChildren = Collections.unmodifiableSet(childrenSet);
72: }
方法参数
executor
,执行器。详细解析,见 「5.2 ThreadPerTaskExecutor」 。第 6 至 9 行:若
executor
为空,则创建执行器。
第 12 行:创建 EventExecutor 数组。
第 18 行:调用
#newChild(Executor executor, Object... args)
方法,创建 EventExecutor 对象,然后设置到数组中。第 21 至 24 行:创建失败,抛出 IllegalStateException 异常。
第 25 至 45 行:创建失败,关闭所有已创建的 EventExecutor 。
第 50 行:调用
EventExecutorChooserFactory#newChooser(EventExecutor[] executors)
方法,创建 EventExecutor 选择器。详细解析,见 「5.3 EventExecutorChooserFactory」 。第 52 至 62 行:创建监听器,用于 EventExecutor 终止时的监听。
第 55 至 60 行:回调的具体逻辑是,当所有 EventExecutor 都终止完成时,通过调用
Future#setSuccess(V result)
方法,通知监听器们。至于为什么设置的值是null
,因为监听器们不关注具体的结果。第 63 至 66 行:设置监听器到每个 EventExecutor 上。
第 68 至 71 行:创建不可变( 只读 )的 EventExecutor 数组。
5.2 ThreadPerTaskExecutor
io.netty.util.concurrent.ThreadPerTaskExecutor
,实现 Executor 接口,每个任务一个线程的执行器实现类。代码如下:
public final class ThreadPerTaskExecutor implements Executor {
/**
* 线程工厂对象
*/
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
/**
* 执行任务
*
* @param command 任务
*/
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
threadFactory
属性,线程工厂对象。Netty 实现自定义的 ThreadFactory 类,为io.netty.util.concurrent.DefaultThreadFactory
。关于 DefaultThreadFactory 比较简单,胖友可以自己看看。#execute(Runnable command)
方法,通过ThreadFactory#newThread(Runnable)
方法,创建一个 Thread ,然后调用Thread#start()
方法,启动线程执行任务。
5.3 EventExecutorChooserFactory
io.netty.util.concurrent.EventExecutorChooserFactory
,EventExecutorChooser 工厂接口。代码如下:
public interface EventExecutorChooserFactory {
/**
* 创建一个 EventExecutorChooser 对象
*
* Returns a new {@link EventExecutorChooser}.
*/
EventExecutorChooser newChooser(EventExecutor[] executors);
/**
* EventExecutor 选择器接口
*
* Chooses the next {@link EventExecutor} to use.
*/
@UnstableApi
interface EventExecutorChooser {
/**
* 选择下一个 EventExecutor 对象
*
* Returns the new {@link EventExecutor} to use.
*/
EventExecutor next();
}
}
#newChooser(EventExecutor[] executors)
方法,创建一个 EventExecutorChooser 对象。EventExecutorChooser 接口,EventExecutor 选择器接口。
#next()
方法,选择下一个 EventExecutor 对象。
5.3.1 DefaultEventExecutorChooserFactory
io.netty.util.concurrent.DefaultEventExecutorChooserFactory
,实现 EventExecutorChooserFactory 接口,默认 EventExecutorChooser 工厂实现类。代码如下
/**
* 单例
*/
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) { // 是否为 2 的幂次方
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
INSTANCE
静态属性,单例。#newChooser(EventExecutor[] executors)
方法,调用#isPowerOfTwo(int val)
方法,判断 EventExecutor 数组的大小是否为 2 的幂次方。若是,创建 PowerOfTwoEventExecutorChooser 对象。详细解析,见 「5.3.3 PowerOfTwoEventExecutorChooser」 。
若否,创建 GenericEventExecutorChooser 对象。详细解析,见 「5.3.2 GenericEventExecutorChooser」 。
#isPowerOfTwo(int val)
方法,为什么(val & -val) == val
可以判断数字是否为 2 的幂次方呢?我们以 8 来举个例子。
8 的二进制为
1000
。-8 的二进制使用补码表示。所以,先求反生成反码为
0111
,然后加一生成补码为1000
。8 和 -8 并操作后,还是 8 。
实际上,以 2 为幂次方的数字,都是最高位为 1 ,剩余位为 0 ,所以对应的负数,求完补码还是自己。
5.3.2 GenericEventExecutorChooser
GenericEventExecutorChooser 实现 EventExecutorChooser 接口,通用的 EventExecutor 选择器实现类。代码如下:
GenericEventExecutorChooser 内嵌在 DefaultEventExecutorChooserFactory 类中。
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
/**
* 自增序列
*/
private final AtomicInteger idx = new AtomicInteger();
/**
* EventExecutor 数组
*/
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
实现比较简单,使用
idx
自增,并使用 EventExecutor 数组的大小来取余。
5.3.3 PowerOfTwoEventExecutorChooser
PowerOfTwoEventExecutorChooser 实现 EventExecutorChooser 接口,基于 EventExecutor 数组的大小为 2 的幂次方的 EventExecutor 选择器实现类。这是一个优化的实现,代码如下:
PowerOfTwoEventExecutorChooser 内嵌在 DefaultEventExecutorChooserFactory 类中。
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
/**
* 自增序列
*/
private final AtomicInteger idx = new AtomicInteger();
/**
* EventExecutor 数组
*/
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
实现比较巧妙,通过
idx
自增,并使用【EventExecutor 数组的大小 - 1】进行进行&
并操作。因为
-
( 二元操作符 ) 的计算优先级高于&
( 一元操作符 ) 。因为 EventExecutor 数组的大小是以 2 为幂次方的数字,那么减一后,除了最高位是 0 ,剩余位都为 1 ( 例如 8 减一后等于 7 ,而 7 的二进制为 0111 。),那么无论
idx
无论如何递增,再进行&
并操作,都不会超过 EventExecutor 数组的大小。并且,还能保证顺序递增。
5.4 newDefaultThreadFactory
#newDefaultThreadFactory()
方法,创建线程工厂对象。代码如下:
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
创建的对象为 DefaultThreadFactory ,并且使用类名作为
poolType
。
5.5 next
#next()
方法,选择下一个 EventExecutor 对象。代码如下:
@Override
public EventExecutor next() {
return chooser.next();
}
5.6 iterator
#iterator()
方法,获得 EventExecutor 数组的迭代器。代码如下:
@Override
public Iterator<EventExecutor> iterator() {
return readonlyChildren.iterator();
}
为了避免调用方,获得迭代器后,对 EventExecutor 数组进行修改,所以返回是不可变的 EventExecutor 数组
readonlyChildren
的迭代器。
5.7 executorCount
#executorCount()
方法,获得 EventExecutor 数组的大小。代码如下:
public final int executorCount() {
return children.length;
}
5.8 newChild
#newChild(Executor executor, Object... args)
抽象方法,创建 EventExecutor 对象。代码如下:
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
子类实现该方法,创建其对应的 EventExecutor 实现类的对象。
5.9 关闭相关方法
如下是关闭相关的方法,比较简单,胖友自己研究:
#terminationFuture()
#shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
#shutdown()
#awaitTermination(long timeout, TimeUnit unit)
#isShuttingDown()
#isShutdown()
#isTerminated()
6. EventLoopGroup
io.netty.channel.EventExecutorGroup
,继承 EventExecutorGroup 接口,EventLoop 的分组接口。代码如下
// ========== 自定义接口 ==========
/**
* Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
* will get notified once the registration was complete.
*/
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
// ========== 实现自 EventExecutorGroup 接口 ==========
@Override
EventLoop next();
#next()
方法,选择下一个 EventLoop 对象。#register(...)
方法,注册 Channel 到 EventLoopGroup 中。实际上,EventLoopGroup 会分配一个 EventLoop 给该 Channel 注册。
7. MultithreadEventLoopGroup
io.netty.channel.MultithreadEventLoopGroup
,实现 EventLoopGroup 接口,继承 MultithreadEventExecutorGroup 抽象类,基于多线程的 EventLoop 的分组抽象类。
7.1 构造方法
/**
* 默认 EventLoop 线程数
*/
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
DEFAULT_EVENT_LOOP_THREADS
属性,EventLoopGroup 默认拥有的 EventLoop 数量。因为一个 EventLoop 对应一个线程,所以为 CPU 数量 * 2 。为什么会 * 2 呢?因为目前 CPU 基本都是超线程,一个 CPU 可对应 2 个线程。
在构造方法未传入
nThreads
方法参数时,使用DEFAULT_EVENT_LOOP_THREADS
。
7.2 newDefaultThreadFactory
newDefaultThreadFactory
#newDefaultThreadFactory()
方法,创建线程工厂对象。代码如下:
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
覆盖父类方法,增加了线程优先级为
Thread.MAX_PRIORITY
。
7.3 next
#next()
方法,选择下一个 EventLoop 对象。代码如下:
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
覆盖父类方法,将返回值转换成 EventLoop 类。
7.4 newChild
#newChild(Executor executor, Object... args)
抽象方法,创建 EventExecutor 对象。代码如下:
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
覆盖父类方法,返回值改为 EventLoop 类。
7.5 register
#register()
方法,注册 Channel 到 EventLoopGroup 中。实际上,EventLoopGroup 会分配一个 EventLoop 给该 Channel 注册。代码如下:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
Channel 注册的 EventLoop ,通过
#next()
方法来选择。
8. NioEventLoopGroup
io.netty.channel.nio.NioEventLoopGroup
,继承 MultithreadEventLoopGroup 抽象类,NioEventLoop 的分组实现类。
8.1 构造方法
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
构造方法比较多,主要是明确了父构造方法的
Object ... args
方法参数:第一个参数,
selectorProvider
,java.nio.channels.spi.SelectorProvider
,用于创建 Java NIO Selector 对象。第二个参数,
selectStrategyFactory
,io.netty.channel.SelectStrategyFactory
,选择策略工厂。详细解析,见后续文章。第三个参数,
rejectedExecutionHandler
,io.netty.channel.SelectStrategyFactory
,拒绝执行处理器。详细解析,见后续文章。
8.2 newChild
#newChild(Executor executor, Object... args)
方法,创建 NioEventLoop 对象。代码如下:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor,
(SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
通过
Object... args
方法参数,传入给 NioEventLoop 创建需要的参数。
8.3 setIoRatio
#setIoRatio(int ioRatio)
方法,设置所有 EventLoop 的 IO 任务占用执行时间的比例。代码如下:
/**
* Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
for (EventExecutor e: this) {
((NioEventLoop) e).setIoRatio(ioRatio);
}
}
8.4 rebuildSelectors
#rebuildSelectors()
方法,重建所有 EventLoop 的 Selector 对象。代码如下:
/**
* Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*/
public void rebuildSelectors() {
for (EventExecutor e: this) {
((NioEventLoop) e).rebuildSelector();
}
}
因为 JDK 有 epoll 100% CPU Bug 。实际上,NioEventLoop 当触发该 Bug 时,也会自动调用
NioEventLoop#rebuildSelector()
方法,进行重建 Selector 对象,以修复该问题。
还是比较简单的文章。如果有不清晰的地方,也可以阅读如下文章:
永顺 《Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)》 的 「NioEventLoopGroup 实例化过程」 小节。
Hypercube 《自顶向下深入分析Netty(四)—— EventLoop-1》
Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化
本文深入分析了 Netty 中 EventLoop 的初始化过程,详细介绍了各个相关类和接口的结构、方法,以及它们在 EventLoop 初始化过程中的作用。
Key Takeaways
EventLoop 是 Netty 中处理 I/O 操作的核心组件,它会处理注册在其上的 Channel 的所有 IO 操作。
EventLoopGroup 是 EventLoop 的集合,它包含多个 EventLoop 线程,用于并发处理多个 Channel。
SingleThreadEventExecutor 是基于单线程的 EventExecutor 抽象类,每个 EventExecutor 对应一个线程。
SingleThreadEventLoop 是基于单线程的 EventLoop 抽象类,它主要增加了 Channel 注册到 EventLoop 上的功能。
NioEventLoop 是 NIO EventLoop 的实现类,它处理注册到其中的 Channel 的就绪的 IO 事件,以及用户提交的任务。
EventLoop 初始化过程涉及多个类和接口,包括 EventExecutor、OrderedEventExecutor、EventLoop、AbstractEventExecutor、AbstractScheduledEventExecutor、SingleThreadEventExecutor、SingleThreadEventLoop 和 NioEventLoop。
EventLoop 在实现上,线程的初始化采用延迟启动的方式,只有在第一个任务时,线程才会被创建,从而节省资源。
Netty 是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。在 Netty 中,
EventLoop
和EventLoopGroup
是非常核心的组件,它们负责 I/O 操作以及事件的处理。1. EventLoopGroup
定义:
EventLoopGroup
是一组EventLoop
的容器,它提供了创建新连接或接受新连接的方法。当创建一个Channel
时,你通常需要提供一个EventLoopGroup
或者两个(一个用于服务器端接受新的连接,另一个用于处理这些连接)。类型:
BossGroup
:通常用于处理新的连接请求。
WorkerGroup
:处理已经被接受的连接上的 I/O 操作。2. EventLoop
定义:
EventLoop
实现了EventExecutor
接口,是实际执行 I/O 操作的地方,每个EventLoop
都有自己的线程,并且这个线程负责处理分配给它的所有任务。3. EventLoop 初始化
3.1 创建 EventLoopGroup
初始化
EventLoop
通常是通过创建EventLoopGroup
实现的。例如,创建一个NioEventLoopGroup
:EventLoopGroup group = new NioEventLoopGroup();
这行代码内部会创建多个
NioEventLoop
实例,并将它们放在一个内部列表中。3.2 NioEventLoop 细节
构造函数:
public NioEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider, int ioRatio, boolean useEpollDirectBuffer) { super(parent, threadFactory, ioRatio); // ... }
继承关系:
NioEventLoop
继承自MultithreadEventLoop
。
MultithreadEventLoop
继承自AbstractEventLoop
。
AbstractEventLoop
实现了EventLoop
接口。关键属性:
Selector
:用于多路复用 I/O 操作。
Thread
:与EventLoop
关联的线程。
TaskQueue
:用于存放非 I/O 相关的任务队列。
IoTaskQueue
:用于存放 I/O 相关的任务队列。3.3 初始化过程
初始化线程:
在
NioEventLoop
构造函数中,会通过传入的ThreadFactory
创建一个新的线程,并启动该线程。线程的
run()
方法实际上调用了NioEventLoop
的run()
方法。初始化 Selector:
NioEventLoop
会在构造函数中创建一个Selector
。
Selector
被用来监听多个通道的 I/O 事件。初始化任务队列:
NioEventLoop
包含了一个IoTaskQueue
和一个TaskQueue
。
IoTaskQueue
用于存放 I/O 任务。
TaskQueue
用于存放非 I/O 任务。3.4 运行循环
主循环:
NioEventLoop
的run()
方法包含了主循环,在这个循环里,EventLoop
会轮询Selector
的就绪操作符,并处理就绪的事件。同时也会处理任务队列中的任务。
4. 总结
EventLoop
的初始化主要是创建并配置好Selector
和相关线程,以及设置好任务队列,以便能够有效地处理 I/O 事件和执行任务。对于更深入的理解,建议阅读 Netty 的源代码以获得更详细的实现细节。
Netty中EventLoop的延迟启动机制是如何实现的,它 对性能和资源利用率有什么影响? 除了NIOEventLoop之外,Netty中还有哪些其他类型 的EventLoop实现? 如何使用Netty的EventLoop来处理异步操作和事件驱 动编程?
Netty 的 EventLoop
提供了一种非常灵活的方式来处理 I/O 操作和其他异步任务。下面我将逐一解答你的问题。
1. EventLoop 的延迟启动机制
实现原理
EventLoop
的延迟启动机制允许在创建 EventLoop
之后,稍后才真正启动它。这种机制可以通过以下方式实现:
构造函数:在创建
EventLoop
时并不立即启动线程,而是准备好线程和相关资源。启动方法:通过调用如
start()
或execute()
等方法来显式地启动EventLoop
。
影响
性能:延迟启动可以避免不必要的线程创建开销,特别是在不确定是否需要使用某个
EventLoop
的时候。资源利用率:只在需要时启动
EventLoop
可以节省系统资源,减少内存和 CPU 的消耗。
2. 其他类型的 EventLoop 实现
除了 NioEventLoop
之外,Netty 还支持多种不同的 EventLoop
实现,以适应不同的 I/O 模型:
EpollEventLoop:针对 Linux 平台,利用
epoll
代替select
/poll
提供更高性能的 I/O 多路复用。KQueueEventLoop:针对 BSD 系统(包括 macOS),使用
kqueue
进行 I/O 多路复用。DatagramChannelEventLoop:专门处理 UDP 通信。
EpollDatagramChannelEventLoop:针对 Linux 平台的 UDP 通信,使用
epoll
。OioEventLoop:阻塞 I/O 的实现,通常用于测试目的。
3. 使用 EventLoop 处理异步操作
基本用法
创建 EventLoopGroup:
EventLoopGroup group = new NioEventLoopGroup(); // 或者 EpollEventLoopGroup 等
注册 Channel:
try (Channel channel = new NioServerSocketChannel(group)) { // 配置 Channel channel.bind(new InetSocketAddress(8080)).sync(); }
提交任务:
group.execute(() -> { // 执行非 I/O 任务 System.out.println("Hello from EventLoop!"); });
调度任务:
group.schedule(() -> { // 定时执行的任务 System.out.println("Scheduled task executed!"); }, 5, TimeUnit.SECONDS);
示例代码
假设你需要使用 EventLoop
来执行一个定时任务和一个 I/O 操作:
public class Example {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
// 提交一个定时任务
group.schedule(() -> {
System.out.println("Scheduled task executed after 5 seconds.");
}, 5, TimeUnit.SECONDS);
// 提交一个非 I/O 任务
group.execute(() -> {
System.out.println("Executing non-I/O task.");
});
// 创建 Channel 并绑定到端口
try (Channel ch = new NioServerSocketChannel(group)) {
ch.bind(new InetSocketAddress(8080)).sync();
// 等待关闭
ch.closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
在这个示例中,我们首先创建了一个 NioEventLoopGroup
,然后向其中提交了一个定时任务和一个非 I/O 任务。最后,我们创建了一个 Channel
并将其绑定到了一个端口上。
4. 总结
延迟启动机制:提高了资源利用率和性能。
不同实现:Netty 支持多种
EventLoop
实现,可以根据操作系统选择最合适的实现。异步操作:通过提交任务到
EventLoop
,可以轻松实现异步编程模型。
希望这些信息能帮助你更好地理解 Netty 中 EventLoop
的工作原理及其在异步编程中的作用!
1. 概述
本文我们分享 EventLoop 的具体代码实现。因为 EventLoop 涉及的代码量较大,所以笔者会分成好几篇文章分别分享。而本文,我们来分享 EventLoop 的初始化。
但是要将 EventLoop 拆出“初始化”部分的内容,笔者又觉得是件非常困难的事情。所以本文希望能达到如下的效果:
理解 EventLoop 有哪些属性
创建 EventLoop 的过程
Channel 注册到 EventLoop 的过程
EventLoop 的任务提交。
虽然任务的提交,比较接近任务的执行,但是考虑到胖友可以更容易的理解 EventLoop ,所以放在本文。
2. 类结构图
EventLoopGroup 的整体类结构如下图:
红框部分,为 EventLoopGroup 相关的类关系。其他部分,为 EventLoop 相关的类关系。
因为我们实际上使用的是 NioEventLoopGroup 和 NioEventLoop ,所以笔者省略了其它相关的类,例如 OioEventLoopGroup、EmbeddedEventLoop 等等。
下面,我们逐层看看每个接口和类的实现代码。
3. EventExecutor
io.netty.util.concurrent.EventExecutor
,继承 EventExecutorGroup 接口,事件执行器接口。代码如下:
// ========== 实现自 EventExecutorGroup 接口 ==========
/**
* 返回自己
*
* Returns a reference to itself.
*/
@Override
EventExecutor next();
// ========== 自定义接口 ==========
/**
* 所属 EventExecutorGroup
*
* Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
*/
EventExecutorGroup parent();
/**
* 当前线程是否在 EventLoop 线程中
*
* Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
*/
boolean inEventLoop();
/**
* 指定线程是否是 EventLoop 线程
*
* Return {@code true} if the given {@link Thread} is executed in the event loop,
* {@code false} otherwise.
*/
boolean inEventLoop(Thread thread);
/**
* 创建一个 Promise 对象
*
* Return a new {@link Promise}.
*/
<V> Promise<V> newPromise();
/**
* 创建一个 ProgressivePromise 对象
*
* Create a new {@link ProgressivePromise}.
*/
<V> ProgressivePromise<V> newProgressivePromise();
/**
* 创建成功结果的 Future 对象
*
* Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
<V> Future<V> newSucceededFuture(V result);
/**
* 创建异常的 Future 对象
*
* Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
* will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
<V> Future<V> newFailedFuture(Throwable cause);
接口定义的方法比较简单,已经添加中文注释,胖友自己看下。
4. OrderedEventExecutor
io.netty.util.concurrent.OrderedEventExecutor
,继承 EventExecutor 接口,有序的事件执行器接口。代码如下:
/**
* Marker interface for {@link EventExecutor}s that will process all submitted tasks in an ordered / serial fashion.
*/
public interface OrderedEventExecutor extends EventExecutor {
}
没有定义任何方法,仅仅是一个标记接口,表示该执行器会有序 / 串行的方式执行。
5. EventLoop
io.netty.channel.EventLoop
,继承 OrderedEventExecutor 和 EventLoopGroup 接口,EventLoop 接口。代码如下:
/**
* Will handle all the I/O operations for a {@link Channel} once registered.
*
* One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on
* implementation details and internals.
*
*/
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
#parent()
接口方法,覆写方法的返回类型为 EventLoopGroup 。接口上的英文注释,意思如下:
EventLoop 将会处理注册在其上的 Channel 的所有 IO 操作。
通常,一个 EventLoop 上可以注册不只一个 Channel 。当然,这个也取决于具体的实现。
6. AbstractEventExecutor
io.netty.util.concurrent.AbstractEventExecutor
,实现 EventExecutor 接口,继承 AbstractExecutorService 抽象类,EventExecutor 抽象类。
6.1 构造方法#parent()
接口方法,覆写方法的返回类型为 EventLoopGroup 。
接口上的英文注释,意思如下:
EventLoop 将会处理注册在其上的 Channel 的所有 IO 操作。
通常,一个 EventLoop 上可以注册不只一个 Channel 。当然,这个也取决于具体的实现。
6. AbstractEventExecutor
io.netty.util.concurrent.AbstractEventExecutor
,实现 EventExecutor 接口,继承 AbstractExecutorService 抽象类,EventExecutor 抽象类。
6.1 构造方法
/**
* 所属 EventExecutorGroup
*/
private final EventExecutorGroup parent;
/**
* EventExecutor 数组。只包含自己,用于 {@link #iterator()}
*/
private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);
protected AbstractEventExecutor() {
this(null);
}
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
6.2 parent
#parent()
方法,获得所属 EventExecutorGroup 。代码如下:
@Override
public EventExecutorGroup parent() {
return parent;
}
6.3 next
#next()
方法,获得自己。代码如下:
@Override
public EventExecutor next() {
return this;
}
6.4 inEventLoop()
#inEventLoop()
方法,判断当前线程是否在 EventLoop 线程中。代码如下:
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
具体的
#inEventLoop(Thread thread)
方法,需要在子类实现。因为 AbstractEventExecutor 类还体现不出它所拥有的线程。
6.5 iterator
#iterator()
方法,代码如下:
具体的 #inEventLoop(Thread thread) 方法,需要在子类实现。因为 AbstractEventExecutor 类还体现不出它所拥有的线程。
6.5 iterator
#iterator() 方法,代码如下:
6.6 newPromise 和 newProgressivePromise
#newPromise()
和 #newProgressivePromise()
方法,分别创建 DefaultPromise 和 DefaultProgressivePromise 对象。代码如下:
@Override
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}
@Override
public <V> ProgressivePromise<V> newProgressivePromise() {
return new DefaultProgressivePromise<V>(this);
}
我们可以看到,创建的 Promise 对象,都会传入自身作为 EventExecutor 。关于 Promise 相关的,我们在后续文章详细解析。实在想了解,也可以看看 《Netty 源码笔记 —— 第四章 Future 和 Promise》 。
6.7 newSucceededFuture 和 newFailedFuture
#newSucceededFuture(V result)
和 #newFailedFuture(Throwable cause)
方法,分别创建成功结果和异常的 Future 对象。代码如下:
@Override
public <V> Future<V> newSucceededFuture(V result) {
return new SucceededFuture<V>(this, result);
}
@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
return new FailedFuture<V>(this, cause);
}
创建的 Future 对象,会传入自身作为 EventExecutor ,并传入
result
或cause
分别作为成功结果和异常。
6.8 newTaskFor
#newTaskFor(...)
方法,创建 PromiseTask 对象。代码如下:
@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new PromiseTask<T>(this, runnable, value);
}
@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new PromiseTask<T>(this, callable);
}
创建的 PromiseTask 对象,会传入自身作为 EventExecutor ,并传入 Runnable + Value 或 Callable 作为任务( Task )。
6.9 submit
#submit(...)
方法,提交任务。代码如下:
@Override
public Future<?> submit(Runnable task) {
return (Future<?>) super.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return (Future<T>) super.submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return (Future<T>) super.submit(task);
}
每个方法的实现上,是调用父类 AbstractExecutorService 的实现。
6.10 schedule
#schedule(...)
方法,都不支持,交给子类 AbstractScheduledEventExecutor 实现。代码如下:
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
throw new UnsupportedOperationException();
}
6.11 safeExecute
#safeExecute(Runnable task)
静态方法,安全的执行任务。代码如下:
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
所谓“安全”指的是,当任务执行发生异常时,仅仅打印告警日志。
6.12 shutdown
#shutdown()
方法,关闭执行器。代码如下:
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
@Override
@Deprecated
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
具体的
#shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
和#shutdown()
方法的实现,在子类中。
7. AbstractScheduledEventExecutor
io.netty.util.concurrent.AbstractScheduledEventExecutor
,继承 AbstractEventExecutor 抽象类,支持定时任务的 EventExecutor 的抽象类。
详细解析,见 《精尽 Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务》 。
8. SingleThreadEventExecutor
io.netty.util.concurrent.SingleThreadEventExecutor
,实现 OrderedEventExecutor 接口,继承 AbstractScheduledEventExecutor 抽象类,基于单线程的 EventExecutor 抽象类,即一个 EventExecutor 对应一个线程。
8.1 构造方法
/**
* {@link #state} 字段的原子更新器
*/
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
/**
* {@link #thread} 字段的原子更新器
*/
private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER = AtomicReferenceFieldUpdater.newUpdater(SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
/**
* 任务队列
*
* @see #newTaskQueue(int)
*/
private final Queue<Runnable> taskQueue;
/**
* 线程
*/
private volatile Thread thread;
/**
* 线程属性
*/
@SuppressWarnings("unused")
private volatile ThreadProperties threadProperties;
/**
* 执行器
*/
private final Executor executor;
/**
* 线程是否已经打断
*
* @see #interruptThread()
*/
private volatile boolean interrupted;
/**
* TODO 1006 EventLoop 优雅关闭
*/
private final Semaphore threadLock = new Semaphore(0);
/**
* TODO 1006 EventLoop 优雅关闭
*/
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
/**
* 添加任务时,是否唤醒线程{@link #thread}
*/
private final boolean addTaskWakesUp;
/**
* 最大等待执行任务数量,即 {@link #taskQueue} 的队列大小
*/
private final int maxPendingTasks;
/**
* 拒绝执行处理器
*
* @see #reject()
* @see #reject(Runnable)
*/
private final RejectedExecutionHandler rejectedExecutionHandler;
/**
* 最后执行时间
*/
private long lastExecutionTime;
/**
* 状态
*/
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED;
/**
* TODO 优雅关闭
*/
private volatile long gracefulShutdownQuietPeriod;
/**
* 优雅关闭超时时间,单位:毫秒 TODO 1006 EventLoop 优雅关闭
*/
private volatile long gracefulShutdownTimeout;
/**
* 优雅关闭开始时间,单位:毫秒 TODO 1006 EventLoop 优雅关闭
*/
private long gracefulShutdownStartTime;
/**
* TODO 1006 EventLoop 优雅关闭
*/
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
}
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
属性比较多,我们耐心往下看。
taskQueue
属性,任务队列。addTaskWakesUp
属性,添加任务到taskQueue
队列时,是否唤醒thread
线程。详细解析,见 「8.11 execute」 。maxPendingTasks
属性,最大等待执行任务数量,即taskQueue
队列大小。rejectedExecutionHandler
属性,拒绝执行处理器。在taskQueue
队列超过最大任务数量时,怎么拒绝处理新提交的任务。
thread
属性,线程。在 SingleThreadEventExecutor 中,任务是提交到taskQueue
队列中,而执行在thread
线程中。threadProperties
属性,线程属性。详细解析,见 「8.15 threadProperties」 。executor
属性,执行器。通过它创建thread
线程。详细解析,见 「8.11 execute」 。interrupted
属性,线程是否打断。详细解析,详细解析,见 「8.14 interruptThread」 。lastExecutionTime
属性,最后执行时间。state
属性,线程状态。SingleThreadEventExecutor 在实现上,thread
的初始化采用延迟启动的方式,只有在第一个任务时,executor
才会执行并创建该线程,从而节省资源。目前thread
线程有 5 种状态,代码如下:
private static final int ST_NOT_STARTED = 1; // 未开始
private static final int ST_STARTED = 2; // 已开始
private static final int ST_SHUTTING_DOWN = 3; // 正在关闭中
private static final int ST_SHUTDOWN = 4; // 已关闭
private static final int ST_TERMINATED = 5; // 已经终止
状态变更流程如下图:
构造方法,虽然比较多,但是很简单,胖友自己看下。
8.2 newTaskQueue
#newTaskQueue(int maxPendingTasks)
方法,创建任务队列。代码如下:
/**
* Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
* {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
* calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
* implementation that does not support blocking operations at all.
*/
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
方法上有一大段注释,简单的说,这个方法默认返回的是 LinkedBlockingQueue 阻塞队列。如果子类有更好的队列选择( 例如非阻塞队列 ),可以重写该方法。在下文,我们会看到它的子类 NioEventLoop ,就重写了这个方法。
8.3 inEventLoop
#inEventLoop(Thread thread)
方法,判断指定线程是否是 EventLoop 线程。代码如下:
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
8.4 offerTask
#offerTask(Runnable task)
方法,添加任务到队列中。若添加失败,则返回 false
。代码如下:
final boolean offerTask(Runnable task) {
// 关闭时,拒绝任务
if (isShutdown()) {
reject();
}
// 添加任务到队列
return taskQueue.offer(task);
}
注意,即使对于 BlockingQueue 的
#offer(E e)
方法,也不是阻塞的!
8.5 addTask
#offerTask(Runnable task)
方法,在 #offerTask(Runnable task)
的方法的基础上,若添加任务到队列中失败,则进行拒绝任务。代码如下:
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 添加任务到队列
if (!offerTask(task)) {
// 添加失败,则拒绝任务
reject(task);
}
}
调用
#reject(task)
方法,拒绝任务。详细解析,见 「8.6 reject」 。该方法是
void
,无返回值。
8.6 removeTask
#removeTask(Runnable task)
方法,移除指定任务。代码如下:
protected boolean removeTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
return taskQueue.remove(task);
}
8.7 peekTask
#peekTask()
方法,返回队头的任务,但是不移除。代码如下:
protected Runnable peekTask() {
assert inEventLoop(); // 仅允许在 EventLoop 线程中执行
return taskQueue.peek();
}
8.8 hasTasks
#hasTasks()
方法,队列中是否有任务。代码如下:
protected boolean hasTasks() {
assert inEventLoop(); // 仅允许在 EventLoop 线程中执行
return !taskQueue.isEmpty();
}
8.9 pendingTasks
#pendingTasks()
方法,获得队列中的任务数。代码如下:
public int pendingTasks() {
return taskQueue.size();
}
8.10 reject
#reject(Runnable task)
方法,拒绝任务。代码如下:
protected final void reject(Runnable task) {
rejectedExecutionHandler.rejected(task, this);
}
调用
RejectedExecutionHandler#rejected(Runnable task, SingleThreadEventExecutor executor)
方法,拒绝该任务。
#reject()
方法,拒绝任何任务,用于 SingleThreadEventExecutor 已关闭( #isShutdown()
方法返回的结果为 true
)的情况。代码如下:
protected static void reject() {
throw new RejectedExecutionException("event executor terminated");
}
8.10.1 RejectedExecutionHandler
io.netty.util.concurrent.RejectedExecutionHandler
,拒绝执行处理器接口。代码如下:
/**
* Called when someone tried to add a task to {@link SingleThreadEventExecutor} but this failed due capacity
* restrictions.
*/
void rejected(Runnable task, SingleThreadEventExecutor executor);
8.10.2 RejectedExecutionHandlers
io.netty.util.concurrent.RejectedExecutionHandlers
,RejectedExecutionHandler 实现类枚举,目前有 2 种实现类。
第一种
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
public static RejectedExecutionHandler reject() {
return REJECT;
}
通过
#reject()
方法,返回REJECT
实现类的对象。该实现在拒绝时,直接抛出 RejectedExecutionException 异常。默认情况下,使用这种实现。
第二种
public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) {
ObjectUtil.checkPositive(retries, "retries");
final long backOffNanos = unit.toNanos(backoffAmount);
return new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
if (!executor.inEventLoop()) { // 非 EventLoop 线程中。如果在 EventLoop 线程中,就无法执行任务,这就导致完全无法重试了。
// 循环多次尝试添加到队列中
for (int i = 0; i < retries; i++) {
// 唤醒执行器,进行任务执行。这样,就可能执行掉部分任务。
// Try to wake up the executor so it will empty its task queue.
executor.wakeup(false);
// 阻塞等待
LockSupport.parkNanos(backOffNanos);
// 添加任务
if (executor.offerTask(task)) {
return;
}
}
}
// Either we tried to add the task from within the EventLoop or we was not able to add it even with
// backoff.
// 多次尝试添加失败,抛出 RejectedExecutionException 异常
throw new RejectedExecutionException();
}
};
}
通过
#backoff(final int retries, long backoffAmount, TimeUnit unit)
方法,创建带多次尝试添加到任务队列的 RejectedExecutionHandler 实现类。代码已经添加中文注释,胖友自己理解下,比较简单的。
8.11 execute
#execute(Runnable task)
方法,执行一个任务。但是方法名无法很完整的体现出具体的方法实现,甚至有一些出入,所以我们直接看源码,代码如下:
1: @Override
2: public void execute(Runnable task) {
3: if (task == null) {
4: throw new NullPointerException("task");
5: }
6:
7: // 获得当前是否在 EventLoop 的线程中
8: boolean inEventLoop = inEventLoop();
9: // 添加到任务队列
10: addTask(task);
11: if (!inEventLoop) {
12: // 创建线程
13: startThread();
14: // 若已经关闭,移除任务,并进行拒绝
15: if (isShutdown() && removeTask(task)) {
16: reject();
17: }
18: }
19:
20: // 唤醒线程
21: if (!addTaskWakesUp && wakesUpForTask(task)) {
22: wakeup(inEventLoop);
23: }
24:
第 8 行:调用
#inEventLoop()
方法,获得当前是否在 EventLoop 的线程中。第 10 行:调用
#addTask(Runnable task)
方法,添加任务到队列中。第 11 行:非 EventLoop 的线程
第 13 行:调用
#startThread()
方法,启动 EventLoop 独占的线程,即thread
属性。详细解析,见 「8.12 startThread」 。第 14 至 17 行:若已经关闭,则移除任务,并拒绝执行。
第 20 至 23 行:调用
#wakeup(boolean inEventLoop)
方法,唤醒线程。详细解析,见 「8.13 wakeup」 。等等,第 21 行的
!addTaskWakesUp
有点奇怪,不是说好的addTaskWakesUp
表示“添加任务时,是否唤醒线程”?!但是,怎么使用!
取反了。这样反倒变成了,“添加任务时,是否【不】唤醒线程”。具体的原因是为什么呢?笔者 Google、Github Netty Issue、和基佬讨论,都未找到解答。目前笔者的理解是:addTaskWakesUp
真正的意思是,“添加任务后,任务是否会自动导致线程唤醒”。为什么呢?对于 Nio 使用的 NioEventLoop ,它的线程执行任务是基于 Selector 监听感兴趣的事件,所以当任务添加到
taskQueue
队列中时,线程是无感知的,所以需要调用#wakeup(boolean inEventLoop)
方法,进行主动的唤醒。对于 Oio 使用的 ThreadPerChannelEventLoop ,它的线程执行是基于
taskQueue
队列监听( 阻塞拉取 )事件和任务,所以当任务添加到taskQueue
队列中时,线程是可感知的,相当于说,进行被动的唤醒。
调用 #wakesUpForTask(task)
方法,判断该任务是否需要唤醒线程。代码如下:
protected boolean wakesUpForTask(Runnable task) {
return true;
}
默认返回
true
。在 「9. SingleThreadEventLoop」 中,我们会看到对该方法的重写。
8.12 startThread
#startThread()
方法,启动 EventLoop 独占的线程,即 thread
属性。代码如下:
1: private void doStartThread() {
2: assert thread == null;
3: executor.execute(new Runnable() {
4:
5: @Override
6: public void run() {
7: // 记录当前线程
8: thread = Thread.currentThread();
9:
10: // 如果当前线程已经被标记打断,则进行打断操作。
11: if (interrupted) {
12: thread.interrupt();
13: }
14:
15: boolean success = false; // 是否执行成功
16:
17: // 更新最后执行时间
18: updateLastExecutionTime();
19: try {
20: // 执行任务
21: SingleThreadEventExecutor.this.run();
22: success = true; // 标记执行成功
23: } catch (Throwable t) {
24: logger.warn("Unexpected exception from an event executor: ", t);
25: } finally {
26: // TODO 1006 EventLoop 优雅关闭
27: for (;;) {
28: int oldState = state;
29: if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
30: SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
31: break;
32: }
33: }
34:
35: // TODO 1006 EventLoop 优雅关闭
36: // Check if confirmShutdown() was called at the end of the loop.
37: if (success && gracefulShutdownStartTime == 0) {
38: if (logger.isErrorEnabled()) {
39: logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
40: SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
41: "be called before run() implementation terminates.");
42: }
43: }
44:
45: // TODO 1006 EventLoop 优雅关闭
46: try {
47: // Run all remaining tasks and shutdown hooks.
48: for (;;) {
49: if (confirmShutdown()) {
50: break;
51: }
52: }
53: } finally {
54: try {
55: cleanup(); // 清理,释放资源
56: } finally {
57: STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
58: threadLock.release();
59: if (!taskQueue.isEmpty()) {
60: if (logger.isWarnEnabled()) {
61: logger.warn("An event executor terminated with " +
62: "non-empty task queue (" + taskQueue.size() + ')');
63: }
64: }
65:
66: terminationFuture.setSuccess(null);
67: }
68: }
69: }
70:
71: }
72: });
73: }
第 2 行:断言,保证
thread
为空。第 3 行 至 72 行:调用
Executor#execute(Runnable runnable)
方法,执行任务。下面,我们来详细解析。第 8 行:赋值当前的线程给
thread
属性。这就是,每个 SingleThreadEventExecutor 独占的线程的创建方式。第 10 至 13 行:如果当前线程已经被标记打断,则进行打断操作。为什么会有这样的逻辑呢?详细解析,见 「8.14 interruptThread」 。
第 18 行:调用
#updateLastExecutionTime()
方法,更新最后执行时间。代码如下:
/**
* Updates the internal timestamp that tells when a submitted task was executed most recently.
* {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
* usually no need to call this method. However, if you take the tasks manually using {@link #takeTask()} or
* {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
* checks.
*/
protected void updateLastExecutionTime() {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
英文注释,自己看。😈
第 21 行:调用
SingleThreadEventExecutor#run()
方法,执行任务。详细解析,见 8.X run 。第 25 至 69 行:TODO 1006 EventLoop 优雅关闭
第 55 行:调用
#cleanup()
方法,清理释放资源。详细解析,见 8.X cleanup 。