在一個服務器程序中,監聽器的作用類似於公司前台,起引導作用,因此監聽器花在每個新連接上的時間應該盡可能短,這樣才能保證最快響應。
回到編程本身來說:
1. 監聽器最好由單獨的線程運行
2. 監聽器在接到新的連接之后,處理連接的方法需要盡快返回
在Java Push Framework中,因為需要同時監聽普通客戶端和服務器監視服務的客戶端,所以定義兩種監聽器:Acceptor和MonitorAcceptor。
由於兩者的關於監聽部分的邏輯是相同的,因此首先定義了抽象類Listener來實現了監視器的功能,把處理socket的部分定義為抽象方法。
// 處理socket的抽象方法 protected abstract boolean handleAcceptedSocket( PushClientSocket clientSocket);
對於監聽功能的實現比較簡單,還是那三步:create,bind,accept。
private boolean doListening(InetSocketAddress serverAddr) { boolean ret = false; int socketBufferSize = getServerImpl().getServerOptions() .getSocketBufferSize(); int socketType = getServerImpl().getServerOptions() .getSocketType(); try { // Create serverSocket = SocketFactory.getDefault().createServerSocket( socketType, socketBufferSize); // Bind serverSocket.bind(serverAddr); Debug.debug("Start to listen " + serverAddr.getHostName() + ":" + serverAddr.getPort()); // Accept doAccept(); ret = true; } catch (IOException e) { e.printStackTrace(); if (serverSocket != null) { serverSocket.close(); serverSocket = null; } } return ret; }
考慮Java中現在有好幾種不同的socket:同步阻塞Socket,同步非阻塞Socket,以及JDK7新添加的異步Socket,如果直接使用Java的Socket類,不方便在不同類型的socket之間切換使用。所以我自定義了PushServerSocket和PushClientSocket兩個新接口:
// 對於服務器socket來說,只定義了必須的bind和accept, // 以及一個不會拋出異常的close。 public interface PushServerSocket { public void bind(InetSocketAddress serverAddr) throws IOException; public PushClientSocket accept() throws IOException; public void close(); }
// 客戶端socket接口的定義是C++的風格,因為原來的代碼是C++寫的,這么定義便於翻譯原來的C++代碼 public interface PushClientSocket { public String getIP(); public int getPort(); // 這里直接使用Selector其實是有問題的,注定了只能使用NIO的方式 // 后面會考慮修改 public SelectionKey registerSelector(Selector selector, int ops, Object attachment) throws IOException; public int send(byte[] buffer, int offset, int size) throws IOException; public int recv(byte[] buffer, int offset, int size) throws IOException; public boolean isOpen(); public boolean isConnected(); public void close(); }
兩者對應的NIO版本實現是PushServerSocketImpl和PushClientSocketImpl,代碼實現比較簡單,這里就不貼出來了。
回到Listener,來看doAccept:
private void doAccept() { // Start a new thread acceptorThread = new Thread(new Runnable() { public void run() { while (blnRunning) { try { PushClientSocket clientSocket = serverSocket.accept(); Debug.debug("New client from " + clientSocket.getIP()); // Start servicing the client connection if (!handleAcceptedSocket(clientSocket)) { clientSocket.close(); } } catch (IOException e) { e.printStackTrace(); return; } } } }); // Start the thread acceptorThread.start(); }
這里服務器socket的accept方法實現是阻塞的,這樣可以避免不停地輪詢,因此在用NIO實現accept時要不能調用configureBlocking設置成非阻塞模式。
后面停止監聽時直接調用服務器socket的close方法,accept方法會拋出異常從而跳出循環,結束監聽線程的運行。
結束監聽時不要忘記使用線程的join方法等待線程結束。
public void stopListening() { blnRunning = false; // Close server socket if (serverSocket != null) { serverSocket.close(); serverSocket = null; } // Wait the thread to terminate if (acceptorThread != null) { try { acceptorThread.join(); } catch (InterruptedException e) { //e.printStackTrace(); } acceptorThread = null; } }
Acceptor的實現相對復雜一些,需要記錄訪問的信息,做一些檢查,然后再交給ClientFactory處理:
protected boolean handleAcceptedSocket(PushClientSocket clientSocket) { // 記錄日志 ClientFactory clientFactoryImpl = serverImpl.getClientFactory(); ServerStats stats = serverImpl.getServerStats(); ServerOptions options = serverImpl.getServerOptions(); stats.addToCumul(ServerStats.Measures.VisitorsSYNs, 1); // 檢查是否達到最大允許訪問數 if (clientFactoryImpl.getClientCount() >= options.getMaxConnections()) { Debug.debug("Reach maximum clients allowed, deny it"); return false; } //檢查IP是否被允許 if (!clientFactoryImpl.isAddressAllowed(clientSocket.getIP())) { Debug.debug("IP refused: " + clientSocket.getIP()); return false; } // 處理socket return clientFactoryImpl.createPhysicalConnection(clientSocket, false, listenerOptions); }
MonitorAcceptor的實現比較簡單,直接交給ClientFactory處理就可以。
protected boolean handleAcceptedSocket(PushClientSocket clientSocket) { return serverImpl.getClientFactory().createPhysicalConnection( clientSocket, true, listenerOptions); }
關於ClientFactory的處理邏輯后面的文章里細講。
實現一個監聽器功能是很容易的,所以可以說的東西不多。