分布式架構從零開始========》【基於Java自身技術實現消息方式的系統間通信】


    基於Java自身包實現消息方式的系統間通信的方式有:TCP/IP+BIO,TCP/IP+NIO,UDP/IP+BIO,UDP/IP+NIO.下面就這4種類型一一做個詳細的介紹:

一.TCP/IP+BIO

    在java中可基於Socket,ServerSocket來實現TCP/IP+BIO的系統間通信。Socket主要用於實現建立連接以及網絡IO的操作,ServerSocket主要用於實現服務器端端口的監聽及Socket對象的獲取。基於Socket實現客戶端的代碼如下:

public class Client {

    /**
     * @param args
     */
    public static void main(String[] args) throws Exception{
        String host="127.0.0.1";
        int port=9527;
        Socket socket=new Socket(host,port);
        BufferedReader in=new BufferedReader(new InputStreamReader(socket.getInputStream()));
        PrintWriter out=new PrintWriter(socket.getOutputStream(),true);
        BufferedReader systemIn=new BufferedReader(new InputStreamReader(System.in));
        boolean flag=true;
        while(flag){
            String command=systemIn.readLine();
            if(command==null || "quit".equalsIgnoreCase(command.trim())){
                flag=false;
                System.out.println("Client quit!");
                out.println("quit");
                out.close();
                in.close();
                socket.close();
                continue;
            }
            out.println(command);
            String response=in.readLine();
            System.out.println(response);
        }
    }
    
}

服務器端代碼如下:

public class Server {

    /**
     * @param args
     */
    public static void main(String[] args) throws Exception{
        int port=9527;
        ServerSocket ss=new ServerSocket(port);
        System.out.println("Server listen on port: "+port);
        Socket socket=ss.accept();
        BufferedReader in=new BufferedReader(new InputStreamReader(socket.getInputStream()));
        PrintWriter out=new PrintWriter(socket.getOutputStream(),true);
        while(true){
            String line=in.readLine();
            if(line==null){
                Thread.sleep(100);
                continue;
            }
            if("quit".equalsIgnoreCase(line.trim())){
                in.close();
                out.close();
                ss.close();
                System.out.println("Server has been shutdown!");
                System.exit(0);
            }
            else{
                System.out.println("Message from client: "+ line);
                out.println("Server response:"+line);
                Thread.sleep(100);
            }
        }
    }
    
}

  上面是基於Socket,ServerSocket實現的一個簡單的系統間通信的例子。而在實際的系統中,通常要面對的是客戶端同時要發送多個請求到服務器端,服務器端則同時要接受多個連接發送的請求,上面的代碼顯然是無法滿足的。

為了滿足客戶端能同時發送多個請求到服務器端,最簡單的方法就是生成多個Socket。但這里會產生兩個問題:一是生成太多的Socket會消耗過多的本地資源,在客戶端機器多,服務器端機器少的情況下,客戶端生成太多Socket會

導致服務器端須要支撐非常高的連接數;二是生成Socket(建立連接)通常是比較慢的,因此頻繁的創建會導致系統性能不足。鑒於這兩個問題,通常采用連接池的方法來維護Socket是比較好的,一方面限制了能創建的Socket的個數;

另一個方面由於將Socket放入了池中,避免了重復創建Socket帶來的性能下降的問題。數據庫連接池就是這種方式的典型代表,但連接池的方式會帶來另一個問題,連接池中的Socket的個數是有限的,但同時要用socket的請求可能會

很多,在這種情況下就會造成激烈的競爭和等待,還有一個需要注意的問題是合理控制等待響應的超時時間,如不設定會導致當服務器端處理變慢時,客戶端相關的請求都在做無限的等待,二客戶端的資源必然是有限的。因此這種情況

下很容易造成服務器端出問題時,客戶端掛掉的現象。超時時間具體設置為多少取決於客戶端能承受的請求量及服務器端的處理時間。既要保證性能,又要保證出錯率不會過高,對於直接基於TCP/IP+BIO的方式,可采用Socket.setSotimeout來設置等待響應的超時時間。

二.TCP/IP+NIO

    在java中可基於java.nio.channels中的Channel和Selector的相關類來實現TCP/IP+NIO方式的系統間通信。Channel有SocketChannel和ServerSocketChannnel兩種,SocketChannel用於建立連接,監聽事件及操作讀寫,

ServerSocketChannel用於監聽端口及監聽連接事件;程序通過Selector來獲取是否有要處理的事件。基於這兩個類實現客戶端代碼如下:

public class Client {

    public static void main(String[] args) throws Exception{
        int port=9527;
        SocketChannel channel=SocketChannel.open();
        channel.configureBlocking(false);
        SocketAddress target=new InetSocketAddress("127.0.0.1",port);
        channel.connect(target);
        Selector selector=Selector.open();
        channel.register(selector, SelectionKey.OP_CONNECT);
        BufferedReader systemIn=new BufferedReader(new InputStreamReader(System.in));
        while(true){
            if(channel.isConnected()){
                String command=systemIn.readLine();
                channel.write(Charset.forName("UTF-8").encode(command));
                if(command==null || "quit".equalsIgnoreCase(command.trim())){
                    systemIn.close();
                    channel.close();
                    selector.close();
                    System.out.println("Client quit!");
                    System.exit(0);
                }
            }
            int nKeys=selector.select(1000);
            if(nKeys>0){
                for (SelectionKey key : selector.selectedKeys()) {
                    if(key.isConnectable()){
                        SocketChannel sc=(SocketChannel) key.channel();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                        sc.finishConnect();
                    }
                    else if(key.isReadable()){
                        ByteBuffer buffer=ByteBuffer.allocate(1024);
                        SocketChannel sc=(SocketChannel) key.channel();
                        int readBytes=0;
                        try{
                            int ret=0;
                            try{
                                while((ret=sc.read(buffer))>0){
                                    readBytes+=ret;
                                }
                            }
                            finally{
                                buffer.flip();
                            }
                            if(readBytes>0){
                                System.out.println(Charset.forName("UTF-8").decode(buffer).toString());
                                buffer = null;
                            }
                        }
                        finally{
                            if(buffer!=null){
                                buffer.clear();
                            }
                        }
                    }
                }
                selector.selectedKeys().clear();
            }
        }
    }
    
}

服務器端代碼如下:

public class Server {

    public static void main(String[] args) throws Exception{
        int port=9527;
        Selector selector=Selector.open();
        ServerSocketChannel ssc=ServerSocketChannel.open();
        ServerSocket serverSocket=ssc.socket();
        serverSocket.bind(new InetSocketAddress(port));
        System.out.println("Server listen on port: "+port);
        ssc.configureBlocking(false);
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        while(true){
            int nKeys=selector.select(1000);
            if(nKeys>0){
                for (SelectionKey key : selector.selectedKeys()) {
                    if(key.isAcceptable()){
                        ServerSocketChannel server=(ServerSocketChannel) key.channel();
                        SocketChannel sc=server.accept();
                        if(sc==null){
                            continue;
                        }
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                    }
                    else if(key.isReadable()){
                        ByteBuffer buffer=ByteBuffer.allocate(1024);
                        SocketChannel sc=(SocketChannel) key.channel();
                        int readBytes=0;
                        String message=null;
                        try{
                            int ret;
                            try{
                                while((ret=sc.read(buffer))>0){
                                    readBytes+=ret;
                                }
                            }
                            catch(Exception e){
                                readBytes=0;
                                // IGNORE
                            }
                            finally{
                                buffer.flip();
                            }
                            if(readBytes>0){
                                message=Charset.forName("UTF-8").decode(buffer).toString();
                                buffer = null;
                            }
                        }
                        finally{
                            if(buffer!=null){
                                buffer.clear();
                            }
                        }
                        if(readBytes>0){
                            System.out.println("Message from client: "+ message);
                            if("quit".equalsIgnoreCase(message.trim())){
                                sc.close();
                                selector.close();
                                System.out.println("Server has been shutdown!");
                                System.exit(0);
                            }
                            String outMessage="Server response:"+message;
                            sc.write(Charset.forName("UTF-8").encode(outMessage));
                        }
                    }
                }
                selector.selectedKeys().clear();
            }
        }
    }
    
}

三.UDP/IP+BIO

服務器端代碼如下:

public class Server {

    /**
     * @param args
     */
    public static void main(String[] args) throws Exception{
        int port=9527;
        int aport=9528;
        DatagramSocket server=new DatagramSocket(port);
        DatagramSocket client=new DatagramSocket();
        InetAddress serverAddress=InetAddress.getByName("localhost");
        byte[] buffer=new byte[65507];
        DatagramPacket packet=new DatagramPacket(buffer,buffer.length);
        while(true){
            server.receive(packet);
            String line=new String(packet.getData(),0,packet.getLength(),"UTF-8");
            if("quit".equalsIgnoreCase(line.trim())){
                server.close();
                System.exit(0);
            }
            else{
                System.out.println("Message from client: "+ line);
                packet.setLength(buffer.length);
                String response="Server response"+line;
                byte[] datas=response.getBytes("UTF-8");
                DatagramPacket responsePacket=new DatagramPacket(datas,datas.length,serverAddress,aport);
                client.send(responsePacket);
                Thread.sleep(100);
            }
        }
    }

}

客戶端代碼如下:

public class Client {

    /**
     * @param args
     */
    public static void main(String[] args) throws Exception{
        int port=9527;
        int aport=9528;
        DatagramSocket serverSocket=new DatagramSocket(aport);
        byte[] buffer=new byte[65507];
        DatagramPacket receivePacket=new DatagramPacket(buffer,buffer.length);
        DatagramSocket socket=new DatagramSocket();
        InetAddress server=InetAddress.getByName("localhost");
        BufferedReader systemIn=new BufferedReader(new InputStreamReader(System.in));
        boolean flag=true;
        while(flag){
            String command=systemIn.readLine();
            byte[] datas=command.getBytes("UTF-8");
            DatagramPacket packet=new DatagramPacket(datas,datas.length,server,port);
            socket.send(packet);
            if(command==null || "quit".equalsIgnoreCase(command.trim())){
                flag=false;
                System.out.println("Client quit!");
                socket.close();
                continue;
            }
            serverSocket.receive(receivePacket);
            String receiveResponse=new String(receivePacket.getData(),0,receivePacket.getLength(),"UTF-8");
            System.out.println(receiveResponse);
        }
    }

}

四.UDP/IP+NIO

服務器端代碼如下:

public class Server {

    public static void main(String[] args) throws Exception{
        int rport=9527;
        int sport=9528;
        
        DatagramChannel sendChannel=DatagramChannel.open();
        sendChannel.configureBlocking(false);
        SocketAddress target=new InetSocketAddress("127.0.0.1",sport);
        sendChannel.connect(target);
        
        DatagramChannel receiveChannel=DatagramChannel.open();
        DatagramSocket serverSocket=receiveChannel.socket();
        serverSocket.bind(new InetSocketAddress(rport));
        System.out.println("Data receive listen on port: "+rport);
        receiveChannel.configureBlocking(false);
        Selector selector=Selector.open();
        receiveChannel.register(selector, SelectionKey.OP_READ);
        while(true){
            int nKeys=selector.select(1000);
            if(nKeys>0){
                for (SelectionKey key : selector.selectedKeys()) {
                    if(key.isReadable()){
                        ByteBuffer buffer=ByteBuffer.allocate(1024);
                        DatagramChannel dc=(DatagramChannel) key.channel();
                        dc.receive(buffer);
                        buffer.flip();
                        String message=Charset.forName("UTF-8").decode(buffer).toString();
                        System.out.println("Message from client: "+ message);
                        if("quit".equalsIgnoreCase(message.trim())){
                            dc.close();
                            selector.close();
                            sendChannel.close();
                            System.out.println("Server has been shutdown!");
                            System.exit(0);
                        }
                        String outMessage="Server response��"+message;
                        sendChannel.write(Charset.forName("UTF-8").encode(outMessage));
                    }
                }
                selector.selectedKeys().clear();
            }
        }
    }
    
}

客戶端代碼如下:

public class Client {

    public static void main(String[] args) throws Exception{
        int rport=9528;
        int sport=9527;
        
        DatagramChannel receiveChannel=DatagramChannel.open();
        receiveChannel.configureBlocking(false);
        DatagramSocket socket=receiveChannel.socket();
        socket.bind(new InetSocketAddress(rport));
        Selector selector=Selector.open();
        receiveChannel.register(selector, SelectionKey.OP_READ);
        
        DatagramChannel sendChannel=DatagramChannel.open();
        sendChannel.configureBlocking(false);
        SocketAddress target=new InetSocketAddress("127.0.0.1",sport);
        sendChannel.connect(target);
        
        BufferedReader systemIn=new BufferedReader(new InputStreamReader(System.in));
        
        while(true){
            String command=systemIn.readLine();
            sendChannel.write(Charset.forName("UTF-8").encode(command));
            if(command==null || "quit".equalsIgnoreCase(command.trim())){
                systemIn.close();
                sendChannel.close();
                selector.close();
                System.out.println("Client quit!");
                System.exit(0);
            }
            int nKeys=selector.select(1000);
            if(nKeys>0){
                for (SelectionKey key : selector.selectedKeys()) {
                    if(key.isReadable()){
                        ByteBuffer buffer=ByteBuffer.allocate(1024);
                        DatagramChannel dc=(DatagramChannel) key.channel();
                        dc.receive(buffer);
                        buffer.flip();
                        System.out.println(Charset.forName("UTF-8").decode(buffer).toString());
                        buffer = null;
                    }
                }
                selector.selectedKeys().clear();
            }
        }
    }
    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM