websocket之三:Tomcat的WebSocket實現


Tomcat自7.0.5版本開始支持WebSocket,並且實現了Java WebSocket規范(JSR356 ),而在7.0.5版本之前(7.0.2版本之后)則采用自定義API,即WebSocketServlet。本節我們僅介紹Tomcat針對規范的實現。

根據JSR356的規定,Java WebSocket應用由一系列的WebSocket Endpoint組成。Endpoint是一個Java對象,代表WebSocket鏈接的一端,對於服務端,我們可以視為處理具體WebSocket消息的接口,就像Servlet之於HTTP請求一樣(不同之處在於Endpoint每個鏈接一個實例)。

我們可以通過兩種方式定義Endpoint,第一種是編程式,即繼承類javax.websocket.Endpoint並實現其方法。第二種是注解式,即定義一個POJO對象,為其添加Endpoint相關的注解。

Endpoint實例在WebSocket握手時創建,並在客戶端與服務端鏈接過程中有效,最后在鏈接關閉時結束。Endpoint接口明確定義了與其生命周期相關的方法,規范實現者確保在生命周期的各個階段調用實例的相關方法。

Endpoint的生命周期方法如下:

  • onOpen:當開啟一個新的會話時調用。這是客戶端與服務器握手成功后調用的方法。等同於注解@OnOpen。
  • onClose:當會話關閉時調用。等同於注解@OnClose。
  • onError:當鏈接過程中異常時調用。等同於注解@OnError。

當客戶端鏈接到一個Endpoint時,服務器端會為其創建一個唯一的會話(javax.websocket.Session)。會話在WebSocket握手之后創建,並在鏈接關閉時結束。當生命周期中觸發各個事件時,都會將當前會話傳給Endpoint。

我們通過為Session添加MessageHandler消息處理器來接收消息。當采用注解方式定義Endpoint時,我們還可以通過@OnMessage指定接收消息的方法。發送消息則由RemoteEndpoint完成,其實例由Session維護,根據使用情況,我們可以通過Session.getBasicRemote獲取同步消息發送的實例或者通過Session.getAsyncRemote獲取異步消息發送的實例。

WebSocket通過javax.websocket.WebSocketContainer接口維護應用中定義的所有Endpoint。它在每個Web應用中只有一個實例,類似於傳統Web應用中的ServletContext。

最后,WebSocket規范提供了一個接口javax.websocket.server.ServerApplicationConfig,通過它,我們可以為編程式的Endpoint創建配置(如指定請求地址),還可以過濾只有符合條件的Endpoint提供服務。該接口的實現同樣通過SCI機制加載。

介紹完WebSocket規范中的基本概念,我們看一下Tomcat的具體實現。接下來會涉及到Tomcat鏈接器(Cotyte)和Web應用加載的知識,如不清楚可以閱讀Tomcat官方文檔。

WebSocket加載

Tomcat提供了一個javax.servlet.ServletContainerInitializer的實現類org.apache.tomcat.websocket.server.WsSci。因此Tomcat的WebSocket加載是通過SCI機制完成的。WsSci可以處理的類型有三種:添加了注解@ServerEndpoint的類、Endpoint的子類以及ServerApplicationConfig的實現類。

Web應用啟動時,通過WsSci.onStartup方法完成WebSocket的初始化:

  • 構造WebSocketContainer實例,Tomcat提供的實現類為WsServerContainer。在WsServerContainer構造方法中,Tomcat除了初始化配置外,還會為ServletContext添加一個過濾器org.apache.tomcat.websocket.server.WsFilter,它用於判斷當前請求是否為WebSocket請求,以便完成握手。
  • 對於掃描到的Endpoint子類和添加了注解@ServerEndpoint的類,如果當前應用存在ServerApplicationConfig實現,則通過ServerApplicationConfig獲取Endpoint子類的配置(ServerEndpointConfig實例,包含了請求路徑等信息)和符合條件的注解類,將結果注冊到WebSocketContainer上,用於處理WebSocket請求。
  • 通過ServerApplicationConfig接口我們以編程的方式確定只有符合一定規則的Endpoint可以注冊到WebSocketContainer,而非所有。規范通過這種方式為我們提供了一種定制化機制。
  • 如果當前應用沒有定義ServerApplicationConfig的實現類,那么WsSci默認只將所有掃描到的注解式Endpoint注冊到WebSocketContainer。因此,如果采用可編程方式定義Endpoint,那么必須添加ServerApplicationConfig實現。

WebSocket請求處理

當服務器接收到來自客戶端的請求時,首先WsFilter會判斷該請求是否是一個WebSocket Upgrade請求(即包含Upgrade: websocket頭信息)。如果是,則根據請求路徑查找對應的Endpoint處理類,並進行協議Upgrade。

在協議Upgrade過程中,除了檢測WebSocket擴展、添加相關的轉換外,最主要的是添加WebSocket相關的響應頭信息、構造Endpoint實例、構造HTTP Upgrade處理類WsHttpUpgradeHandler。

將WsHttpUpgradeHandler傳遞給具體的Tomcat協議處理器(ProtocolHandler)進行Upgrade。接收到Upgrade的動作后,Tomcat的協議處理器(HTTP協議)不再使用原有的Processor處理請求,而是替換為專門的Upgrade Processor。

根據I/O的不同,Tomcat提供的Upgrade Processor實現如下:

  • org.apache.coyote.http11.upgrade.BioProcessor;
  • org.apache.coyote.http11.upgrade.NioProcessor;
  • org.apache.coyote.http11.upgrade.Nio2Processor;
  • org.apache.coyote.http11.upgrade.AprProcessor;

替換成功后,WsHttpUpgradeHandler會對Upgrade Processor進行初始化(按以下順序):

  • 創建WebSocket會話。
  • 為Upgrade Processor的輸出流添加寫監聽器。WebSocket向客戶端推送消息具體由org.apache.tomcat.websocket.server.WsRemoteEndpointImplServer完成。
  • 構造WebSocket會話,執行當前Endpoint的onOpen方法。
  • 為Upgrade Processor的輸入流添加讀監聽器,完成消息讀取。WebSocket讀取客戶端消息具體由org.apache.tomcat.websocket.server.WsFrameServer完成。

通過這種方式,Tomcat實現了WebSocket請求處理與具體I/O方式的解耦。

基於編程的示例

首先,添加一個Endpoint子類,代碼如下:

package org.springframework.samples.websocket.demo3;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;

public class ChatEndpoint extends Endpoint {
    private static final Set<ChatEndpoint> connections = new CopyOnWriteArraySet<>();
    private Session session;

    private static class ChatMessageHandler implements MessageHandler.Partial<String> {
        private Session session;

        private ChatMessageHandler(Session session) {
            this.session = session;
        }

        @Override
        public void onMessage(String message, boolean last) {
            String msg = String.format("%s %s %s", session.getId(), "said:", message);
            broadcast(msg);
        }
    };

    @Override
    public void onOpen(Session session, EndpointConfig config) {
        this.session = session;
        connections.add(this);
        this.session.addMessageHandler(new ChatMessageHandler(session));
        String message = String.format("%s %s", session.getId(), "has joined.");
        broadcast(message);
    }

    @Override
    public void onClose(Session session, CloseReason closeReason) {
        connections.remove(this);
        String message = String.format("%s %s", session.getId(), "has disconnected.");
        broadcast(message);
    }

    @Override
    public void onError(Session session, Throwable throwable) {
    }

    private static void broadcast(String msg) {
        for (ChatEndpoint client : connections) {
            try {
                synchronized (client) {
                    client.session.getBasicRemote().sendText(msg);
                }
            } catch (IOException e) {
                connections.remove(client);
                try {
                    client.session.close();
                } catch (IOException e1) {
                }
                String message = String.format("%s %s", client.session.getId(), "has been disconnected.");
                broadcast(message);
            }
        }
    }
}

為了方便向客戶端推送消息,我們使用一個靜態集合作為鏈接池維護所有Endpoint實例。

在onOpen方法中,首先將當前Endpoint實例添加到鏈接池,然后為會話添加了一個消息處理器ChatMessageHandler,用於接收消息。當接收到客戶端消息后,我們將其推送到所有客戶端。最后向所有客戶端廣播一條上線通知。

在onClose方法中,將當前Endpoint從鏈接池中移除,向所有客戶端廣播一條下線通知。

然后定義ServerApplicationConfig實現,代碼如下:

package org.springframework.samples.websocket.demo3;

import java.util.HashSet;
import java.util.Set;

import javax.websocket.Endpoint;
import javax.websocket.server.ServerApplicationConfig;
import javax.websocket.server.ServerEndpointConfig;

public class ChatServerApplicationConfig implements ServerApplicationConfig {
    @Override
    public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> scanned) {
        return scanned;
    }

    @Override
    public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> scanned) {
        Set<ServerEndpointConfig> result = new HashSet<>();
        if (scanned.contains(ChatEndpoint.class)) {
            result.add(ServerEndpointConfig.Builder.create(ChatEndpoint.class, "/program/chat").build());
        }
        return result;
    }
}

在ChatServerApplicationConfig中為ChatEndpoint添加ServerEndpointConfig,其請求鏈接為“/program/chat”。

最后添加對應的HTML頁面,src\main\webapp\chat.html:

<?xml version="1.0" encoding="UTF-8"?>
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <script type="application/javascript">
        var Chat = {};
        Chat.socket = null;
        Chat.connect = (function(host) {
            if ('WebSocket' in window) {
                Chat.socket = new WebSocket(host);
            } else if ('MozWebSocket' in window) {
                Chat.socket = new MozWebSocket(host);
            } else {
                Console.log('Error: WebSocket is not supported by this browser.');
                return;
            }
            Chat.socket.onopen = function () {
                Console.log('Info: WebSocket connection opened.');
                document.getElementById('chat').onkeydown = function(event) {
                    if (event.keyCode == 13) {
                        Chat.sendMessage();
                    }
                };
            };
            Chat.socket.onclose = function () {
                document.getElementById('chat').onkeydown = null;
                Console.log('Info: WebSocket closed.');
            };
            Chat.socket.onmessage = function (message) {
                Console.log(message.data);
            };
        });
        Chat.initialize = function() {
            if (window.location.protocol == 'http:') {
                Chat.connect('ws://' + window.location.host + '/spring-websocket-test/program/chat');
            } else {
                Chat.connect('wss://' + window.location.host + '/spring-websocket-test/program/chat');
            }
        };
        Chat.sendMessage = (function() {
            var message = document.getElementById('chat').value;
            if (message != '') {
                Chat.socket.send(message);
                document.getElementById('chat').value = '';
            }
        });
        var Console = {};
        Console.log = (function(message) {
            var console = document.getElementById('console');
            var p = document.createElement('p');
            p.style.wordWrap = 'break-word';
            p.innerHTML = message;
            console.appendChild(p);
            while (console.childNodes.length > 25) {
                console.removeChild(console.firstChild);
            }
            console.scrollTop = console.scrollHeight;
        });
        Chat.initialize();
 </script>
</head>
<body>
<div>
    <p>
        <input type="text" placeholder="type and press enter to chat" id="chat" />
    </p>
    <div id="console-container">
        <div id="console"/>
    </div>
</div>
</body>
</html>

客戶端實現並不復雜,只是要注意瀏覽器的區別。在添加完所有配置后,可以將應用部署到Tomcat查看效果,與Comet類似,我們可以同時開啟兩個客戶端查看消息推送效果。

 

基於注解的示例

基於注解的定義要比編程式簡單一些,首先定義一個POJO對象,並添加相關注解:

package org.springframework.samples.websocket.demo3;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(value = "/anno/chat")
public class ChatAnnotation {
    private static final Set<ChatAnnotation> connections = new CopyOnWriteArraySet<>();
    private Session session;

    @OnOpen
    public void start(Session session) {
        this.session = session;
        connections.add(this);
        String message = String.format("%s %s", session.getId(), "has joined.");
        broadcast(message);
    }

    @OnClose
    public void end() {
        connections.remove(this);
        String message = String.format("%s %s", session.getId(), "has disconnected.");
        broadcast(message);
    }

    @OnMessage
    public void incoming(String message) {
        String msg = String.format("%s %s %s", session.getId(), "said:", message);
        broadcast(msg);
    }

    @OnError
    public void onError(Throwable t) throws Throwable {
    }

    private static void broadcast(String msg) {
        for (ChatAnnotation client : connections) {
            try {
                synchronized (client) {
                    client.session.getBasicRemote().sendText(msg);
                }
            } catch (IOException e) {
                connections.remove(client);
                try {
                    client.session.close();
                } catch (IOException e1) {
                }
                String message = String.format("%s %s", client.session.getId(), "has been disconnected.");
                broadcast(message);
            }
        }
    }
}

@ServerEndpoint注解聲明該類是一個Endpoint,並指定了請求的地址。

@OnOpen注解的方法在會話打開時調用,與ChatEndpoint類似,將當前實例添加到鏈接池。@OnClose注解的方法在會話關閉時調用。@OnError注解的方法在鏈接異常時調用。@OnMessage注解的方法用於接收消息。

使用注解方式定義Endpoint時,ServerApplicationConfig不是必須的,此時直接默認加載所有的@ServerEndpoin注解POJO。

我們可以直接將編程式示例中HTML頁面src\main\webapp\chatanno.html中的鏈接地址改為“/anno/chat”查看效果。

<?xml version="1.0" encoding="UTF-8"?>
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <script type="application/javascript">
        var Chat = {};
        Chat.socket = null;
        Chat.connect = (function(host) {
            if ('WebSocket' in window) {
                Chat.socket = new WebSocket(host);
            } else if ('MozWebSocket' in window) {
                Chat.socket = new MozWebSocket(host);
            } else {
                Console.log('Error: WebSocket is not supported by this browser.');
                return;
            }
            Chat.socket.onopen = function () {
                Console.log('Info: WebSocket connection opened.');
                document.getElementById('chat').onkeydown = function(event) {
                    if (event.keyCode == 13) {
                        Chat.sendMessage();
                    }
                };
            };
            Chat.socket.onclose = function () {
                document.getElementById('chat').onkeydown = null;
                Console.log('Info: WebSocket closed.');
            };
            Chat.socket.onmessage = function (message) {
                Console.log(message.data);
            };
        });
        Chat.initialize = function() {
            if (window.location.protocol == 'http:') {
                Chat.connect('ws://' + window.location.host + '/spring-websocket-test/anno/chat');
            } else {
                Chat.connect('wss://' + window.location.host + '/spring-websocket-test/anno/chat');
            }
        };
        Chat.sendMessage = (function() {
            var message = document.getElementById('chat').value;
            if (message != '') {
                Chat.socket.send(message);
                document.getElementById('chat').value = '';
            }
        });
        var Console = {};
        Console.log = (function(message) {
            var console = document.getElementById('console');
            var p = document.createElement('p');
            p.style.wordWrap = 'break-word';
            p.innerHTML = message;
            console.appendChild(p);
            while (console.childNodes.length > 25) {
                console.removeChild(console.firstChild);
            }
            console.scrollTop = console.scrollHeight;
        });
        Chat.initialize();
 </script>
</head>
<body>
<div>
    <p>
        <input type="text" placeholder="type and press enter to chat" id="chat" />
    </p>
    <div id="console-container">
        <div id="console"/>
    </div>
</div>
</body>
</html>

結果:

 

 

 

  • tomcat是怎么加載ServerApplicationConfig的配置的,我想做嵌入式tomcat開發,請問您這邊清楚的嗎?
  • 回復xiaospace1028:通過org.apache.tomcat.websocket.server.WsSci類,這是一個ServletContainerInitializer,容器啟動時會自動加載這個類,執行onStartup方法


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM