BIO、NIO、AIO 和 Netty的初步認識


要實現網絡機器間的通訊,首先得來看看計算機系統網絡通信的基本原理,在底層層面去看,網絡通信需要做的就是將流從一台計算機傳輸到另外一台計算機,基於傳輸協議和網絡IO來實現,其中傳輸協議比較出名的有tcp、udp等等,tcp、udp都是在基於Socket概念上為某類應用場景而擴展出的傳輸協議,網絡IO,主要有bio、nio、aio三種方式。

1.什么是RPC

RPC全稱為remote procedure call,即遠程過程調用。借助RPC可以做到像本地調用一樣調用遠程服務,是一種進程間的通信方式。RPC並不是一個具體的技術,而是指整個網絡遠程調用過程。

在java中RPC框架比較多,常見的有Hessian、gRPC、Thrift、HSF (High Speed Service Framework)、Dubbo
等,其實對 於RPC框架而言,核心模塊就是通訊和序列化。

一個完整的RPC架構里面包含了四個核心的組件,分別是Client,Client Stub,Server以及Server Stub,這個Stub可以理解為存根。

  • 客戶端(Client),服務的調用方。
  • 客戶端存根(Client Stub),存放服務端的地址消息,再將客戶端的請求參數打包成網絡消息,然后通過網絡遠程發送給服務方。
  • 服務端(Server),真正的服務提供者。
  • 服務端存根(Server Stub),接收客戶端發送過來的消息,將消息解包,並調用本地的方法。

調用過程:

(1) 客戶端(client)以本地調用方式(即以接口的方式)調用服務;
(2) 客戶端存根(client stub)接收到調用后,負責將方法、參數等組裝成能夠進行網絡傳輸的消息體(將消息體對象序列化為二進制);
(3) 客戶端通過sockets將消息發送到服務端;
(4) 服務端存根( server stub)收到消息后進行解碼(將消息對象反序列化);
(5) 服務端存根( server stub)根據解碼結果調用本地的服務;
(6) 本地服務執行並將結果返回給服務端存根( server stub);
(7) 服務端存根( server stub)將返回結果打包成消息(將結果消息對象序列化);
(8) 服務端(server)通過sockets將消息發送到客戶端;
(9) 客戶端存根(client stub)接收到結果消息,並進行解碼(將結果消息發序列化);
(10) 客戶端(client)得到最終結果。

RPC的目標是要把2、3、4、7、8、9這些步驟都封裝起來。無論是何種類型的數據,最終都需要轉換成二進制流在網絡上進行傳輸,數據的發送方需要將對象轉換為二進制流,而數據的接收方則需要把二進制流再恢復為對象。

2.RMI

概述

Java RMI 指的是遠程方法調用 (Remote Method Invocation),是java原生支持的遠程調用 ,采用JRMP(JavaRemote Messageing protocol)作為通信協議,可以認為是純java版本的分布式遠程調用解決方案, RMI主要用於不同虛擬機之間的通信,這些虛擬機可以在不同的主機上、也可以在同一個主機上,這里的通信可以理解為一個虛擬機上的對象調用另一個虛擬機上對象的方法

客戶端:

  • 存根/樁(Stub):遠程對象在客戶端上的代理
  • 遠程引用層(Remote Reference Layer):解析並執行遠程引用協議
  • 傳輸層(Transport):發送調用、傳遞遠程方法參數、接收遠程方法執行結果

服務端:

  • 骨架(Skeleton):讀取客戶端傳遞的方法參數,調用服務器方的實際對象方法,並接收方法執行后的返回值
  • 遠程引用層(Remote Reference Layer):處理遠程引用后向骨架發送遠程方法調用
  • 傳輸層(Transport):監聽客戶端的入站連接,接收並轉發調用到遠程引用層

注冊表(Registry):

  • 以URL形式注冊遠程對象,並向客戶端回復對遠程對象的引用

調用過程:

  • 遠程調用過程

    1.客戶端從遠程服務器的注冊表中查詢並獲取遠程對象引用。
    2.樁對象與遠程對象具有相同的接口和方法列表,當客戶端調用遠程對象時,實際上是由相應的樁對象代理完成的。
    3,遠程引用層在將樁的本地引用轉換為服務器上對象的遠程引用后,再將調用傳遞給傳輸層(Transport),由傳輸層通過TCP協議發送調用;
    4.在服務器端,傳輸層監聽入站連接,它一旦接收到客戶端遠程調用后,就將這個引用轉發給其上層的遠程引用層;
    5.服務器端的遠程引用層將客戶端發送的遠程應用轉換為本地虛擬機的引用后,再將請求傳遞給骨架(Skeleton);
    6.骨架讀取參數,又將請求傳遞給服務器,最后由服務器進行實際的方法調用。

  • 結果返回過程

    1.如果遠程方法調用后有返回值,則服務器將這些結果又沿着“骨架->遠程引用層->傳輸層”向下傳遞;
    2.客戶端的傳輸層接收到返回值后,又沿着“傳輸層->遠程引用層->樁”向上傳遞,然后由樁來反序列化這些返回值,並將最終的結果傳遞給客戶端程序。

開發流程

服務端:

  • 定義Remote子接口,在其內部定義要發布的遠程方法,並且這些方法都要Throws RemoteException;
  • 定義實現遠程接口,並且繼承:UnicastRemoteObject
  • 啟動服務器:依次完成注冊表的啟動和遠程對象綁定

客戶端:

  • 通過符合JRMP規范的URL字符串在注冊表中獲取並強轉成Remote子接口對象
  • 調用這個Remote子接口對象中的某個方法就是為一次遠程方法調用行為

代碼實現

1.創建遠程接口IHelloService

/**
* 遠程服務對象接口必須繼承Remote接口;同時方法必須拋出RemoteExceptino異常
*/
public interface IHelloService extends Remote {

    //1.定義一個sayHello方法
    public  String sayHello(User user) throws RemoteException;
}

其中有一個引用對象User作為參數

/**
* 引用對象應該是可序列化對象,這樣才能在遠程調用的時候:
* 1. 序列化對象 2. 拷貝 3. 在網絡中傳輸
* 4. 服務端反序列化 5. 獲取參數進行方法調用; 
* 這種方式其實是將遠程對象引用傳遞的方式轉化為值傳遞的方式
*/
public class User implements Serializable {
    private String username;
    private int age;

    // 其余代碼省略 ...
}

2.實現遠程服務對象IHelloServiceImpl

/**
* 遠程服務對象實現類寫在服務端;必須繼承UnicastRemoteObject或其子類
**/
public class HelloServiceImpl extends UnicastRemoteObject implements IHelloService {

    /**
    * 手動實現父類的構造方法
	* 因為UnicastRemoteObject的構造方法拋出了RemoteException異常,
	* 因此這里默認的構造方法必須寫,必須聲明拋出RemoteException異常
    * @throws RemoteException
    */
    public HelloServiceImpl() throws RemoteException {
        super();
    }

    //我們自定義的sayHello
    public String sayHello(User user) throws RemoteException {
        System.out.println("this is server , say hello to "+user.getUsername());
        return "success";
    }
}

3.服務端程序RMIServer

public class RMIServer {
    public static void main(String[] args) throws RemoteException, AlreadyBoundException, MalformedURLException {
        // 1.創建一個遠程對象,同時也會創建stub對象、skeleton對象
        IHelloService service = new HelloServiceImpl();

        // 2.本地主機上的遠程對象注冊表Registry的實例,並指定端口為8888
        // 這一步必不可少(Java默認端口是1099),缺少注冊表創建,則無法綁定對象到遠程注冊表上
        LocateRegistry.createRegistry(8888);

        //3.對象的綁定
        //bind方法的參數1:   rmi://ip地址:端口/服務名(協議名rmi可省略)   參數2:綁定的對象
        Naming.bind("//127.0.0.1:8888/rmiserver",service); //將stub引用綁定到服務地址上
    }
}

4.客戶端程序RMIClient

public class RMIClient {
    public static void main(String[] args) throws RemoteException, NotBoundException, MalformedURLException {
        //1.從注冊表中獲取遠程對象 , 強轉
        IHelloService service = (IHelloService) Naming.lookup("//127.0.0.1:8888/rmiserver");

        //2.准備參數
        User user = new User("laowang",18);

        //3.調用遠程方法sayHello
        String message = service.sayHello(user);
        System.out.println(message);
    }
}

客戶端上的IHelloService.java 和 User.java 與服務端代碼相同.

6.啟動服務端程序,再啟動客戶端進行遠程方法調用

3.BIO,NIO,AIO

同步和異步

同步(synchronize)、異步(asychronize)是指應用程序和內核的交互而言的.

  • 同步:指用戶進程觸發IO操作等待或者輪訓的方式查看IO操作是否就緒。
  • 異步:當一個異步進程調用發出之后,調用者不會立刻得到結果。而是在調用發出之后,被調用者通過狀態、通知來通知調用者,或者通過回調函數來處理這個調用。

使用異步IO時,Java將IO讀寫委托給OS處理,需要將數據緩沖區地址和大小傳給OS,OS需要支持異步IO操作

阻塞和非阻塞

阻塞和非阻塞是針對於進程訪問數據的時候,根據IO操作的就緒狀態來采取不同的方式.

簡單點說就是一種讀寫操作方法的實現方式. 阻塞方式下讀取和寫入將一直等待, 而非阻塞方式下,讀取和寫入方法會理解返回一個狀態值.

例子:

老張煮開水。 老張,水壺兩把(普通水壺,簡稱水壺;會響的水壺,簡稱響水壺)。
1 老張把水壺放到火上,站立着等水開。(同步阻塞)
2 老張把水壺放到火上,去客廳看電視,時不時去廚房看看水開沒有。(同步非阻塞)
3 老張把響水壺放到火上,立等水開。(異步阻塞)
4 老張把響水壺放到火上,去客廳看電視,水壺響之前不再去看它了,響了再去拿壺。(異步非阻塞)

BIO

同步阻塞IO服務器實現模式為一個連接一個線程,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理,如果這個連接不做任何事情會造成不必要的線程開銷,當然可以通過線程池機制改善。

服務端代碼:

public class IOServer {
    public static void main(String[] args) throws Exception {

        //首先創建了一個serverSocket
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress("127.0.0.1",8081));
        while (true){
            Socket socket = serverSocket.accept();  //同步阻塞
            new Thread(()->{
                try {
                    byte[] bytes = new byte[1024];
                    int len = socket.getInputStream().read(bytes);  //同步阻塞
                    System.out.println(new String(bytes,0,len));
                    socket.getOutputStream().write(bytes,0,len);
                    socket.getOutputStream().flush();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

客戶端代碼:

public class IOClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("127.0.0.1",8081);
        socket.getOutputStream().write("hello".getBytes());
        socket.getOutputStream().flush();
        System.out.println("server send back data =====");
        byte[] bytes = new byte[1024];
        int len = socket.getInputStream().read(bytes);
        System.out.println(new String(bytes,0,len));
        socket.close();
    }
}

NIO

同步非阻塞IO (non-blocking IO / new io)是指JDK 1.4 及以上版本。服務器實現模式為一個請求一個通道,即客戶端發送的連接請求都會注冊到多路復用器上,多路復用器輪詢到連接有IO請求時才啟動一個線程進行處理。

通道(Channels):NIO 新引入的最重要的抽象是通道的概念。Channel 數據連接的通道。 數據可以從Channel讀到Buffer中,也可以從Buffer 寫到Channel中 。
緩沖區(Buffers):通道channel可以向緩沖區Buffer中寫數據,也可以像buffer中存數據。
選擇器(Selector):使用選擇器,借助單一線程,就可對數量龐大的活動 I/O 通道實時監控和維護。

當一個連接創建后,不會需要對應一個線程,這個連接會被注冊到多路復用器,所以一個連接只需要一個線程即可,所有的連接需要一個線程就可以操作,該線程的多路復用器會輪訓,發現連接有請求時,才開啟一個線程處理。

NIO模型中selector的作用,一條連接來了之后,現在不創建一個while死循環去監聽是否有數據可讀了,而是直接把這條連接注冊到selector上,然后通過檢查這個selector,就可以批量監測出有數據可讀的連接,進而讀取數據。

JDK原生NIO實現服務端:

public class NIOServer extends Thread{

    //1.聲明多路復用器
    private Selector selector;
    //2.定義讀寫緩沖區
    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);

    //3.定義構造方法初始化端口
    public NIOServer(int port) {
        init(port);
    }

    //4.main方法啟動線程
    public static void main(String[] args) {
        new Thread(new  NIOServer(8888)).start();
    }

    //5.初始化
    private void init(int port) {

        try {
            System.out.println("服務器正在啟動......");
            //1)開啟多路復用器
            this.selector = Selector.open();
            //2) 開啟服務通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //3)設置為非阻塞
            serverSocketChannel.configureBlocking(false);
            //4)綁定端口
            serverSocketChannel.bind(new InetSocketAddress(port));
            /**
             * SelectionKey.OP_ACCEPT   —— 接收連接繼續事件,表示服務器監聽到了客戶連接,服務器可以接收這個連接了
             * SelectionKey.OP_CONNECT  —— 連接就緒事件,表示客戶與服務器的連接已經建立成功
             * SelectionKey.OP_READ     —— 讀就緒事件,表示通道中已經有了可讀的數據,可以執行讀操作了(通道目前有數據,可以進行讀操作了)
             * SelectionKey.OP_WRITE    —— 寫就緒事件,表示已經可以向通道寫數據了(通道目前可以用於寫操作)
             */
            //5)注冊,標記服務連接狀態為ACCEPT狀態
            serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
            System.out.println("服務器啟動完畢");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void run(){
        while (true){
            try {
                //1.當有至少一個通道被選中,執行此方法
                this.selector.select();
                //2.獲取選中的通道編號集合
                Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
                //3.遍歷keys
                while (keys.hasNext()) {
                    SelectionKey key = keys.next();
                    //4.當前key需要從動刀集合中移出,如果不移出,下次循環會執行對應的邏輯,造成業務錯亂
                    keys.remove();
                    //5.判斷通道是否有效
                    if (key.isValid()) {
                        try {
                            //6.判斷是否可以連接
                            if (key.isAcceptable()) {
                                accept(key);
                            }
                        } catch (CancelledKeyException e) {
                            //出現異常斷開連接
                            key.cancel();
                        }

                        try {
                            //7.判斷是否可讀
                            if (key.isReadable()) {
                                read(key);
                            }
                        } catch (CancelledKeyException e) {
                            //出現異常斷開連接
                            key.cancel();
                        }

                        try {
                            //8.判斷是否可寫
                            if (key.isWritable()) {
                                write(key);
                            }
                        } catch (CancelledKeyException e) {
                            //出現異常斷開連接
                            key.cancel();
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void accept(SelectionKey key) {
        try {
            //1.當前通道在init方法中注冊到了selector中的ServerSocketChannel
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
            //2.阻塞方法, 客戶端發起后請求返回.
            SocketChannel channel = serverSocketChannel.accept();
            ///3.serverSocketChannel設置為非阻塞
            channel.configureBlocking(false);
            //4.設置對應客戶端的通道標記,設置次通道為可讀時使用
            channel.register(this.selector, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //使用通道讀取數據
    private void read(SelectionKey key) {
        try{
            //清空緩存
            this.readBuffer.clear();
            //獲取當前通道對象
            SocketChannel channel = (SocketChannel) key.channel();
            //將通道的數據(客戶發送的data)讀到緩存中.
            int readLen = channel.read(readBuffer);
            //如果通道中沒有數據
            if(readLen == -1 ){
                //關閉通道
                key.channel().close();
                //關閉連接
                key.cancel();
                return;
            }
            //Buffer中有游標,游標不會重置,需要我們調用flip重置. 否則讀取不一致
            this.readBuffer.flip();
            //創建有效字節長度數組
            byte[] bytes = new byte[readBuffer.remaining()];
            //讀取buffer中數據保存在字節數組
            readBuffer.get(bytes);
            System.out.println("收到了從客戶端 "+ channel.getRemoteAddress() + " :  "+ new String(bytes,"UTF-8"));
            //注冊通道,標記為寫操作
            channel.register(this.selector,SelectionKey.OP_WRITE);

        }catch (Exception e){

        }
    }

    //給通道中寫操作
    private void write(SelectionKey key) {
        //清空緩存
        this.readBuffer.clear();
        //獲取當前通道對象
        SocketChannel channel = (SocketChannel) key.channel();
        //錄入數據
        Scanner scanner = new Scanner(System.in);

        try {
            System.out.println("即將發送數據到客戶端..");
            String line = scanner.nextLine();
            //把錄入的數據寫到Buffer中
            writeBuffer.put(line.getBytes("UTF-8"));
            //重置緩存游標
            writeBuffer.flip();
            channel.write(writeBuffer);
            channel.register(this.selector,SelectionKey.OP_READ);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客戶端:

public class NIOClient {
    public static void main(String[] args) {
        //創建遠程地址
        InetSocketAddress address  = new InetSocketAddress("127.0.0.1",8888);
        SocketChannel channel = null;
        //定義緩存
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        try {
            //開啟通道
            channel = SocketChannel.open();
            //連接遠程遠程服務器
            channel.connect(address);
            Scanner sc = new Scanner(System.in);
            while (true){
                System.out.println("客戶端即將給 服務器發送數據..");
                String line = sc.nextLine();
                if(line.equals("exit")){
                    break;
                }
                //控制台輸入數據寫到緩存
                buffer.put(line.getBytes("UTF-8"));
                //重置buffer 游標
                buffer.flip();
                //數據發送到數據
                channel.write(buffer);
                //清空緩存數據
                buffer.clear();

                //讀取服務器返回的數據
                int readLen = channel.read(buffer);
                if(readLen == -1){
                    break;
                }
                //重置buffer游標
                buffer.flip();
                byte[] bytes = new byte[buffer.remaining()];
                //讀取數據到字節數組
                buffer.get(bytes);
                System.out.println("收到了服務器發送的數據 : "+ new String(bytes,"UTF-8"));
                buffer.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != channel){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

效果:

AIO

異步非阻塞IO。A代表asynchronize。

當有流可以讀時,操作系統會將可以讀的流傳入read方法的緩沖區,並通知應用程序,對於寫操作,OS將write方法的流寫入完畢是操作系統會主動通知應用程序。因此read和write都是異步 的,完成后會調用回調函數。

使用場景:連接數目多且連接比較長(重操作)的架構,比如相冊服務器。重點調用了OS參與並發操作,編程比較復雜。Java7開始支持。

4.Netty

Netty初認識

Netty 是由 JBOSS 提供一個異步的、 基於事件驅動的網絡編程框架

Netty 可以幫助你快速、 簡單的開發出一 個網絡應用, 相當於簡化和流程化了 NIO 的開發過程。 作為當前最流行的 NIO 框架, Netty 在互聯網領域、 大數據分布式計算領域、 游戲行業、 通信行業等獲得了廣泛的應用, 知名的 Elasticsearch 、 Dubbo 框架內部都采用了 Netty。

NIO缺點:

  • NIO 的類庫和 API 繁雜,使用麻煩。你需要熟練掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等.
  • 可靠性不強,開發工作量和難度都非常大
  • NIO 的 Bug。例如 Epoll Bug,它會導致 Selector 空輪詢,最終導致 CPU 100%。

Netty優點:

  • 對各種傳輸協議提供統一的 API
  • 高度可定制的線程模型——單線程、一個或多個線程池
  • 更好的吞吐量,更低的等待延遲
  • 更少的資源消耗
  • 最小化不必要的內存拷貝

線程模型

單線程模型:

線程池模型:

Netty模型:

Netty 抽象出兩組線程池, BossGroup 專門負責接收客 戶端連接, WorkerGroup 專門負責網絡讀寫操作。NioEventLoop 表示一個不斷循環執行處理 任務的線程, 每個 NioEventLoop 都有一個 selector, 用於監聽綁定在其上的 socket 網絡通道。 NioEventLoop 內部采用串行化設計, 從消息的讀取->解碼->處理->編碼->發送, 始終由 IO 線 程 NioEventLoop 負責。

Netty核心組件

EventLoopGroup 和其實現類 NioEventLoopGroup

EventLoopGroup 是一組 EventLoop 的抽象, Netty 為了更好的利用多核 CPU 資源, 一般 會有多個 EventLoop同時工作, 每個 EventLoop 維護着一個 Selector 實例。

public NioEventLoopGroup(), 構造方法

public Future<?> shutdownGracefully(), 斷開連接, 關閉線程

ServerBootstrap 和 Bootstrap

ServerBootstrap 是 Netty 中的服務器端啟動助手,通過它可以完成服務器端的各種配置; Bootstrap 是 Netty 中的客戶端啟動助手, 通過它可以完成客戶端的各種配置。

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),該方法用於服務器端, 用來設置兩個 EventLoop

public B group(EventLoopGroup group) , 該方法用於客戶端, 用來設置一個 EventLoop

public B channel(Class<? extends C> channelClass), 該方法用來設置一個服務器端的通道實現

public B option(ChannelOption option, T value), 用來給 ServerChannel 添加配置

public ServerBootstrap childOption(ChannelOption childOption, T value), 用來給接收到的通道添加配置

public ServerBootstrap childHandler(ChannelHandler childHandler), 該方法用來設置業務處理類(自定
義的 handler)

public ChannelFuture bind(int inetPort) , 該方法用於服務器端, 用來設置占用的端口號

public ChannelFuture connect(String inetHost, int inetPort) 該方法用於客戶端, 用來連接服務器端

ChannelPipeline

ChannelPipeline 是一個 Handler 的集合, 它負責處理和攔截 inbound 或者 outbound 的事件和操作, 相當於一個貫穿 Netty 的鏈。

ChannelPipeline addFirst(ChannelHandler... handlers), 把一個業務處理類(handler) 添加到鏈中的第一個位置

ChannelPipeline addLast(ChannelHandler... handlers), 把一個業務處理類(handler) 添加到鏈中的最后一個位置

ChannelHandler 及其實現類

ChannelHandler 接口定義了許多事件處理的方法, 我們可以通過重寫這些方法去實現具 體的業務邏輯。經常需要自定義一個 Handler 類去繼承 ChannelInboundHandlerAdapter, 然后通過重寫相應方法實現業務邏輯,一般都需要重寫下列方法:

public void channelActive(ChannelHandlerContext ctx), 通道就緒事件

public void channelRead(ChannelHandlerContext ctx, Object msg), 通道讀取數據事件

public void channelReadComplete(ChannelHandlerContext ctx) , 數據讀取完畢事件

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause), 通道發生異常事件

ChannelHandlerContext

這 是 事 件 處 理 器 上 下 文 對 象 , Pipeline 鏈 中 的 實 際 處 理 節 點 。 每 個 處 理 節 點ChannelHandlerContext 中 包 含 一 個 具 體 的 事 件 處 理 器 ChannelHandler , 同 時ChannelHandlerContext 中也綁定了對應的 pipeline 和 Channel 的信息,方便對 ChannelHandler 進行調用。

ChannelFuture close(), 關閉通道

ChannelOutboundInvoker flush(), 刷新

ChannelFuture writeAndFlush(Object msg) , 將 數 據 寫 到 ChannelPipeline 中當前ChannelHandler 的下一個 ChannelHandler 開始處理(出站)

ChannelFuture

表示 Channel 中異步 I/O 操作的結果, 在 Netty 中所有的 I/O 操作都是異步的, I/O 的調 用會直接返回, 調用者並不能立刻獲得結果, 但是可以通過 ChannelFuture 來獲取 I/O 操作 的處理狀態。

Channel channel(), 返回當前正在進行 IO 操作的通道
ChannelFuture sync(), 等待異步操作執行完畢

案例實現

需求:使用netty客戶端給服務端發送數據,服務端接收消息打印。

引入依賴:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.6.Final</version>
</dependency>

服務端實現代碼:

// 接收客戶端請求,打印在控制台
public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        //1.創建2個線程池對象,默認線程數為CPU核心數乘2
        //bossGroup 負責接收用戶連接
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        //workGroup 負責處理用戶的io讀寫操作
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        //2.創建服務啟動輔助類:組裝一些必要的組件
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        //3.設置啟動引導類
        //添加到組中,兩個線程池,第一個位置的線程池就負責接收,第二個參數就負責讀寫
        serverBootstrap.group(bossGroup,workGroup)
                //channel方法指定服務器監聽的通道類型:NioServerSocketChannel
                .channel(NioServerSocketChannel.class)
                //設置channel handler,每一個客戶端連接后,給定一個監聽器進行處理
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    //事件監聽Channel通道
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //獲取pipeLine傳輸通道
                        ChannelPipeline pipeline = nioSocketChannel.pipeline();
                        //綁定編碼,在通道上添加對通道的處理器, 該處理器可能還是一個監聽器
                        pipeline.addFirst(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        //綁定我們的業務邏輯,監聽器隊列上添加我們自己的處理方式
                        pipeline.addLast(new SimpleChannelInboundHandler<String>() {
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
                                //獲取入棧信息,打印客戶端傳遞的數據
                                System.out.println(msg);
                            }
                        });
                    }
                });

        //4.啟動引導類綁定監聽端口
        ChannelFuture future = serverBootstrap.bind(9999).sync();

        //5.關閉通道,會阻塞等待直到服務器的channel關閉
        future.channel().closeFuture().sync();
    }
}

客戶端實現代碼:

//客戶端給服務器發送數據
public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //1.創建連接池對象
        NioEventLoopGroup group = new NioEventLoopGroup();

        //2.創建客戶端的啟動引導類 BootStrap
        Bootstrap bootstrap = new Bootstrap();

        //3.配置啟動引導類
        bootstrap.group(group)
                //設置通道為Nio,指定通道類型:NioSocketChannel
                .channel(NioSocketChannel.class)
                //設置Channel初始化監聽
                .handler(new ChannelInitializer<Channel>() {
                    //當前該方法監聽channel是否初始化
                    protected void initChannel(Channel channel) throws Exception {
                        //設置編碼
                        channel.pipeline().addLast(new StringEncoder());
                    }
                });

        //4.使用啟動引導類連接服務器 , 獲取一個channel
        Channel channel = bootstrap.connect("127.0.0.1", 9999).channel();

        //5.循環寫數據給服務器
        while (true) {
           //給服務器寫數據
            channel.writeAndFlush("hello server .. this is client ...");
            Thread.sleep(2000);
        }
    }
}

使用Netty之后,一方面Netty對NIO封裝得如此完美,寫出來的代碼非常優雅,另外一方面,使用Netty之后,網絡通信這塊的性能問題幾乎不用操心。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM