《用Java寫一個通用的服務器程序》02 監聽器


在一個服務器程序中,監聽器的作用類似於公司前台,起引導作用,因此監聽器花在每個新連接上的時間應該盡可能短,這樣才能保證最快響應。

回到編程本身來說:

1. 監聽器最好由單獨的線程運行

2. 監聽器在接到新的連接之后,處理連接的方法需要盡快返回

 

在Java Push Framework中,因為需要同時監聽普通客戶端和服務器監視服務的客戶端,所以定義兩種監聽器:Acceptor和MonitorAcceptor。

由於兩者的關於監聽部分的邏輯是相同的,因此首先定義了抽象類Listener來實現了監視器的功能,把處理socket的部分定義為抽象方法。

// 處理socket的抽象方法
protected abstract boolean handleAcceptedSocket(
            PushClientSocket clientSocket);

 

對於監聽功能的實現比較簡單,還是那三步:create,bind,accept。

    private boolean doListening(InetSocketAddress serverAddr) {
        boolean ret = false;
        int socketBufferSize = getServerImpl().getServerOptions()
                    .getSocketBufferSize();
        int socketType = getServerImpl().getServerOptions()
                    .getSocketType();
        try {
            // Create
            serverSocket = SocketFactory.getDefault().createServerSocket(
                    socketType, socketBufferSize);
            
            // Bind
            serverSocket.bind(serverAddr);

            Debug.debug("Start to listen " + serverAddr.getHostName() + ":"
                    + serverAddr.getPort());
            
            // Accept
            doAccept();
            
            ret = true;
        } catch (IOException e) {
            e.printStackTrace();
            if (serverSocket != null) {
                serverSocket.close();
                serverSocket = null;
            }
        }
        
        return ret;
    }

考慮Java中現在有好幾種不同的socket:同步阻塞Socket,同步非阻塞Socket,以及JDK7新添加的異步Socket,如果直接使用Java的Socket類,不方便在不同類型的socket之間切換使用。所以我自定義了PushServerSocket和PushClientSocket兩個新接口:

// 對於服務器socket來說,只定義了必須的bind和accept,
// 以及一個不會拋出異常的close。
public interface PushServerSocket {

    public void bind(InetSocketAddress serverAddr) throws IOException;

    public PushClientSocket accept() throws IOException;

    public void close();
}
// 客戶端socket接口的定義是C++的風格,因為原來的代碼是C++寫的,這么定義便於翻譯原來的C++代碼
public interface PushClientSocket {

    public String getIP();
    public int getPort();
    
    // 這里直接使用Selector其實是有問題的,注定了只能使用NIO的方式
    // 后面會考慮修改
    public SelectionKey registerSelector(Selector selector, int ops, 
            Object attachment) throws IOException;
    
    public int send(byte[] buffer, int offset, int size) throws IOException;
    
    public int recv(byte[] buffer, int offset, int size) throws IOException;
    
    public boolean isOpen();
    
    public boolean isConnected();
    
    public void close();
}

兩者對應的NIO版本實現是PushServerSocketImpl和PushClientSocketImpl,代碼實現比較簡單,這里就不貼出來了。

回到Listener,來看doAccept:

    private void doAccept()
    {
        // Start a new thread
        acceptorThread = new Thread(new Runnable()  {
            public void run() {
                while (blnRunning) {
                    try {
                        PushClientSocket clientSocket = serverSocket.accept();
                        
                        Debug.debug("New client from " + clientSocket.getIP());

                        // Start servicing the client connection
                        if (!handleAcceptedSocket(clientSocket)) {
                            clientSocket.close();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        return;
                    }
                }
            }
            
        });
        
        // Start the thread
        acceptorThread.start();
    }

這里服務器socket的accept方法實現是阻塞的,這樣可以避免不停地輪詢,因此在用NIO實現accept時要不能調用configureBlocking設置成非阻塞模式。

后面停止監聽時直接調用服務器socket的close方法,accept方法會拋出異常從而跳出循環,結束監聽線程的運行。

結束監聽時不要忘記使用線程的join方法等待線程結束。

    public void stopListening() {
        blnRunning = false;

        // Close server socket
        if (serverSocket != null) {
            serverSocket.close();
            serverSocket = null;
        }
        
        // Wait the thread to terminate
        if (acceptorThread != null) {
            try {
                acceptorThread.join();
            } catch (InterruptedException e) {
                //e.printStackTrace();
            }

            acceptorThread = null;
        }
    }

Acceptor的實現相對復雜一些,需要記錄訪問的信息,做一些檢查,然后再交給ClientFactory處理:

    protected boolean handleAcceptedSocket(PushClientSocket clientSocket) {
        // 記錄日志
        ClientFactory clientFactoryImpl = serverImpl.getClientFactory();
        ServerStats stats = serverImpl.getServerStats();
        ServerOptions options = serverImpl.getServerOptions();

        stats.addToCumul(ServerStats.Measures.VisitorsSYNs, 1);
        // 檢查是否達到最大允許訪問數
        if (clientFactoryImpl.getClientCount() >= options.getMaxConnections()) {
            Debug.debug("Reach maximum clients allowed, deny it");
            return false;
        }

        //檢查IP是否被允許
        if (!clientFactoryImpl.isAddressAllowed(clientSocket.getIP())) {
            Debug.debug("IP refused: " + clientSocket.getIP());
            return false;
        }
        // 處理socket
        return clientFactoryImpl.createPhysicalConnection(clientSocket, 
                false, listenerOptions);
    }

MonitorAcceptor的實現比較簡單,直接交給ClientFactory處理就可以。

    protected boolean handleAcceptedSocket(PushClientSocket clientSocket) {
        return serverImpl.getClientFactory().createPhysicalConnection(
                clientSocket, true, listenerOptions);
    }

關於ClientFactory的處理邏輯后面的文章里細講。

 

實現一個監聽器功能是很容易的,所以可以說的東西不多。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM