netty 學習筆記三:大躍進,使用 netty 實現 IM 即時通訊系統


前言:

本文成文基於掘金小冊《Netty 入門與實戰:仿寫微信 IM 即時通訊系統》:https://juejin.im/book/5b4bc28bf265da0f60130116

掘金上有些小冊還是不錯,這篇 netty 也是我18年買了,19年再看才能大部分吸收的內容(之前就是菜啊,還能是什么)。推薦想學習 netty 漲知識的小伙伴購入,中級以上水平應該 3-5天即可消化里面的內容,初級水平的同學可能需要多看幾遍。

 

正文:

本節在《netty 學習筆記二》之上進行了一段大躍進,因此在本節你將會一股腦看到 netty自定義協議設計、數據載體 ByteBuf API、通信協議編解碼、pipeline 結構、ChannelHandler 生命周期 和 熱插拔效果、單聊和群聊實現、心跳與空閑檢測、netty IM 系統的性能優化等等內容。由於 netty 已經造了很多好用的輪子,如粘包拆包處理器、空閑檢測處理器、通用編解碼器等,我們只需要配置一些構造參數,基本上就可以足夠使用,不用再重復造輪子了。

代碼已經上傳到我的 github:https://github.com/christmad/code-share/tree/master/netty-group-chat

 

群聊最終效果——>>>

服務端:

客戶端-老王:

客戶端-隔壁老王:

客戶端-盤古:

 

 自定義協議設計:

 

一種通用的通信協議設計:
  1. 魔數(magic number)
    魔數作為第一個字段,通常情況下為固定的幾個字節,我們可以規定為 4個字節。值一般設定為不容易被猜到的。
    魔數可以認為是一種顯示的起始標志,在 java 的二進制文件中以魔數 0xcafebabe 作為開頭,有異曲同工之妙。
    在源源不斷的網絡包中,起始標志可以減少錯誤率,迅速找出正確的包。
    在編程中,magic number 也用來描述不使用變量名而直接使用數字的編程習慣,直接使用數字通常會引起歧義。
  2. 版本號
    通常是預留字段, IP 協議中也有一個 version 字段用來標識 IP 協議的版本是 IPv4 或 IPv6。
  3. 序列化算法
    序列化算法,是指如何把對象轉為二進制數據,以及把二進制數據轉為對象,此處是 java 對象。
    比如 java 自帶的序列化算法,json,hessian 等序列化方式。
    規定一個字節,可以表示 256 種算法,足夠用了。
  4. 指令
    比如 IM即時通信系統中客戶端登錄、聊天等指令。
    對於 IM系統 ,可以規定 1個字節,可以表示 256 種指令,完全夠用。
  5. 數據長度
    規定4個字節。
  6. 數據內容
    變長 N 字節,具體內容序列化后可以占不同的長度。

目前除了版本號外,這里設計的每一個字段在 ChannelHandler 里都提現出來了。魔數對應 IMProtocolSplitter(同時完成了服務端拆包工作);序列化算法對應 UltimatePacketCodecHandler;指令對應 IMHandler;數據對應了具體類型的 Packet。序列化算法采用了 alibaba 的 FASTJSON,JSON 也是前端大量在使用的一種序列化方式。

有了自定義協議設定,編碼時只要逐字段按照協議拼裝字節即可,通常我們的 java 對象使用 FASTJSON 序列化后會塞到“數據”字段里。解碼時比較關鍵的兩個字段是“指令”和“數據”,根據“指令”類型獲取對應的 Class 類型,然后獲取序列化過的 byte[],最后 FASTJSON API 使用這兩個參數進行反序列化。 

 

數據載體 ByteBuf API:

Bytebuf 數據結構如下圖:

API 這塊最終每個人都有自己不同的熟悉程度,不好談。只講兩點:

1. get/set 方法不會改變 讀寫指針,而 read/write 方法會改變讀寫指針。

2. 如果遇到內存緊張的問題,一定是沒有釋放內存。netty 某些 decoder 會自動釋放內存,但如果假設我這個項目中用的 MessageToMessageCodec 底層沒有幫我們管理內存而導致內存泄漏,我們就應該自己在程序中手動釋放內存。對應的方法是 ByteBuf#release(),它將會把 ByteBuf 引用計數減 1,減到 0 時表示能被回收。默認申請完一塊 ByteBuf 默認計數為 1。對應的增加計數的方法為 retain(),在 slice()、duplicate() 場景下會用到。

 

ConsoleCommand 與 UI:

由於本次實現的 netty IM系統 server端 和 client端都由 java 實現,而我們的 client端使用了控制台來實現。因此代碼中的每一種 ConsoleCommand 就對應了實際項目中的 UI 控件按鈕,如 createGroup、listGroupMembers 等“一級指令”就對應一個個 UI 上的按鈕。構造了一個 ConsoleCommandManager 方便聚合所有的“二級指令”,這個功用和 IMHandler 有點相似,都可以簡化多層 if else 代碼。

 

心跳與空閑檢測:

網絡應用程序普遍會遇到的一個問題:連接假死

連接假死的現象是:在某一端(服務端或者客戶端)看來,底層的 TCP 連接已經斷開了,但是應用程序並沒有捕獲到,因此會認為這條連接仍然是存在的,從 TCP 層面來說,只有收到四次握手數據包或者一個 RST 數據包,連接的狀態才表示已斷開。

連接假死會帶來以下兩大問題

1. 對於服務端來說,因為每條連接都會耗費 cpu 和內存資源,大量假死的連接會逐漸耗光服務器的資源,最終導致性能逐漸下降,程序崩潰。
2. 對於客戶端來說,連接假死會造成發送數據超時,影響用戶體驗。

通常,連接假死由以下幾個原因造成的:

a. 應用程序出現線程堵塞,無法進行數據的讀寫
b. 客戶端或者服務端網絡相關的設備出現故障,比如網卡,機房故障
c. 公網丟包。公網環境相對內網而言,非常容易出現丟包,網絡抖動等現象,如果在一段時間內用戶接入的網絡連續出現丟包現象,那么對客戶端來說數據一直發送不出去,而服務端也是一直收不到客戶端來的數據,連接就一直耗着

連接假死的應對策略就是空閑檢測

空閑檢測指的是每隔一段時間,檢測這段時間內是否有數據讀寫,簡化一下,我們的服務端只需要檢測一段時間內,是否收到過客戶端發來的數據即可,Netty 自帶的 IdleStateHandler 就可以實現這個功能
PS:這個問題上服務端和客戶端的策略是一樣的

服務端在一段時間內沒有收到客戶端的數據,這個現象產生的原因可以分為以下兩種:(從客戶端角度看也是類似的)

1. 連接假死。
2. 非假死狀態下確實沒有發送數據
只需要排查第二種情況。使用 Netty 自帶的 IdleStateHandler 就可以實現這個功能,見代碼 IMIdleStateHandler.java:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/handler/IMIdleStateHandler.java

Netty IM 即時通訊系統優化:

優化通常指在服務端優化,服務端單機可能會面對十幾萬甚至幾十萬連接,需要進行一些對象碎片管理、優化(縮短)調用鏈(netty 中叫做 縮短事件傳播路徑)、阻塞方法優化等。

1. 共享 handler

在 ServerBootstrap 的 childHandler() 方法中,ChannelInitializer 類的 initChannel 邏輯是:每次有新連接到來的時候,都會調用 ChannelInitializer 的 initChannel() 方法,然后把我們添加的 ChannelHandler 都 new 一次,插入到 channel pipeline 中。

仔細觀察這些 handler ,它們方法中是沒有成員變量的,也就是無狀態的,因此可以用單例模式來優化這些實例。在單機十幾萬甚至幾十萬連接的情況下,單例使得性能得到一定程度提升,創建的小對象也大大減少了。

然后重要的一點是,在 netty 中聲明一個 ChannelHandler 是共享的,需要使用注解 @ChannelHandler.Sharable 來告訴 netty 這個 handler 是可以被多個 channel 共享的。

在沒有單例優化前,你的 ChannelInitializer # initChannel() 方法可能是這樣的:

 1 serverBootstrap
 2                 .childHandler(new ChannelInitializer<NioSocketChannel>() {
 3                     protected void initChannel(NioSocketChannel ch) {
 4                         ch.pipeline().addLast(new Spliter());
 5                         ch.pipeline().addLast(new PacketDecoder());
 6                         ch.pipeline().addLast(new LoginRequestHandler());
 7                         ch.pipeline().addLast(new AuthHandler());
 8                         ch.pipeline().addLast(new MessageRequestHandler());
 9                         ch.pipeline().addLast(new CreateGroupRequestHandler());
10                         ch.pipeline().addLast(new JoinGroupRequestHandler());
11                         ch.pipeline().addLast(new QuitGroupRequestHandler());
12                         ch.pipeline().addLast(new ListGroupMembersRequestHandler());
13                         ch.pipeline().addLast(new GroupMessageRequestHandler());
14                         ch.pipeline().addLast(new LogoutRequestHandler());
15                         ch.pipeline().addLast(new PacketEncoder());
16                     }
17                 });
initChannel

使用單例改造后,ChannelInitializer # initChannel() 方法是這樣的:

serverBootstrap
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            protected void initChannel(NioSocketChannel ch) {
                ch.pipeline().addLast(new Spliter());
                ch.pipeline().addLast(new PacketDecoder());
                ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
                ch.pipeline().addLast(AuthHandler.INSTANCE);
                ch.pipeline().addLast(MessageRequestHandler.INSTANCE);
                ch.pipeline().addLast(CreateGroupRequestHandler.INSTANCE);
                ch.pipeline().addLast(JoinGroupRequestHandler.INSTANCE);
                ch.pipeline().addLast(QuitGroupRequestHandler.INSTANCE);
                ch.pipeline().addLast(ListGroupMembersRequestHandler.INSTANCE);
                ch.pipeline().addLast(GroupMessageRequestHandler.INSTANCE);
                ch.pipeline().addLast(LogoutRequestHandler.INSTANCE);
                ch.pipeline().addLast(new PacketEncoder());
            }
        });
initChannel 單例改造

另外,需要注意的是,Splitter 不能被共享。雖然看起來我們的 Splitter 方法內也沒有引用任何成員變量,但也許是因為每個連接都要維護自己的 ByteBuf,因此 Splitter 繼承了 拆包器-LengthFieldBasedFrameDecoder 之后由於父類的有狀態而導致 Splitter 也有狀態了。如果你不信,可以強行試試把 Splitter 改造成單例。最后你會發現,控制台會輸出一個錯誤。debug 后你會看到在 Splitter 某個父類中的構造器是這樣的:

    protected ByteToMessageDecoder() {
        ensureNotSharable();
    }

這已經在告訴你不能把 ByteToMessageDecoder 和 它的派生子類設為共享 handler。我的 netty 版本用的是 4.1.24.final,而在這之前的一些版本中 ensureNotSharable() 方法還並不是在 ChannelHandler 繼承體系中的一個方法,是用了某種 Util 工具來存放這個方法。不過重點是,我們知道運行起來效果是一樣的。

2. 壓縮 handler

 2.1 合並編解碼器

Netty 內部提供了一個類,叫做 MessageToMessageCodec,使用它可以讓我們的編解碼操作放到一個類里面去實現。並且這個 codec 也是可以共享的。詳情見代碼 UltimatePacketCodecHandler.java:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/handler/UltimatePacketCodecHandler.java

 1 @ChannelHandler.Sharable
 2 public class UltimatePacketCodecHandler extends MessageToMessageCodec<ByteBuf, Packet> {
 3     public static final UltimatePacketCodecHandler INSTANCE = new UltimatePacketCodecHandler();
 4 
 5     private UltimatePacketCodecHandler() {}
 6 
 7     @Override
 8     protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) {
 9         // 使用 channel 上的 ByteBuf alloc,方便 netty 管理內存
10         ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();
11         out.add(PacketCodec.INSTANCE.encode(byteBuf, packet));
12     }
13 
14     @Override
15     protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
16         out.add(PacketCodec.INSTANCE.decode(buf));
17     }
18 }

2.2 合並平行 handler——縮短事件傳播路徑

對我們的 IM 應用來說,每次從控制台(對應一個UI按鈕)只會傳一個指令到服務器,並且這個指令只會被某一個 handler 處理,因此這些指令 handler 有一個“平行”的概念。我們可以將這些平行的 handler 壓縮為一個 handler,如 IMRequestHandler 所示:

 1 @ChannelHandler.Sharable
 2 public class IMRequestHandler extends SimpleChannelInboundHandler<Packet> {
 3 
 4     public static final IMRequestHandler INSTANCE = new IMRequestHandler();
 5 
 6     private Map<Byte, SimpleChannelInboundHandler<? extends Packet>> channelMap;
 7 
 8     private IMRequestHandler() {
 9         channelMap = new HashMap<>();
10         // 將指令類型 和 request handler 做映射
11         channelMap.put(Command.MESSAGE_REQUEST, MessageRequestHandler.INSTANCE);
12         channelMap.put(Command.LOGIN_REQUEST, LoginRequestHandler.INSTANCE);
13         channelMap.put(Command.LOGOUT_REQUEST, LogoutRequestHandler.INSTANCE);
14         channelMap.put(Command.CREATE_GROUP_REQUEST, CreateGroupRequestHandler.INSTANCE);
15         channelMap.put(Command.JOIN_GROUP_REQUEST, JoinGroupRequestHandler.INSTANCE);
16         channelMap.put(Command.LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestHandler.INSTANCE);
17         channelMap.put(Command.QUIT_GROUP_REQUEST, QuitGroupRequestHandler.INSTANCE);
18         channelMap.put(Command.GROUP_MESSAGE_REQUEST, GroupMessageRequestHandler.INSTANCE);
19     }
20 
21     @Override
22     protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {
23         SimpleChannelInboundHandler<? extends Packet> simpleChannelInboundHandler = channelMap.get(msg.getCommand());
24         if (simpleChannelInboundHandler != null) {
25             // 只關心能處理的類型
26             simpleChannelInboundHandler.channelRead(ctx, msg);
27         }
28     }
29 }

 

再看看代碼中的 IMResponseHandler:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/client/handler/IMResponseHandler.java

其實客戶端沒有必要進行這種程度的優化,不過可以再次感受一下 netty 給我們帶來的編碼上的方便。

3. 減少阻塞主線程的操作

通常我們的應用會涉及數據庫或網絡操作,比如在 LoginRequestHandler 中,實際上在 valid() 或 checkUser() 方法中做的事情是把用戶名和密碼拿到數據庫或某個網絡中間件里面去進行比較,而例子中我只是粗暴驗證直接返回 true 並簡單的生成一個 userId 返回了。實際場景如下:

1 protected void channelRead0(ChannelHandlerContext ctx, T packet) {
2     // 1. balabala 一些邏輯
3     // 2. 數據庫或者網絡等一些耗時的操作
4     // 3. writeAndFlush()
5     // 4. balabala 其他的邏輯
6 }

對於第2個過程中的耗時操作,通常不會直接這樣寫。為什么?先來看看 netty 一條 NIO 線程的處理邏輯抽象:

1 List<Channel> channelList = 已有數據可讀的 channel
2 for (Channel channel in channelist) {
3    for (ChannelHandler handler in channel.pipeline()) {
4        handler.channelRead0(ctx, msg);
5    } 
6 }

當我們執行 NioEventLoopGroup worker = new NioEventLoopGroup(); 這行代碼時,netty 默認會啟動 2倍 CPU 核數的 NIO 線程,在單機大量連接(幾萬甚至十幾萬以上)情況下, 一條 NIO 線程管理着幾千條甚至上萬條連接。如果在某個連接上執行 channelRead0() 時發生阻塞,最終都會拖慢綁定在該 NIO 線程上的其他 channel 的執行速度。

這時我們應該把耗時操作扔到業務線程池中去處理,處理邏輯如 LoginRequestHandler.java 中代碼所示:https://github.com/christmad/code-share/blob/master/netty-group-chat/src/main/java/code/christ/netty/server/handler/LoginRequestHandler.java,偽代碼如下:

 1 ThreadPool threadPool = xxx;
 2 
 3 protected void channelRead0(ChannelHandlerContext ctx, T packet) {
 4     threadPool.submit(new Runnable() {
 5         // 1. balabala 一些邏輯
 6         // 2. 數據庫或者網絡等一些耗時的操作
 7         // 3. writeAndFlush()
 8         // 4. balabala 其他的邏輯
 9     });
10 }

 

最后,其他小細節就不在本篇里長篇大論了,以后應該會收集一個系列來專門記錄編程里的小技巧。很多功能也沒有在這個版本里一並實現,比如消息的存儲,需要加上數據庫。以及“模擬打開聊天窗口”時查看最近的一些消息等。參考 QQ 最近這幾年的變化,打開聊天窗口加載的消息數量變少了,如果有關注的話應該會對這個變化有印象,之前的一些版本中打開窗口就能看到之前聊過的十幾行消息,后面慢慢變成幾行,目前(2019-11-07)打開窗口只能看三行了。只要有時間,這些功能都是可以添加的。比如消息存儲和歷史消息這塊,先有消息存儲后,后續就可以做一個 7天內、3天內的、當天的N條 等不同級別的歷史消息緩存級別。后面有空我也會繼續持續完善這個 IM 系統的功能,畢竟這是我興趣的項目之一。

有緣下篇博客見,See ya~~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM