java NIO經典實例


服務端:

 

Loader.java

package net.chatroom.server;

public class Loader {

    public static void main(String[] args) {
        Deamon deamon = new Deamon(9999);
        new Thread(deamon).start();
    }

}

 

Util.java

package net.chatroom.server;

import java.nio.charset.Charset;
import java.util.HashSet;

public class Util {

    public static Charset charset = Charset.forName("UTF-8");
    
    // 相當於自定義協議格式,與客戶端協商好
    public static String USER_CONTENT_SPILIT = "#@#";
    
    // 用來記錄在線人數,以及昵稱
    public static HashSet<String> users = new HashSet<String>();
    public static String USER_EXIST = "system message: user exist, please change a name";
}

 

Deamon.java

package net.chatroom.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;

public class Deamon implements Runnable {

    private boolean flag = true;
    

    private ServerSocketChannel serverChannel = null;
    private Selector selector = null;
    /**
     * 記錄進來的所有的客戶端連接
     * */
    private List<SocketChannel> clientChannels = null;

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    public Deamon(int port) {
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.socket().bind(new InetSocketAddress(port));
            selector = Selector.open();
            serverChannel.configureBlocking(false);
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            this.clientChannels = new ArrayList<SocketChannel>();
            System.out.println("Server is listening now...");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
//        System.out.println("server listening..");
        while (this.flag) {
            int num = 0;
            try {
                //此處select()阻塞了線程
                num = selector.select();
            } catch (IOException e) {
                System.out.println("Error while select channel:" + e);
            }
            if (num > 0) {
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    it.remove();
                    if (key.isAcceptable()) {
                        // 監聽到有新的連接則再注冊讀操作
                        this.clientChannels.add(Dealer.accept(selector,
                                serverChannel));
                    } else if (key.isReadable()) {
                        // 監聽到讀操作
                        try {
                            Dealer.read(selector, key, clientChannels);
                        } catch (IOException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("server to close..");
        if (this.serverChannel != null && this.serverChannel.isOpen()) {
            try {
                this.serverChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (this.selector != null && this.selector.isOpen()) {
            try {
                this.selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

 

Dealer.java

package net.chatroom.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.rmi.server.Skeleton;
import java.util.List;
import java.util.Scanner;

public class Dealer {

    public static SocketChannel accept(Selector selector,
            ServerSocketChannel serverChannel) {
        SocketChannel channel = null;
        try {
            channel = serverChannel.accept();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ);

            channel.write(Util.charset.encode("Please input your name."));

        } catch (Exception e) {
            System.out.println("Error while configure socket channel :" + e);
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return channel;
    }

    public static void read(Selector selector, SelectionKey selectionkey,
            List<SocketChannel> clientChannels) throws IOException {
        SocketChannel socketClientChannel = (SocketChannel) selectionkey
                .channel();
        ByteBuffer buffer = ByteBuffer.allocateDirect(6 * 1024);
        StringBuilder content = new StringBuilder();
        int num = 0;
        try {
            // 將客戶端發上來的消息讀到buffer
            //循環將通道中數據讀入buffer
            while (socketClientChannel.read(buffer) > 0) {
                buffer.flip();// 切換成讀
                content.append(Util.charset.decode(buffer));
            }

            System.out.println("num:" + num);
            System.out.println("Server is listening from client :"
                    + socketClientChannel.getRemoteAddress() + " data rev is: "
                    + content);
        } catch (IOException e) {
            /**
             * 如果出現異常,則需要關閉連接。故把num設置為-1,用下面的關閉邏輯來關閉channel
             */
            num = -1;
        }

        if (num >= 0) {
            if (content.length() > 0) {
                String[] arrayContent = content.toString().split(
                        Util.USER_CONTENT_SPILIT);
                // 注冊用戶
                if (arrayContent != null && arrayContent.length == 1) {
                    String name = arrayContent[0];
                    if (Util.users.contains(name)) {
                        socketClientChannel.write(Util.charset
                                .encode(Util.USER_EXIST));
                    } else {
                        Util.users.add(name);
                        int onlineNum = clientChannels.size();
                        String message = "welcome " + name
                                + " to chat room! Online numbers:" + onlineNum;
                        BroadCast2(clientChannels, null, message);
                    }
                }
                // 注冊完了,發送消息
                else if (arrayContent != null && arrayContent.length > 1) {
                    String name = arrayContent[0];
                    String message = content.substring(name.length()
                            + Util.USER_CONTENT_SPILIT.length());
                    message = name + " say: " + message;
                    if (Util.users.contains(name)) {
                        // 不回發給發送此內容的客戶端
                        BroadCast2(clientChannels, socketClientChannel, message);
                    }
                }

                // /**
                // * 把讀到的數據原封不動的下發給客戶端
                // */
                // int length = clientChannels.size();
                // for (int index = 0; index < length; index++) {
                // // 循環所有的客戶端連接,下發數據
                // buffer.flip();
                // try {
                // // 將buffer里的數據再下發給客戶端的通道
                // clientChannels.get(index).write(buffer);
                // } catch (IOException e) {
                // e.printStackTrace();
                // }
                // }
            }
        } else {
            /**
             * 如果未讀到數據,對方關閉了SocketChannel 所以服務器這邊也要關閉
             */
            try {
                socketClientChannel.close();
                int length = clientChannels.size();
                for (int index = 0; index < length; index++) {
                    if (clientChannels.get(index).equals(socketClientChannel)) {
                        // 移除當前接受的通道
                        clientChannels.remove(index);
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    // TODO 要是能檢測下線,就不用這么統計了
    public static int OnlineNum(Selector selector) {
        int res = 0;
        for (SelectionKey key : selector.keys()) {
            Channel targetchannel = key.channel();

            if (targetchannel instanceof SocketChannel) {
                res++;
            }
        }
        return res;
    }

    public void BroadCast(Selector selector, SocketChannel except,
            String content) throws IOException {
        // 廣播數據到所有的SocketChannel中
        for (SelectionKey key : selector.keys()) {
            Channel targetchannel = key.channel();
            // 如果except不為空,不回發給發送此內容的客戶端
            if (targetchannel instanceof SocketChannel
                    && targetchannel != except) {
                SocketChannel dest = (SocketChannel) targetchannel;
                dest.write(Util.charset.encode(content));
            }
        }
    }

    public static void BroadCast2(List<SocketChannel> socketChannels,
            SocketChannel except, String content) throws IOException {
        for (SocketChannel socketChannel : socketChannels) {
            if (!socketChannel.equals(except)) {
                // 除了自己,其它通道都通知
                socketChannel.write(Util.charset.encode(content));
            }
        }
    }

}

 

客戶端:

 

Loader.java

package net.chatroom.client;

import java.util.Scanner;

import net.chatroom.server.Util;

public class Loader {

    public static void main(String[] args) {
        String name = "";
        Deamon deamon = new Deamon("127.0.0.1", 9999);
        new Thread(deamon).start();

        // 在主線程中 從鍵盤讀取數據輸入到服務器端
        Scanner scan = new Scanner(System.in);
        while (scan.hasNextLine()) {
            String line = scan.nextLine();
            if ("".equals(line))
                continue; // 不允許發空消息
            if ("".equals(name)) {
                name = line;
                line = name + Util.USER_CONTENT_SPILIT;
            } else {
                line = name + Util.USER_CONTENT_SPILIT + line;
            }
            deamon.chancelToWrite(Util.charset.encode(line));// sc既能寫也能讀,這邊是寫
        }
    }
}

 

Deamon.java

package net.chatroom.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

import net.chatroom.server.Util;

public class Deamon implements Runnable {
    /**
     * 選擇器,用於監聽注冊在上面的SocketChannel的狀態
     */
    private Selector selector = null;

    /**
     * SocketChannel 用戶發送和接受數據的信道
     */
    private SocketChannel channel = null;

    /**
     * 運行標識。在線程里此標識為false的時候會推出線程
     * 該屬性在ExitCommandListener里通過調用setFlag方法修改,用於通知線程用戶要求退出的程序
     */
    private boolean flag = true;

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    public Deamon(String address, int port) {
        try {
            channel = SocketChannel.open(new InetSocketAddress(address, port));
            channel.configureBlocking(false);
            selector = Selector.open();
            // 客戶端直接注冊讀和寫操作
            channel.register(selector, SelectionKey.OP_READ
                    | SelectionKey.OP_WRITE);

            
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    
    public void chancelToWrite(ByteBuffer buffer){
        try {
            channel.write(buffer);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        System.out.println("client run..");
        while (this.flag) {
            /**
             * 如果可以繼續執行,則在循環體內循環執行監聽選擇操作
             */
            int num = 0;
            try {
                /**
                 * 得到處於可讀或者可寫狀態的SocketChannel對象的個數
                 */
                // 客戶端的select()並不阻塞線程,是因為客戶端一啟動就是SelectionKey.OP_WRITE狀態
//                 System.out.println("client select..");
                num = this.selector.select();

//                 System.out.println("client num:"+num);
            } catch (IOException e) {
                /**
                 * 如果出現異常,則此處應該加上日志打印,然后跳出循環,執行循環體下面的釋放資源操作
                 */
                break;
            }

            if (num > 0) {
                /**
                 * 如果有多個SocketChannel處於可讀或者可寫狀態,則輪詢注冊在Selector上面的SelectionKey
                 */
                Iterator<SelectionKey> keys = selector.selectedKeys()
                        .iterator();
                while (keys.hasNext()) {
                    SelectionKey key = keys.next();
                    /**
                     * 此步操作用於刪除該SelectionKey的被選中狀態
                     */
                    keys.remove();
                    if (key.isReadable()) {
                        System.out.println("client isReadable..");
                        /**
                         * 如果是讀操作,則調用讀操作的處理邏輯
                         */
                        try {
                            Dealer.read((SocketChannel) key.channel());
                        } catch (IOException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    } else if (key.isWritable()) {
                        //客戶端的寫狀態是一直就緒的
                        // System.out.println("client isWritable..");
                        /**
                         * 如果是寫操作,則調用寫操作的處理邏輯
                         */
//                        Dealer.write((SocketChannel) key.channel());
                    }
                }
            }
            
            /*取消關注,多用在多線程的時候
             * key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
             * 
             * 增加關注
             * key.interestOps(key.interestOps() | SelectionKey.OP_READ);
             * */

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        if (this.channel != null && this.channel.isOpen()) {
            /**
             * 關閉SocketChannel
             */
            try {
                this.channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (this.selector != null && this.selector.isOpen()) {
            /**
             * 關閉Selector選擇器
             */
            try {
                this.selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

 

Dealer.java

package net.chatroom.client;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

import net.chatroom.server.Util;

public class Dealer {
    /**
     * 從SocketChannel中讀取信息
     * 
     * @param channel
     * @throws IOException 
     */
    public static void read(SocketChannel channel) throws IOException {

        /**
         * 初始化緩沖區
         */
        ByteBuffer buffer = ByteBuffer.allocateDirect(6 * 1024);
        /**
         * 讀到的字節數
         */
        int num = 0;
        String content = "";
        while ((num = channel.read(buffer)) > 0) {
            buffer.flip();
            content += Util.charset.decode(buffer);
        }
        //若系統發送通知名字已經存在,則需要換個昵稱
        if(Util.USER_EXIST.equals(content)) {
//            name = "";
            System.out.println("name has exists.");
        }
        System.out.println(content);
    }

    /**
     * 想SocketChannel中寫入數據
     * 
     * @param channel
     */
    public static void write(SocketChannel channel) {

//        /**
//         * 從消息隊列中獲取要發送的消息
//         */
//        String msg = MsgQueue.getInstance().get();
//        if (msg == null) {
//            /**
//             * 如果消息隊列中沒有要發送的消息,則返回。
//             */
//            return;
//        }
//        /**
//         * 初始化緩沖區
//         */
//        ByteBuffer buffer = ByteBuffer.allocateDirect(6 * 1024);
//
//        /**
//         * 把消息放到緩沖區中
//         */
//        buffer.put(msg.getBytes());
//
//        /**
//         * 重置緩沖區指針
//         */
//        buffer.flip();
//        try {
//            /**
//             * 把緩沖區中的數據寫到SocketChannel里
//             */
//            channel.write(buffer);
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM