spring websocket源碼分析續Handler的使用


1. handler的定義

spring websocket支持的消息有以下幾種:

對消息的處理就使用了Handler模式,抽象handler類AbstractWebSocketHandler.java

@Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        if (message instanceof TextMessage) {
            handleTextMessage(session, (TextMessage) message);
        }
        else if (message instanceof BinaryMessage) {
            handleBinaryMessage(session, (BinaryMessage) message);
        }
        else if (message instanceof PongMessage) {
            handlePongMessage(session, (PongMessage) message);
        }
        else {
            throw new IllegalStateException("Unexpected WebSocket message type: " + message);
        }
    }

    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    }

    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
    }

    protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
    }

具體實現handler類BinaryWebSocketHandler(為例,其它略)

public class BinaryWebSocketHandler extends AbstractWebSocketHandler {

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        try {
            session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Text messages not supported"));
        }
        catch (IOException e) {
            // ignore
        }
    }

}

2.handler的使用

StandardWebSocketClient和服務端握手時,調用

@Override
    protected ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler webSocketHandler,
            HttpHeaders headers, final URI uri, List<String> protocols,
            List<WebSocketExtension> extensions, Map<String, Object> attributes) {

        int port = getPort(uri);
        InetSocketAddress localAddress = new InetSocketAddress(getLocalHost(), port);
        InetSocketAddress remoteAddress = new InetSocketAddress(uri.getHost(), port);

        final StandardWebSocketSession session = new StandardWebSocketSession(headers,
                attributes, localAddress, remoteAddress);

        final ClientEndpointConfig.Builder configBuilder = ClientEndpointConfig.Builder.create();
        configBuilder.configurator(new StandardWebSocketClientConfigurator(headers));
        configBuilder.preferredSubprotocols(protocols);
        configBuilder.extensions(adaptExtensions(extensions));
        final Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session);

        Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
            @Override
            public WebSocketSession call() throws Exception {
                webSocketContainer.connectToServer(endpoint, configBuilder.build(), uri);
                return session;
            }
        };

        if (this.taskExecutor != null) {
            return this.taskExecutor.submitListenable(connectTask);
        }
        else {
            ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
            task.run();
            return task;
        }
    }

紅色部分調用一個適配器StandardWebSocketHandlerAdapter,它封裝了Handler的調用

@Override
    public void onOpen(final javax.websocket.Session session, EndpointConfig config) {

        this.wsSession.initializeNativeSession(session);

        if (this.handler.supportsPartialMessages()) {
            session.addMessageHandler(new MessageHandler.Partial<String>() {
                @Override
                public void onMessage(String message, boolean isLast) {
                    handleTextMessage(session, message, isLast);
                }
            });
            session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {
                @Override
                public void onMessage(ByteBuffer message, boolean isLast) {
                    handleBinaryMessage(session, message, isLast);
                }
            });
        }
        else {
            session.addMessageHandler(new MessageHandler.Whole<String>() {
                @Override
                public void onMessage(String message) {
                    handleTextMessage(session, message, true);
                }
            });
            session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
                @Override
                public void onMessage(ByteBuffer message) {
                    handleBinaryMessage(session, message, true);
                }
            });
        }

        session.addMessageHandler(new MessageHandler.Whole<javax.websocket.PongMessage>() {
            @Override
            public void onMessage(javax.websocket.PongMessage message) {
                handlePongMessage(session, message.getApplicationData());
            }
        });

        try {
            this.handler.afterConnectionEstablished(this.wsSession);
        }
        catch (Throwable t) {
            ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
            return;
        }
    }

具體實現

private void handleTextMessage(javax.websocket.Session session, String payload, boolean isLast) {
        TextMessage textMessage = new TextMessage(payload, isLast); try { this.handler.handleMessage(this.wsSession, textMessage); } catch (Throwable t) { ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger); } } private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer payload, boolean isLast) { BinaryMessage binaryMessage = new BinaryMessage(payload, isLast); try { this.handler.handleMessage(this.wsSession, binaryMessage); } catch (Throwable t) { ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger); } } private void handlePongMessage(javax.websocket.Session session, ByteBuffer payload) { PongMessage pongMessage = new PongMessage(payload); try { this.handler.handleMessage(this.wsSession, pongMessage); } catch (Throwable t) { ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger); }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM