spring websocket源碼分析


什么是websocket?

摘錄於wiki【1】:

WebSocket is a protocol providing full-duplex communication channels over a single TCP connection. The WebSocket protocol was standardized by the IETF as RFC 6455 in 2011, and the WebSocket API in Web IDL is being standardized by theW3C.

WebSocket is designed to be implemented in web browsers and web servers, but it can be used by any client or server application. The WebSocket Protocol is an independent TCP-based protocol. Its only relationship to HTTP is that its handshake is interpreted by HTTP servers as an Upgrade request.[1] The WebSocket protocol makes more interaction between a browser and a website possible, facilitating the real-time data transfer from and to the server. This is made possible by providing a standardized way for the server to send content to the browser without being solicited by the client, and allowing for messages to be passed back and forth while keeping the connection open. In this way a two-way (bi-directional) ongoing conversation can take place between a browser and the server. The communications are done over TCP port number 80, which is of benefit for those environments which block non-web Internet connections using a firewall. Similar two-way browser-server communications have been achieved in non-standardized ways using stopgap technologies such as Comet.

The WebSocket protocol is currently supported in most major browsers including Google ChromeInternet ExplorerFirefoxSafari and Opera. WebSocket also requires web applications on the server to support it.

Unlike HTTP, WebSocket provides full-duplex communication. Additionally, WebSocket enables streams of messages on top of TCP. TCP alone deals with streams of bytes with no inherent concept of a message. Before WebSocket, port 80 full-duplex communication was attainable using Comet channels; however, Comet implementation is nontrivial, and due to the TCP handshake and HTTP header overhead, it is inefficient for small messages. WebSocket protocol aims to solve these problems without compromising security assumptions of the web.

The WebSocket protocol specification defines ws and wss as two new uniform resource identifier (URI) schemes that are used for unencrypted and encrypted connections, respectively. Apart from the scheme name and fragment (# is not supported), the rest of the URI components are defined to use URI generic syntax.

Using the Google Chrome Developer Tools, developers can inspect the WebSocket handshake as well as the WebSocket frames.

簡單來說,websocket是一個基於tcp的雙工協議,分客戶端和服務器端,不同於http的協議。作用是使瀏覽器和站點之間的交互更便利,使實時數據可以傳送到或者發送出服務器端。

什么是sockjs

SockJS 是一個瀏覽器上運行的 JavaScript 庫,如果瀏覽器不支持 WebSocket,該庫可以模擬對 WebSocket 的支持,實現瀏覽器和 Web 服務器之間低延遲、全雙工、跨域的通訊通道【2】。

SockJS family:

Work in progress:

websocket消息

抽象接口WebSocketMessage:A message that can be handled or sent on a WebSocket connection.

public interface WebSocketMessage<T> {

    /**
     * Returns the message payload. This will never be {@code null}.
     */
    T getPayload();


    /**
     * Return the number of bytes contained in the message.
     */
    int getPayloadLength();

    /**
     * When partial message support is available and requested via
     * {@link org.springframework.web.socket.WebSocketHandler#supportsPartialMessages()},
     * this method returns {@code true} if the current message is the last part of the
     * complete WebSocket message sent by the client. Otherwise {@code false} is returned
     * if partial message support is either not available or not enabled.
     */
    boolean isLast();

}

其中,Payload有效信息

 

 

websocket客戶端

抽象接口WebSocketClient定義了初始化一個websocket請求的規范。當應用啟動時,啟動一個websocket連接到預配置的url時可以考慮使用WebSocketConnectionManager。

WebSocketConnectionManager在指定uri,WebsocketClient和websocketHandler時通過start()和stop()方法連接到websocket服務器。如果setAutoStartup(boolean)設置為true,spring ApplicationContext刷新時會自動連接。

ConnectionManagerSupport是WebSocketConnectionManager的基類

    /**
     * Start the websocket connection. If already connected, the method has no impact.
     */
    @Override
    public final void start() {
        synchronized (this.lifecycleMonitor) {
            if (!isRunning()) {
                startInternal();
            }
        }
    }

    protected void startInternal() {
        synchronized (lifecycleMonitor) {
            if (logger.isDebugEnabled()) {
                logger.debug("Starting " + this.getClass().getSimpleName());
            }
            this.isRunning = true;
            openConnection();
        }
    }

protected abstract void openConnection();

我們來看看StandardWebSocketClient怎么工作的?

 StandardWebSocketClient通過標准的java websocket api編程式初始化一個連接到websocket服務器的websocket請求。

 它有兩個私有成員:

    private final WebSocketContainer webSocketContainer;

    private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();

其中,WebSocketContainer使用java本身提供的api獲取:

 

    public StandardWebSocketClient() {
        this.webSocketContainer = ContainerProvider.getWebSocketContainer();
    }

 

其中,SimpleAsyncTaskExecutor為每個task觸發一個新的線程來異步的執行。

/**
     * Executes the given task, within a concurrency throttle
     * if configured (through the superclass's settings).
     * @see #doExecute(Runnable)
     */
    @Override
    public void execute(Runnable task) {
        execute(task, TIMEOUT_INDEFINITE);
    }

    /**
     * Executes the given task, within a concurrency throttle
     * if configured (through the superclass's settings).
     * <p>Executes urgent tasks (with 'immediate' timeout) directly,
     * bypassing the concurrency throttle (if active). All other
     * tasks are subject to throttling.
     * @see #TIMEOUT_IMMEDIATE
     * @see #doExecute(Runnable)
     */
    @Override
    public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
            this.concurrencyThrottle.beforeAccess();
            doExecute(new ConcurrencyThrottlingRunnable(task));
        }
        else {
            doExecute(task);
        }
    }
    /**
     * Template method for the actual execution of a task.
     * <p>The default implementation creates a new Thread and starts it.
     * @param task the Runnable to execute
     * @see #setThreadFactory
     * @see #createThread
     * @see java.lang.Thread#start()
     */
    protected void doExecute(Runnable task) {
        Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
        thread.start();
    }

接下來,我們看一下StandardWebSocketClient的握手過程:

@Override
    protected ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler webSocketHandler,
            HttpHeaders headers, final URI uri, List<String> protocols,
            List<WebSocketExtension> extensions, Map<String, Object> attributes) {

        int port = getPort(uri);
        InetSocketAddress localAddress = new InetSocketAddress(getLocalHost(), port);
        InetSocketAddress remoteAddress = new InetSocketAddress(uri.getHost(), port);

        final StandardWebSocketSession session = new StandardWebSocketSession(headers,
                attributes, localAddress, remoteAddress);

        final ClientEndpointConfig.Builder configBuilder = ClientEndpointConfig.Builder.create();
        configBuilder.configurator(new StandardWebSocketClientConfigurator(headers));
        configBuilder.preferredSubprotocols(protocols);
        configBuilder.extensions(adaptExtensions(extensions));
        final Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session);

        Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
            @Override
            public WebSocketSession call() throws Exception {
                webSocketContainer.connectToServer(endpoint, configBuilder.build(), uri);
                return session;
            }
        };

        if (this.taskExecutor != null) {
            return this.taskExecutor.submitListenable(connectTask);
        }
        else {
            ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
            task.run();
            return task;
        }
    }

 websocket服務器端

 注解:EnableWebSocketMessageBroker

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebSocketMessageBrokerConfiguration.class)
public @interface EnableWebSocketMessageBroker {

}

注解:EnableWebSocket

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebSocketConfiguration.class)
public @interface EnableWebSocket {
}

示例程序

1.客戶端【3】index.html

<!DOCTYPE html>
<html>
<head>
    <title>Hello WebSocket</title>
    <script src="sockjs-0.3.4.js"></script>
    <script src="stomp.js"></script>
    <script type="text/javascript">
        var stompClient = null;

        function setConnected(connected) {
            document.getElementById('connect').disabled = connected;
            document.getElementById('disconnect').disabled = !connected;
            document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';
            document.getElementById('response').innerHTML = '';
        }

        function connect() {
            var socket = new SockJS('/hello');
            stompClient = Stomp.over(socket);
            stompClient.connect({}, function(frame) {
                setConnected(true);
                console.log('Connected: ' + frame);
                stompClient.subscribe('/topic/greetings', function(greeting){
                    showGreeting(JSON.parse(greeting.body).content);
                });
            });
        }

        function disconnect() {
            if (stompClient != null) {
                stompClient.disconnect();
            }
            setConnected(false);
            console.log("Disconnected");
        }

        function sendName() {
            var name = document.getElementById('name').value;
            stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name }));
        }

        function showGreeting(message) {
            var response = document.getElementById('response');
            var p = document.createElement('p');
            p.style.wordWrap = 'break-word';
            p.appendChild(document.createTextNode(message));
            response.appendChild(p);
        }
    </script>
</head>
<body onload="disconnect()">
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websocket relies on Javascript being enabled. Please enable
    Javascript and reload this page!</h2></noscript>
<div>
    <div>
        <button id="connect" onclick="connect();">Connect</button>
        <button id="disconnect" disabled="disabled" onclick="disconnect();">Disconnect</button>
    </div>
    <div id="conversationDiv">
        <label>What is your name?</label><input type="text" id="name" />
        <button id="sendName" onclick="sendName();">Send</button>
        <p id="response"></p>
    </div>
</div>
</body>
</html>

2.服務端程序【3】WebSocketConfig.java

package hello;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/hello").withSockJS();
    }

}

 

參考文獻:

【1】https://en.wikipedia.org/wiki/WebSocket

【2】http://www.oschina.net/p/sockjs/similar_projects?lang=0&sort=time&p=1

【3】https://spring.io/guides/gs/messaging-stomp-websocket/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM