Tomcat自7.0.5版本開始支持WebSocket,並且實現了Java WebSocket規范(JSR356 ),而在7.0.5版本之前(7.0.2版本之后)則采用自定義API,即WebSocketServlet。本節我們僅介紹Tomcat針對規范的實現。
根據JSR356的規定,Java WebSocket應用由一系列的WebSocket Endpoint組成。Endpoint是一個Java對象,代表WebSocket鏈接的一端,對於服務端,我們可以視為處理具體WebSocket消息的接口,就像Servlet之於HTTP請求一樣(不同之處在於Endpoint每個鏈接一個實例)。
我們可以通過兩種方式定義Endpoint,第一種是編程式,即繼承類javax.websocket.Endpoint並實現其方法。第二種是注解式,即定義一個POJO對象,為其添加Endpoint相關的注解。
Endpoint實例在WebSocket握手時創建,並在客戶端與服務端鏈接過程中有效,最后在鏈接關閉時結束。Endpoint接口明確定義了與其生命周期相關的方法,規范實現者確保在生命周期的各個階段調用實例的相關方法。
Endpoint的生命周期方法如下:
- onOpen:當開啟一個新的會話時調用。這是客戶端與服務器握手成功后調用的方法。等同於注解@OnOpen。
- onClose:當會話關閉時調用。等同於注解@OnClose。
- onError:當鏈接過程中異常時調用。等同於注解@OnError。
當客戶端鏈接到一個Endpoint時,服務器端會為其創建一個唯一的會話(javax.websocket.Session)。會話在WebSocket握手之后創建,並在鏈接關閉時結束。當生命周期中觸發各個事件時,都會將當前會話傳給Endpoint。
我們通過為Session添加MessageHandler消息處理器來接收消息。當采用注解方式定義Endpoint時,我們還可以通過@OnMessage指定接收消息的方法。發送消息則由RemoteEndpoint完成,其實例由Session維護,根據使用情況,我們可以通過Session.getBasicRemote獲取同步消息發送的實例或者通過Session.getAsyncRemote獲取異步消息發送的實例。
WebSocket通過javax.websocket.WebSocketContainer接口維護應用中定義的所有Endpoint。它在每個Web應用中只有一個實例,類似於傳統Web應用中的ServletContext。
最后,WebSocket規范提供了一個接口javax.websocket.server.ServerApplicationConfig,通過它,我們可以為編程式的Endpoint創建配置(如指定請求地址),還可以過濾只有符合條件的Endpoint提供服務。該接口的實現同樣通過SCI機制加載。
介紹完WebSocket規范中的基本概念,我們看一下Tomcat的具體實現。接下來會涉及到Tomcat鏈接器(Cotyte)和Web應用加載的知識,如不清楚可以閱讀Tomcat官方文檔。
WebSocket加載
Tomcat提供了一個javax.servlet.ServletContainerInitializer的實現類org.apache.tomcat.websocket.server.WsSci。因此Tomcat的WebSocket加載是通過SCI機制完成的。WsSci可以處理的類型有三種:添加了注解@ServerEndpoint的類、Endpoint的子類以及ServerApplicationConfig的實現類。
Web應用啟動時,通過WsSci.onStartup方法完成WebSocket的初始化:
- 構造WebSocketContainer實例,Tomcat提供的實現類為WsServerContainer。在WsServerContainer構造方法中,Tomcat除了初始化配置外,還會為ServletContext添加一個過濾器org.apache.tomcat.websocket.server.WsFilter,它用於判斷當前請求是否為WebSocket請求,以便完成握手。
- 對於掃描到的Endpoint子類和添加了注解@ServerEndpoint的類,如果當前應用存在ServerApplicationConfig實現,則通過ServerApplicationConfig獲取Endpoint子類的配置(ServerEndpointConfig實例,包含了請求路徑等信息)和符合條件的注解類,將結果注冊到WebSocketContainer上,用於處理WebSocket請求。
- 通過ServerApplicationConfig接口我們以編程的方式確定只有符合一定規則的Endpoint可以注冊到WebSocketContainer,而非所有。規范通過這種方式為我們提供了一種定制化機制。
- 如果當前應用沒有定義ServerApplicationConfig的實現類,那么WsSci默認只將所有掃描到的注解式Endpoint注冊到WebSocketContainer。因此,如果采用可編程方式定義Endpoint,那么必須添加ServerApplicationConfig實現。
WebSocket請求處理
當服務器接收到來自客戶端的請求時,首先WsFilter會判斷該請求是否是一個WebSocket Upgrade請求(即包含Upgrade: websocket頭信息)。如果是,則根據請求路徑查找對應的Endpoint處理類,並進行協議Upgrade。
在協議Upgrade過程中,除了檢測WebSocket擴展、添加相關的轉換外,最主要的是添加WebSocket相關的響應頭信息、構造Endpoint實例、構造HTTP Upgrade處理類WsHttpUpgradeHandler。
將WsHttpUpgradeHandler傳遞給具體的Tomcat協議處理器(ProtocolHandler)進行Upgrade。接收到Upgrade的動作后,Tomcat的協議處理器(HTTP協議)不再使用原有的Processor處理請求,而是替換為專門的Upgrade Processor。
根據I/O的不同,Tomcat提供的Upgrade Processor實現如下:
- org.apache.coyote.http11.upgrade.BioProcessor;
- org.apache.coyote.http11.upgrade.NioProcessor;
- org.apache.coyote.http11.upgrade.Nio2Processor;
- org.apache.coyote.http11.upgrade.AprProcessor;
替換成功后,WsHttpUpgradeHandler會對Upgrade Processor進行初始化(按以下順序):
- 創建WebSocket會話。
- 為Upgrade Processor的輸出流添加寫監聽器。WebSocket向客戶端推送消息具體由org.apache.tomcat.websocket.server.WsRemoteEndpointImplServer完成。
- 構造WebSocket會話,執行當前Endpoint的onOpen方法。
- 為Upgrade Processor的輸入流添加讀監聽器,完成消息讀取。WebSocket讀取客戶端消息具體由org.apache.tomcat.websocket.server.WsFrameServer完成。
通過這種方式,Tomcat實現了WebSocket請求處理與具體I/O方式的解耦。
基於編程的示例
首先,添加一個Endpoint子類,代碼如下:
package org.springframework.samples.websocket.demo3; import java.io.IOException; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.CloseReason; import javax.websocket.Endpoint; import javax.websocket.EndpointConfig; import javax.websocket.MessageHandler; import javax.websocket.Session; public class ChatEndpoint extends Endpoint { private static final Set<ChatEndpoint> connections = new CopyOnWriteArraySet<>(); private Session session; private static class ChatMessageHandler implements MessageHandler.Partial<String> { private Session session; private ChatMessageHandler(Session session) { this.session = session; } @Override public void onMessage(String message, boolean last) { String msg = String.format("%s %s %s", session.getId(), "said:", message); broadcast(msg); } }; @Override public void onOpen(Session session, EndpointConfig config) { this.session = session; connections.add(this); this.session.addMessageHandler(new ChatMessageHandler(session)); String message = String.format("%s %s", session.getId(), "has joined."); broadcast(message); } @Override public void onClose(Session session, CloseReason closeReason) { connections.remove(this); String message = String.format("%s %s", session.getId(), "has disconnected."); broadcast(message); } @Override public void onError(Session session, Throwable throwable) { } private static void broadcast(String msg) { for (ChatEndpoint client : connections) { try { synchronized (client) { client.session.getBasicRemote().sendText(msg); } } catch (IOException e) { connections.remove(client); try { client.session.close(); } catch (IOException e1) { } String message = String.format("%s %s", client.session.getId(), "has been disconnected."); broadcast(message); } } } }
為了方便向客戶端推送消息,我們使用一個靜態集合作為鏈接池維護所有Endpoint實例。
在onOpen方法中,首先將當前Endpoint實例添加到鏈接池,然后為會話添加了一個消息處理器ChatMessageHandler,用於接收消息。當接收到客戶端消息后,我們將其推送到所有客戶端。最后向所有客戶端廣播一條上線通知。
在onClose方法中,將當前Endpoint從鏈接池中移除,向所有客戶端廣播一條下線通知。
然后定義ServerApplicationConfig實現,代碼如下:
package org.springframework.samples.websocket.demo3; import java.util.HashSet; import java.util.Set; import javax.websocket.Endpoint; import javax.websocket.server.ServerApplicationConfig; import javax.websocket.server.ServerEndpointConfig; public class ChatServerApplicationConfig implements ServerApplicationConfig { @Override public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> scanned) { return scanned; } @Override public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> scanned) { Set<ServerEndpointConfig> result = new HashSet<>(); if (scanned.contains(ChatEndpoint.class)) { result.add(ServerEndpointConfig.Builder.create(ChatEndpoint.class, "/program/chat").build()); } return result; } }
在ChatServerApplicationConfig中為ChatEndpoint添加ServerEndpointConfig,其請求鏈接為“/program/chat”。
最后添加對應的HTML頁面,src\main\webapp\chat.html:
<?xml version="1.0" encoding="UTF-8"?> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <script type="application/javascript"> var Chat = {}; Chat.socket = null; Chat.connect = (function(host) { if ('WebSocket' in window) { Chat.socket = new WebSocket(host); } else if ('MozWebSocket' in window) { Chat.socket = new MozWebSocket(host); } else { Console.log('Error: WebSocket is not supported by this browser.'); return; } Chat.socket.onopen = function () { Console.log('Info: WebSocket connection opened.'); document.getElementById('chat').onkeydown = function(event) { if (event.keyCode == 13) { Chat.sendMessage(); } }; }; Chat.socket.onclose = function () { document.getElementById('chat').onkeydown = null; Console.log('Info: WebSocket closed.'); }; Chat.socket.onmessage = function (message) { Console.log(message.data); }; }); Chat.initialize = function() { if (window.location.protocol == 'http:') { Chat.connect('ws://' + window.location.host + '/spring-websocket-test/program/chat'); } else { Chat.connect('wss://' + window.location.host + '/spring-websocket-test/program/chat'); } }; Chat.sendMessage = (function() { var message = document.getElementById('chat').value; if (message != '') { Chat.socket.send(message); document.getElementById('chat').value = ''; } }); var Console = {}; Console.log = (function(message) { var console = document.getElementById('console'); var p = document.createElement('p'); p.style.wordWrap = 'break-word'; p.innerHTML = message; console.appendChild(p); while (console.childNodes.length > 25) { console.removeChild(console.firstChild); } console.scrollTop = console.scrollHeight; }); Chat.initialize(); </script> </head> <body> <div> <p> <input type="text" placeholder="type and press enter to chat" id="chat" /> </p> <div id="console-container"> <div id="console"/> </div> </div> </body> </html>
客戶端實現並不復雜,只是要注意瀏覽器的區別。在添加完所有配置后,可以將應用部署到Tomcat查看效果,與Comet類似,我們可以同時開啟兩個客戶端查看消息推送效果。
基於注解的示例
基於注解的定義要比編程式簡單一些,首先定義一個POJO對象,並添加相關注解:
package org.springframework.samples.websocket.demo3; import java.io.IOException; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; @ServerEndpoint(value = "/anno/chat") public class ChatAnnotation { private static final Set<ChatAnnotation> connections = new CopyOnWriteArraySet<>(); private Session session; @OnOpen public void start(Session session) { this.session = session; connections.add(this); String message = String.format("%s %s", session.getId(), "has joined."); broadcast(message); } @OnClose public void end() { connections.remove(this); String message = String.format("%s %s", session.getId(), "has disconnected."); broadcast(message); } @OnMessage public void incoming(String message) { String msg = String.format("%s %s %s", session.getId(), "said:", message); broadcast(msg); } @OnError public void onError(Throwable t) throws Throwable { } private static void broadcast(String msg) { for (ChatAnnotation client : connections) { try { synchronized (client) { client.session.getBasicRemote().sendText(msg); } } catch (IOException e) { connections.remove(client); try { client.session.close(); } catch (IOException e1) { } String message = String.format("%s %s", client.session.getId(), "has been disconnected."); broadcast(message); } } } }
@ServerEndpoint注解聲明該類是一個Endpoint,並指定了請求的地址。
@OnOpen注解的方法在會話打開時調用,與ChatEndpoint類似,將當前實例添加到鏈接池。@OnClose注解的方法在會話關閉時調用。@OnError注解的方法在鏈接異常時調用。@OnMessage注解的方法用於接收消息。
使用注解方式定義Endpoint時,ServerApplicationConfig不是必須的,此時直接默認加載所有的@ServerEndpoin注解POJO。
我們可以直接將編程式示例中HTML頁面src\main\webapp\chatanno.html中的鏈接地址改為“/anno/chat”查看效果。
<?xml version="1.0" encoding="UTF-8"?> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <script type="application/javascript"> var Chat = {}; Chat.socket = null; Chat.connect = (function(host) { if ('WebSocket' in window) { Chat.socket = new WebSocket(host); } else if ('MozWebSocket' in window) { Chat.socket = new MozWebSocket(host); } else { Console.log('Error: WebSocket is not supported by this browser.'); return; } Chat.socket.onopen = function () { Console.log('Info: WebSocket connection opened.'); document.getElementById('chat').onkeydown = function(event) { if (event.keyCode == 13) { Chat.sendMessage(); } }; }; Chat.socket.onclose = function () { document.getElementById('chat').onkeydown = null; Console.log('Info: WebSocket closed.'); }; Chat.socket.onmessage = function (message) { Console.log(message.data); }; }); Chat.initialize = function() { if (window.location.protocol == 'http:') { Chat.connect('ws://' + window.location.host + '/spring-websocket-test/anno/chat'); } else { Chat.connect('wss://' + window.location.host + '/spring-websocket-test/anno/chat'); } }; Chat.sendMessage = (function() { var message = document.getElementById('chat').value; if (message != '') { Chat.socket.send(message); document.getElementById('chat').value = ''; } }); var Console = {}; Console.log = (function(message) { var console = document.getElementById('console'); var p = document.createElement('p'); p.style.wordWrap = 'break-word'; p.innerHTML = message; console.appendChild(p); while (console.childNodes.length > 25) { console.removeChild(console.firstChild); } console.scrollTop = console.scrollHeight; }); Chat.initialize(); </script> </head> <body> <div> <p> <input type="text" placeholder="type and press enter to chat" id="chat" /> </p> <div id="console-container"> <div id="console"/> </div> </div> </body> </html>
結果:
-
-
tomcat是怎么加載ServerApplicationConfig的配置的,我想做嵌入式tomcat開發,請問您這邊清楚的嗎?
-
-
-
回復xiaospace1028:通過org.apache.tomcat.websocket.server.WsSci類,這是一個ServletContainerInitializer,容器啟動時會自動加載這個類,執行onStartup方法
-