java netty實現websocket記錄


netty實現socket服務器 處理websocket請求

最近有兩個都用到netty做服務端的項目,第一個是c直接發起socket建立連接的請求,第二個是react框架的app,用websocket協議發起連接請求,netty處理稍有不同,記錄一下。

netty高性能:https://www.infoq.cn/article/netty-high-performance

netty調優:https://blog.csdn.net/C_J33/article/details/80737053

#### 先來看第一個項目:

Springboot版本是1.5.10,點進去發現默認依賴沒有netty,加入netty依賴。

maven依賴:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.10.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
    
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.10.Final</version>
</dependency>


spring啟動完成后,新建線程指定端口啟動socket

public class SocketService {
    private int port;

    public SocketService(int port) {
        this.port = port;
    }

    public SocketService() {
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public void startSocket() throws Exception{
        // 接受socket鏈接循環器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 處理業務邏輯循環器
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap bs = new ServerBootstrap();
            bs.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline()
                            // 回車換行作為消息分隔符,消息最大長度設置1024
                            .addLast(new LineBasedFrameDecoder(1024))
                            .addLast(new StringDecoder())
                            .addLast(new StringEncoder(CharsetUtil.UTF_8))
                            .addLast(new MyServerHandler());
                }
            })
                    // 請求處理線程滿時,臨時存放完成握手隊列的大小,默認50
                    .option(ChannelOption.SO_BACKLOG, 1024);
                    // 是否啟用心跳保活機制,若鏈接建立並2小時左右無數據傳輸,此機制才會被激活(tcp機制)。
                    //.childOption(ChannelOption.SO_KEEPALIVE, true);
            // 同步等待socket鏈接結果,用戶線程waite,直到連接完成被notify,繼續執行用戶代碼
            ChannelFuture future = bs.bind(port).sync();
            future.channel().closeFuture().sync();
        }finally {
            // 優雅的釋放資源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

 

> netty使用很簡單,只要加入相應處理handle即可。
>
> 傳輸層為了效率,tcp協議發送數據包時有可能合並發送,對接收方來說會產生粘包問題,需要在應用層解決拆包,收發數據時協商設計分割點,一般而言有四種分割收到包的方法:
>
> 1. 發送方在發送每段數據后拼接回車換行符,接收方讀到“\r\n”則認為是一個獨立的數據包。netty默認解析實現是LineBasedFrameDecoder,加入解碼handle即可。
> 2. 其他自定義分割符號,如“#”。netty實現handle是DelimiterBasedFrameDecoder.
> 3. 無論數據大小,每次發送固定長度,如1024字節,不夠的0補位,超出的截斷。缺點是比較生硬,數據小的時候浪費帶寬資源。netty實現的handle是FixedLengthFrameHandle.
> 4. 數據分為消息頭,消息體,消息頭定義消息體長度,接收端解析出長度后只讀取指定的長度。需要自己實現decoder。

_上述DecoderHandle全部繼承ByteToMessageDecoder,是netty封裝的解析二進制數據的處理類,只要將相應handle添加到pipeline中即可,解析完成后傳輸給自定義的邏輯處理類MyServerHandler。此項目中與c端約定傳輸json字符串格式數據,每段數據手動增加換行分割符。_ 

 

#### 第二個項目(netty與websocket)

springboot版本2.0.6.RELEASE,點進去發現默認依賴<netty.version>4.1.29.Final</netty.version>。

maven依賴:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.6.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>



SocketService與上一個項目相同,只是把匿名內部類單獨創建為ChildChannelInit類,具體實現為:

public class ChildChannelInit extends ChannelInitializer<SocketChannel> {
    private Logger logger = LoggerFactory.getLogger(ChildChannelInit.class);
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
//        logger.debug("有客戶端鏈接新建一條chennel ......");
        SSLEngine sslEngine = SslUtil.generateSSLContext().createSSLEngine();
        sslEngine.setUseClientMode(false); //服務器端模式
        sslEngine.setNeedClientAuth(false); //不需要驗證客戶端
        ch.pipeline().addLast("ssl", new SslHandler(sslEngine));

        ch.pipeline().addLast("http-codec", new HttpServerCodec());
//        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
        // 把多個httpmessagge組裝成一個的默認實現
        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
        ch.pipeline().addLast("ping", new IdleStateHandler(20,0,0, TimeUnit.SECONDS));
        ch.pipeline().addLast("handler", new MyNettyHandler());
    }
}


上面是幾條是為了給socket加入ssl功能,SslUtil類的主要方法:

private static volatile SSLContext ssl_Context = null;
    public static SSLContext generateSSLContext() {
        if (null == ssl_Context){
            synchronized (SslUtil.class){
                if (null == ssl_Context){
                    try {
                        KeyStore ks = KeyStore.getInstance("JKS");
                        InputStream ksInputStream = new FileInputStream(APP_CONFIG.getKeyStorePath());
                        ks.load(ksInputStream, APP_CONFIG.getKeyStorePass().toCharArray());
                        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
                        kmf.init(ks, APP_CONFIG.getKeyStoreKeyPass().toCharArray());

                        SSLContext sslContext = SSLContext.getInstance("TLS");
                        sslContext.init(kmf.getKeyManagers(), null, null);
                        ssl_Context = sslContext;
                    }catch (Exception e){
                        logger.info("load ssl context failed, error:{}",e.getLocalizedMessage());
                    }
                }
            }
        }
        return ssl_Context;
    }



下面的Handle方法則是因為websocket協議是通過http協議握手,然后切換(升級)到socket協議,主要是用來處理http協議的編解碼添加的netty自定義實現的handle。

這里有個問題,也是本篇要記錄的初衷,在接收消息的handle中,后期測試發現,客戶端發來10000條數據,內容是json,每次解析出json中的cmd指令回復相應數據,總會少回6-7條,想到了是粘包導致的問題,但無論是加分割符的編解碼還是自定義二進制decoder,pipeline中都不會加載,也就沒有任何作用。

__后來查看netty源碼,發現在http發送握手后,netty會自動添加及調整websocket的編解碼。__

// handshake方法內的部分原碼,
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());


> websocketDecoder繼承WebSocketFrameDecoder,會處理編解碼,並把二進制數據轉換成binaryFrame或Textframe,其中frame有個isFinalFragment方法可以判斷是否是一條數據的最后一段,如果不是,會通過ContinuationWebSocketFrame消息類型發送剩下的數據,自己在代碼邏輯中可以拼接出完整的數據,避免了拆包不清的問題。
>
> 這里處理的是text消息類型,binary同理,用byte數組存就可以了。


自定義處理類:

@ChannelHandler.Sharable
public class MyNettyHandler extends ChannelInboundHandlerAdapter {
    private Logger logger = LoggerFactory.getLogger(MyNettyHandler.class);
    private WebSocketServerHandshaker handshaker;
    private String appendStr = "";
    private String currentUserId = "";
    String wsFactroyUri = "";
    
    /**客戶端鏈接建立,即為活躍*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("new connect active !! channelId:{}",ctx.channel().id().asShortText());
    }
    /**客戶端斷開鏈接,通道不活躍*/
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (currentUserId != null && NettyManager.channelGroupMap.containsKey(currentUserId)){
            NettyManager.channelGroupMap.get(currentUserId).remove(ctx.channel());
            logger.debug("client disconnect!! channelId:{}  map user size:{}  current user connCount:{}",ctx.channel().id().asShortText(),NettyManager.channelGroupMap.size(), NettyManager.channelGroupMap.get(currentUserId).size());
        }
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        logger.error("!!!!EXCEPTION:{}",cause.toString());
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //1.第一次握手請求消息由HTTP協議承載,所以它是一個HTTP消息,執行handleHttpRequest方法來處理WebSocket握手請求。
        //2.客戶端通過socket提交請求消息給服務端,WebSocketServerHandler接收到的是已經解碼后的WebSocketFrame消息。
        if (msg instanceof FullHttpRequest){
            handleHttpRequest(ctx,(FullHttpRequest) msg);
        }else if (msg instanceof WebSocketFrame){
            handleSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE)){
                logger.info("Can't get client msg or ping in idle time,channel will be closed, channelId:{}  ", ctx.channel().id().asLongText());
                ctx.channel().close();
            }else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws UnsupportedEncodingException{
        // 利用http協議完成握手后升級到webSocket
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }

        WebSocketServerHandshakerFactory handShakerFac = new WebSocketServerHandshakerFactory( wsFactroyUri, null, false);
        handshaker = handShakerFac.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            // 通過它構造握手響應消息返回給客戶端
            // 同時將WebSocket相關的編碼和解碼類動態添加到ChannelPipeline中,用於WebSocket消息的編解碼,
            // 添加WebSocketEncoder和WebSocketDecoder之后,服務端就可以自動對WebSocket消息進行編解碼了
            handshaker.handshake(ctx.channel(), req);
        }
    }
    private void handleSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
        // 判斷是否為關閉鏈接
        if (frame instanceof CloseWebSocketFrame){
            logger.info("get close socket command");
            handshaker.close(ctx.channel(),(CloseWebSocketFrame)frame.retain());
            return;
        }

        // 判斷是否ping消息
        if (frame instanceof PingWebSocketFrame) {
            logger.info("get ping socket command");
            ctx.channel().write( new PongWebSocketFrame(frame.content().retain()));
            return;
        }

        // 文本內容
        if (frame instanceof TextWebSocketFrame){
            String body = ((TextWebSocketFrame) frame).text();
            if (!frame.isFinalFragment()){
                appendStr += body;
            }else {
                handleMsg(ctx, body);
            }
        }else if (frame instanceof ContinuationWebSocketFrame){
            String halfBody = ((ContinuationWebSocketFrame) frame).text();
            appendStr += halfBody;
            if (frame.isFinalFragment()){
                handleMsg(ctx, appendStr);
                appendStr = "";
            }
        }
    }

    private void handleMsg(ChannelHandlerContext ctx, String body){
        JSONObject jsonObject ;
        try {
            jsonObject = new JSONObject(body);
        }catch (Exception e){
            logger.error("get json error :{}",body);
            return;
        }
        String cmd = (String) jsonObject.get("command");
        if (cmd.equals("auth")){
            handleAuthLogic(ctx, jsonObject);
        }else if (cmd.equals("client_ping")){
            handleClientPingLogic(ctx, jsonObject);
        }
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回應答給客戶端
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        // 如果是非Keep-Alive,關閉連接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    private static boolean isKeepAlive(FullHttpRequest req) {
        return false;
    }

    /** 客戶端登錄 */
    private void handleAuthLogic(ChannelHandlerContext ctx, JSONObject jsonObject){
        logger.debug("json:{}",jsonObject.toString());
        String userId = (String)jsonObject.get("from");
        Long clientTime = (Long)jsonObject.get("timestamp");
        String uniqueId = (String) jsonObject.get("uniqueId");

        Long currentTime = System.currentTimeMillis();
        Long diff = currentTime - clientTime;

        AuthRes authRes = new AuthRes();
        authRes.setCommand("auth");
        authRes.setFrom("sys");
        authRes.setTo(userId);
        authRes.setDiff_time(diff);

        Service2Controller<UserProfile> s2c = NettyManager.USER_SERVICE.getUserById(userId);
        UserProfile userProfile = s2c.getData();
//        UserProfile userProfile = new UserProfile();

        boolean shouldClose = false;
        if (userProfile == null){
            authRes.setResult("failed");
            authRes.setResson("user_not_exist");
            shouldClose = true;
        }else {
            authRes.setResult("ok");
            authRes.setResson("success");
            currentUserId = userId;
            // 保存當前user的所有鏈接chennel
//            NettyManager.addConnectMap(userId, ctx.channel());
            if (!NettyManager.channelGroupMap.containsKey(userId)){
                NettyManager.channelGroupMap.put(userId, new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
            }
            NettyManager.channelGroupMap.get(userId).add(ctx.channel());
            logger.info("connect map user size:{}  connect count:{}",NettyManager.channelGroupMap.size(),NettyManager.channelGroupMap.get(userId).size());

        }
        authRes.setTimestamp(currentTime);
        String resString = authRes.toString() + NettyManager.SEP;
        ctx.channel().writeAndFlush(new TextWebSocketFrame(resString));
        if (shouldClose){
            ctx.close();
        }
    }

    /** 客戶端ping */
    private void handleClientPingLogic(ChannelHandlerContext ctx, JSONObject jsonObject){
        Long clientTime = (Long)jsonObject.get("timestamp");
        Long currentTime = System.currentTimeMillis();
        long diffTime = Math.abs(currentTime - clientTime);

        if (diffTime < 30 * 1000){
            JsonObject object = new JsonObject();
            object.addProperty("command","client_ping_receive");
            String resString = object.toString() + NettyManager.SEP;
            ctx.channel().writeAndFlush(new TextWebSocketFrame(resString));
            logger.info("receive client ping command ,res:{}", resString);
        }
    }
}


代碼中還有維持存儲客戶端連接的邏輯,一並記錄,保存連接的容器結構是:
Map<String, ChannelGroup> channelGroupMap = new ConcurrentHashMap<>;
鍵為用戶ID,值為當前用戶的連接集合。在給某個用戶發送數據,在相應地方調用channelGroupMap.get("userId").writeAndFlush()方法即可。

 

 

 

 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

使用netty時,如果鏈接異常關閉會打印對應的log信息,下面是別人的博客地址,記錄一下。

tcp(socket)各類日志的打印場景

終止一個連接的正常方式是發送FIN。 在發送緩沖區中 所有排隊數據都已發送之后才發送FIN,正常情況下沒有任何數據丟失。

但我們有時也有可能發送一個RST報文段而不是F IN來中途關閉一個連接。這稱為異常關閉 。 

進程關閉socket的默認方式是正常關閉,如果需要異常關閉,利用 SO_LINGER選項來控制。

異常關閉一個連接對應用程序來說有兩個優點:

(1)丟棄任何待發的已經無意義的 數據,並立即發送RST報文段;

(2)RST的接收方利用關閉方式來 區分另一端執行的是異常關閉還是正常關閉。

值得注意的是RST報文段不會導致另一端產生任何響應,另一端根本不進行確認。收到RST的一方將終止該連接。程序行為如下:

阻塞模型下,內核無法主動通知應用層出錯,只有應用層主動調用read()或者write()這樣的IO系統調用時,內核才會利用出錯來通知應用層對端RST。

非阻塞模型下,select或者epoll會返回sockfd可讀,應用層對其進行讀取時,read()會報錯RST。 

游戲測試過程中發現某些socket錯誤經常出現,以下是測試游戲服務器時通常考慮的case. 
服務器端: 
1. 
Case:客戶端程序正常運行的情況下,拔掉網線,殺掉客戶端程序 
目的:模擬客戶端死機、系統突然重啟、網線松動或網絡不通等情況 
結論:這種情況下服務器程序沒有檢測到任何異常,並最后等待“超時”才斷開TCP連接

2. 
Case:客戶端程序發送很多數據包后正常關閉Socket並exit進程(或不退出進程) 
目的:模擬客戶端發送完消息后正常退出的情況 
結論:這種情況下服務器程序能夠成功接收完所有消息,並最后收到“對端關閉”(Recv返回零)消息

3. 
Case:客戶端程序發送很多數據包后不關閉Socket直接exit進程 
目的:模擬客戶端程序退出而忘記關閉Socket的情況(比如通過Windows窗口的關閉圖標退出進程,而沒有捕獲相應關閉事件做正常退出處理等) 
結論:這種情況下服務器程序能夠收到部分TCP消息,然后收到“104: Connection reset by peer”(Linux下)或“10054: An existing connection was forcibly closed by the remote host”(Windows下)錯誤

4. 
Case:客戶端程序發送很多數據包的過程中直接Kill進程 
目的:模擬客戶端程序崩潰或非正常方式結束進程(比如Linux下”kill -9″或Windows的任務管理器殺死進程)的情況 
結論:這種情況下服務器程序很快收到“104: Connection reset by peer”(Linux下)或“10054: An existing connection was forcibly closed by the remote host”(Windows下)錯誤

5. 
Case:客戶端程序發送很多數據包后正常關閉Socket並exit進程(或不退出進程) 
目的:模擬客戶端正常關閉Socket后,服務器端在檢查到TCP對端關閉前向客戶端發送消息的情況 
結論:這種情況下服務器程序接收和發送部分TCP消息后,在Send消息時產生“32: Broken pipe”(Linux下)或“10053: An established connection was aborted by the software in your host machine”(Windows下)錯誤

總結: 
當TCP連接的進程在忘記關閉Socket而退出、程序崩潰、或非正常方式結束進程的情況下(Windows客戶端),會導致TCP連接的對端進程產生“104: Connection reset by peer”(Linux下)或“10054: An existing connection was forcibly closed by the remote host”(Windows下)錯誤

當TCP連接的進程機器發生死機、系統突然重啟、網線松動或網絡不通等情況下,連接的對端進程可能檢測不到任何異常,並最后等待“超時”才斷開TCP連接

當TCP連接的進程正常關閉Socket時,對端進程在檢查到TCP關閉事件之前仍然向TCP發送消息,則在Send消息時會產生“32: Broken pipe”(Linux下)或“10053: An established connection was aborted by the software in your host machine”(Windows下)錯誤

客戶端 
1. 
服務器端已經close了Socket,客戶端再發送數據 
目的:測試在TCP對端進程已經關閉Socket時,本端進程還未檢測到連接關閉的情況下繼續向對端發送消息 
結論:第一包可以發送成功,但第二包發送失敗,錯誤碼為“10053: An established connection was aborted by the software in your host machine”(Windows下)或“32: Broken pipe,同時收到SIGPIPE信號”(Linux下)錯誤

2. 
服務器端發送數據到TCP后close了Socket,客戶端再發送一包數據,然后接收消息 
目的:測試在TCP對端進程發送數據后關閉Socket,本端進程還未檢測到連接關閉的情況下發送一包消息,並接着接收消息 
結論:客戶端能夠成功發送第一包數據(這會導致服務器端發送一個RST包 <已抓包驗證>),客戶端再去Recv時,對於Windows和Linux程序有如下不同的表現: 
Windows客戶端程序:Recv失敗,錯誤碼為“10053: An established connection was aborted by the software in your host machine” 
Linux客戶端程序:能正常接收完所有消息包,最后收到正常的對端關閉消息(這一點與Window下不一樣)

3. 
服務器端在TCP的接收緩沖區中還有未接收數據的情況下close了Socket,客戶端再收包 
目的:測試在TCP的接收緩沖區中還有未接收數據的情況下關閉Socket時,對端進程是否正常 
結論:這種情況服務器端就會向對端發送RST包,而不是正常的FIN包(已經抓包證明),這就會導致客戶端提前(RST包比正常數據包先被收到)收到“10054: An existing connection was forcibly closed by the remote host”(Windows下)或“104: Connection reset by peer”(Linux下)錯誤

總結: 
當TCP連接的對端進程已經關閉了Socket的情況下,本端進程再發送數據時,第一包可以發送成功(但會導致對端發送一個RST包過來): 
之后如果再繼續發送數據會失敗,錯誤碼為“10053: An established connection was aborted by the software in your host machine”(Windows下)或“32: Broken pipe,同時收到SIGPIPE信號”(Linux下)錯誤; 
之后如果接收數據,則Windows下會報10053的錯誤,而Linux下則收到正常關閉消息

TCP連接的本端接收緩沖區中還有未接收數據的情況下close了Socket,則本端TCP會向對端發送RST包,而不是正常的FIN包,這就會導致對端進程提前(RST包比正常數據包先被收到)收到“10054: An existing connection was forcibly closed by the remote host”(Windows下)或“104: Connection reset by peer”(Linux下)錯誤
--------------------- 
作者:九嶷山 
來源:CSDN 
原文:https://blog.csdn.net/larry_zeng1/article/details/78982370 
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM