netty實現消息中心(一)思路整理


一、需求

需要實現直播間的以下功能:

  • 群發消息(文本、圖片、推薦商品)
  • 點對點私發消息(文本、圖片、推薦商品)
  • 單個用戶禁言
  • 全體用戶禁言
  • 撤回消息
  • 聊天記錄持久化

二、技術實現

服務端消息中心采用netty實現,
微站、小程序使用websocket與消息中心通信,
安卓端使用netty與消息中心通信。

服務器端每過一定時間會給客戶端推送一條ping消息,客戶端收到ping消息后回復pong消息,通過心跳驗證存活客戶端,定時斷開未回復pong消息的鏈接,剔除服務端連接會話信息。
客戶端因網絡等問題斷開鏈接后,客戶端需要實現定時重連機制,先設定斷開后每5秒嘗試一次重連,這個時間后續可能會修改。

服務端鏈接地址:ws://{ip}:{端口}/websocket?liveId=123&userId=00b084ea98e24e80a7f8be3c4b8a64d0
liveId為直播id,userId為用戶id

1.消息格式
消息為json格式字符串,有如下屬性:

字段名 類型 含義 客戶端是否必填
id string uuid,服務端生成 不填
liveId string 直播id 必填
code int 系統消息類型 必填
type int 業務消息類型 業務消息必填,心跳消息不填
msg string 消息內容 必填,心跳消息不填
sendUserId string 發送人用戶id 必填
sendUserName string 發送人用戶名 不填,服務器端返回
sendUserHeadImg string 發送人頭像 不填,服務器端返回
receiveUserId string[] 接收人用戶id數組 code為群發時無需填寫,私聊需要填寫
sendTime string 發送時間,服務端生成,格式:yyyy-MM-dd HH:mm:ss 不填
ext string 擴展信息 選填

消息code定義

code 含義
1 點對點
2 群發
3 ping消息,服務器端心跳發送到客戶端
4 pong消息,客戶端收到ping消息后回應pong消息

消息type定義

type 含義
1001 普通文本消息
1002 圖片消息
1003 推薦商品消息
1004 單個用戶禁言消息
1005 全體用戶禁言消息
1006 撤回用戶發言消息
1007 打賞消息

pong消息示例:
{
"code":4,
"liveId":"asfasda",
"sendUserId":"sfasdasdasd"
}

a.普通文本消息示例:
{
"id":"fasdasdasd",
"liveId":"fasdasdas",
"code":2,
"msg":"你好啊",
"sendUserId":"這是發送人的用戶id",
"sendUserName":"asfasdas",
"sendUserHeadImg":"fasdasdasdsad.jpg",
"receiveUserId":"這是接收人的用戶id",
"type":1001,
"sendTime":133548798798,
"ext":"{}"
}

b.帶表情的普通文本消息示例:
{
"id":"fasdasdasd",
"liveId":"fasdasdas",
"code":10001,
"msg":"你好啊[微笑]",
"sendUserId":"這是發送人的用戶id",
"sendUserName":"asfasdas",
"sendUserHeadImg":"fasdasdasdsad.jpg",
"receiveUserId":"這是接收人的用戶id",
"type":1,
"sendTime":133548798798,
"ext":"{}"
}
備注:[微笑]為微笑表情圖片的字符串標識,客戶端收到這條消息后需要把表情標識替換為圖片顯示到聊天窗口。

c.圖片消息示例:
先把圖片文件進行客戶端壓縮,盡量控制到1M以內,然后調用上傳圖片接口得到圖片相對路徑,如果上傳成功,組裝websocket消息把路徑放入msg:
{
"id":"fasdasd",
"liveId":"fasdasdas",
"code":2,
"msg":"/static/img/chat/2020-02-11/xxx.jpg",
"sendUserId":"這是發送人的用戶id",
"sendUserName":"asfasdas",
"sendUserHeadImg":"fasdasdasdsad.jpg",
"receiveUserId":"這是接收人的用戶id",
"type":1002,
"sendTime":133548798798,
"ext":"{}"
}

d.推薦商品消息示例:
推薦商品的msg字段為商品信息json
{
"id":"asfasdsdads",
"liveId":"fasdasdas",
"code":2,
"msg":"{"productId":"asdasfas","productType":1,"originPrice":100,"currentPrice":100,"productName":"sfasd", "coverImg":"asfasdasd.jpg"}",
"sendUserId":"這是發送人的用戶id",
"sendUserName":"asfasdas",
"sendUserHeadImg":"fasdasdasdsad.jpg",
"receiveUserId":"這是接收人的用戶id",
"type":1003,
"sendTime":133548798798
}

e.打賞消息示例:
{
"id":"fasdasd",
"liveId":"fasdasdas",
"code":2,
"msg":"/static/img/chat/2020-02-11/xxx.jpg",
"sendUserId":"這是發送人的用戶id",
"receiveUserId":"",
"type":1007,
"sendTime":133548798798,
"ext":"{"name":"被打賞用戶的用戶名", "price":100}"
}

f.單用戶禁言消息示例:
{
“id”:”fasdasd”,
“liveId”:”fasdasdas”,
“code”:2,
“msg”:”被禁言用戶的id”,
“sendUserId”:”這是發送人的用戶id”,
“receiveUserId”:”這是接收人的用戶id”,
“type”:1004,
“sendTime”:133548798798,
“ext”:"{'name':'被禁言的用戶名'}”
}

f.單用戶取消禁言消息示例:
{
“id”:”fasdasd”,
“liveId”:”fasdasdas”,
“code”:2,
“msg”:”被取消禁言用戶的id”,
“sendUserId”:”這是發送人的用戶id”,
“receiveUserId”:”這是接收人的用戶id”,
“type”:1010,
“sendTime”:133548798798,
“ext”:"{'name':'被禁言的用戶名'}”
}

f.撤回消息示例:
{
“id”:”fasdasd”,
“liveId”:”fasdasdas”,
“code”:2,
“msg”:”被撤銷的消息id”,
“sendUserId”:”這是發送人的用戶id”,
“receiveUserId”:”這是接收人的用戶id”,
“type”:1006,
“sendTime”:133548798798,
“ext”:””
}

2.消息交互流程
image.png

接收消息步驟:

a.當用戶打開直播介紹頁面時,向注冊中心發起建立連接請求,監聽消息中心推送過來的消息。

b.當消息中心推送過來的消息觸發了監聽事件函數,判斷type是普通文本消息、系統消息、推薦商品消息其中的某一種,然后執行對應的邏輯處理與展示。

發送消息步驟:

a.當用戶打開直播介紹頁面時,向注冊中心發起建立連接請求,監聽消息中心推送過來的消息。

b.如果是普通聊天文本消息或系統消息按約定好的消息格式組裝好消息json,如果是圖片消息則先調用圖片上傳接口得到url並把url組裝到消息json,調用websocket或者netty sdk向消息中心發送消息。

撤回消息步驟:

a.助教端、ibos點擊撤回時,組裝一條type為撤回、code為群發類型、msg字段為需要撤回的消息id的消息,發送到消息中心。

b.消息中心收到這條消息后,將mongodb中的消息狀態改為撤回,並廣播撤回消息到所有客戶端。

c.被廣播的客戶端收到這條消息后,按消息id隱藏對應的消息。

單個用戶禁言步驟

a.助教端、ibos點擊用戶禁言時,組裝一條type為禁言、code為點對點私聊類型、msg字段為需要禁言的用戶id的消息,發送到消息中心。

b.消息中心收到這條消息后,將用戶的禁言狀態持久化到數據庫,並在緩存中記錄直播和用戶的禁言關系。如果這個用戶的客戶端還保持會話連接,點對點推送這個被禁言用戶的客戶端。

c.被禁言用戶客戶端收到消息后,文本框禁用並提示用戶已被禁言。

備注:
當用戶刷新頁面時,會從數據庫中獲取到最新的禁言狀態並禁用/啟用文本框。極端情況用戶發出消息,消息經過消息中心時會查詢一下緩存中是否有直播用戶禁言關系,有的話該消息不予推送。
取消單個用戶禁言的交互步驟和以上步驟相同,只是消息的type為取消禁言。

全體用戶禁言步驟

a.助教端、ibos點擊全體用戶禁言時,將直播的禁言狀態持久化到數據庫,並在緩存中記錄直播的禁言狀態。組裝一條type為全體禁言、code為廣播的消息,發送到消息中心。

b.消息中心將消息廣播到觀看該直播的所有客戶端。

c.客戶端收到消息后,文本框禁用。

備注:
當用戶刷新頁面時,會從數據庫中獲取到最新的直播禁言狀態並禁用/啟用文本框。極端情況用戶發出消息,消息經過消息中心時會查詢一下緩存中是否有直播禁言狀態,有的話該消息不予推送。
取消全體用戶禁言的交互步驟和以上步驟相同,只是消息的type為取消全體禁言。

3.消息存儲

在消息中心所在服務器的本地內存加一個隊列作為緩沖區,經過消息中心的聊天記錄會追加到緩沖區。開啟異步任務定時檢查緩沖區是否達到閾值,達到緩沖區閾值后批量存儲到mongodb聊天記錄表。

mongodb表結構

字段名 含義
id 主鍵
zbId 直播id
msgId 消息id
sendUserId 發送人id
receiveUserId 接收人id,多個逗號分隔
sendTime 發送時間
code 消息發送類型
type 消息類型
msg 消息內容
ext 消息擴展內容
createTime 保存時間

4.鑒權

在消息中心對用戶的發言狀態做驗證。

5.擴展
如果需要支持熱部署和擴容,需要解決如下的坑:
1.消息中心的客戶端會話管理代碼需要改造成分布式會話管理,例如使用redis做存儲,但是連接數過大時網絡請求傳輸的數據量也會增大,不妥。

2.集群需要解決負載均衡問題,目前只能做客戶端負載均衡,無法像nginx轉發http請求一樣實現服務端負載均衡。

3.目前市面上沒有成熟標准的解決以上問題的方案,需要自己結合一些零散的思路做一些嘗試。

網上找到的消息服務集群思路:
如果從自己編程方面考慮socket集群,那么是有困難的。告訴你一個我曾使用過的架構模型。

1、HTTP服務做集群。

2、socket服務器啟用后直接訪問HTTP服務,主動告知有一個新的socket服務,socket服務狀態用中間緩存層保存,具體服務狀態可以使用HTTP心跳輪詢檢測,此部分為socket服務的主動發現、裝載服務、卸載服務。

3、客戶端請求HTTP服務,HTTP服務分析保存在其上的各個socket服務的存活狀態和負載情況,然后返回給客戶端最優的socket服務地址。

4、客戶端獲得最優負載的socket服務地址后連接對應的socket服務。

5、各服務之間的數據交換,可以添加一台socket服務作為socket服務的中轉站,這種方式不太可靠,強依賴於中轉服務的存活狀態。

6、各socket服務的數據必須能保證全局共享,用於客戶端之間數據的共通性,使用戶在感知上就像完全連接在一台socket服務之上。

下一篇為netty實現消息推送的代碼實現:netty實現消息中心(二)基於netty搭建一個聊天室


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM