JAVA NIO Socket通道


 

DatagramChannel和SocketChannel都實現定義讀寫功能,ServerSocketChannel不實現,只負責監聽傳入的連接,並建立新的SocketChannel,本身不傳輸數據。

Socket通道被實例化時都會創建一個對等的socket,通過此方式創建的socket都會有關聯的通道,通過getChannel()獲取。

繼承於 SelectableChannel,所以socket可以在非阻塞模式下運行:

 

Readiness Selection:就緒選擇,查詢通道的機制,該機制可以判斷通道是否准備好執行下一個目標操作(讀,寫...),其價值在於潛在的大量通道可以同時進行就緒檢查,真正的就緒選擇需要由操作系統來做,處理IO請求,並通知各個線程數據准備情況。

Selector選擇器:提供了這種抽象(抽象接口),是的Java代碼能夠以可移植的方式,請求底層操作系統提供這種服務。

Selector選擇器類:管理着一個被注冊的通道集合的信息和他們的狀態,通道和選擇器是一起被注冊的,並且使用選擇器來更新通道狀態。 

一個通道可以被注冊到多個選擇器上,但在每個選擇器上,只能注冊一次。

SelectionKey選擇鍵:封裝了通道和選擇器的注冊關系,選擇鍵被SelectableChannel.register()返回並提供標識這種注冊關系的標記。

通道在被注冊到選擇器之前必須設置為noblocking模式,正常狀態。

 

chanel.register(selector, keystate):通道注冊選擇器。

selector.select():阻塞操作,直到某一個channel的keystate發生。

selectionKey.cancel(),取消注冊關系。 

通道關閉,相關的注冊鍵會自動取消,選擇器關閉,則所有注冊到該選擇器的通道都將被注銷,並且相關的鍵會立刻失效。

selectionkey包含兩個以整數型式進行編碼的比特掩碼,一個用於指示那些通道和選擇器組合所關心的操作,另一個表示通道准備好要執行的操作。當前的interest集合可以通過調用見對象的interestOps()方法來獲取,且永遠不會被選擇器改變,但可以調用interestOps()方法,傳入一個新的比特碼來改變。

readyOpts()獲取相關通道的已就緒的操作,ready集合是interest集合的子集,表示從上次調用select()之后已經就緒的操作。如下:

if((key.readOps() & SelctionKey.OP_READ) != 0){

    buffer.clear();

    key.channel().read(buffer);

    do()....

}

附加參數:attach()

SelectionKey key = SelectableChannel.register(Selector, SelectionKey.OP_XXX, paramObj);

等價:

SelectionKey key = SelectableChannel.register(Selector, SelectionKey.OP_XXX);

key.attach(paramObj);

SelectionKey 多線程應用同步問題。

選擇器:

Selector上的已注冊鍵集合中,會存在失效鍵、null,keys()返回,不可修改。

已選擇鍵集合,selectedKeys()返回,已經准備好的鍵集合,可能為空。

核心:選擇過程,是對select(),poll(),epoll()等本地調用(native call)或者類似的操作系統的本地調用的包裝(抽象),期間,將執行以下過程:

  1. 已取消的鍵的集合將會被檢查,如果非空,則會被從其它兩個集合(已注冊、已選擇)移除,相關通道將會被注銷,鍵被清空。

  2. 已注冊鍵的集合的鍵的interest集合將會被檢查,就緒條件確定,底層操作系統對通道所關心的操作的就緒狀態進行檢查,如果沒有,則阻塞當前(超時值)。

    對於已經就緒的通道執行:

    a. 如果通道的鍵未在已選擇的鍵集合中,那么鍵的reay集合將會被清空,然后設置當前准備好的比特掩碼。

    b. 如果通道的鍵已在已選擇的鍵集合中,鍵的ready集合更新。不再就緒的狀態不會被清除。

  3. select返回的是從上一次select()調用之后進入就緒狀態的通道數量,之前的調用中已經就緒的,並且本次調用中仍然就緒的不會被計入。

使用內部已取消的鍵的集合來延遲注銷,防止線程在取消鍵時阻塞及與正在進行的選擇操作沖突的優化,

三種形式的select: select(), select(timeout),selectNow()(非阻塞,立刻返回當前狀況)。

調用 Selector 對象的 wakeup( )方法將使得選擇器上的第一個還沒有返回的選擇操作立即回。如果當前沒有在進行中的選擇,那么下一次對 select( )方法的一種形式的調用將立即返回。后續的選擇操作將正常進行。在選擇操作之間多次調用 wakeup( )方法與調用它一次沒有什么不同。有時這種延遲的喚醒行為並不是您想要的。您可能只想喚醒一個睡眠中的線程,而使得后續的

選擇繼續正常地進行。您可以通過在調用 wakeup( )方法后調用 selectNow( )方法來繞過這個問題。 

通常的做法是在選擇器上調用一次 select 操作(這將更新已選擇的鍵的集合),然后遍歷 selectKeys( )方法返回的鍵的集合。在按順序進行檢查每個鍵的過程中,相關的通道也根據鍵的就緒集合進行處理。然后鍵將從已選擇的鍵的集合中被移除(通過在 Iterator對象上調用 remove( )方法),然后檢查下一個鍵。完成后,通過再次調用 select( )方法重復這個循環。如下:

package org.windwant.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by windwant on 2016/10/27.
 */
public class SocketChannelOpt {

    private static final String HOST = "localhost";
    private static final int PORT = 8888;

    private static ExecutorService read = Executors.newFixedThreadPool(5);
    private static ExecutorService write = Executors.newFixedThreadPool(5);

    public static void main(String[] args){
        ServerSocketChannel serverSocketChannel = null;
        ServerSocket serverSocket = null;
        Selector selector = null;
        try {
            serverSocketChannel = ServerSocketChannel.open();//工廠方法創建ServerSocketChannel
            serverSocket = serverSocketChannel.socket(); //獲取channel對應的ServerSocket
            serverSocket.bind(new InetSocketAddress(HOST, PORT)); //綁定地址
            serverSocketChannel.configureBlocking(false); //設置ServerSocketChannel非阻塞模式
            selector = Selector.open();//工廠方法創建Selector
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注冊選擇器,接受連接就緒狀態。
            while (true){//循環檢查
                if(selector.select() == 0){//阻塞檢查,當有就緒狀態發生,返回鍵集合
                    continue;
                }

                Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //獲取就緒鍵遍歷對象。
                while (it.hasNext()){
                    SelectionKey selectionKey = it.next();
                    //處理就緒狀態
                    if (selectionKey.isAcceptable()){
                        ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只負責監聽,阻塞,管理,不發送、接收數據
                        SocketChannel socketChannel = schannel.accept();//就緒后的操作,剛到達的socket句柄
                        if(null == socketChannel){
                            continue;
                        }
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ); //告知選擇器關心的通道,准備好讀數據
                    }else if(selectionKey.isReadable()){
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);

                        StringBuilder result = new StringBuilder();
                        while (socketChannel.read(byteBuffer) > 0){//確保讀完
                            byteBuffer.flip();
                            result.append(new String(byteBuffer.array()));
                            byteBuffer.clear();//每次清空 對應上面flip()
                        }

                        System.out.println("server receive: " + result.toString());
                        socketChannel.register(selector, SelectionKey.OP_WRITE);

                    }else if(selectionKey.isWritable()){
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        String sendStr = "server send data: " + Math.random();
                        ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
                        while (send.hasRemaining()){
                            socketChannel.write(send);
                        }
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println(sendStr);
                    }
                    it.remove();
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 

Selector多線程執行,同步需求。

一個線程監控通道的就緒狀態,一個線程池處理業務需求。線程池也可以擴展為不同的業務處理線程池,如日志、業務、心跳。

package org.windwant.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 線程處理讀取,寫出
 * Created by windwant on 2016/10/27.
 */
public class TSocketChannelOpt {

    private static final String HOST = "localhost";
    private static final int PORT = 8888;

    private static ExecutorService read = Executors.newFixedThreadPool(5);
    private static ExecutorService write = Executors.newFixedThreadPool(5);

    public static void main(String[] args){
        ServerSocketChannel serverSocketChannel = null;
        ServerSocket serverSocket = null;
        Selector selector = null;
        try {
            serverSocketChannel = ServerSocketChannel.open();//工廠方法創建ServerSocketChannel
            serverSocket = serverSocketChannel.socket(); //獲取channel對應的ServerSocket
            serverSocket.bind(new InetSocketAddress(HOST, PORT)); //綁定地址
            serverSocketChannel.configureBlocking(false); //設置ServerSocketChannel非阻塞模式
            selector = Selector.open();//工廠方法創建Selector
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注冊選擇器,接受連接就緒狀態。
            while (true){//循環檢查
                if(selector.select() == 0){//阻塞檢查,當有就緒狀態發生,返回鍵集合
                    continue;
                }

                Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //獲取就緒鍵遍歷對象。
                while (it.hasNext()){
                    SelectionKey selectionKey = it.next();
                    it.remove();
                    //處理就緒狀態
                    if (selectionKey.isAcceptable()){
                        ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只負責監聽,阻塞,管理,不發送、接收數據
                        SocketChannel socketChannel = schannel.accept();//就緒后的操作,剛到達的socket句柄
                        if(null == socketChannel){
                            continue;
                        }
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ); //告知選擇器關心的通道,准備好讀數據
                    }else if(selectionKey.isReadable()){
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        read.execute(new MyReadRunnable(socketChannel));

//                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//                        ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);
//
//                        StringBuilder result = new StringBuilder();
//                        while (socketChannel.read(byteBuffer) > 0){//確保讀完
//                            byteBuffer.flip();
//                            result.append(new String(byteBuffer.array()));
//                            byteBuffer.clear();//每次清空 對應上面flip()
//                        }
//
//                        System.out.println("server receive: " + result.toString());
                        socketChannel.register(selector, SelectionKey.OP_WRITE);

                    }else if(selectionKey.isWritable()){
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        write.execute(new MyWriteRunnable(socketChannel));
//                        String sendStr = "server send data: " + Math.random();
//                        ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
//                        while (send.hasRemaining()){
//                            socketChannel.write(send);
//                        }
//                        System.out.println(sendStr);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static class MyReadRunnable implements Runnable {

        private SocketChannel channel;

        public MyReadRunnable(SocketChannel channel){
            this.channel = channel;
        }

        @Override
        public synchronized void  run() {
            ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);

            StringBuilder result = new StringBuilder();
            try {
                while (channel.read(byteBuffer) > 0){//確保讀完
                    byteBuffer.flip();
                    result.append(new String(byteBuffer.array()));
                    byteBuffer.clear();//每次清空 對應上面flip()
                }
                System.out.println("server receive: " + result.toString());
            } catch (IOException e) {
                e.printStackTrace();
            }


        }
    }

    static class MyWriteRunnable implements Runnable {

        private SocketChannel channel;

        public MyWriteRunnable(SocketChannel channel){
            this.channel = channel;
        }

        @Override
        public void run() {
            String sendStr = "server send data: " + Math.random();
            ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
            try {
                while (send.hasRemaining()) {
                    channel.write(send);
                }
                System.out.println(sendStr);
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }
}
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM