1. 概述
本文我们分享 EventLoop 的具体代码实现。因为 EventLoop 涉及的代码量较大,所以笔者会分成好几篇文章分别分享。而本文,我们来分享 EventLoop 的初始化。
但是要将 EventLoop 拆出“初始化”部分的内容,笔者又觉得是件非常困难的事情。所以本文希望能达到如下的效果:
- 理解 EventLoop 有哪些属性
- 创建 EventLoop 的过程
- Channel 注册到 EventLoop 的过程
- EventLoop 的任务提交。
- 虽然任务的提交,比较接近任务的执行,但是考虑到胖友可以更容易的理解 EventLoop ,所以放在本文。
2. 类结构图
EventLoopGroup 的整体类结构如下图:
- 红框部分,为 EventLoopGroup 相关的类关系。其他部分,为 EventLoop 相关的类关系。
- 因为我们实际上使用的是 NioEventLoopGroup 和 NioEventLoop ,所以笔者省略了其它相关的类,例如 OioEventLoopGroup、EmbeddedEventLoop 等等。
下面,我们逐层看看每个接口和类的实现代码。
3. EventExecutor
io.netty.util.concurrent.EventExecutor ,继承 EventExecutorGroup 接口,事件执行器接口。代码如下:
// ========== 实现自 EventExecutorGroup 接口 ========== |
- 接口定义的方法比较简单,已经添加中文注释,胖友自己看下。
4. OrderedEventExecutor
io.netty.util.concurrent.OrderedEventExecutor ,继承 EventExecutor 接口,有序的事件执行器接口。代码如下:
/** |
- 没有定义任何方法,仅仅是一个标记接口,表示该执行器会有序 / 串行的方式执行。
5. EventLoop
io.netty.channel.EventLoop ,继承 OrderedEventExecutor 和 EventLoopGroup 接口,EventLoop 接口。代码如下:
/** |
#parent()接口方法,覆写方法的返回类型为 EventLoopGroup 。- 接口上的英文注释,意思如下:
- EventLoop 将会处理注册在其上的 Channel 的所有 IO 操作。
- 通常,一个 EventLoop 上可以注册不只一个 Channel 。当然,这个也取决于具体的实现。
6. AbstractEventExecutor
io.netty.util.concurrent.AbstractEventExecutor ,实现 EventExecutor 接口,继承 AbstractExecutorService 抽象类,EventExecutor 抽象类。
6.1 构造方法
/** |
6.2 parent
#parent() 方法,获得所属 EventExecutorGroup 。代码如下:
|
6.3 next
#next() 方法,获得自己。代码如下:
|
6.4 inEventLoop()
#inEventLoop() 方法,判断当前线程是否在 EventLoop 线程中。代码如下:
|
- 具体的
#inEventLoop(Thread thread)方法,需要在子类实现。因为 AbstractEventExecutor 类还体现不出它所拥有的线程。
6.5 iterator
#iterator() 方法,代码如下:
|
6.6 newPromise 和 newProgressivePromise
#newPromise() 和 #newProgressivePromise() 方法,分别创建 DefaultPromise 和 DefaultProgressivePromise 对象。代码如下:
|
- 我们可以看到,创建的 Promise 对象,都会传入自身作为 EventExecutor 。关于 Promise 相关的,我们在后续文章详细解析。实在想了解,也可以看看 《Netty 源码笔记 —— 第四章 Future 和 Promise》 。
6.7 newSucceededFuture 和 newFailedFuture
#newSucceededFuture(V result) 和 #newFailedFuture(Throwable cause) 方法,分别创建成功结果和异常的 Future 对象。代码如下:
|
- 创建的 Future 对象,会传入自身作为 EventExecutor ,并传入
result或cause分别作为成功结果和异常。
6.8 newTaskFor
#newTaskFor(...) 方法,创建 PromiseTask 对象。代码如下:
|
- 创建的 PromiseTask 对象,会传入自身作为 EventExecutor ,并传入 Runnable + Value 或 Callable 作为任务( Task )。
6.9 submit
#submit(...) 方法,提交任务。代码如下:
|
- 每个方法的实现上,是调用父类 AbstractExecutorService 的实现。
6.10 schedule
#schedule(...) 方法,都不支持,交给子类 AbstractScheduledEventExecutor 实现。代码如下:
|
6.11 safeExecute
#safeExecute(Runnable task) 静态方法,安全的执行任务。代码如下:
protected static void safeExecute(Runnable task) { |
- 所谓“安全”指的是,当任务执行发生异常时,仅仅打印告警日志。
6.12 shutdown
#shutdown() 方法,关闭执行器。代码如下:
|
- 具体的
#shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)和#shutdown()方法的实现,在子类中。
7. AbstractScheduledEventExecutor
io.netty.util.concurrent.AbstractScheduledEventExecutor ,继承 AbstractEventExecutor 抽象类,支持定时任务的 EventExecutor 的抽象类。
详细解析,见 《精尽 Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务》 。
8. SingleThreadEventExecutor
io.netty.util.concurrent.SingleThreadEventExecutor ,实现 OrderedEventExecutor 接口,继承 AbstractScheduledEventExecutor 抽象类,基于单线程的 EventExecutor 抽象类,即一个 EventExecutor 对应一个线程。
8.1 构造方法
/** |
- 属性比较多,我们耐心往下看。
taskQueue属性,任务队列。addTaskWakesUp属性,添加任务到taskQueue队列时,是否唤醒thread线程。详细解析,见 「8.11 execute」 。maxPendingTasks属性,最大等待执行任务数量,即taskQueue队列大小。rejectedExecutionHandler属性,拒绝执行处理器。在taskQueue队列超过最大任务数量时,怎么拒绝处理新提交的任务。
thread属性,线程。在 SingleThreadEventExecutor 中,任务是提交到taskQueue队列中,而执行在thread线程中。threadProperties属性,线程属性。详细解析,见 「8.15 threadProperties」 。executor属性,执行器。通过它创建thread线程。详细解析,见 「8.11 execute」 。interrupted属性,线程是否打断。详细解析,详细解析,见 「8.14 interruptThread」 。lastExecutionTime属性,最后执行时间。state属性,线程状态。SingleThreadEventExecutor 在实现上,thread的初始化采用延迟启动的方式,只有在第一个任务时,executor才会执行并创建该线程,从而节省资源。目前thread线程有 5 种状态,代码如下:private static final int ST_NOT_STARTED = 1; // 未开始
private static final int ST_STARTED = 2; // 已开始
private static final int ST_SHUTTING_DOWN = 3; // 正在关闭中
private static final int ST_SHUTDOWN = 4; // 已关闭
private static final int ST_TERMINATED = 5; // 已经终止
- 构造方法,虽然比较多,但是很简单,胖友自己看下。
8.2 newTaskQueue
#newTaskQueue(int maxPendingTasks) 方法,创建任务队列。代码如下:
/** |
- 方法上有一大段注释,简单的说,这个方法默认返回的是 LinkedBlockingQueue 阻塞队列。如果子类有更好的队列选择( 例如非阻塞队列 ),可以重写该方法。在下文,我们会看到它的子类 NioEventLoop ,就重写了这个方法。
8.3 inEventLoop
#inEventLoop(Thread thread) 方法,判断指定线程是否是 EventLoop 线程。代码如下:
|
8.4 offerTask
#offerTask(Runnable task) 方法,添加任务到队列中。若添加失败,则返回 false 。代码如下:
final boolean offerTask(Runnable task) { |
- 注意,即使对于 BlockingQueue 的
#offer(E e)方法,也不是阻塞的!
8.5 addTask
#offerTask(Runnable task) 方法,在 #offerTask(Runnable task) 的方法的基础上,若添加任务到队列中失败,则进行拒绝任务。代码如下:
protected void addTask(Runnable task) { |
- 调用
#reject(task)方法,拒绝任务。详细解析,见 「8.6 reject」 。 - 该方法是
void,无返回值。
8.6 removeTask
#removeTask(Runnable task) 方法,移除指定任务。代码如下:
protected boolean removeTask(Runnable task) { |
8.7 peekTask
#peekTask() 方法,返回队头的任务,但是不移除。代码如下:
protected Runnable peekTask() { |
8.8 hasTasks
#hasTasks() 方法,队列中是否有任务。代码如下:
protected boolean hasTasks() { |
8.9 pendingTasks
#pendingTasks() 方法,获得队列中的任务数。代码如下:
public int pendingTasks() { |
8.10 reject
#reject(Runnable task) 方法,拒绝任务。代码如下:
protected final void reject(Runnable task) { |
- 调用
RejectedExecutionHandler#rejected(Runnable task, SingleThreadEventExecutor executor)方法,拒绝该任务。
#reject() 方法,拒绝任何任务,用于 SingleThreadEventExecutor 已关闭( #isShutdown() 方法返回的结果为 true )的情况。代码如下:
protected static void reject() { |
8.10.1 RejectedExecutionHandler
io.netty.util.concurrent.RejectedExecutionHandler ,拒绝执行处理器接口。代码如下:
/** |
8.10.2 RejectedExecutionHandlers
io.netty.util.concurrent.RejectedExecutionHandlers ,RejectedExecutionHandler 实现类枚举,目前有 2 种实现类。
第一种
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() { |
- 通过
#reject()方法,返回REJECT实现类的对象。该实现在拒绝时,直接抛出 RejectedExecutionException 异常。 - 默认情况下,使用这种实现。
第二种
public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) { |
- 通过
#backoff(final int retries, long backoffAmount, TimeUnit unit)方法,创建带多次尝试添加到任务队列的 RejectedExecutionHandler 实现类。 - 代码已经添加中文注释,胖友自己理解下,比较简单的。
8.11 execute
#execute(Runnable task) 方法,执行一个任务。但是方法名无法很完整的体现出具体的方法实现,甚至有一些出入,所以我们直接看源码,代码如下:
1: |
- 第 8 行:调用
#inEventLoop()方法,获得当前是否在 EventLoop 的线程中。 - 第 10 行:调用
#addTask(Runnable task)方法,添加任务到队列中。 - 第 11 行:非 EventLoop 的线程
- 第 13 行:调用
#startThread()方法,启动 EventLoop 独占的线程,即thread属性。详细解析,见 「8.12 startThread」 。 - 第 14 至 17 行:若已经关闭,则移除任务,并拒绝执行。
- 第 13 行:调用
第 20 至 23 行:调用
#wakeup(boolean inEventLoop)方法,唤醒线程。详细解析,见 「8.13 wakeup」 。- 等等,第 21 行的
!addTaskWakesUp有点奇怪,不是说好的addTaskWakesUp表示“添加任务时,是否唤醒线程”?!但是,怎么使用!取反了。这样反倒变成了,“添加任务时,是否【不】唤醒线程”。具体的原因是为什么呢?笔者 Google、Github Netty Issue、和基佬讨论,都未找到解答。目前笔者的理解是:addTaskWakesUp真正的意思是,“添加任务后,任务是否会自动导致线程唤醒”。为什么呢?- 对于 Nio 使用的 NioEventLoop ,它的线程执行任务是基于 Selector 监听感兴趣的事件,所以当任务添加到
taskQueue队列中时,线程是无感知的,所以需要调用#wakeup(boolean inEventLoop)方法,进行主动的唤醒。 - 对于 Oio 使用的 ThreadPerChannelEventLoop ,它的线程执行是基于
taskQueue队列监听( 阻塞拉取 )事件和任务,所以当任务添加到taskQueue队列中时,线程是可感知的,相当于说,进行被动的唤醒。 - 感谢闪电侠,证实我的理解是正确的。参见:
- 对于 Nio 使用的 NioEventLoop ,它的线程执行任务是基于 Selector 监听感兴趣的事件,所以当任务添加到
调用
#wakesUpForTask(task)方法,判断该任务是否需要唤醒线程。代码如下:protected boolean wakesUpForTask(Runnable task) {
return true;
}- 默认返回
true。在 「9. SingleThreadEventLoop」 中,我们会看到对该方法的重写。
- 默认返回
- 等等,第 21 行的
8.12 startThread
#startThread() 方法,启动 EventLoop 独占的线程,即 thread 属性。代码如下:
1: private void doStartThread() { |
- 第 2 行:断言,保证
thread为空。 - 第 3 行 至 72 行:调用
Executor#execute(Runnable runnable)方法,执行任务。下面,我们来详细解析。 - 第 8 行:赋值当前的线程给
thread属性。这就是,每个 SingleThreadEventExecutor 独占的线程的创建方式。 - 第 10 至 13 行:如果当前线程已经被标记打断,则进行打断操作。为什么会有这样的逻辑呢?详细解析,见 「8.14 interruptThread」 。
第 18 行:调用
#updateLastExecutionTime()方法,更新最后执行时间。代码如下:/**
* Updates the internal timestamp that tells when a submitted task was executed most recently.
* {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
* usually no need to call this method. However, if you take the tasks manually using {@link #takeTask()} or
* {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
* checks.
*/
protected void updateLastExecutionTime() {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}- 英文注释,自己看。😈
- 第 21 行:调用
SingleThreadEventExecutor#run()方法,执行任务。详细解析,见 8.X run 。 - 第 25 至 69 行:TODO 1006 EventLoop 优雅关闭
- 第 55 行:调用
#cleanup()方法,清理释放资源。详细解析,见 8.X cleanup 。
8.13 wakeup
#wakeup(boolean inEventLoop) 方法,唤醒线程。代码如下:
protected void wakeup(boolean inEventLoop) { |
<1>处的!inEventLoop代码段,判断不在 EventLoop 的线程中。因为,如果在 EventLoop 线程中,意味着线程就在执行中,不必要唤醒。<2>处,调用Queue#offer(E e)方法,添加任务到队列中。而添加的任务是WAKEUP_TASK,代码如下:private static final Runnable WAKEUP_TASK = new Runnable() {
public void run() {
// Do nothing.
}
};- 这是一个空的 Runnable 实现类。仅仅用于唤醒基于
taskQueue阻塞拉取的 EventLoop 实现类。 对于 NioEventLoop 会重写该方法,代码如下:
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}- 通过 NIO Selector 唤醒。
- 这是一个空的 Runnable 实现类。仅仅用于唤醒基于
8.14 interruptThread
#interruptThread() 方法,打断 EventLoop 的线程。代码如下:
protected void interruptThread() { |
- 因为 EventLoop 的线程是延迟启动,所以可能
thread并未创建,此时通过interrupted标记打断。之后在#startThread()方法中,创建完线程后,再进行打断,也就是说,“延迟打断”。
8.15 threadProperties
#threadProperties() 方法,获得 EventLoop 的线程属性。代码如下:
1: public final ThreadProperties threadProperties() { |
- 第 2 至 3 行:获得 ThreadProperties 对象。若不存在,则进行创建 ThreadProperties 对象。
- 第 4 至 5 行:获得 EventLoop 的线程。因为线程是延迟启动的,所以会出现线程为空的情况。若线程为空,则需要进行创建。
- 第 15 行:调用 DefaultThreadProperties 对象。
- 第 16 至 19 行:CAS 修改
threadProperties属性。
- 第 22 行:返回
threadProperties。
8.15.1 ThreadProperties
io.netty.util.concurrent.ThreadProperties ,线程属性接口。代码如下:
Thread.State state(); |
8.15.2 DefaultThreadProperties
DefaultThreadProperties 实现 ThreadProperties 接口,默认线程属性实现类。代码如下:
DefaultThreadProperties 内嵌在 SingleThreadEventExecutor 中。
private static final class DefaultThreadProperties implements ThreadProperties { |
- 我们可以看到,每个实现方法,实际上就是对被包装的线程
t的方法的封装。 - 那为什么
#threadProperties()方法不直接返回thread呢?因为如果直接返回thread,调用方可以调用到该变量的其他方法,这个是我们不希望看到的。
8.16 run
#run() 方法,它是一个抽象方法,由子类实现,如何执行 taskQueue 队列中的任务。代码如下:
protected abstract void run(); |
SingleThreadEventExecutor 提供了很多执行任务的方法,方便子类在实现自定义运行任务的逻辑时:
- [x]
#runAllTasks() - [x]
#runAllTasks(long timeoutNanos) - [x]
#runAllTasksFrom(Queue<Runnable> taskQueue) - [x]
#afterRunningAllTasks() - [x]
#pollTask() - [x]
#pollTaskFrom(Queue<Runnable> taskQueue) #takeTask()#fetchFromScheduledTaskQueue()#delayNanos(long currentTimeNanos)
详细解析,见 《精尽 Netty 源码解析 —— EventLoop(四)之 EventLoop 运行》 。
8.17 cleanup
#cleanup() 方法,清理释放资源。代码如下:
/** |
- 目前该方法为空的。在子类 NioEventLoop 中,我们会看到它覆写该方法,关闭 NIO Selector 对象。
8.18 invokeAll
#invokeAll(...) 方法,在 EventExecutor 中执行多个普通任务。代码如下:
|
调用
#throwIfInEventLoop(String method)方法,判断若在 EventLoop 的线程中调用该方法,抛出 RejectedExecutionException 异常。代码如下:private void throwIfInEventLoop(String method) {
if (inEventLoop()) {
throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
}
}调用父类 AbstractScheduledEventExecutor 的
#invokeAll(tasks, ...)方法,执行多个普通任务。在该方法内部,会调用#execute(Runnable task)方法,执行任务。调用栈如下图:invokeAll => execute 的流程
8.19 invokeAny
和
#invokeAll(...)方法,类似。
#invokeAll(...) 方法,在 EventExecutor 中执行多个普通任务,有一个执行完成即可。代码如下:
|
- 调用
#throwIfInEventLoop(String method)方法,判断若在 EventLoop 的线程中调用该方法,抛出 RejectedExecutionException 异常。 - 调用父类 AbstractScheduledEventExecutor 的
#invokeAny(tasks, ...)方法,执行多个普通任务,有一个执行完成即可。在该方法内部,会调用#execute(Runnable task)方法,执行任务。调用栈如下图:invokeAny => execute 的流程
8.20 shutdown
如下是优雅关闭,我们在 TODO 1006 EventLoop 优雅关闭
#addShutdownHook(final Runnable task)#removeShutdownHook(final Runnable task)#runShutdownHooks()#shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)#shutdown()#terminationFuture()#isShuttingDown()#isShutdown()#isTerminated()#confirmShutdown()#awaitTermination(long timeout, TimeUnit unit)
9. SingleThreadEventLoop
io.netty.channel.SingleThreadEventLoop ,实现 EventLoop 接口,继承 SingleThreadEventExecutor 抽象类,基于单线程的 EventLoop 抽象类,主要增加了 Channel 注册到 EventLoop 上。
9.1 构造方法
/** |
- 新增了一条
tailTasks队列,执行的顺序在taskQueue之后。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(六)之 EventLoop 处理普通任务》 。 - 构造方法比较简单,胖友自己看下就可以了。
9.2 parent
#parent() 方法,获得所属 EventLoopGroup 。代码如下:
|
- 覆盖父类方法,将返回值转换成 EventLoopGroup 类。
9.3 next
#next() 方法,获得自己。代码如下:
|
- 覆盖父类方法,将返回值转换成 EventLoop 类。
9.4 register
#register(Channel channel) 方法,注册 Channel 到 EventLoop 上。代码如下:
|
- 将 Channel 和 EventLoop 创建一个 DefaultChannelPromise 对象。通过这个 DefaultChannelPromise 对象,我们就能实现对异步注册过程的监听。
调用
#register(final ChannelPromise promise)方法,注册 Channel 到 EventLoop 上。代码如下:
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 注册 Channel 到 EventLoop 上
promise.channel().unsafe().register(this, promise);
// 返回 ChannelPromise 对象
return promise;
}- 在方法内部,我们就看到在 《精尽 Netty 源码分析 —— 启动(一)之服务端》 的 「3.14.3 注册 Channel 到 EventLoopGroup」 章节,熟悉的内容,调用
AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)方法,注册 Channel 到 EventLoop 上。
- 在方法内部,我们就看到在 《精尽 Netty 源码分析 —— 启动(一)之服务端》 的 「3.14.3 注册 Channel 到 EventLoopGroup」 章节,熟悉的内容,调用
9.5 hasTasks
#hasTasks() 方法,队列中是否有任务。代码如下:
|
- 基于两个队列来判断是否还有任务。
9.6 pendingTasks
#pendingTasks() 方法,获得队列中的任务数。代码如下:
|
- 计算两个队列的任务之和。
9.7 executeAfterEventLoopIteration
#executeAfterEventLoopIteration(Runnable task) 方法,执行一个任务。但是方法名无法很完整的体现出具体的方法实现,甚至有一些出入,所以我们直接看源码,代码如下:
1: |
- 第 4 至 7 行:SingleThreadEventLoop 关闭时,拒绝任务。
- 第 10 行:调用
Queue#offer(E e)方法,添加任务到队列中。- 第 12 行:若添加失败,调用
#reject(Runnable task)方法,拒绝任务。
- 第 12 行:若添加失败,调用
- 第 15 至 18 行:唤醒线程。
- 第 16 行:SingleThreadEventLoop 重写了
#wakesUpForTask(Runnable task)方法。详细解析,见 「9.9 wakesUpForTask」 。
- 第 16 行:SingleThreadEventLoop 重写了
9.8 removeAfterEventLoopIterationTask
#removeAfterEventLoopIterationTask(Runnable task) 方法,移除指定任务。代码如下:
|
9.9 wakesUpForTask
#wakesUpForTask(task) 方法,判断该任务是否需要唤醒线程。代码如下:
|
- 当任务类型为 NonWakeupRunnable ,则不进行唤醒线程。
9.9.1 NonWakeupRunnable
NonWakeupRunnable 实现 Runnable 接口,用于标记不唤醒线程的任务。代码如下:
/** |
9.10 afterRunningAllTasks
#afterRunningAllTasks() 方法,在运行完所有任务后,执行 tailTasks 队列中的任务。代码如下:
protected void afterRunningAllTasks() { |
- 调用
#runAllTasksFrom(queue)方法,执行tailTasks队列中的所有任务。
10. NioEventLoop
io.netty.channel.nio.NioEventLoop ,继承 SingleThreadEventLoop 抽象类,NIO EventLoop 实现类,实现对注册到其中的 Channel 的就绪的 IO 事件,和对用户提交的任务进行处理。
详细解析,见 《精尽 Netty 源码解析 —— EventLoop(四)之 EventLoop 运行》 。
666. 彩蛋
自顶向下的过了下 EventLoop 相关的类和方法。因为仅涉及 EventLoop 初始化相关的内容,所以对于 EventLoop 运行相关的内容,就不得不省略了。
那么,饥渴难耐的我们,《精尽 Netty 源码解析 —— EventLoop(四)之 EventLoop 运行》 ,走起!
推荐阅读如下文章:
- 永顺 《Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)》 的 「NioEventLoop」 小节。
- Hypercube 《自顶向下深入分析Netty(四)—— EventLoop-2》