1. 首页
  2. Netty源码解析
  3. Netty源码分析之 EventLoop

Netty源码分析之 EventLoop

  • 发布于 2024-08-21
  • 7 次阅读

Netty 源码解析 —— EventLoop(一)之 Reactor 模型

本文深入解析了 Netty 中 EventLoopGroup 的代码实现,重点讲解了 MultithreadEventExecutorGroup 和 NioEventLoopGroup 的实现细节。

Key Takeaways

  • EventExecutorGroup 是 EventExecutor ( 事件执行器 )的分组接口,它提供了迭代出 EventExecutor 对象的方法。

  • MultithreadEventExecutorGroup 是基于多线程的 EventExecutor 分组抽象类,它使用线程池来管理多个 EventExecutor 对象。

  • NioEventLoopGroup 是 NioEventLoop 的分组实现类,它使用 Java NIO 的 Selector 对象来处理 I/O 事件。

  • NioEventLoopGroup 的构造方法接受多个参数,包括线程池、SelectorProvider、SelectStrategyFactory 和 RejectedExecutionHandler,用于创建 NioEventLoop 对象。

  • NioEventLoopGroup 提供了 setIoRatio() 方法,用于设置所有 EventLoop 的 IO 任务占用执行时间的比例。

  • NioEventLoopGroup 提供了 rebuildSelectors() 方法,用于重建所有 EventLoop 的 Selector 对象。

1. 概述

从本文开始,我们来分享 Netty 非常重要的一个组件 EventLoop 。在看 EventLoop 的具体实现之前,我们先来对 Reactor 模型做个简单的了解。

为什么要了解 Reactor 模型呢?因为 EventLoop 是 Netty 基于 Reactor 模型的思想进行实现。所以理解 Reactor 模型,对于我们理解 EventLoop 会有很大帮助。

我们来看看 Reactor 模型的核心思想

将关注的 I/O 事件注册到多路复用器上,一旦有 I/O 事件触发,将事件分发到事件处理器中,执行就绪 I/O 事件对应的处理函数中。模型中有三个重要的组件:

  • 多路复用器:由操作系统提供接口,Linux 提供的 I/O 复用接口有select、poll、epoll 。

  • 事件分离器:将多路复用器返回的就绪事件分发到事件处理器中。

  • 事件处理器:处理就绪事件处理函数。

初步一看,Java NIO 符合 Reactor 模型啊?因为 Reactor 有 3 种模型实现:

  1. 单 Reactor 单线程模型

  2. 单 Reactor 多线程模型

  3. 多 Reactor 多线程模型

😈 由于不擅长相对理论文章的内容编写,所以 「2.」「3.」「4.」 小节的内容,我决定一本正经的引用基友 wier 的 《【NIO 系列】—— 之Reactor 模型》

2. 单 Reactor 单线程模型

示例图如下:

示例代码主要表达大体逻辑,比较奔放。所以,胖友理解大体意思就好。

Reactor 示例代码如下:

/**
* 等待事件到来,分发事件处理
*/
class Reactor implements Runnable {
​
  private Reactor() throws Exception {
      SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
      // attach Acceptor 处理新连接
      sk.attach(new Acceptor());
  }
​
​  @Override
  public void run() {
      try {
          while (!Thread.interrupted()) {
              selector.select();
              Set selected = selector.selectedKeys();
              Iterator it = selected.iterator();
              while (it.hasNext()) {
                  it.remove();
                  //分发事件处理
                  dispatch((SelectionKey) (it.next()));
              }
          }
      } catch (IOException ex) {
          //do something
      }
  }
​
  void dispatch(SelectionKey k) {
      // 若是连接事件获取是acceptor
      // 若是IO读写事件获取是handler
      Runnable runnable = (Runnable) (k.attachment());
      if (runnable != null) {
          runnable.run();
      }
  }
​
}

示例的 Handler 的代码实现应该是漏了。胖友脑补一个实现 Runnable 接口的 Handler 类。😈

这是最基础的单 Reactor 单线程模型。

Reactor 线程,负责多路分离套接字。

  • 有新连接到来触发 OP_ACCEPT 事件之后, 交由 Acceptor 进行处理。

  • 有 IO 读写事件之后,交给 Handler 处理。

Acceptor 主要任务是构造 Handler 。

  • 在获取到 Client 相关的 SocketChannel 之后,绑定到相应的 Handler 上。

  • 对应的 SocketChannel 有读写事件之后,基于 Reactor 分发,Handler 就可以处理了。

注意,所有的 IO 事件都绑定到 Selector 上,由 Reactor 统一分发


该模型适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多

3. 单 Reactor 多线程模型

示例图如下:

相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到 IO 的读写事件之后,交由线程池来处理,这样可以减小主 Reactor 的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。

MultiThreadHandler 示例代码如下:

/**
* 多线程处理读写业务逻辑
*/
class MultiThreadHandler implements Runnable {
  public static final int READING = 0, WRITING = 1;
  int state;
  final SocketChannel socket;
  final SelectionKey sk;
​
  //多线程处理业务逻辑
  ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
​
​
  public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
      this.state = READING;
      this.socket = socket;
      sk = socket.register(selector, SelectionKey.OP_READ);
      sk.attach(this);
      socket.configureBlocking(false);
  }
​
  @Override
  public void run() {
      if (state == READING) {
          read();
      } else if (state == WRITING) {
          write();
      }
  }
​
  private void read() {
      //任务异步处理
      executorService.submit(() -> process());
​
      //下一步处理写事件
      sk.interestOps(SelectionKey.OP_WRITE);
      this.state = WRITING;
  }
​
  private void write() {
      //任务异步处理
      executorService.submit(() -> process());
​
      //下一步处理读事件
      sk.interestOps(SelectionKey.OP_READ);
      this.state = READING;
  }
​
  /**
    * task 业务处理
    */
  public void process() {
      //do IO ,task,queue something
  }
}
  • #read()#write() 方法中,提交 executorService 线程池,进行处理。

4. 多 Reactor 多线程模型

示例图如下:

第三种模型比起第二种模型,是将 Reactor 分成两部分:

  1. mainReactor 负责监听 ServerSocketChannel ,用来处理客户端新连接的建立,并将建立的客户端的 SocketChannel 指定注册给 subReactor 。

  2. subReactor 维护自己的 Selector ,基于 mainReactor 建立的客户端的 SocketChannel 多路分离 IO 读写事件,读写网络数据。对于业务处理的功能,另外扔给 worker 线程池来完成。

MultiWorkThreadAcceptor 示例代码如下:

/**
* 多work 连接事件Acceptor,处理连接事件
*/
class MultiWorkThreadAcceptor implements Runnable {
​
  // cpu线程数相同多work线程
  int workCount = Runtime.getRuntime().availableProcessors();
  SubReactor[] workThreadHandlers = new SubReactor[workCount];
  volatile int nextHandler = 0;
​
  public MultiWorkThreadAcceptor() {
      this.init();
  }
​
  public void init() {
      nextHandler = 0;
      for (int i = 0; i < workThreadHandlers.length; i++) {
          try {
              workThreadHandlers[i] = new SubReactor();
          } catch (Exception e) {
          }
      }
  }
​
  @Override
  public void run() {
      try {
          SocketChannel c = serverSocket.accept();
          if (c != null) {// 注册读写
              synchronized (c) {
                  // 顺序获取SubReactor,然后注册channel 
                  SubReactor work = workThreadHandlers[nextHandler];
                  work.registerChannel(c);
                  nextHandler++;
                  if (nextHandler >= workThreadHandlers.length) {
                      nextHandler = 0;
                  }
              }
          }
      } catch (Exception e) {
      }
  }
}

SubReactor 示例代码如下:

/**
* 多work线程处理读写业务逻辑
*/
class SubReactor implements Runnable {
  final Selector mySelector;
​
  //多线程处理业务逻辑
  int workCount =Runtime.getRuntime().availableProcessors();
  ExecutorService executorService = Executors.newFixedThreadPool(workCount);
​
​
  public SubReactor() throws Exception {
      // 每个SubReactor 一个selector 
      this.mySelector = SelectorProvider.provider().openSelector();
  }
​
  /**
    * 注册chanel
    *
    * @param sc
    * @throws Exception
    */
  public void registerChannel(SocketChannel sc) throws Exception {
      sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
  }
​
  @Override
  public void run() {
      while (true) {
          try {
          //每个SubReactor 自己做事件分派处理读写事件
              selector.select();
              Set<SelectionKey> keys = selector.selectedKeys();
              Iterator<SelectionKey> iterator = keys.iterator();
              while (iterator.hasNext()) {
                  SelectionKey key = iterator.next();
                  iterator.remove();
                  if (key.isReadable()) {
                      read();
                  } else if (key.isWritable()) {
                      write();
                  }
              }
​
          } catch (Exception e) {
​
          }
      }
  }
​
  private void read() {
      //任务异步处理
      executorService.submit(() -> process());
  }
​
  private void write() {
      //任务异步处理
      executorService.submit(() -> process());
  }
​
  /**
    * task 业务处理
    */
  public void process() {
      //do IO ,task,queue something
  }
}

从代码中,我们可以看到:

  1. mainReactor 主要用来处理网络 IO 连接建立操作,通常,mainReactor 只需要一个,因为它一个线程就可以处理。

  2. subReactor 主要和建立起来的客户端的 SocketChannel 做数据交互和事件业务处理操作。通常,subReactor 的个数和 CPU 个数相等,每个 subReactor 独占一个线程来处理。


此种模式中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大大的提升,支持的可并发客户端数量可达到上百万级别。

一般来说,是达到数十万级别。

关于此种模式的应用,目前有很多优秀的框架已经在应用,比如 Mina 和 Netty 等等。上述中去掉线程池的第三种形式的变种,也是 Netty NIO 的默认模式

5. Netty NIO 客户端

我们来看看 Netty NIO 客户端的示例代码中,和 EventLoop 相关的代码:

// 创建一个 EventLoopGroup 对象
EventLoopGroup group = new NioEventLoopGroup();
// 创建 Bootstrap 对象
Bootstrap b = new Bootstrap();
// 设置使用的 EventLoopGroup
b.group(group);
  • 对于 Netty NIO 客户端来说,仅创建一个 EventLoopGroup 。

  • 一个 EventLoop 可以对应一个 Reactor 。因为 EventLoopGroup 是 EventLoop 的分组,所以对等理解,EventLoopGroup 是一种 Reactor 的分组。

  • 一个 Bootstrap 的启动,只能发起对一个远程的地址。所以只会使用一个 NIO Selector ,也就是说仅使用一个 Reactor 。即使,我们在声明使用一个 EventLoopGroup ,该 EventLoopGroup 也只会分配一个 EventLoop 对 IO 事件进行处理。

  • 因为 Reactor 模型主要使用服务端的开发中,如果套用在 Netty NIO 客户端中,到底使用了哪一种模式呢?

    • 如果只有一个业务线程使用 Netty NIO 客户端,那么可以认为是【单 Reactor 线程模型】。

    • 如果有多个业务线程使用 Netty NIO 客户端,那么可以认为是【单 Reactor 线程模型】。

  • 那么 Netty NIO 客户端是否能够使用【多 Reactor 多线程模型】呢?😈 创建多个 Netty NIO 客户端,连接同一个服务端。那么多个 Netty 客户端就可以认为符合多 Reactor 多线程模型了。

    • 一般情况下,我们不会这么干。

    • 当然,实际也有这样的示例。例如 Dubbo 或 Motan 这两个 RPC 框架,支持通过配置,同一个 Consumer 对同一个 Provider 实例同时建立多个客户端连接。

6. Netty NIO 服务端

我们来看看 Netty NIO 服务端的示例代码中,和 EventLoop 相关的代码

// 创建两个 EventLoopGroup 对象
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 创建 boss 线程组 用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建 worker 线程组 用于进行 SocketChannel 的数据读写
// 创建 ServerBootstrap 对象
ServerBootstrap b = new ServerBootstrap();
// 设置使用的 EventLoopGroup
b.group(bossGroup, workerGroup);
  • 对于 Netty NIO 服务端来说,创建两个 EventLoopGroup 。

    • bossGroup 对应 Reactor 模式的 mainReactor ,用于服务端接受客户端的连接。比较特殊的是,传入了方法参数 nThreads = 1 ,表示只使用一个 EventLoop ,即只使用一个 Reactor 。这个也符合我们上面提到的,“通常,mainReactor 只需要一个,因为它一个线程就可以处理”。

    • workerGroup 对应 Reactor 模式的 subReactor ,用于进行 SocketChannel 的数据读写。对于 EventLoopGroup ,如果未传递方法参数 nThreads ,表示使用 CPU 个数 Reactor 。这个也符合我们上面提到的,“通常,subReactor 的个数和 CPU 个数相等,每个 subReactor 独占一个线程来处理”。

  • 因为使用两个 EventLoopGroup ,所以符合【多 Reactor 多线程模型】的多 Reactor 的要求。实际在使用时,workerGroup 在读完数据时,具体的业务逻辑处理,我们会提交到专门的业务逻辑线程池,例如在 Dubbo 或 Motan 这两个 RPC 框架中。这样一来,就完全符合【多 Reactor 多线程模型】。

  • 那么可能有胖友可能和我有一样的疑问,bossGroup 如果配置多个线程,是否可以使用多个 mainReactor 呢?我们来分析一波,一个 Netty NIO 服务端同一时间,只能 bind 一个端口,那么只能使用一个 Selector 处理客户端连接事件。又因为,Selector 操作是非线程安全的,所以无法在多个 EventLoop ( 多个线程 )中,同时操作。所以这样就导致,即使 bossGroup 配置多个线程,实际能够使用的也就是一个线程。

  • 那么如果一定一定一定要多个 mainReactor 呢?创建多个 Netty NIO 服务端,并绑定多个端口。

如果 Reactor 模式讲解的不够清晰,或者想要更加深入的理解,推荐阅读如下文章:

Netty 源码解析 —— EventLoop(二)之 EventLoopGroup

1. 概述

在 《 Netty 源码分析 —— Netty 简介(二)之核心组件》 中,对 EventLoopGroup 和 EventLoop 做了定义,我们再来回顾下:

  • Channel 为Netty 网络操作抽象类,EventLoop 负责处理注册到其上的 Channel 处理 I/O 操作,两者配合参与 I/O 操作。

  • EventLoopGroup 是一个 EventLoop 的分组,它可以获取到一个或者多个 EventLoop 对象,因此它提供了迭代出 EventLoop 对象的方法。

在 《 Netty 源码分析 —— 启动》 中,我们特别熟悉的一段代码就是:

  • new NioEventLoopGroup() ,创建一个 EventLoopGroup 对象。

  • EventLoopGroup#register(Channel channel) ,将 Channel 注册到 EventLoopGroup 上。

那么,本文我们分享 EventLoopGroup 的具体代码实现,来一探究竟。

2. 类结构图

EventLoopGroup 的整体类结构如下图:

  • 红框部分,为 EventLoopGroup 相关的类关系。其他部分,为 EventLoop 相关的类关系。

  • 因为我们实际上使用的是 NioEventLoopGroup 和 NioEventLoop ,所以笔者省略了其它相关的类,例如 OioEventLoopGroup、EmbeddedEventLoop 等等。

下面,我们逐层看看每个接口和类的实现代码。

3. EventExecutorGroup

io.netty.util.concurrent.EventExecutorGroup ,实现 Iterable、ScheduledExecutorService 接口,EventExecutor ( 事件执行器 )的分组接口。代码如下:

// ========== 自定义接口 ==========

boolean isShuttingDown();

// 优雅关闭
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

Future<?> terminationFuture();

// 选择一个 EventExecutor 对象
EventExecutor next(); 

// ========== 实现自 Iterable 接口 ==========

@Override
Iterator<EventExecutor> iterator();

// ========== 实现自 ExecutorService 接口 ==========

@Override
Future<?> submit(Runnable task);
@Override
<T> Future<T> submit(Runnable task, T result);
@Override
<T> Future<T> submit(Callable<T> task);

@Override
@Deprecated
void shutdown();
@Override
@Deprecated
List<Runnable> shutdownNow();

// ========== 实现自 ScheduledExecutorService 接口 ==========

@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
  • 每个接口的方法的意思比较好理解,笔者就不一一赘述了。

  • 比较特殊的是,接口方法返回类型为 Future 不是 Java 原生的 java.util.concurrent.Future ,而是 Netty 自己实现的 Future 接口。详细解析,见后续文章。

  • EventExecutorGroup 自身不执行任务,而是将任务 #submit(...)#schedule(...) 给自己管理的 EventExecutor 的分组。至于提交给哪一个 EventExecutor ,一般是通过 #next() 方法,选择一个 EventExecutor 。

4. AbstractEventExecutorGroup

io.netty.util.concurrent.AbstractEventExecutorGroup ,实现 EventExecutorGroup 接口,EventExecutor ( 事件执行器 )的分组抽象类。

4.1 submit

#submit(...) 方法,提交一个普通任务到 EventExecutor 中。代码如下:

@Override
public Future<?> submit(Runnable task) {
    return next().submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
    return next().submit(task, result);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
    return next().submit(task);
}
  • 提交的 EventExecutor ,通过 #next() 方法选择。

4.2 schedule

#schedule(...) 方法,提交一个定时任务到 EventExecutor 中。代码如下:

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    return next().schedule(command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
    return next().schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    return next().scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
  • 提交的 EventExecutor ,通过 #next() 方法选择。

4.3 execute

#execute(...) 方法,在 EventExecutor 中执行一个普通任务。代码如下:

@Override
public void execute(Runnable command) {
    next().execute(command);
}
  • 执行的 EventExecutor ,通过 #next() 方法选择。

  • 看起来 #execute(...)#submit(...) 方法有几分相似,具体的差异,由 EventExecutor 的实现决定。

4.4 invokeAll

#invokeAll(...) 方法,在 EventExecutor 中执行多个普通任务。代码如下:

@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
    return next().invokeAll(tasks);
}

@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
    return next().invokeAll(tasks, timeout, unit);
}
  • 执行的 EventExecutor ,通过 #next() 方法选择。并且,多个任务使用同一个 EventExecutor 。

4.5 invokeAny

#invokeAll(...) 方法,在 EventExecutor 中执行多个普通任务,有一个执行完成即可。代码如下

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
    return next().invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    return next().invokeAny(tasks, timeout, unit);
}

  • 执行的 EventExecutor ,通过 #next() 方法选择。并且,多个任务使用同一个 EventExecutor 。

4.6 shutdown

#shutdown(...) 方法,关闭 EventExecutorGroup 。代码如下:

@Override
public Future<?> shutdownGracefully() {
    return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD /* 2 */, DEFAULT_SHUTDOWN_TIMEOUT /* 15 */, TimeUnit.SECONDS);
}

@Override
@Deprecated
public List<Runnable> shutdownNow() {
    shutdown();
    return Collections.emptyList();
}

@Override
@Deprecated
public abstract void shutdown();
  • 具体的 #shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)#shutdown() 方法,由子类实现。

5. MultithreadEventExecutorGroup

io.netty.util.concurrent.MultithreadEventExecutorGroup ,继承 AbstractEventExecutorGroup 抽象类,基于多线程的 EventExecutor ( 事件执行器 )的分组抽象类。

5.1 构造方法

/**
 * EventExecutor 数组
 */
private final EventExecutor[] children;
/**
 * 不可变( 只读 )的 EventExecutor 数组
 *
 * @see #MultithreadEventExecutorGroup(int, Executor, EventExecutorChooserFactory, Object...)
 */
private final Set<EventExecutor> readonlyChildren;
/**
 * 已终止的 EventExecutor 数量
 */
private final AtomicInteger terminatedChildren = new AtomicInteger();
/**
 * 用于终止 EventExecutor 的异步 Future
 */
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
/**
 * EventExecutor 选择器
 */
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

  1: protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
  2:     if (nThreads <= 0) {
  3:         throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  4:     }
  5: 
  6:     // 创建执行器
  7:     if (executor == null) {
  8:         executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  9:     }
 10: 
 11:     // 创建 EventExecutor 数组
 12:     children = new EventExecutor[nThreads];
 13: 
 14:     for (int i = 0; i < nThreads; i ++) {
 15:         boolean success = false; // 是否创建成功
 16:         try {
 17:             // 创建 EventExecutor 对象
 18:             children[i] = newChild(executor, args);
 19:             // 标记创建成功
 20:             success = true;
 21:         } catch (Exception e) {
 22:             // 创建失败,抛出 IllegalStateException 异常
 23:             // TODO: Think about if this is a good exception type
 24:             throw new IllegalStateException("failed to create a child event loop", e);
 25:         } finally {
 26:             // 创建失败,关闭所有已创建的 EventExecutor
 27:             if (!success) {
 28:                 // 关闭所有已创建的 EventExecutor
 29:                 for (int j = 0; j < i; j ++) {
 30:                     children[j].shutdownGracefully();
 31:                 }
 32:                 // 确保所有已创建的 EventExecutor 已关闭
 33:                 for (int j = 0; j < i; j ++) {
 34:                     EventExecutor e = children[j];
 35:                     try {
 36:                         while (!e.isTerminated()) {
 37:                             e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
 38:                         }
 39:                     } catch (InterruptedException interrupted) {
 40:                         // Let the caller handle the interruption.
 41:                         Thread.currentThread().interrupt();
 42:                         break;
 43:                     }
 44:                 }
 45:             }
 46:         }
 47:     }
 48: 
 49:     // 创建 EventExecutor 选择器
 50:     chooser = chooserFactory.newChooser(children);
 51: 
 52:     // 创建监听器,用于 EventExecutor 终止时的监听
 53:     final FutureListener<Object> terminationListener = new FutureListener<Object>() {
 54: 
 55:         @Override
 56:         public void operationComplete(Future<Object> future) throws Exception {
 57:             if (terminatedChildren.incrementAndGet() == children.length) { // 全部关闭
 58:                 terminationFuture.setSuccess(null); // 设置结果,并通知监听器们。
 59:             }
 60:         }
 61: 
 62:     };
 63:     // 设置监听器到每个 EventExecutor 上
 64:     for (EventExecutor e: children) {
 65:         e.terminationFuture().addListener(terminationListener);
 66:     }
 67: 
 68:     // 创建不可变( 只读 )的 EventExecutor 数组
 69:     Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
 70:     Collections.addAll(childrenSet, children);
 71:     readonlyChildren = Collections.unmodifiableSet(childrenSet);
 72: }
  • 方法参数 executor ,执行器。详细解析,见 「5.2 ThreadPerTaskExecutor」 。

    • 第 6 至 9 行:若 executor 为空,则创建执行器。

  • 第 12 行:创建 EventExecutor 数组。

    • 第 18 行:调用 #newChild(Executor executor, Object... args) 方法,创建 EventExecutor 对象,然后设置到数组中。

    • 第 21 至 24 行:创建失败,抛出 IllegalStateException 异常。

    • 第 25 至 45 行:创建失败,关闭所有已创建的 EventExecutor 。

  • 第 50 行:调用 EventExecutorChooserFactory#newChooser(EventExecutor[] executors) 方法,创建 EventExecutor 选择器。详细解析,见 「5.3 EventExecutorChooserFactory」 。

  • 第 52 至 62 行:创建监听器,用于 EventExecutor 终止时的监听。

    • 第 55 至 60 行:回调的具体逻辑是,当所有 EventExecutor 都终止完成时,通过调用 Future#setSuccess(V result) 方法,通知监听器们。至于为什么设置的值是 null ,因为监听器们不关注具体的结果。

    • 第 63 至 66 行:设置监听器到每个 EventExecutor 上。

  • 第 68 至 71 行:创建不可变( 只读 )的 EventExecutor 数组。

5.2 ThreadPerTaskExecutor

io.netty.util.concurrent.ThreadPerTaskExecutor ,实现 Executor 接口,每个任务一个线程的执行器实现类。代码如下:

public final class ThreadPerTaskExecutor implements Executor {

    /**
     * 线程工厂对象
     */
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    /**
     * 执行任务
     *
     * @param command 任务
     */
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }

}
  • threadFactory 属性,线程工厂对象。Netty 实现自定义的 ThreadFactory 类,为 io.netty.util.concurrent.DefaultThreadFactory 。关于 DefaultThreadFactory 比较简单,胖友可以自己看看。

  • #execute(Runnable command) 方法,通过 ThreadFactory#newThread(Runnable) 方法,创建一个 Thread ,然后调用 Thread#start() 方法,启动线程执行任务

5.3 EventExecutorChooserFactory

io.netty.util.concurrent.EventExecutorChooserFactory ,EventExecutorChooser 工厂接口。代码如下:

public interface EventExecutorChooserFactory {

    /**
     * 创建一个 EventExecutorChooser 对象
     *
     * Returns a new {@link EventExecutorChooser}.
     */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /**
     *  EventExecutor 选择器接口
     *
     * Chooses the next {@link EventExecutor} to use.
     */
    @UnstableApi
    interface EventExecutorChooser {

        /**
         * 选择下一个 EventExecutor 对象
         *
         * Returns the new {@link EventExecutor} to use.
         */
        EventExecutor next();

    }

}
  • #newChooser(EventExecutor[] executors) 方法,创建一个 EventExecutorChooser 对象。

  • EventExecutorChooser 接口,EventExecutor 选择器接口。

    • #next() 方法,选择下一个 EventExecutor 对象。

5.3.1 DefaultEventExecutorChooserFactory

io.netty.util.concurrent.DefaultEventExecutorChooserFactory ,实现 EventExecutorChooserFactory 接口,默认 EventExecutorChooser 工厂实现类。代码如下

/**
 * 单例
 */
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

private DefaultEventExecutorChooserFactory() { }

@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) { // 是否为 2 的幂次方
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}
  • INSTANCE 静态属性,单例。

  • #newChooser(EventExecutor[] executors) 方法,调用 #isPowerOfTwo(int val) 方法,判断 EventExecutor 数组的大小是否为 2 的幂次方。

    • 若是,创建 PowerOfTwoEventExecutorChooser 对象。详细解析,见 「5.3.3 PowerOfTwoEventExecutorChooser」 。

    • 若否,创建 GenericEventExecutorChooser 对象。详细解析,见 「5.3.2 GenericEventExecutorChooser」 。

  • #isPowerOfTwo(int val) 方法,为什么 (val & -val) == val 可以判断数字是否为 2 的幂次方呢?

    • 我们以 8 来举个例子。

      • 8 的二进制为 1000

      • -8 的二进制使用补码表示。所以,先求反生成反码为 0111 ,然后加一生成补码为 1000

      • 8 和 -8 并操作后,还是 8 。

      • 实际上,以 2 为幂次方的数字,都是最高位为 1 ,剩余位为 0 ,所以对应的负数,求完补码还是自己。

5.3.2 GenericEventExecutorChooser

GenericEventExecutorChooser 实现 EventExecutorChooser 接口,通用的 EventExecutor 选择器实现类。代码如下:

GenericEventExecutorChooser 内嵌在 DefaultEventExecutorChooserFactory 类中。

private static final class GenericEventExecutorChooser implements EventExecutorChooser {

    /**
     * 自增序列
     */
    private final AtomicInteger idx = new AtomicInteger();
    /**
     * EventExecutor 数组
     */
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }

}
  • 实现比较简单,使用 idx 自增,并使用 EventExecutor 数组的大小来取余。

5.3.3 PowerOfTwoEventExecutorChooser

PowerOfTwoEventExecutorChooser 实现 EventExecutorChooser 接口,基于 EventExecutor 数组的大小为 2 的幂次方的 EventExecutor 选择器实现类。这是一个优化的实现,代码如下:

PowerOfTwoEventExecutorChooser 内嵌在 DefaultEventExecutorChooserFactory 类中。

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {

    /**
     * 自增序列
     */
    private final AtomicInteger idx = new AtomicInteger();
    /**
     * EventExecutor 数组
     */
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }

}
  • 实现比较巧妙,通过 idx 自增,并使用【EventExecutor 数组的大小 - 1】进行进行 & 并操作。

    • 因为 - ( 二元操作符 ) 的计算优先级高于 & ( 一元操作符 ) 。

    • 因为 EventExecutor 数组的大小是以 2 为幂次方的数字,那么减一后,除了最高位是 0 ,剩余位都为 1 ( 例如 8 减一后等于 7 ,而 7 的二进制为 0111 。),那么无论 idx 无论如何递增,再进行 & 并操作,都不会超过 EventExecutor 数组的大小。并且,还能保证顺序递增。

5.4 newDefaultThreadFactory

#newDefaultThreadFactory() 方法,创建线程工厂对象。代码如下:

protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass());
}
  • 创建的对象为 DefaultThreadFactory ,并且使用类名作为 poolType

5.5 next

#next() 方法,选择下一个 EventExecutor 对象。代码如下:

@Override
public EventExecutor next() {
    return chooser.next();
}

5.6 iterator

#iterator() 方法,获得 EventExecutor 数组的迭代器。代码如下:

@Override
public Iterator<EventExecutor> iterator() {
    return readonlyChildren.iterator();
}
  • 为了避免调用方,获得迭代器后,对 EventExecutor 数组进行修改,所以返回是不可变的 EventExecutor 数组 readonlyChildren 的迭代器。

5.7 executorCount

#executorCount() 方法,获得 EventExecutor 数组的大小。代码如下:

public final int executorCount() {
    return children.length;
}

5.8 newChild

#newChild(Executor executor, Object... args) 抽象方法,创建 EventExecutor 对象。代码如下:

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
  • 子类实现该方法,创建其对应的 EventExecutor 实现类的对象。

5.9 关闭相关方法

如下是关闭相关的方法,比较简单,胖友自己研究:

  • #terminationFuture()

  • #shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)

  • #shutdown()

  • #awaitTermination(long timeout, TimeUnit unit)

  • #isShuttingDown()

  • #isShutdown()

  • #isTerminated()

6. EventLoopGroup

io.netty.channel.EventExecutorGroup ,继承 EventExecutorGroup 接口,EventLoop 的分组接口。代码如下

// ========== 自定义接口 ==========

/**
 * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
 * will get notified once the registration was complete.
 */
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);

// ========== 实现自 EventExecutorGroup 接口 ==========

@Override
EventLoop next();
  • #next() 方法,选择下一个 EventLoop 对象。

  • #register(...) 方法,注册 Channel 到 EventLoopGroup 中。实际上,EventLoopGroup 会分配一个 EventLoop 给该 Channel 注册。

7. MultithreadEventLoopGroup

io.netty.channel.MultithreadEventLoopGroup ,实现 EventLoopGroup 接口,继承 MultithreadEventExecutorGroup 抽象类,基于多线程的 EventLoop 的分组抽象类。

7.1 构造方法

/**
 * 默认 EventLoop 线程数
 */
private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
  • DEFAULT_EVENT_LOOP_THREADS 属性,EventLoopGroup 默认拥有的 EventLoop 数量。因为一个 EventLoop 对应一个线程,所以为 CPU 数量 * 2 。

    • 为什么会 * 2 呢?因为目前 CPU 基本都是超线程,一个 CPU 可对应 2 个线程

    • 在构造方法未传入 nThreads 方法参数时,使用 DEFAULT_EVENT_LOOP_THREADS

7.2 newDefaultThreadFactory

newDefaultThreadFactory

#newDefaultThreadFactory() 方法,创建线程工厂对象。代码如下:

@Override
protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
  • 覆盖父类方法,增加了线程优先级为 Thread.MAX_PRIORITY

7.3 next

#next() 方法,选择下一个 EventLoop 对象。代码如下:

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}
  • 覆盖父类方法,将返回值转换成 EventLoop 类。

7.4 newChild

#newChild(Executor executor, Object... args) 抽象方法,创建 EventExecutor 对象。代码如下:

@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
  • 覆盖父类方法,返回值改为 EventLoop 类。

7.5 register

#register() 方法,注册 Channel 到 EventLoopGroup 中。实际上,EventLoopGroup 会分配一个 EventLoop 给该 Channel 注册。代码如下:

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

@Override
public ChannelFuture register(ChannelPromise promise) {
    return next().register(promise);
}

@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
    return next().register(channel, promise);
}
  • Channel 注册的 EventLoop ,通过 #next() 方法来选择。

8. NioEventLoopGroup

io.netty.channel.nio.NioEventLoopGroup ,继承 MultithreadEventLoopGroup 抽象类,NioEventLoop 的分组实现类。

8.1 构造方法

public NioEventLoopGroup() {
    this(0);
}

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
    this(nThreads, threadFactory, SelectorProvider.provider());
}

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(
        int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
    final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
            RejectedExecutionHandlers.reject());
}

public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory,
                         final RejectedExecutionHandler rejectedExecutionHandler) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
  • 构造方法比较多,主要是明确了父构造方法的 Object ... args 方法参数:

    • 第一个参数,selectorProviderjava.nio.channels.spi.SelectorProvider ,用于创建 Java NIO Selector 对象。

    • 第二个参数,selectStrategyFactoryio.netty.channel.SelectStrategyFactory ,选择策略工厂。详细解析,见后续文章。

    • 第三个参数,rejectedExecutionHandlerio.netty.channel.SelectStrategyFactory ,拒绝执行处理器。详细解析,见后续文章。

8.2 newChild

#newChild(Executor executor, Object... args) 方法,创建 NioEventLoop 对象。代码如下:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor,
            (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
  • 通过 Object... args 方法参数,传入给 NioEventLoop 创建需要的参数。

8.3 setIoRatio

#setIoRatio(int ioRatio) 方法,设置所有 EventLoop 的 IO 任务占用执行时间的比例。代码如下:

/**
 * Sets the percentage of the desired amount of time spent for I/O in the child event loops.  The default value is
 * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
 */
public void setIoRatio(int ioRatio) {
    for (EventExecutor e: this) {
        ((NioEventLoop) e).setIoRatio(ioRatio);
    }
}

8.4 rebuildSelectors

#rebuildSelectors() 方法,重建所有 EventLoop 的 Selector 对象。代码如下:

/**
 * Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
 * around the  infamous epoll 100% CPU bug.
 */
public void rebuildSelectors() {
    for (EventExecutor e: this) {
        ((NioEventLoop) e).rebuildSelector();
    }
}
  • 因为 JDK 有 epoll 100% CPU Bug 。实际上,NioEventLoop 当触发该 Bug 时,也会自动调用 NioEventLoop#rebuildSelector() 方法,进行重建 Selector 对象,以修复该问题。

还是比较简单的文章。如果有不清晰的地方,也可以阅读如下文章:

Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化

本文深入分析了 Netty 中 EventLoop 的初始化过程,详细介绍了各个相关类和接口的结构、方法,以及它们在 EventLoop 初始化过程中的作用。

Key Takeaways

  • EventLoop 是 Netty 中处理 I/O 操作的核心组件,它会处理注册在其上的 Channel 的所有 IO 操作。

  • EventLoopGroup 是 EventLoop 的集合,它包含多个 EventLoop 线程,用于并发处理多个 Channel。

  • SingleThreadEventExecutor 是基于单线程的 EventExecutor 抽象类,每个 EventExecutor 对应一个线程。

  • SingleThreadEventLoop 是基于单线程的 EventLoop 抽象类,它主要增加了 Channel 注册到 EventLoop 上的功能。

  • NioEventLoop 是 NIO EventLoop 的实现类,它处理注册到其中的 Channel 的就绪的 IO 事件,以及用户提交的任务。

  • EventLoop 初始化过程涉及多个类和接口,包括 EventExecutor、OrderedEventExecutor、EventLoop、AbstractEventExecutor、AbstractScheduledEventExecutor、SingleThreadEventExecutor、SingleThreadEventLoop 和 NioEventLoop。

  • EventLoop 在实现上,线程的初始化采用延迟启动的方式,只有在第一个任务时,线程才会被创建,从而节省资源。

Netty 是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。在 Netty 中,EventLoopEventLoopGroup 是非常核心的组件,它们负责 I/O 操作以及事件的处理。

1. EventLoopGroup

  • 定义EventLoopGroup 是一组 EventLoop 的容器,它提供了创建新连接或接受新连接的方法。当创建一个 Channel 时,你通常需要提供一个 EventLoopGroup 或者两个(一个用于服务器端接受新的连接,另一个用于处理这些连接)。

  • 类型

    • BossGroup:通常用于处理新的连接请求。

    • WorkerGroup:处理已经被接受的连接上的 I/O 操作。

2. EventLoop

  • 定义EventLoop 实现了 EventExecutor 接口,是实际执行 I/O 操作的地方,每个 EventLoop 都有自己的线程,并且这个线程负责处理分配给它的所有任务。

3. EventLoop 初始化

3.1 创建 EventLoopGroup

初始化 EventLoop 通常是通过创建 EventLoopGroup 实现的。例如,创建一个 NioEventLoopGroup

EventLoopGroup group = new NioEventLoopGroup();

这行代码内部会创建多个 NioEventLoop 实例,并将它们放在一个内部列表中。

3.2 NioEventLoop 细节

  • 构造函数

    public NioEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
                        SelectorProvider selectorProvider, int ioRatio,
                        boolean useEpollDirectBuffer) {
        super(parent, threadFactory, ioRatio);
        // ...
    }
  • 继承关系

    • NioEventLoop 继承自 MultithreadEventLoop

    • MultithreadEventLoop 继承自 AbstractEventLoop

    • AbstractEventLoop 实现了 EventLoop 接口。

  • 关键属性

    • Selector:用于多路复用 I/O 操作。

    • Thread:与 EventLoop 关联的线程。

    • TaskQueue:用于存放非 I/O 相关的任务队列。

    • IoTaskQueue:用于存放 I/O 相关的任务队列。

3.3 初始化过程

  • 初始化线程

    • NioEventLoop 构造函数中,会通过传入的 ThreadFactory 创建一个新的线程,并启动该线程。

    • 线程的 run() 方法实际上调用了 NioEventLooprun() 方法。

  • 初始化 Selector

    • NioEventLoop 会在构造函数中创建一个 Selector

    • Selector 被用来监听多个通道的 I/O 事件。

  • 初始化任务队列

    • NioEventLoop 包含了一个 IoTaskQueue 和一个 TaskQueue

    • IoTaskQueue 用于存放 I/O 任务。

    • TaskQueue 用于存放非 I/O 任务。

3.4 运行循环

  • 主循环

    • NioEventLooprun() 方法包含了主循环,在这个循环里,EventLoop 会轮询 Selector 的就绪操作符,并处理就绪的事件。

    • 同时也会处理任务队列中的任务。

4. 总结

EventLoop 的初始化主要是创建并配置好 Selector 和相关线程,以及设置好任务队列,以便能够有效地处理 I/O 事件和执行任务。对于更深入的理解,建议阅读 Netty 的源代码以获得更详细的实现细节。

Netty中EventLoop的延迟启动机制是如何实现的,它 对性能和资源利用率有什么影响? 除了NIOEventLoop之外,Netty中还有哪些其他类型 的EventLoop实现? 如何使用Netty的EventLoop来处理异步操作和事件驱 动编程?

Netty 的 EventLoop 提供了一种非常灵活的方式来处理 I/O 操作和其他异步任务。下面我将逐一解答你的问题。

1. EventLoop 的延迟启动机制

实现原理

EventLoop 的延迟启动机制允许在创建 EventLoop 之后,稍后才真正启动它。这种机制可以通过以下方式实现:

  • 构造函数:在创建 EventLoop 时并不立即启动线程,而是准备好线程和相关资源。

  • 启动方法:通过调用如 start()execute() 等方法来显式地启动 EventLoop

影响

  • 性能:延迟启动可以避免不必要的线程创建开销,特别是在不确定是否需要使用某个 EventLoop 的时候。

  • 资源利用率:只在需要时启动 EventLoop 可以节省系统资源,减少内存和 CPU 的消耗。

2. 其他类型的 EventLoop 实现

除了 NioEventLoop 之外,Netty 还支持多种不同的 EventLoop 实现,以适应不同的 I/O 模型:

  • EpollEventLoop:针对 Linux 平台,利用 epoll 代替 select/poll 提供更高性能的 I/O 多路复用。

  • KQueueEventLoop:针对 BSD 系统(包括 macOS),使用 kqueue 进行 I/O 多路复用。

  • DatagramChannelEventLoop:专门处理 UDP 通信。

  • EpollDatagramChannelEventLoop:针对 Linux 平台的 UDP 通信,使用 epoll

  • OioEventLoop:阻塞 I/O 的实现,通常用于测试目的。

3. 使用 EventLoop 处理异步操作

基本用法

  1. 创建 EventLoopGroup

    EventLoopGroup group = new NioEventLoopGroup(); // 或者 EpollEventLoopGroup 等
  2. 注册 Channel

    try (Channel channel = new NioServerSocketChannel(group)) {
        // 配置 Channel
        channel.bind(new InetSocketAddress(8080)).sync();
    }
  3. 提交任务

    group.execute(() -> {
        // 执行非 I/O 任务
        System.out.println("Hello from EventLoop!");
    });
  4. 调度任务

    group.schedule(() -> {
        // 定时执行的任务
        System.out.println("Scheduled task executed!");
    }, 5, TimeUnit.SECONDS);

示例代码

假设你需要使用 EventLoop 来执行一个定时任务和一个 I/O 操作:

public class Example {
​
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
​
        // 提交一个定时任务
        group.schedule(() -> {
            System.out.println("Scheduled task executed after 5 seconds.");
        }, 5, TimeUnit.SECONDS);
​
        // 提交一个非 I/O 任务
        group.execute(() -> {
            System.out.println("Executing non-I/O task.");
        });
​
        // 创建 Channel 并绑定到端口
        try (Channel ch = new NioServerSocketChannel(group)) {
            ch.bind(new InetSocketAddress(8080)).sync();
            // 等待关闭
            ch.closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

在这个示例中,我们首先创建了一个 NioEventLoopGroup,然后向其中提交了一个定时任务和一个非 I/O 任务。最后,我们创建了一个 Channel 并将其绑定到了一个端口上。

4. 总结

  • 延迟启动机制:提高了资源利用率和性能。

  • 不同实现:Netty 支持多种 EventLoop 实现,可以根据操作系统选择最合适的实现。

  • 异步操作:通过提交任务到 EventLoop,可以轻松实现异步编程模型。

希望这些信息能帮助你更好地理解 Netty 中 EventLoop 的工作原理及其在异步编程中的作用!

1. 概述

本文我们分享 EventLoop 的具体代码实现。因为 EventLoop 涉及的代码量较大,所以笔者会分成好几篇文章分别分享。而本文,我们来分享 EventLoop 的初始化。

但是要将 EventLoop 拆出“初始化”部分的内容,笔者又觉得是件非常困难的事情。所以本文希望能达到如下的效果:

  1. 理解 EventLoop 有哪些属性

  2. 创建 EventLoop 的过程

  3. Channel 注册到 EventLoop 的过程

  4. EventLoop 的任务提交。

    • 虽然任务的提交,比较接近任务的执行,但是考虑到胖友可以更容易的理解 EventLoop ,所以放在本文。

2. 类结构图

EventLoopGroup 的整体类结构如下图:

  • 红框部分,为 EventLoopGroup 相关的类关系。其他部分,为 EventLoop 相关的类关系

  • 因为我们实际上使用的是 NioEventLoopGroup 和 NioEventLoop ,所以笔者省略了其它相关的类,例如 OioEventLoopGroup、EmbeddedEventLoop 等等。

下面,我们逐层看看每个接口和类的实现代码。

3. EventExecutor

io.netty.util.concurrent.EventExecutor ,继承 EventExecutorGroup 接口,事件执行器接口。代码如下:

// ========== 实现自 EventExecutorGroup 接口 ==========

/**
 * 返回自己
 *
 * Returns a reference to itself.
 */
@Override
EventExecutor next();

// ========== 自定义接口 ==========

/**
 * 所属 EventExecutorGroup
 *
 * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
 */
EventExecutorGroup parent();

/**
 * 当前线程是否在 EventLoop 线程中
 *
 * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
 */
boolean inEventLoop();
/**
 * 指定线程是否是 EventLoop 线程
 *
 * Return {@code true} if the given {@link Thread} is executed in the event loop,
 * {@code false} otherwise.
 */
boolean inEventLoop(Thread thread);

/**
 * 创建一个 Promise 对象
 *
 * Return a new {@link Promise}.
 */
<V> Promise<V> newPromise();
/**
 * 创建一个 ProgressivePromise 对象
 *
 * Create a new {@link ProgressivePromise}.
 */
<V> ProgressivePromise<V> newProgressivePromise();
/**
 * 创建成功结果的 Future 对象
 *
 * Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
 * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
 * every call of blocking methods will just return without blocking.
 */
<V> Future<V> newSucceededFuture(V result);
/**
 * 创建异常的 Future 对象
 *
 * Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
 * will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
 * every call of blocking methods will just return without blocking.
 */
<V> Future<V> newFailedFuture(Throwable cause);
  • 接口定义的方法比较简单,已经添加中文注释,胖友自己看下。

4. OrderedEventExecutor

io.netty.util.concurrent.OrderedEventExecutor ,继承 EventExecutor 接口,有序的事件执行器接口。代码如下:

/**
 * Marker interface for {@link EventExecutor}s that will process all submitted tasks in an ordered / serial fashion.
 */
public interface OrderedEventExecutor extends EventExecutor {
}
  • 没有定义任何方法,仅仅是一个标记接口,表示该执行器会有序 / 串行的方式执行。

5. EventLoop

io.netty.channel.EventLoop ,继承 OrderedEventExecutor 和 EventLoopGroup 接口,EventLoop 接口。代码如下:

/**
 * Will handle all the I/O operations for a {@link Channel} once registered.
 *
 * One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on
 * implementation details and internals.
 *
 */
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {

    @Override
    EventLoopGroup parent();

}
  • #parent() 接口方法,覆写方法的返回类型为 EventLoopGroup 。

  • 接口上的英文注释,意思如下:

    • EventLoop 将会处理注册在其上的 Channel 的所有 IO 操作。

    • 通常,一个 EventLoop 上可以注册不只一个 Channel 。当然,这个也取决于具体的实现。

6. AbstractEventExecutor

io.netty.util.concurrent.AbstractEventExecutor ,实现 EventExecutor 接口,继承 AbstractExecutorService 抽象类,EventExecutor 抽象类。

6.1 构造方法#parent() 接口方法,覆写方法的返回类型为 EventLoopGroup 。

  • 接口上的英文注释,意思如下:

    • EventLoop 将会处理注册在其上的 Channel 的所有 IO 操作。

    • 通常,一个 EventLoop 上可以注册不只一个 Channel 。当然,这个也取决于具体的实现。

6. AbstractEventExecutor

io.netty.util.concurrent.AbstractEventExecutor ,实现 EventExecutor 接口,继承 AbstractExecutorService 抽象类,EventExecutor 抽象类。

6.1 构造方法

/**
 * 所属 EventExecutorGroup
 */
private final EventExecutorGroup parent;
/**
 * EventExecutor 数组。只包含自己,用于 {@link #iterator()}
 */
private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);

protected AbstractEventExecutor() {
    this(null);
}

protected AbstractEventExecutor(EventExecutorGroup parent) {
    this.parent = parent;
}

6.2 parent

#parent() 方法,获得所属 EventExecutorGroup 。代码如下:

@Override
public EventExecutorGroup parent() {
    return parent;
}

6.3 next

#next() 方法,获得自己。代码如下:

@Override
public EventExecutor next() {
    return this;
}

6.4 inEventLoop()

#inEventLoop() 方法,判断当前线程是否在 EventLoop 线程中。代码如下:

@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}
  • 具体的 #inEventLoop(Thread thread) 方法,需要在子类实现。因为 AbstractEventExecutor 类还体现不出它所拥有的线程。

6.5 iterator

#iterator() 方法,代码如下:

具体的 #inEventLoop(Thread thread) 方法,需要在子类实现。因为 AbstractEventExecutor 类还体现不出它所拥有的线程。
6.5 iterator
#iterator() 方法,代码如下:

6.6 newPromise 和 newProgressivePromise

#newPromise()#newProgressivePromise() 方法,分别创建 DefaultPromise 和 DefaultProgressivePromise 对象。代码如下:

@Override
public <V> Promise<V> newPromise() {
    return new DefaultPromise<V>(this);
}

@Override
public <V> ProgressivePromise<V> newProgressivePromise() {
    return new DefaultProgressivePromise<V>(this);
}

6.7 newSucceededFuture 和 newFailedFuture

#newSucceededFuture(V result)#newFailedFuture(Throwable cause) 方法,分别创建成功结果和异常的 Future 对象。代码如下:

@Override
public <V> Future<V> newSucceededFuture(V result) {
    return new SucceededFuture<V>(this, result);
}

@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
    return new FailedFuture<V>(this, cause);
}
  • 创建的 Future 对象,会传入自身作为 EventExecutor ,并传入 resultcause 分别作为成功结果和异常。

6.8 newTaskFor

#newTaskFor(...) 方法,创建 PromiseTask 对象。代码如下:

@Override
protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new PromiseTask<T>(this, runnable, value);
}

@Override
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new PromiseTask<T>(this, callable);
}
  • 创建的 PromiseTask 对象,会传入自身作为 EventExecutor ,并传入 Runnable + Value 或 Callable 作为任务( Task )。

6.9 submit

#submit(...) 方法,提交任务。代码如下:

@Override
public Future<?> submit(Runnable task) {
    return (Future<?>) super.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
    return (Future<T>) super.submit(task, result);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
    return (Future<T>) super.submit(task);
}
  • 每个方法的实现上,是调用父类 AbstractExecutorService 的实现。

6.10 schedule

#schedule(...) 方法,都不支持,交给子类 AbstractScheduledEventExecutor 实现。代码如下:

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    throw new UnsupportedOperationException();
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
    throw new UnsupportedOperationException();
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    throw new UnsupportedOperationException();
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
    throw new UnsupportedOperationException();
}

6.11 safeExecute

#safeExecute(Runnable task) 静态方法,安全的执行任务。代码如下:

protected static void safeExecute(Runnable task) {
    try {
        task.run();
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}
  • 所谓“安全”指的是,当任务执行发生异常时,仅仅打印告警日志。

6.12 shutdown

#shutdown() 方法,关闭执行器。代码如下:

@Override
public Future<?> shutdownGracefully() {
    return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}

@Override
@Deprecated
public List<Runnable> shutdownNow() {
    shutdown();
    return Collections.emptyList();
}
  • 具体的 #shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)#shutdown() 方法的实现,在子类中。

7. AbstractScheduledEventExecutor

io.netty.util.concurrent.AbstractScheduledEventExecutor ,继承 AbstractEventExecutor 抽象类,支持定时任务的 EventExecutor 的抽象类。

详细解析,见 《精尽 Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务》 。

8. SingleThreadEventExecutor

io.netty.util.concurrent.SingleThreadEventExecutor ,实现 OrderedEventExecutor 接口,继承 AbstractScheduledEventExecutor 抽象类,基于单线程的 EventExecutor 抽象类,即一个 EventExecutor 对应一个线程

8.1 构造方法


/**
 * {@link #state} 字段的原子更新器
 */
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
/**
 * {@link #thread} 字段的原子更新器
 */
private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER = AtomicReferenceFieldUpdater.newUpdater(SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");

/**
 * 任务队列
 *
 * @see #newTaskQueue(int)
 */
private final Queue<Runnable> taskQueue;
/**
 * 线程
 */
private volatile Thread thread;
/**
 * 线程属性
 */
@SuppressWarnings("unused")
private volatile ThreadProperties threadProperties;
/**
 * 执行器
 */
private final Executor executor;
/**
 * 线程是否已经打断
 *
 * @see #interruptThread()
 */
private volatile boolean interrupted;

/**
 * TODO 1006 EventLoop 优雅关闭
 */
private final Semaphore threadLock = new Semaphore(0);
/**
 * TODO 1006 EventLoop 优雅关闭
 */
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
/**
 * 添加任务时,是否唤醒线程{@link #thread}
 */
private final boolean addTaskWakesUp;
/**
 * 最大等待执行任务数量,即 {@link #taskQueue} 的队列大小
 */
private final int maxPendingTasks;
/**
 * 拒绝执行处理器
 *
 * @see #reject()
 * @see #reject(Runnable)
 */
private final RejectedExecutionHandler rejectedExecutionHandler;

/**
 * 最后执行时间
 */
private long lastExecutionTime;

/**
 * 状态
 */
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED;

/**
 * TODO 优雅关闭
 */
private volatile long gracefulShutdownQuietPeriod;
/**
 * 优雅关闭超时时间,单位:毫秒 TODO 1006 EventLoop 优雅关闭
 */
private volatile long gracefulShutdownTimeout;
/**
 * 优雅关闭开始时间,单位:毫秒 TODO 1006 EventLoop 优雅关闭
 */
private long gracefulShutdownStartTime;

/**
 * TODO 1006 EventLoop 优雅关闭
 */
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
}

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory,
        boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
    this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
    this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
}

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
  • 属性比较多,我们耐心往下看。

  • taskQueue 属性,任务队列。

    • addTaskWakesUp 属性,添加任务到 taskQueue 队列时,是否唤醒 thread 线程。详细解析,见 「8.11 execute」 。

    • maxPendingTasks 属性,最大等待执行任务数量,即 taskQueue 队列大小。

    • rejectedExecutionHandler 属性,拒绝执行处理器。在 taskQueue 队列超过最大任务数量时,怎么拒绝处理新提交的任务。

  • thread 属性,线程。在 SingleThreadEventExecutor 中,任务是提交到 taskQueue 队列中,而执行在 thread 线程中。

    • threadProperties 属性,线程属性。详细解析,见 「8.15 threadProperties」 。

    • executor 属性,执行器。通过它创建 thread 线程。详细解析,见 「8.11 execute」 。

    • interrupted 属性,线程是否打断。详细解析,详细解析,见 「8.14 interruptThread」 。

    • lastExecutionTime 属性,最后执行时间。

    • state 属性,线程状态。SingleThreadEventExecutor 在实现上,thread 的初始化采用延迟启动的方式,只有在第一个任务时,executor 才会执行并创建该线程,从而节省资源。目前 thread 线程有 5 种状态,代码如下:

private static final int ST_NOT_STARTED = 1; // 未开始
private static final int ST_STARTED = 2; // 已开始
private static final int ST_SHUTTING_DOWN = 3; // 正在关闭中
private static final int ST_SHUTDOWN = 4; // 已关闭
private static final int ST_TERMINATED = 5; // 已经终止

状态变更流程如下图:

  • 构造方法,虽然比较多,但是很简单,胖友自己看下。

8.2 newTaskQueue

#newTaskQueue(int maxPendingTasks) 方法,创建任务队列。代码如下:

/**
 * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
 * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
 * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
 * implementation that does not support blocking operations at all.
 */
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
  • 方法上有一大段注释,简单的说,这个方法默认返回的是 LinkedBlockingQueue 阻塞队列。如果子类有更好的队列选择( 例如非阻塞队列 ),可以重写该方法。在下文,我们会看到它的子类 NioEventLoop ,就重写了这个方法。

8.3 inEventLoop

#inEventLoop(Thread thread) 方法,判断指定线程是否是 EventLoop 线程。代码如下:

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

8.4 offerTask

#offerTask(Runnable task) 方法,添加任务到队列中。若添加失败,则返回 false 。代码如下:

final boolean offerTask(Runnable task) {
    // 关闭时,拒绝任务
    if (isShutdown()) {
        reject();
    }
    // 添加任务到队列
    return taskQueue.offer(task);
}
  • 注意,即使对于 BlockingQueue 的 #offer(E e) 方法,也不是阻塞的

8.5 addTask

#offerTask(Runnable task) 方法,在 #offerTask(Runnable task) 的方法的基础上,若添加任务到队列中失败,则进行拒绝任务。代码如下:

protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    // 添加任务到队列
    if (!offerTask(task)) {
        // 添加失败,则拒绝任务
        reject(task);
    }
}
  • 调用 #reject(task) 方法,拒绝任务。详细解析,见 「8.6 reject」 。

  • 该方法是 void ,无返回值。

8.6 removeTask

#removeTask(Runnable task) 方法,移除指定任务。代码如下:

protected boolean removeTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    return taskQueue.remove(task);
}

8.7 peekTask

#peekTask() 方法,返回队头的任务,但是不移除。代码如下:

protected Runnable peekTask() {
    assert inEventLoop(); // 仅允许在 EventLoop 线程中执行
    return taskQueue.peek();
}

8.8 hasTasks

#hasTasks() 方法,队列中是否有任务。代码如下:

protected boolean hasTasks() {
    assert inEventLoop(); // 仅允许在 EventLoop 线程中执行
    return !taskQueue.isEmpty();
}

8.9 pendingTasks

#pendingTasks() 方法,获得队列中的任务数。代码如下:

public int pendingTasks() {
    return taskQueue.size();
}

8.10 reject

#reject(Runnable task) 方法,拒绝任务。代码如下:

protected final void reject(Runnable task) {
    rejectedExecutionHandler.rejected(task, this);
}
  • 调用 RejectedExecutionHandler#rejected(Runnable task, SingleThreadEventExecutor executor) 方法,拒绝该任务。

#reject() 方法,拒绝任何任务,用于 SingleThreadEventExecutor 已关闭( #isShutdown() 方法返回的结果为 true )的情况。代码如下:

protected static void reject() {
    throw new RejectedExecutionException("event executor terminated");
}

8.10.1 RejectedExecutionHandler

io.netty.util.concurrent.RejectedExecutionHandler ,拒绝执行处理器接口。代码如下:

/**
 * Called when someone tried to add a task to {@link SingleThreadEventExecutor} but this failed due capacity
 * restrictions.
 */
void rejected(Runnable task, SingleThreadEventExecutor executor);

8.10.2 RejectedExecutionHandlers

io.netty.util.concurrent.RejectedExecutionHandlers ,RejectedExecutionHandler 实现类枚举,目前有 2 种实现类。

第一种

private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {

    @Override
    public void rejected(Runnable task, SingleThreadEventExecutor executor) {
        throw new RejectedExecutionException();
    }

};

public static RejectedExecutionHandler reject() {
    return REJECT;
}
  • 通过 #reject() 方法,返回 REJECT 实现类的对象。该实现在拒绝时,直接抛出 RejectedExecutionException 异常。

  • 默认情况下,使用这种实现。

第二种

public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) {
    ObjectUtil.checkPositive(retries, "retries");
    final long backOffNanos = unit.toNanos(backoffAmount);
    return new RejectedExecutionHandler() {
        @Override
        public void rejected(Runnable task, SingleThreadEventExecutor executor) {
            if (!executor.inEventLoop()) { // 非 EventLoop 线程中。如果在 EventLoop 线程中,就无法执行任务,这就导致完全无法重试了。
                // 循环多次尝试添加到队列中
                for (int i = 0; i < retries; i++) {
                    // 唤醒执行器,进行任务执行。这样,就可能执行掉部分任务。
                    // Try to wake up the executor so it will empty its task queue.
                    executor.wakeup(false);

                    // 阻塞等待
                    LockSupport.parkNanos(backOffNanos);
                    // 添加任务
                    if (executor.offerTask(task)) {
                        return;
                    }
                }
            }
            // Either we tried to add the task from within the EventLoop or we was not able to add it even with
            // backoff.
            // 多次尝试添加失败,抛出 RejectedExecutionException 异常
            throw new RejectedExecutionException();
        }
    };
}
  • 通过 #backoff(final int retries, long backoffAmount, TimeUnit unit) 方法,创建带多次尝试添加到任务队列的 RejectedExecutionHandler 实现类。

  • 代码已经添加中文注释,胖友自己理解下,比较简单的。

8.11 execute

#execute(Runnable task) 方法,执行一个任务。但是方法名无法很完整的体现出具体的方法实现,甚至有一些出入,所以我们直接看源码,代码如下:

 1: @Override
 2: public void execute(Runnable task) {
 3:     if (task == null) {
 4:         throw new NullPointerException("task");
 5:     }
 6: 
 7:     // 获得当前是否在 EventLoop 的线程中
 8:     boolean inEventLoop = inEventLoop();
 9:     // 添加到任务队列
10:     addTask(task);
11:     if (!inEventLoop) {
12:         // 创建线程
13:         startThread();
14:         // 若已经关闭,移除任务,并进行拒绝
15:         if (isShutdown() && removeTask(task)) {
16:             reject();
17:         }
18:     }
19: 
20:     // 唤醒线程
21:     if (!addTaskWakesUp && wakesUpForTask(task)) {
22:         wakeup(inEventLoop);
23:     }
24: 
  • 第 8 行:调用 #inEventLoop() 方法,获得当前是否在 EventLoop 的线程中。

  • 第 10 行:调用 #addTask(Runnable task) 方法,添加任务到队列中。

  • 第 11 行:非 EventLoop 的线程

    • 第 13 行:调用 #startThread() 方法,启动 EventLoop 独占的线程,即 thread 属性。详细解析,见 「8.12 startThread」 。

    • 第 14 至 17 行:若已经关闭,则移除任务,并拒绝执行。

  • 第 20 至 23 行:调用 #wakeup(boolean inEventLoop) 方法,唤醒线程。详细解析,见 8.13 wakeup」 。

  • 等等,第 21 行的 !addTaskWakesUp 有点奇怪,不是说好的 addTaskWakesUp 表示“添加任务时,是否唤醒线程”?!但是,怎么使用 ! 取反了。这样反倒变成了,“添加任务时,是否【】唤醒线程”。具体的原因是为什么呢?笔者 Google、Github Netty Issue、和基佬讨论,都未找到解答。目前笔者的理解是:addTaskWakesUp 真正的意思是,“添加任务后,任务是否会自动导致线程唤醒”。为什么呢?

    • 对于 Nio 使用的 NioEventLoop ,它的线程执行任务是基于 Selector 监听感兴趣的事件,所以当任务添加到 taskQueue 队列中时,线程是无感知的,所以需要调用 #wakeup(boolean inEventLoop) 方法,进行主动的唤醒。

    • 对于 Oio 使用的 ThreadPerChannelEventLoop ,它的线程执行是基于 taskQueue 队列监听( 阻塞拉取 )事件和任务,所以当任务添加到 taskQueue 队列中时,线程是可感知的,相当于说,进行被动的唤醒。

调用 #wakesUpForTask(task) 方法,判断该任务是否需要唤醒线程。代码如下:

protected boolean wakesUpForTask(Runnable task) {
    return true;
}
  • 默认返回 true 。在 「9. SingleThreadEventLoop」 中,我们会看到对该方法的重写。

8.12 startThread

#startThread() 方法,启动 EventLoop 独占的线程,即 thread 属性。代码如下:

 1: private void doStartThread() {
 2:     assert thread == null;
 3:     executor.execute(new Runnable() {
 4: 
 5:         @Override
 6:         public void run() {
 7:             // 记录当前线程
 8:             thread = Thread.currentThread();
 9: 
10:             // 如果当前线程已经被标记打断,则进行打断操作。
11:             if (interrupted) {
12:                 thread.interrupt();
13:             }
14: 
15:             boolean success = false; // 是否执行成功
16: 
17:             // 更新最后执行时间
18:             updateLastExecutionTime();
19:             try {
20:                 // 执行任务
21:                 SingleThreadEventExecutor.this.run();
22:                 success = true; // 标记执行成功
23:             } catch (Throwable t) {
24:                 logger.warn("Unexpected exception from an event executor: ", t);
25:             } finally {
26:                 // TODO 1006 EventLoop 优雅关闭
27:                 for (;;) {
28:                     int oldState = state;
29:                     if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
30:                             SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
31:                         break;
32:                     }
33:                 }
34: 
35:                 // TODO 1006 EventLoop 优雅关闭
36:                 // Check if confirmShutdown() was called at the end of the loop.
37:                 if (success && gracefulShutdownStartTime == 0) {
38:                     if (logger.isErrorEnabled()) {
39:                         logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
40:                                 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
41:                                 "be called before run() implementation terminates.");
42:                     }
43:                 }
44: 
45:                 // TODO 1006 EventLoop 优雅关闭
46:                 try {
47:                     // Run all remaining tasks and shutdown hooks.
48:                     for (;;) {
49:                         if (confirmShutdown()) {
50:                             break;
51:                         }
52:                     }
53:                 } finally {
54:                     try {
55:                         cleanup(); // 清理,释放资源
56:                     } finally {
57:                         STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
58:                         threadLock.release();
59:                         if (!taskQueue.isEmpty()) {
60:                             if (logger.isWarnEnabled()) {
61:                                 logger.warn("An event executor terminated with " +
62:                                         "non-empty task queue (" + taskQueue.size() + ')');
63:                             }
64:                         }
65: 
66:                         terminationFuture.setSuccess(null);
67:                     }
68:                 }
69:             }
70:             
71:         }
72:     });
73: }
  • 第 2 行:断言,保证 thread 为空。

  • 第 3 行 至 72 行:调用 Executor#execute(Runnable runnable) 方法,执行任务。下面,我们来详细解析。

  • 第 8 行:赋值当前的线程给 thread 属性。这就是,每个 SingleThreadEventExecutor 独占的线程的创建方式。

  • 第 10 至 13 行:如果当前线程已经被标记打断,则进行打断操作。为什么会有这样的逻辑呢?详细解析,见 「8.14 interruptThread」 。

  • 第 18 行:调用 #updateLastExecutionTime() 方法,更新最后执行时间。代码如下:

/**
 * Updates the internal timestamp that tells when a submitted task was executed most recently.
 * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
 * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
 * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
 * checks.
 */
protected void updateLastExecutionTime() {
    lastExecutionTime = ScheduledFutureTask.nanoTime();
}
  • 英文注释,自己看。😈

  • 第 21 行:调用 SingleThreadEventExecutor#run() 方法,执行任务。详细解析,见 8.X run 。

  • 第 25 至 69 行:TODO 1006 EventLoop 优雅关闭

  • 第 55 行:调用 #cleanup() 方法,清理释放资源。详细解析,见 8.X cleanup 。