用這張圖表示的就是一個基本的Netty框架
通過創建兩個線程池,一個負責接入, 一個負責處理
public class Start { public static void main(String[] args) { //初始化線程 NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); //1 //獲取服務類 ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool); //綁定端口 bootstrap.bind(new InetSocketAddress(80)); System.out.println("start"); } }
NioSelectorRunnablePool 相當於一個線程池操作類
public class NioSelectorRunnablePool { /** * boss 線程數組 */ private final AtomicInteger bossIndex = new AtomicInteger(); private Boss[] bosses; /** * worker線程數組 */ private final AtomicInteger workerIndex = new AtomicInteger(); private Worker[] workers; public NioSelectorRunnablePool(Executor boss, Executor worker) { //初始化boss線程 即接入線程 initBoss(boss, 1); //根據當前核心數*2 初始化處理線程 initWorker(worker, Runtime.getRuntime().availableProcessors() * 2); } /** * 初始化boss線程組 * @param boss * @param count */ private void initBoss(Executor boss, int count) { this.bosses = new NioServerBoss[count]; for (int i = 0; i < bosses.length; i++) { //線程池數組 bosses[i] = new NioServerBoss(boss, "boss thread" + (i+1), this); } } /** * 初始化worker線程 * @param worker worker線程池 * @param count 線程數 */ private void initWorker(Executor worker, int count) { this.workers = new NioServerWorker[count]; for (int i = 0; i < bosses.length; i++) { workers[i] = new NioServerWorker(worker, "worker thread" + (i+1), this); } } /** * 獲取下一個boss線程 * @return */ public Boss nextBoss() { return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; } /** * 獲取下一個work線程 * @return */ public Worker nextWorkr() { return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; } }
初始化兩個線程池 NioServerBoss 和NioServerWorker 兩個類都實現 各自的Boss 和 Worker 接口 繼承 了 AbstractNioSelector 抽象Selector

public interface Boss { /** * 加入一個新的ServerSocket * @param serverChannel */ public void registerAcceptChannelTask(ServerSocketChannel serverChannel); }

public interface Worker { /** * 加入一個新的客戶端會話 * @param channel */ public void registerNewChannelTask(SocketChannel channel); }
/** *@Description 抽象selector線程類 *@autor:mxz *2018-08-17 **/ public abstract class AbstractNioSelector implements Runnable{ /** * 線程池 */ private Executor executor; private String threadName; /** * 選擇器wakenUp狀態標記 */ protected final AtomicBoolean wakenUp = new AtomicBoolean(); /** * 線程管理對象,存儲線程池數組 */ private NioSelectorRunnablePool selectorRunnablePool; protected Selector selector; /** * 任務隊列 */ private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); public AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { this.executor = executor; this.threadName = threadName; this.selectorRunnablePool = selectorRunnablePool; //一個線程 加入一個selector openSelector(); } /** * 獲取線程管理對象 * @return */ protected NioSelectorRunnablePool getselectorRunnablePool() { return this.selectorRunnablePool; } /** * 獲取selector並啟動線程 */ private void openSelector() { try { this.selector = Selector.open(); } catch (IOException e) { throw new RuntimeException("Failed to create a selector"); } //線程池執行該線程 executor.execute(this); } @Override public void run() { Thread.currentThread().setName(this.threadName); while (true) { try { wakenUp.set(false); //當注冊事件到達時,方法返回,否則該方法會一直阻塞 select(selector); //運行任務隊列中的任務 processTaskQueue(); process(selector); } catch (Exception e) { } } } /** * 注冊一個任務並激活selector 重新執行 * @param task */ protected final void registerTask(Runnable task) { taskQueue.add(task); Selector selector = this.selector; if (selector != null) { if (wakenUp.compareAndSet(false, true)) { //會首先喚醒Boss 接入總線線程 喚醒阻塞在selector上的線程, 去做其他事情,例如注冊channel改變interestOps的值 selector.wakeup(); } } else { taskQueue.remove(task); } } /** * */ //執行隊列中的任務 private void processTaskQueue() { for(;;) { final Runnable task = taskQueue.poll(); if (task == null) { break; } task.run(); } } /** * selector抽象方法 * @param selector * @return * @throws IOException */ protected abstract int select(Selector selector) throws IOException; /** * selector的業務處理 * @param selector * @return * @throws IOException */ protected abstract void process(Selector selector) throws IOException; }
執行openSelector() 創建 selector execute 執行線程 執行各自的 select()
public class NioServerBoss extends AbstractNioSelector implements Boss { public NioServerBoss(Executor boss, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(boss, threadName, selectorRunnablePool); } @Override protected int select(Selector selector) throws IOException { return selector.select(); } @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (SelectionKey key : selectedKeys) { selectedKeys.remove(key); ServerSocketChannel server = (ServerSocketChannel) key.channel(); //新客戶端 SocketChannel channel = server.accept(); //設置為非阻塞 channel.configureBlocking(false); //獲取一個worker Worker nextworker = getselectorRunnablePool().nextWorkr(); //注冊新客戶端介入任務給另一個線程任務隊列加入新任務 nextworker.registerNewChannelTask(channel); System.out.println("新客戶連接"); } } @Override public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) { final Selector selector = this.selector; registerTask(new Runnable() { @Override public void run() { try { serverChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } }
public class NioServerWorker extends AbstractNioSelector implements Worker{ public NioServerWorker(Executor worker, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(worker, threadName, selectorRunnablePool); } @Override protected int select(Selector selector) throws IOException { return selector.select(); } @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); //移除 防止重復提交 iterator.remove(); //得到事件發生的socket通道 SocketChannel channel = (SocketChannel) key.channel(); //數據總長度 int ret = 0; boolean failure = true; ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取數據 try { ret = channel.read(buffer); failure = false; } catch (Exception e) { } //判斷是否連接已斷開 if (ret <= 0 || failure) { key.channel(); System.out.println("客戶端已斷開連接"); } else { System.out.println("收到數據:" + new String (buffer.array())); //回 寫數據 ByteBuffer outBuf = ByteBuffer.wrap("收到\n".getBytes()); channel.write(outBuf);// 將消息發送到客戶端 } } }
此時都會selector.selector() 阻塞等待連接
此時再看 bootstrap.bind(new InetSocketAddress(80)); 會調用
public class ServerBootstrap { private NioSelectorRunnablePool selectorRunnablePool; public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) { this.selectorRunnablePool = selectorRunnablePool; } public void bind(final InetSocketAddress localAddress) { try { //獲得一個ServerSocket通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); //設置通道為非阻塞 serverChannel.configureBlocking(false); //將該通道對應的serverSocket綁定到port serverChannel.socket().bind(localAddress); //獲取一個boss線程 Boss nextBoss = selectorRunnablePool.nextBoss(); //向當前boss 線 程注冊一個ServerSocket通道 nextBoss.registerAcceptChannelTask(serverChannel); } catch (Exception e) { // TODO: handle exception } } }
這個時候通過獲取下一個線程注入任務池(其實就一個) 這里可以看AbstractSelector 中的nextBoss 和nextWorker 方法 從線程數組循環拿出線程
這個時候會將當前通道在selector上注冊 OP_ACCEPT 的事件 並將這個任務添加到Taskqueue
/** * 注冊一個任務並激活selector 重新執行 * @param task */ protected final void registerTask(Runnable task) { taskQueue.add(task); Selector selector = this.selector; if (selector != null) { if (wakenUp.compareAndSet(false, true)) { //會首先喚醒Boss 接入總線線程 喚醒阻塞在selector上的線程, 去做其他事情,例如注冊channel改變interestOps的值 selector.wakeup(); } } else { taskQueue.remove(task); } }
這個時候會喚醒selector 不過喚醒的是boss的selector
喚醒后之前的阻塞會繼續往下執行
wakenUp.set(false); //當注冊事件到達時,方法返回,否則該方法會一直阻塞 select(selector); //運行任務隊列中的任務 processTaskQueue(); process(selector);
先執行任務 也就是 channel regiter 到 selector 上
執行 Boss中的 process 的方法
@Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (SelectionKey key : selectedKeys) { selectedKeys.remove(key); ServerSocketChannel server = (ServerSocketChannel) key.channel(); //新客戶端 SocketChannel channel = server.accept(); //設置為非阻塞 channel.configureBlocking(false); //獲取一個worker Worker nextworker = getselectorRunnablePool().nextWorkr(); //注冊新客戶端介入任務給另一個線程任務隊列加入新任務 nextworker.registerNewChannelTask(channel); System.out.println("新客戶連接"); } }
同理再去 走相同的路線 把獲取到的通道綁到 worker的selector上