Java高並發網絡編程(五)Netty應用


推送系統

 

一、系統設計

 

 

 

 

 

 

二、拆包和粘包

粘包、拆包表現形式

現在假設客戶端向服務端連續發送了兩個數據包,用packet1和packet2來表示,那么服務端收到的數據可以分為三種,現列舉如下:

第一種情況,接收端正常收到兩個數據包,即沒有發生拆包和粘包的現象,此種情況不在本文的討論范圍內。normal

第二種情況,接收端只收到一個數據包,由於TCP是不會出現丟包的,所以這一個數據包中包含了發送端發送的兩個數據包的信息,這種現象即為粘包。這種情況由於接收端不知道這兩個數據包的界限,所以對於接收端來說很難處理。one

第三種情況,這種情況有兩種表現形式,如下圖。接收端收到了兩個數據包,但是這兩個數據包要么是不完整的,要么就是多出來一塊,這種情況即發生了拆包和粘包。這兩種情況如果不加特殊處理,對於接收端同樣是不好處理的。half_oneone_half

 

 

粘包、拆包發生原因

發生TCP粘包或拆包有很多原因,現列出常見的幾點,可能不全面,歡迎補充,

1、要發送的數據大於TCP發送緩沖區剩余空間大小,將會發生拆包即應用程序寫入數據的字節大小大於套接字發送緩沖區的大小。

 

2、進行MSS大小的TCP分段。MSS是最大報文段長度的縮寫。MSS是TCP報文段中的數據字段的最大長度。數據字段加上TCP首部才等於整個的TCP報文段。所以MSS並不是TCP報文段的最大長度,而是:MSS=TCP報文段長度-TCP首部長度,

待發送數據大於MSS(最大報文長度),TCP在傳輸前將進行拆包。

 

3、要發送的數據小於TCP發送緩沖區的大小,TCP將多次寫入緩沖區的數據一次發送出去,將會發生粘包。

 

4、接收數據端的應用層沒有及時讀取接收緩沖區中的數據,將發生粘包。

 

5、以太網的payload大於MTU進行IP分片。MTU指:一種通信協議的某一層上面所能通過的最大數據包大小。如果IP層有一個數據包要傳,而且數據的長度比鏈路層的MTU大,那么IP層就會進行分片,把數據包分成若干片,讓每一片都不超過MTU。注意,IP分片可以發生在原始發送端主機上,也可以發生在中間路由器上。

 

 TCP粘包和拆包的解決策略

1. 消息定長。例如100字節。發送端給每個數據包添加包首部,首部中應該至少包含數據包的長度,這樣接收端在接收到數據后,通過讀取包首部的長度字段,便知道每一個數據包的實際長度了。

2. 在包尾部增加回車或者空格符等特殊字符進行分割,典型的如FTP協議,發送端將每個數據包封裝為固定長度(不夠的可以通過補0填充),這樣接收端每次從接收緩沖區中讀取固定長度的數據就自然而然的把每個數據包拆分開來。

3. 將消息分為消息頭和消息尾。可以在數據包之間設置邊界,如添加特殊符號,這樣,接收端通過這個邊界就可以將不同的數據包拆分開。

 

4. 其它復雜的協議,如RTMP協議等。

 

來源:https://blog.csdn.net/lijieshare/article/details/84815187

 

public class XNettyServer {
    public static void main(String[] args) throws Exception {
        // 1、 線程定義
        // accept 處理連接的線程池
        EventLoopGroup acceptGroup = new NioEventLoopGroup();
        // read io 處理數據的線程池
        EventLoopGroup readGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptGroup, readGroup);
            // 2、 選擇TCP協議,NIO的實現方式
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 3、 職責鏈定義(請求收到后怎么處理)
                    ChannelPipeline pipeline = ch.pipeline();
                    // TODO 3.1 增加解碼器
                    // pipeline.addLast(new XDecoder());
                    // TODO 3.2 打印出內容 handdler
                    pipeline.addLast(new XHandller());
                }
            });
            // 4、 綁定端口
            System.out.println("啟動成功,端口 9999");
            b.bind(9999).sync().channel().closeFuture().sync();
        } finally {
            acceptGroup.shutdownGracefully();
            readGroup.shutdownGracefully();
        }
    }
}

也會存在粘包和拆包的問題

 

 

 

自己編寫解析器

簡單地用長度做處理

// 編解碼一定是根據協議~如http
public class XDecoder extends ByteToMessageDecoder {
    static final int PACKET_SIZE = 220; // 每次請求數據大小是220,我們自己定義的協議

    // 用來臨時保留沒有處理過的請求報文,如只傳過來了110個字節,先存着
    ByteBuf tempMsg = Unpooled.buffer();

    // in輸入   --- 處理  --- out 輸出
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println(Thread.currentThread()+"收到了一次數據包,長度是:" + in.readableBytes());
        // in 請求的數據
        // out 將粘在一起的報文拆分后的結果保留起來

        // 1、 合並報文
        ByteBuf message = null;
        int tmpMsgSize = tempMsg.readableBytes();
        // 如果暫存有上一次余下的請求報文,則合並
        if (tmpMsgSize > 0) {
            message = Unpooled.buffer();
            message.writeBytes(tempMsg);
            message.writeBytes(in);
            System.out.println("合並:上一數據包余下的長度為:" + tmpMsgSize + ",合並后長度為:" + message.readableBytes());
        } else {
            message = in;
        }

        // 2、 拆分報文
        // 這個場景下,一個請求固定長度為3,可以根據長度來拆分
        // i+1 i+1 i+1 i+1 i+1
        // 不固定長度,需要應用層協議來約定 如何計算長度
        // 在應用層中,根據單個報文的長度及特殊標記,來將報文進行拆分或合並
        // dubbo rpc協議 = header(16) + body(不固定)
        // header最后四個字節來標識body
        // 長度 = 16 + body長度
        // 0xda, 0xbb 魔數


        int size = message.readableBytes();
        int counter = size / PACKET_SIZE;
        for (int i = 0; i < counter; i++) {
            byte[] request = new byte[PACKET_SIZE];
            // 每次從總的消息中讀取220個字節的數據
            message.readBytes(request);

            // 將拆分后的結果放入out列表中,交由后面的業務邏輯去處理
            out.add(Unpooled.copiedBuffer(request));
        }

        // 3、多余的報文存起來
        // 第一個報文: i+  暫存
        // 第二個報文: 1 與第一次
        size = message.readableBytes();
        if (size != 0) {
            System.out.println("多余的數據長度:" + size);
            // 剩下來的數據放到tempMsg暫存 留到下次再進行合並
            tempMsg.clear();
            tempMsg.writeBytes(message.readBytes(size));
        }
    }
}

上面的處理不適用復雜的現實場景,Netty提供了大量的現成的編解碼工具,我們一般使用這些工具

 

三、使用websocket

websocket協議是基於TCP的一種新的網絡協議。

它的出現實現了瀏覽器與服務器雙全工(full-duplex)通信:允許服務器主動發送信息給客戶端。

半雙工:服務器不能主動響應瀏覽器,只能等待請求后再響應。

多客戶端多語言多服務器支持:瀏覽器、php、Java、ruby、nginx、python、Tomcat、erlang、.net等等

 

 

 

代碼示例

public final class WebSocketServer {

    static int PORT = 9000;

    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .childHandler(new WebSocketServerInitializer())
                    .childOption(ChannelOption.SO_REUSEADDR, true);
            for (int i = 0; i < 100; i++) { // 綁定100個端口
                b.bind(++PORT).addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if ("true".equals(System.getProperty("netease.debug")))
                            System.out.println("端口綁定完成:" + future.channel().localAddress());
                    }
                });
            }

            // 端口綁定完成,啟動消息隨機推送(測試)
            TestCenter.startTest();

            System.in.read();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

開啟端口的復用ChannelOption.SO_REUSEADDR,這是底層的TCP的參數,和我們代碼無關

 

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        //  職責鏈, 數據處理流程
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec()); // 轉為http請求
        pipeline.addLast(new HttpObjectAggregator(65536)); // 最大數據量
        pipeline.addLast(new WebSocketServerHandler()); // websocket握手,處理后續消息
        pipeline.addLast(new NewConnectHandler());
    }
}

 

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private static final String WEBSOCKET_PATH = "/websocket";

    private WebSocketServerHandshaker handshaker;

    public static final LongAdder counter = new LongAdder();

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) {
        counter.add(1);
        if (msg instanceof FullHttpRequest) {
            // 處理websocket握手
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            // 處理websocket后續的消息
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // Handle a bad request. //如果http解碼失敗 則返回http異常 並且判斷消息頭有沒有包含Upgrade字段(協議升級)
        if (!req.decoderResult().isSuccess() || req.method() != GET || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }

        // 構造握手響應返回
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            // 版本不支持
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
            ctx.fireChannelRead(req.retain()); // 繼續傳播
        }
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // Check for closing frame 關閉
        if (frame instanceof CloseWebSocketFrame) {
            Object userId = ctx.channel().attr(AttributeKey.valueOf("userId")).get();
            TestCenter.removeConnection(userId);
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) { // ping/pong作為心跳
            System.out.println("ping: " + frame);
            ctx.write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (frame instanceof TextWebSocketFrame) {
            // Echo the frame
            // TODO 處理具體的數據請求(... 雲課堂聊天室,推送給其他的用戶)
            //發送到客戶端websocket
            ctx.channel().write(new TextWebSocketFrame(((TextWebSocketFrame) frame).text()
                    + ", 歡迎使用Netty WebSocket服務, 現在時刻:"
                    + new java.util.Date().toString()));

            return;
        }
        // 不處理二進制消息
        if (frame instanceof BinaryWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
        }
    }

    private static void sendHttpResponse(
            ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // Generate an error page if response getStatus code is not OK (200).
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpUtil.setContentLength(res, res.content().readableBytes());
        }

        // Send the response and close the connection if necessary.
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    private static String getWebSocketLocation(FullHttpRequest req) {
        String location = req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH;
        return "ws://" + location;
    }
}

 

// 新連接建立了
public class NewConnectHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
        // 解析請求,判斷token,拿到用戶ID。
        Map<String, List<String>> parameters = new QueryStringDecoder(req.uri()).parameters();
        // String token = parameters.get("token").get(0);  不是所有人都能連接,比如需要登錄之后,發放一個推送的token
        String userId = parameters.get("userId").get(0);
        ctx.channel().attr(AttributeKey.valueOf("userId")).getAndSet(userId); // channel中保存userId
        TestCenter.saveConnection(userId, ctx.channel()); // 保存連接

        // 結束
    }
}

保存到TestCenter

// 正常情況是,后台系統通過接口請求,把數據丟到對應的MQ隊列,再由推送服務器讀取
public class TestCenter {
    // 此處假設一個用戶一台設備,否則用戶的通道應該是多個。
    // TODO 還應該有一個定時任務,用於檢測失效的連接(類似緩存中的LRU算法,長時間不使用,就拿出來檢測一下是否斷開了);
    static ConcurrentHashMap<String, Channel> userInfos = new ConcurrentHashMap<String, Channel>();

    // 保存信息
    public static void saveConnection(String userId, Channel channel) {
        userInfos.put(userId, channel);
    }

    // 退出的時候移除掉
    public static void removeConnection(Object userId) {
        if (userId != null) {
            userInfos.remove(userId.toString());
        }
    }

    final static byte[] JUST_TEST = new byte[1024];

    public static void startTest() {
        // 發一個tony吧
        System.arraycopy("tony".getBytes(), 0, JUST_TEST, 0, 4);
        final String sendmsg = System.getProperty("netease.server.test.sendmsg", "false");
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
            try {
                // 壓力測試,在用戶中隨機抽取1/10進行發送
                if (userInfos.isEmpty()) {
                    return;
                }
                int size = userInfos.size();
                ConcurrentHashMap.KeySetView<String, Channel> keySetView = userInfos.keySet();
                String[] keys = keySetView.toArray(new String[]{});
                System.out.println(WebSocketServerHandler.counter.sum() + " : 當前用戶數量" + keys.length);
                if (Boolean.valueOf(sendmsg)) { // 是否開啟發送
                    for (int i = 0; i < (size > 10 ? size / 10 : size); i++) {
                        // 提交任務給它執行
                        String key = keys[new Random().nextInt(size)];
                        Channel channel = userInfos.get(key);
                        if (channel == null) {
                            continue;
                        }
                        if (!channel.isActive()) {
                            userInfos.remove(key);
                            continue;
                        }
                        channel.eventLoop().execute(() -> {
                            channel.writeAndFlush(new TextWebSocketFrame(new String(JUST_TEST))); // 推送1024字節
                        });

                    }
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }, 1000L, 2000L, TimeUnit.MILLISECONDS);
    }
}

瀏覽器測試

<!-- saved from url=(0022)http://127.0.0.1:8080/ -->
<html><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><title>Web Socket Test</title></head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
  window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
  // 隨機數
  var random = Math.floor(Math.random()*(10000 - 10 +1) + 10)
  socket = new WebSocket("ws://127.0.0.1:9001/websocket?userId=" + random);
  socket.onmessage = function(event) {
    var ta = document.getElementById('responseText');
    ta.value = ta.value + '\n' + event.data
  };
  socket.onopen = function(event) {
    var ta = document.getElementById('responseText');
    ta.value = "Web Socket opened!";
  };
  socket.onclose = function(event) {
    var ta = document.getElementById('responseText');
    ta.value = ta.value + "Web Socket closed"; 
  };
} else {
  alert("Your browser does not support Web Socket.");
}

function send(message) {
  if (!window.WebSocket) { return; }
  if (socket.readyState == WebSocket.OPEN) {
    socket.send(message);
  } else {
    alert("The socket is not open.");
  }
}
</script>
<form onsubmit="return false;">
<input type="text" name="message" value="Hello, World!"><input type="button" value="Send Web Socket Data" onclick="send(this.form.message.value)">
<h3>Output</h3>
<textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form>


</body></html>

 

瀏覽器扛不住巨量的請求,使用Java客戶端進行測試

public final class WebSocketClient {


    public static void main(String[] args) throws Exception {
        final String host = System.getProperty("netease.pushserver.host", "127.0.0.1");
        final String maxSize = System.getProperty("netease.client.port.maxSize", "100");
        final String maxConnections = System.getProperty("netease.client.port.maxConnections", "60000");
        int port = 9001;

        EventLoopGroup group = new NioEventLoopGroup();
        try {

            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_REUSEADDR, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new HttpClientCodec());
                    p.addLast(new HttpObjectAggregator(8192));
                    p.addLast(WebSocketClientCompressionHandler.INSTANCE);
                    p.addLast("webSocketClientHandler", new WebSocketClientHandler());
                }
            });
            // tcp 建立連接
            for (int i = 0; i < 100; i++) { // 服務端有100個端口,發起對100個端口反復的連接
                for (int j = 0; j < 60000; j++) { // 每個端口6萬次連接
                    b.connect(host, port).sync().get();
                }
                port++;
            }
            System.in.read();
        } finally

        {
            group.shutdownGracefully();
        }
    }
}

 

// handler 處理多個事件~ 包括tcp連接建立之后的事件
// open websocket
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketClientHandshaker handshaker;
    private ChannelPromise handshakeFuture;

    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        handshakeFuture = ctx.newPromise();
    }

    static AtomicInteger counter = new AtomicInteger(0);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        if (handshaker == null) {
            InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
            URI uri = null;
            try {
                uri = new URI("ws://" + address.getHostString() + ":" + address.getPort() + "/websocket?userId=" + counter.incrementAndGet());
            } catch (Exception e) {
                e.printStackTrace();
            }
            handshaker = WebSocketClientHandshakerFactory.newHandshaker(
                    uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
        }
        handshaker.handshake(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client disconnected!");
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (!handshaker.isHandshakeComplete()) {
            try {
                handshaker.finishHandshake(ch, (FullHttpResponse) msg);
                if ("true".equals(System.getProperty("netease.debug")))
                    System.out.println("WebSocket Client connected!");
                handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException e) {
                if ("true".equals(System.getProperty("netease.debug")))
                    System.out.println("WebSocket Client failed to connect");
                handshakeFuture.setFailure(e);
            }
            return;
        }

        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            throw new IllegalStateException(
                    "Unexpected FullHttpResponse (getStatus=" + response.status() +
                            ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        }

        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            if ("true".equals(System.getProperty("netease.debug")))
                System.out.println("WebSocket Client received message: " + textFrame.text());
        } else if (frame instanceof PongWebSocketFrame) {
            if ("true".equals(System.getProperty("netease.debug")))
                System.out.println("WebSocket Client received pong");
        } else if (frame instanceof CloseWebSocketFrame) {
            if ("true".equals(System.getProperty("netease.debug")))
                System.out.println("WebSocket Client received closing");
            ch.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }
}

 

網絡四元組:客戶端IP,服務端IP,客戶端端口,服務端端口,這四元組規定了一個連接

客戶端端口數量有限,服務器只有一個端口的情況下,同一個客戶端只能對它發送6萬多個連接,

服務器開啟多個接口,服務器每開啟一個端口,客戶端可增加6萬多連接

上面的測試環境,為了增加連接容納量,服務端和客戶端都開啟了端口復用

 

 

 

打包上傳服務器 服務端6G4核,客戶端6G2核

運行服務端程序

 

 運行客戶端程序

 

 客戶端的端口是操作系統分配好的,也可以自己指定分配區間

 

報錯

 

 文件描述符

 

 open files太小了,調參數

 

 

 

 允許100萬個文件描述符

重新登陸,生效

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM