Netty框架原理


用這張圖表示的就是一個基本的Netty框架

通過創建兩個線程池,一個負責接入, 一個負責處理

public class Start {
    public static void main(String[] args) {
            
        //初始化線程
        NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());   //1
        
        //獲取服務類
        ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);
        
        //綁定端口
        bootstrap.bind(new InetSocketAddress(80));
        
        System.out.println("start");
    }
}
NioSelectorRunnablePool  相當於一個線程池操作類
public class NioSelectorRunnablePool {
    
    /**
     * boss 線程數組
     */
    private final AtomicInteger bossIndex = new AtomicInteger();
    private Boss[] bosses;
    
    /**
     * worker線程數組
     */
    private final AtomicInteger workerIndex = new AtomicInteger();
    private Worker[] workers;

    public NioSelectorRunnablePool(Executor boss, Executor worker) {
        //初始化boss線程  即接入線程 
        initBoss(boss, 1);
        //根據當前核心數*2 初始化處理線程
        initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);
    }

    /**
     * 初始化boss線程組
     * @param boss
     * @param count
     */
    private void initBoss(Executor boss, int count) {
        this.bosses = new NioServerBoss[count];
        for (int i = 0; i < bosses.length; i++) {
            //線程池數組
            bosses[i] = new NioServerBoss(boss, "boss thread" + (i+1), this);
        }
    }

    /**
     * 初始化worker線程
     * @param worker worker線程池
     * @param count 線程數
     */
    private void initWorker(Executor worker, int count) {
        this.workers = new NioServerWorker[count];
        for (int i = 0; i < bosses.length; i++) {
            workers[i] = new NioServerWorker(worker, "worker thread" + (i+1), this);
        }        
    }
    
    /**
     * 獲取下一個boss線程
     * @return
     */
    public Boss nextBoss() {
        return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
    }
    
    /**
     * 獲取下一個work線程
     * @return
     */
    public Worker nextWorkr() {
        return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
    }
}
初始化兩個線程池 NioServerBoss 和NioServerWorker  兩個類都實現 各自的Boss  和 Worker 接口   繼承 了 AbstractNioSelector 抽象Selector
public interface Boss {
    
    /**
     * 加入一個新的ServerSocket
     * @param serverChannel
     */
    public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
Boos
public interface Worker {
    /**
     * 加入一個新的客戶端會話
     * @param channel
     */
    public void registerNewChannelTask(SocketChannel channel);

}
Worker
/**
*@Description 抽象selector線程類
*@autor:mxz
*2018-08-17
**/
public abstract class AbstractNioSelector implements Runnable{
    /**
     * 線程池
     */
    private Executor executor;
    private String threadName;
    
    /**
     * 選擇器wakenUp狀態標記
     */
    protected final AtomicBoolean wakenUp = new AtomicBoolean();
    
    /**
     * 線程管理對象,存儲線程池數組
     */
    private NioSelectorRunnablePool selectorRunnablePool;

    protected Selector selector;

    /**
     * 任務隊列
     */
    private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
    

    public AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
        this.executor = executor;
        this.threadName = threadName;
        this.selectorRunnablePool = selectorRunnablePool;
        //一個線程 加入一個selector
        openSelector();
    }
    
    /**
     * 獲取線程管理對象
     * @return
     */
    protected NioSelectorRunnablePool  getselectorRunnablePool() {
        return this.selectorRunnablePool;
    }
    /**
     * 獲取selector並啟動線程
     */
    private void openSelector() {
        try {
            this.selector = Selector.open();
        } catch (IOException e) {
            throw new RuntimeException("Failed to create a selector");
        }
        //線程池執行該線程
        executor.execute(this);
    }

    @Override
    public void run() {
        Thread.currentThread().setName(this.threadName);
        
        while (true) {
            try {
                wakenUp.set(false);
                //當注冊事件到達時,方法返回,否則該方法會一直阻塞
                select(selector);
                
                //運行任務隊列中的任務
                processTaskQueue();
                
                process(selector);
            } catch (Exception e) {
                
            }
        }
    }
    /**
     * 注冊一個任務並激活selector 重新執行
     * @param task
     */
    protected final void registerTask(Runnable task) {
        taskQueue.add(task);
        
        Selector selector = this.selector;
        
        if (selector != null) {
            if (wakenUp.compareAndSet(false, true)) {
                //會首先喚醒Boss  接入總線線程  喚醒阻塞在selector上的線程, 去做其他事情,例如注冊channel改變interestOps的值
                selector.wakeup();
            }
        } else {
            taskQueue.remove(task);
        }
    }
    
    /**
     *  
     */
    
    //執行隊列中的任務
    private void processTaskQueue() {
        for(;;) {
            final Runnable task = taskQueue.poll();
            if (task == null) {
                break;
            }
            task.run();
        }
    }

    /**
     * selector抽象方法
     * @param selector
     * @return
     * @throws IOException
     */
    protected abstract int select(Selector selector) throws IOException;

    /**
     * selector的業務處理
     * @param selector
     * @return
     * @throws IOException
     */
    protected abstract void process(Selector selector) throws IOException;
}

執行openSelector() 創建 selector  execute 執行線程  執行各自的 select()

public class NioServerBoss extends AbstractNioSelector implements Boss {

    public NioServerBoss(Executor boss, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
        super(boss, threadName, selectorRunnablePool);
    }

    @Override
    protected int select(Selector selector) throws IOException {
        return selector.select();
    }

    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        
        for (SelectionKey key : selectedKeys) {
            selectedKeys.remove(key);
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            //新客戶端
            SocketChannel channel = server.accept();
            //設置為非阻塞
            channel.configureBlocking(false);
            //獲取一個worker
            Worker nextworker = getselectorRunnablePool().nextWorkr();
            //注冊新客戶端介入任務給另一個線程任務隊列加入新任務
            nextworker.registerNewChannelTask(channel);
            System.out.println("新客戶連接");
        }
    }

    @Override
    public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) {
        final Selector selector = this.selector;
        registerTask(new Runnable() {
            @Override
            public void run() {
                try {
                    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            }
        });
    }

}
public class NioServerWorker extends AbstractNioSelector implements Worker{

    public NioServerWorker(Executor worker, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
        super(worker, threadName, selectorRunnablePool);
    }

    @Override
    protected int select(Selector selector) throws IOException {
        return selector.select();
    }

    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            //移除 防止重復提交
            iterator.remove();
            
            //得到事件發生的socket通道
            SocketChannel channel = (SocketChannel) key.channel();

            //數據總長度
            int ret = 0;
            boolean failure = true;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //讀取數據
            try {
                ret = channel.read(buffer);
                failure = false;
            } catch (Exception e) {
            }
            //判斷是否連接已斷開
            if (ret <= 0 || failure) {
                key.channel();
                System.out.println("客戶端已斷開連接");
            } else {
                System.out.println("收到數據:" + new String (buffer.array()));
                
                //回 寫數據
                ByteBuffer outBuf = ByteBuffer.wrap("收到\n".getBytes());
                channel.write(outBuf);// 將消息發送到客戶端
            }
        }
    }

此時都會selector.selector() 阻塞等待連接

 

此時再看 bootstrap.bind(new InetSocketAddress(80));  會調用 

public class ServerBootstrap {

    private NioSelectorRunnablePool selectorRunnablePool;

    public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {
        this.selectorRunnablePool = selectorRunnablePool;
    }

    public void bind(final InetSocketAddress localAddress) {
        try {
            //獲得一個ServerSocket通道
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            //設置通道為非阻塞
            serverChannel.configureBlocking(false);
            //將該通道對應的serverSocket綁定到port
            serverChannel.socket().bind(localAddress);
            
            //獲取一個boss線程
            Boss nextBoss = selectorRunnablePool.nextBoss();
            //向當前boss 線 程注冊一個ServerSocket通道
            nextBoss.registerAcceptChannelTask(serverChannel);
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

} 

這個時候通過獲取下一個線程注入任務池(其實就一個) 這里可以看AbstractSelector 中的nextBoss 和nextWorker 方法 從線程數組循環拿出線程

這個時候會將當前通道在selector上注冊 OP_ACCEPT  的事件 並將這個任務添加到Taskqueue

/**
     * 注冊一個任務並激活selector 重新執行
     * @param task
     */
    protected final void registerTask(Runnable task) {
        taskQueue.add(task);
        
        Selector selector = this.selector;
        
        if (selector != null) {
            if (wakenUp.compareAndSet(false, true)) {
                //會首先喚醒Boss  接入總線線程  喚醒阻塞在selector上的線程, 去做其他事情,例如注冊channel改變interestOps的值
                selector.wakeup();
            }
        } else {
            taskQueue.remove(task);
        }
    }

這個時候會喚醒selector  不過喚醒的是boss的selector

喚醒后之前的阻塞會繼續往下執行

          wakenUp.set(false);
                //當注冊事件到達時,方法返回,否則該方法會一直阻塞
                select(selector);
                
                //運行任務隊列中的任務
                processTaskQueue();
                
                process(selector);

先執行任務  也就是 channel regiter 到 selector 上

執行 Boss中的 process 的方法

    @Override
    protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        
        for (SelectionKey key : selectedKeys) {
            selectedKeys.remove(key);
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            //新客戶端
            SocketChannel channel = server.accept();
            //設置為非阻塞
            channel.configureBlocking(false);
            //獲取一個worker
            Worker nextworker = getselectorRunnablePool().nextWorkr();
            //注冊新客戶端介入任務給另一個線程任務隊列加入新任務
            nextworker.registerNewChannelTask(channel);
            System.out.println("新客戶連接");
        }
    }

同理再去 走相同的路線 把獲取到的通道綁到 worker的selector上

 


 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM