BossGroup和WorkerGroup都是NioEventLoopGroup,BossGroup用來處理nio的Accept,Worker處理nio的Read和Write事件
1 構造NioEventLoopGroup
public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
SelectorProvider是java Nio對IO多路復用機制的實現,與OS相關,對於Linux來說,就是利用了linux的epoll
DefaultSelectStrategyFactory
2 構造NioEventLoopGroup的父類MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
DEFAULT_EVENT_LOOP_THREADS 默認為cpu核數 * 2
3 構造MultithreadEventLoopGroup的父類MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());① } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args);② success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children);③ final FutureListener<Object> terminationListener = new FutureListener<Object>() {④ @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
① executor--用來創建線程,默認實現ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } } protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass()/*MultithreadEventExecutorGroup*/); } public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { if (poolName == null) { throw new NullPointerException("poolName"); } if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); } prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = ObjectUtil.checkNotNull(threadGroup, "threadGroup"); } public DefaultThreadFactory(String poolName, boolean daemon, int priority) { this(poolName, daemon, priority, Thread.currentThread().getThreadGroup()); } @Override public Thread newThread(Runnable r) { Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon()) { if (!daemon) { t.setDaemon(false); } } else { if (daemon) { t.setDaemon(true); } } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; }
② newChild--創建EventLoop
// MultithreadEventExecutorGroup children[i] = newChild(executor, args); // NioEventLoopGroup @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
③ chooser--根據EventLoop個數采用power of two方式或是隨機方式,選擇一個Worker EventLoop
④ 對每個EventLoop添加監聽器,何時調用,作用是notifylistener // TODO
3 構造EventLoop--打開一個selector
// NioEventLoopGroup protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; selector = openSelector(); selectStrategy = strategy; } private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } try { SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Class<?> selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); // Ensure the current selector implementation is what we can instrument. if (!selectorImplClass.isAssignableFrom(selector.getClass())) { return selector; } Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); selectedKeys = selectedKeySet; logger.trace("Instrumented an optimized java.util.Set into: {}", selector); } catch (Throwable t) { selectedKeys = null; logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t); } return selector; }
構造NioEventLoop的父類SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); }
構造SingleThreadEventLoop的父類SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); taskQueue = newTaskQueue(); // 一個LinkedBlockingQueue,用來存放Task rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }