前言
Netty系列索引:
IO相關:
上文提到,早期基於線程的網絡模型,處理高並發的能力非常差,隨着請求數量的增多,必須不斷新建線程,隨之帶來的問題就是服務器資源被占滿、上下文切換成本過高以及IO阻塞導致的CPU浪費。
而Netty則使用了經典Reactor模型,並重新進行了封裝,包括EventLoop、EventGroup等。
EventLoopGroup
EventLoopGroup是一個接口,繼承自線程池EventExecutorGroup,並允許注冊channel到自身所持有的EventLoop,同時支持按一定規則獲取下一個EventLoop。
EventLoopGroup的具體實現有很多,下面以DefaultEventLoopGroup為例,簡述一下我的理解
1.ScheduledExecutorService
JDK接口,一個延遲或定時任務的執行器,其實現類ScheduledThreadPoolExecutor主要是利用了延時隊列及設置下次執行時間來實現的,這里不再贅述(可以單獨開個專題0.0)
2.EventExecutorGroup
接口,Netty自定義的一個線程池,負責復用EventExecutor和執行任務
3.EventLoopGroup
核心接口,EventLoopGroup繼承自EventExecutorGroup,代表他是一個線程池。同時他具備將channel注冊到EventExecutorGroup的功能,代表他是一個能夠真正處理Channel的特殊線程池
4.MultithreadEventExecutorGroup(AbstractEventExecutorGroup)
抽象類,實現自EventExecutorGroup接口,提供了一個簡易線程池的實現,其只有一個抽象方法newChild(創建EventExecutor)供子類實現
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args)
4.1 nThreads
private final EventExecutor[] children;
該線程池通過數組存儲線程,入參nThreads指定數組大小,並循環調用newChild創建線程。當創建過程中有異常時,會自動調用已創建完成線程的shutdownGracefully方法,進行優雅關閉
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
4.2 EventExecutorChooserFactory
EventExecutorChooserFactory是一個工廠接口,負責創建EventExecutorChooser
其默認實現DefaultEventExecutorChooserFactory會判斷當前線程數是否2的n次冪,如果是則返回PowerOfTwoEventExecutorChooser,否則返回GenericEventExecutorChooser
4.3 EventExecutorChooser
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
EventExecutorChooser負責根據一定規則從線程池children數組中取得下一個線程
PowerOfTwoEventExecutorChooser:通過&運算,將超出executors.length的位置為0
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
GenericEventExecutorChooser:通過求余運算,獲取有效index
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
可以看出當線程數是2的n次冪時,Netty通過與運算優化了效率
5.MultithreadEventLoopGroup
抽象類,繼承自MultithreadEventExecutorGroup並實現了EventLoopGroup接口,代表此抽象類是一個可以注冊並處理channel的線程池
值得關注的是next方法,他把返回值的類型,進一步限定為EventLoop
public EventLoop next() {
return (EventLoop) super.next();
}
6.DefaultEventLoopGroup
MultithreadEventLoopGroup的一個默認實現
其核心就是實現了newChild方法返回一個EventLoop extends EventExecutor實例
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventLoop(this, executor);
}
7.總結
說白了EventLoopGroup核心方法,register負責將channel與線程池中某一線程綁定,next負責返回下一個線程供調用方執行任務
EventLoop
EventLoop直譯為事件循環,他的職責簡單來說就是綁定一個唯一的線程,去執行或調度被分配的任務。
可見一個EventLoop實例可以為多個channel服務,而為了最大化利用資源,Netty使用池化技術將EventLoop放入EventLoopGroup中管理。
EventLoop的具體實現有很多,先看下DefaultEventLoop的類圖,會發現他和DefaultEventLoopGroup的類圖很像,都繼承了EventLoopGroup接口,但其最大的不同是紅框所示,他還繼承了EventExecutor,下面主要講一下多出來的這部分到底是干了什么
1.EventExecutor
接口,定義了一個事件執行器,主要方法如下
/**
* 直接返回自身
*/
@Override
EventExecutor next();
/**
* 返回所屬線程池
*/
EventExecutorGroup parent();
/**
* 判斷當前線程是否是當前EventLoop綁定的線程
*/
boolean inEventLoop();
/**
* 判斷傳入線程是否是當前EventLoop綁定的線程
*/
boolean inEventLoop(Thread thread);
(還涉及一些Future異步編程的一些東西,太復雜了后續再填坑吧0.0)
2.AbstractScheduledEventExecutor (AbstractEventExecutor)
抽象類,簡單定義了一個支持延遲(定時)任務的執行器
//延時隊列
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
//下一個任務id
long nextTaskId;
重要方法scheduled
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
//如果和執行器綁定的線程一致,直接放入延時隊列中
scheduleFromEventLoop(task);
} else {
//獲取任務最晚執行時間
final long deadlineNanos = task.deadlineNanos();
// task will add itself to scheduled task queue when run if not expired
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
//放入線程池執行
execute(task);
} else {
//與execute類似,但不保證任務會在執行非延遲任務或執行程序關閉之前運行,默認實現只是委托給execute(Runnable)
lazyExecute(task);
// Second hook after scheduling to facilitate race-avoidance
if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK);
}
}
}
return task;
}
3.SingleThreadEventExecutor
抽象類,定義了一個單線程順序執行器
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
//添加到任務隊列
addTask(task);
if (!inEventLoop) {
//啟動線程
startThread();
//如果線程池已經關閉,調用拒絕方法
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
//喚醒線程
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
4.SingleThreadEventLoop
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop
EventLoop的抽象基類,負責在單線程中執行所有被提交的任務,同時具有注冊和處理channle的能力
5.DefaultEventLoop
單線程任務執行器的默認實現,主要就是其實現的run方法
protected void run() {
//循環阻塞的獲取任務,知道被通知關閉
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
6.總結
通過以上分析,不難看出Netty首先定義了自己的線程池(EventExectorGroup)和執行器(EventExector),然后通過繼承的方式定義了線程池(EventLoopGroup)和執行器(EventLoop),從而添加了處理(注冊)channel的能力。
You gotta grab what you can when you can.
機不可失,時不我待。