瘋狂創客圈,一個Java 高並發研習社群 【博客園 總入口 】
瘋狂創客圈,傾力推出: 《Netty Zookeeper Redis 高並發實戰》一書, 面試必備 + 面試必備 + 面試必備
寫在前面
大家好,我是作者尼恩。目前和幾個小伙伴一起,組織了一個高並發的實戰社群【瘋狂創客圈】。正在開始高並發、億級流程的 IM 聊天程序 學習和實戰
順便說明下:
本文的內容只是一個初稿、初稿,本文的知識,在《Netty Zookeeper Redis 高並發實戰》一書時,進行大篇幅的完善和更新,並且進行的源碼的升級。 博客和書不一樣,書的內容更加系統化、全面化,更加層層升入、層次分明、更多次的錯誤排查,請大家以書的內容為准。
本文的最終內容, 具體請參考瘋狂創客圈 傾力編著,機械工業出版社出版的 《Netty Zookeeper Redis 高並發實戰》一書 。
1.1. 快速的能力提升,巨大的應用價值
1.1.1. 飛速提升能力,並且滿足實際開發要求
為什么要開始一個高並發IM的實戰呢?
首先,實戰完成一個分布式、高並發的IM系統,具有相當的技術挑戰性。這一點,對於從事傳統的企業級WEB開發的兄弟來說,相當於進入了一片全新的天地。企業級WEB,QPS峰值可能在1000以內,甚至在100以內,沒有多少技術挑戰性和含金量,屬於重復性的CRUD的體力活。
而一個分布式、高並發的IM系系統,面臨的QPS峰值可能在十萬、百萬、千萬,甚至上億級別。如果此縱深的層次化遞進的高並發需求,直接無極限的考驗着系統的性能。需要不斷的從通訊的協議、到系統的架構進行優化,對技術能力是一種非常極致的考驗和訓練。
其次,不同的QPS峰值規模的IM系統,所處的用戶需求環境是不一樣的。這就造成了不同用戶規模的IM系統,都具有一定的實際需求和市場需要,不一定需要所有的系統,都需要上億級的高並發。但是,作為一個頂級的架構師,就應該具備全棧式的架構能力。需要能適應不同的用戶規模的、差異化的技術場景,提供和架構出和對應的場景相互匹配的高並發IM系統。也就是說,IM系統綜合性相對較強,相關的技術需要覆蓋到滿足各種不同場景的網絡傳輸、分布式協調、分布式緩存、服務化架構等等。
來具體看看高並發IM的應用場景吧。
1.1.2. 越來越多、大量的應用場景
一切高實時性通訊、消息推送的場景,都需要高並發 IM 。
隨着移動互聯網、AI的飛速發展,高性能高並發IM(即時通訊),有着非常廣泛的應用場景。
典型的應用場景如下:私信、聊天、大規模推送、視頻會議、彈幕、抽獎、互動游戲、基於位置的應用(Uber、滴滴司機位置)、在線教育、智能家居等。
尤其是對於APP開發的小伙伴們來說,即時通訊,已經成為大多數APP標配。移動互聯網時代,推送(Push)服務成為App應用不可或缺的重要組成部分,推送服務可以提升用戶的活躍度和留存率。我們的手機每天接收到各種各樣的廣告和提示消息等大多數都是通過推送服務實現的。
隨着5G時代物聯網的發展,未來所有接入物聯網的智能設備,都將是IM系統的客戶端,這就意味着推送服務未來會面臨海量的設備和終端接入。為了支持這些千萬級、億級終端,一定是需要強悍的后台系統。
有這么多的應用場景,對於想成長為JAVA高手的小伙伴們,高並發IM 都繞不開一個話題。
對於想在后台有所成就的小伙伴們來說,高並發IM實戰,更是在終極BOSS PK之前的一場不可或缺的打怪練手。
總之,真刀真槍的完成一個高並發IM的實戰,既可以積累到非常全面的高並發經驗,又可以獲得更多的挑戰機會。
1.2. 高並發架構中的6大集群
1.2.1. 支撐億級流量的IM整體架構
整體的架構如下圖:
主要的集群介紹如下:
(1)Netty 服務集群
主要用來負責維持和客戶端的TCP連接
(2)連接器集群
負責 Netty Server 集群的管理,包括注冊、路由、負載均衡。集群IP注冊和節點ID分配。主要在基於Zookeeper集群提供底層服務,來完成。
(3)緩存集群
負責用戶、用戶綁定關系、用戶群組關系、用戶遠程會話等等數據的緩存。緩存臨時數據、加快讀速度。
(4)DB持久層集群
存在用戶、群組、離線消息等等
(5)消息隊列集群
用戶狀態廣播,群組消息廣播等等。
並沒有完全涉及全部的集群介紹。只是介紹其中的部分核心功能。 如果全部的功能感興趣,請關注瘋狂創客圈的億級流量實戰學習項目。
理論上,以上集群具備完全的擴展能力,進行合理的橫向擴展和局部的優化,支撐億級流量,沒有任何問題。
為什么這么說呢
單體的Netty服務器,遠遠不止支持10萬並發,在CPU 、內存還不錯的情況下,如果配置得當,甚至能撐到100萬級別。所以,通過合理的高並發架構,能夠讓系統動態擴展到成百上千的Netty節點,支撐億級流量,是沒有任何問題的。
單體的Netty服務器,如何支撐100萬高並發,請查詢瘋狂創客圈社群的文章《Netty 100萬級高並發服務器配置》
1.2.2. IM通訊協議介紹
IM通訊協議,屬於數據交換協議。IM系統的客戶端和服務器節點之間,需要按照同一種數據交換協議,進行數據的交換。
數據交換協議的功能,簡單的說:就是規定網絡中的字節流數據,如何與應用程序需要的結構化數據相互轉換。
數據交換協議主要的工作分為兩步:結構化數據到二進制數據的序列化和反序列化。
數據交換協議按序列化類型:分為文本協議和二進制協議。
常見的文本協議包括XML、JSON。文本協議序列化之后,可讀性好,便於調試,方便擴展。但文本協議的缺點在於解析效率一般,有很多的冗余數據,這一點主要體現在XML格式上。
常見的二進制協議包括PrototolBuff、Thrift,這些協議都自帶了數據壓縮,編解碼效率高,同時兼具擴展性。二進制協議的優勢很明顯,但是劣勢也非常的突出。和文本協議相反,序列化之后的二進制協議報文數據,基本上沒有什么可讀性,很顯然,這點不利於大家開發和調試。
因此,在協議的選擇上,對於並發度不高的IM系統,建議使用文本協議,比如JSON。對於並發度非常之高,QPS在千萬級、億級的通訊系統,盡量選擇二進制的協議。
據說,微信所使用的數據交換協議,就是 PrototolBuff二進制協議。
1.2.3. 長連接和短連接
什么是長連接呢?
客戶端client向server發起連接,server接受client連接,雙方建立連接。Client與server完成一次讀寫之后,它們之間的連接並不會主動關閉,后續的讀寫操作會繼續使用這個連接。
大家知道,TCP協議的連接過程是比較繁瑣的,建立連接是需要三次握手的,而釋放則需要4次握手,所以說每個連接的建立都是需要資源消耗和時間消耗的。
在高並發的IM系統中,客戶端和服務器之間,需要大量的發送通訊的消息,如果每次發送消息,都去建立連接,客戶端的和服務器的連接建立和斷開的開銷是非常巨大的。所以,IM消息的發送,肯定是需要長連接。
什么是短連接呢?
客戶端client向server發起連接,server接受client連接,在三次握手之后,雙方建立連接。Client與server完成一次讀寫,發送數據包並得到返回的結果之后,通過客戶端和服務端的四次握手進行關閉斷開。
短連接適用於數據請求頻度較低的場景。比如網站的瀏覽和普通的web請求。短連接的優點是:管理起來比較簡單,存在的連接都是有用的連接,不需要額外的控制手段。
在高並發的IM系統中,客戶端和服務器之間,除了消息的通訊外,還需要用戶的登錄與認證、好友的更新與獲取等等一些低頻的請求,這些都使用短連接來實現。
綜上所述,在這個高並發IM系統中,存在兩個類的服務器。一類短連接服務器和一個長連接服務器。
短連接服務器也叫Web服務服務器,主要是功能是實現用戶的登錄鑒權和拉取好友、群組、數據檔案等相對低頻的請求操作。
長連接服務器也叫IM即時通訊服務器,主要作用就是用來和客戶端建立並維持長連接,實現消息的傳遞和即時的轉發。並且,分布式網絡非常復雜,長連接管理是重中之重,需要考慮到連接保活、連接檢測、自動重連等方方面面的工作。
短連接Web服務器和長連接IM服務器之間,是相互配合的。在分布式集群的環境下,用戶首先通過短連接登錄Web服務器。Web服務器在完成用戶的賬號/密碼驗證,返回uid和token時,還需要通過一定策略,獲取目標IM服務器的IP地址和端口號列表,返回給客戶端。客戶端開始連接IM服務器,連接成功后,發送鑒權請求,鑒權成功則授權的長連接正式建立。
如果用戶規模龐大,無論是短連接Web服務器,還是長連接IM服務器,都需要進行橫向的擴展,都需要擴展到上十台、百台、甚至上千台機器。只有這樣,才能有良好性能,提高良好的用戶體驗。因此,需要引入一個新的角色,短連接網關(WebGate)。
WebGate短連接網關的職責,首先是代理大量的Web服務器,從而無感知的實現短連接的高並發。在客戶端登錄時和進行其他短連接時,不直接連接Web服務器,而是連接Web網關。圍繞Web網關和Web高並發的相關技術,目前非常成熟,可以使用SpringCloud 或者 Dubbo 等分布式Web技術,也很容易擴展。
除此之外,大量的IM服務器,又如何協同和管理呢?
基於Zookeeper或者其他的分布式協調中間件,可以非常方便、輕松的實現一個IM服務器集群的管理,包括而且不限於命名服務、服務注冊、服務發現、負載均衡等管理。
當用戶登錄成功的時候,WebGate短連接網關可以通過負載均衡技術,從Zookeeper集群中,找出一個可用的IM服務器的地址,返回給用戶,讓用戶來建立長連接。
1.2.4. 技術選型
(1)核心:
Netty4.x + spring4.x + zookeeper 3.x + redis 3.x + rocketMQ 3.x
(2)短連接服務:spring cloud
基於restful 短連接的分布式微服務架構, 完成用戶在線管理、單點登錄系統。
(3)長連接服務:Netty
Netty就不用太多介紹了。
(4)消息隊列:
rocketMQ 高速隊列。整流作用。
(5)底層數據庫:mysql+mongodb
mysql做業務還是很方便的,用來存儲結構化數據,如用戶數據。
mongodb 很重要,用來存儲非結構化離線消息。
(6)協議 Protobuf + JSON
Protobuf 是最高效的IM二進制協議,用於長連接。
JSON 是最緊湊的文本協議,用於短連接。
文本協議 Gson + fastjson。 Gson 谷歌的東西,fastjson 淘寶的東西,兩者互補,結合使用。
1.3. 基於Redis 設計分布式Session
什么是會話?
為了方便客戶端的開發,管理與服務器的連接,這里引入一個非常重要的中間角色——Session (會話)。有點兒像Web開發中的Tomcat的服務器 Session,但是又有很大的不同。
1.3.1. SessionLocal本地會話
客戶端的本地會話概念圖,如下圖所示:
客戶端會話有兩個很重的成員,一個是user,代表了擁有會話的用戶。一個是channel,代表了連接的通道。兩個成員的作用是:
(1)user成員 —— 通過它可以獲得當前的用戶信息
(2)channel成員 —— 通過它可以發送Netty消息
Session需要和 channel 相互綁定,為什么呢?原因有兩點:
(1)消息發送的時候, 需要從Session 寫入 Channel ,這相當於正向的綁定;
(2)收到消息的時候,消息是從Channel 過來的,所以可以直接找到 綁定的Session ,這相當於反向的綁定。
Session和 channel 相互綁定的代碼如下:
//正向綁定
ClientSession session = new (channel);
//反向綁定
channel.attr(ClientSession.SESSION).set(session);
正向綁定,是直接通過ClientSession構造函數完成。反向綁定是通過channel 自身的所具備的容器能力完成。Netty的Channel類型實現了AttributeMap接口 ,它相當於一個 Map容器。 反向的綁定,利用了channel 的這個特點。
總的來說,會話Session 左手用戶實例,右手服務器的channel連接通道,可以說是左擁右抱,是開發中經常使用到的類。
1.3.2. SessionDistrubuted分布式會話
在分布式環境下,本地的Session只能綁定本地的用戶和通道,夠不着其他Netty節點上的用戶和通道。
如何解決這個難題呢? 一個簡單的思路是:制作一個本地Session的副本,保存在分布式緩存Redis中。對於其他的Netty節點來說,可以取到這份Redis副本,從而進行消息的路由和轉發。
基於redis進行分布式的Session 緩存,與本地Session的內容不一樣,不需要保存用戶的具體實例,也不需要保存用戶的Netty Channel通道。只需要能夠根據它找到對於的Netty服務器節點即可。
我們將這個Session,命名為 SessionDistrubuted。代碼如下:
/**
\* create by 尼恩 @ 瘋狂創客圈
**/
@Data
public class SessionDistrubuted implements ServerSession {
//用戶ID
private String userId;
//Netty 服務器ID
private long nodeId;
//sessionId
private String sessionId;
public SessionDistrubuted(
String sessionId, String userId, long nodeId) {
this.sessionId = sessionId;
this.userId = userId;
this.nodeId = nodeId;
}
//...
}
如何判斷這個Session是否有效呢? 可以根據其nodeId,在本地路由器WorkerRouter中查找對應的消息轉發器,如果沒有找到,說明該Netty服務節點是不可以連接的。於是,該Session為無效。
判斷Session是否有效的代碼如下:
@Override
public boolean isValid() {
WorkerReSender sender = WorkerRouter.getInst()
.getRedirectSender(nodeId);
if (null == sender) {
return false;
}
return true;
}
只要該Session為有效。就可以通過它,轉發消息到目的nodeId對應的Netty 服務器。
代碼如下:
@Override
public void writeAndFlush(Object pkg) {
WorkerReSender sender = WorkerRouter
.getInst().getRedirectSender(nodeId);
sender.writeAndFlush(pkg);
}
在分布式環境下,結合本地Session和遠程Session,發送消息也就變得非常之簡單。如果在本地找到了目標的Session,就直接通過其Channel發送消息到客戶端。反之,就通過遠程Session,將消息轉發到客戶端所在的Netty服務器,由該服務器發送到目標客戶端。
1.4. 分布式的在線用戶統計
顧名思義,計數器是用來計數的。在分布式環境中,常規的計數器是不能使用的,在此介紹基本zookeeper實現的分布式計數器。利用ZooKeeper可以實現一個集群共享的計數器,只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。
1.4.1. Curator的分布式計數器
Curator有兩個計數器, 一個是用int來計數(SharedCount),一個用long來計數(DistributedAtomicLong)。
這里使用DistributedAtomicLong來實現高並發IM系統中的在線用戶統計。
代碼如下:
/**
* create by 尼恩 @ 瘋狂創客圈
**/
public class OnlineCounter {
private static final int QTY = 5;
private static final String PATH = "/im/OnlineCounter";
//Zk客戶端
private CuratorFramework client = null;
//單例模式
private static OnlineCounter singleInstance = null;
DistributedAtomicLong onlines = null;
public static OnlineCounter getInst() {
if (null == singleInstance) {
singleInstance = new OnlineCounter();
singleInstance.client = ZKclient.instance.getClient();
singleInstance.init();
}
return singleInstance;
}
private void init() {
//分布式計數器,失敗時重試10,每次間隔30毫秒
onlines = new DistributedAtomicLong( client,
PATH, new RetryNTimes(10, 30));
}
public boolean increment() {
boolean result = false;
AtomicValue<Long> val = null;
try {
val = onlines.increment();
result = val.succeeded();
System.out.println("old cnt: " + val.preValue()
+ " new cnt : " + val.postValue()
+ " result:" + val.succeeded());
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
public boolean decrement() {
boolean result = false;
AtomicValue<Long> val = null;
try {
val = onlines.decrement();
result = val.succeeded();
System.out.println("old cnt: " + val.preValue()
+ " new cnt : " + val.postValue()
+ " result:" + val.succeeded());
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
}
1.4.2. 用戶上線和下線統計
當用戶上線的時候,使用increase方法,分布式的增加一次數量:
/**
\* 增加本地session
*/
public void addSession(String sessionId, SessionLocal s) {
localMap.put(sessionId, s);
String uid = s.getUser().getUid();
//增加用戶數
OnlineCounter.getInst().increment();
//...
}
當用戶下線的時候,使用decrease方法,分布式的減少一次數量:
/**
\* 刪除本地session
*/
public void removeLocalSession(String sessionId) {
if (!localMap.containsKey(sessionId)) {
return;
}
localMap.remove(sessionId);
//減少用戶數
OnlineCounter.getInst().decrement();
//...
}
1.5. 分布式IM命名服務
前面提到,一個高並發系統會有很多的節點組成,而且,節點的數量是不斷動態變化的。
在一個即時消息通訊系統中,從0到1到N,用戶量可能會越來越多,或者說由於某些活動影響,會不斷的出現流量洪峰。這時需要動態加入大量的節點。另外,由於機器或者網絡的原因,一些節點主動的離開的集群。如何為大量的動態節點命名呢?最好的辦法是使用分布式命名服務,按照一定的規則,為動態上線和下線的工作節點命名。
瘋狂創客圈的高並發IM實戰學習項目,基於Zookeeper構建分布式命名服務,為每一個IM工作服務器節點動態命名。
1.3.1. IM節點的POJO類
首先定義一個POJO類,保存IM worker節點的基礎信息如Netty 服務IP、Netty 服務端口,以及Netty的服務連接數。
具體如下:
/**
* create by 尼恩 @ 瘋狂創客圈
**/
@Data
public class ImNode implements Comparable<ImNode> {
//worker 的Id,由Zookeeper負責生成
private long id;
//Netty 服務 的連接數
private AtomicInteger balance;
//Netty 服務 IP
private String host;
//Netty 服務 端口
private String port;
//...
}
這個POJO類的IP、端口、balance負載,和每一個節點的Netty服務器相關。而id屬性,則由利用Zookeeper的中Znode子節點能順序編號的性質,由Zookeeper生成。
1.3.2. IM節點的ImWorker類
命名服務的思路是:所有的工作節點,都在Zookeeper的同一個的父節點下,創建順序節點。然后從返回的臨時路徑上,取得屬於自己的那個后綴的編號。
主要的代碼如下:
package com.crazymakercircle.imServer.distributed;
import com.crazymakercircle.imServer.server.ServerUtils;
import com.crazymakercircle.util.ObjectUtil;
import com.crazymakercircle.zk.ZKclient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
* create by 尼恩 @ 瘋狂創客圈
**/
public class ImWorker {
//Zk curator 客戶端
private CuratorFramework client = null;
//保存當前Znode節點的路徑,創建后返回
private String pathRegistered = null;
private ImNode node = ImNode.getLocalInstance();
private static ImWorker singleInstance = null;
//取得單例
public static ImWorker getInst() {
if (null == singleInstance) {
singleInstance = new ImWorker();
singleInstance.client =
ZKclient.instance.getClient();
singleInstance.init();
}
return singleInstance;
}
private ImWorker() {
}
// 在zookeeper中創建臨時節點
public void init() {
createParentIfNeeded(ServerUtils.MANAGE_PATH);
// 創建一個 ZNode 節點
// 節點的 payload 為當前worker 實例
try {
byte[] payload = ObjectUtil.Object2JsonBytes(node);
pathRegistered = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(ServerUtils.pathPrefix, payload);
//為node 設置id
node.setId(getId());
} catch (Exception e) {
e.printStackTrace();
}
}
public long getId() {
String sid = null;
if (null == pathRegistered) {
throw new RuntimeException("節點注冊失敗");
}
int index = pathRegistered.lastIndexOf(ServerUtils.pathPrefix);
if (index >= 0) {
index += ServerUtils.pathPrefix.length();
sid = index <= pathRegistered.length() ? pathRegistered.substring(index) : null;
}
if (null == sid) {
throw new RuntimeException("節點ID生成失敗");
}
return Long.parseLong(sid);
}
public boolean incBalance() {
if (null == node) {
throw new RuntimeException("還沒有設置Node 節點");
}
// 增加負載:增加負載,並寫回zookeeper
while (true) {
try {
node.getBalance().getAndIncrement();
byte[] payload = ObjectUtil.Object2JsonBytes(this);
client.setData().forPath(pathRegistered, payload);
return true;
} catch (Exception e) {
return false;
}
}
}
public boolean decrBalance() {
if (null == node) {
throw new RuntimeException("還沒有設置Node 節點");
}
// 增加負載:增加負載,並寫回zookeeper
while (true) {
try {
int i = node.getBalance().decrementAndGet();
if (i < 0) {
node.getBalance().set(0);
}
byte[] payload = ObjectUtil.Object2JsonBytes(this);
client.setData().forPath(pathRegistered, payload);
return true;
} catch (Exception e) {
return false;
}
}
}
private void createParentIfNeeded(String managePath) {
try {
Stat stat = client.checkExists().forPath(managePath);
if (null == stat) {
client.create()
.creatingParentsIfNeeded()
.withProtection()
.withMode(CreateMode.PERSISTENT)
.forPath(managePath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
注意,這里有三個Znode相關的路徑:
(1)MANAGE_PATH
(2)pathPrefix
(3)pathRegistered
第一個MANAGE_PATH是一個常量。為所有臨時工作Worker節點的父親節點的路徑,在創建Worker節點之前,首先要檢查一下,父親Znode節點是否存在,否則的話,先創建父親節點。父親節點的創建方式是:持久化節點,而不是臨時節點。
檢查和創建父親節點的代碼如下:
private void createParentIfNeeded(String managePath) {
try {
Stat stat = client.checkExists().forPath(managePath);
if (null == stat) {
client.create()
.creatingParentsIfNeeded()
.withProtection()
.withMode(CreateMode.PERSISTENT)
.forPath(managePath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
第二路徑pathPrefix是所有臨時節點的前綴。例子的值“/im/Workers/”,是在工作路徑后,加上一個“/”分割符。也可是在工作路徑的后面,加上“/”分割符和其他的前綴字符,如:“/im/Workers/id-”,“/im/Workers/seq-”等等。
第三路徑pathRegistered是臨時節點的創建成功之后,返回的完整的路徑。比如:/im/Workers/0000000000,/im/Workers/0000000001 等等。后邊的編號是順序的。
創建節點成功后,截取后邊的數字,放在POJO對象中,供后邊使用:
//為node 設置id
node.setId(getId());
1.6. 即時通訊消息的路由和轉發
如果連接在不同的Netty工作站點的客戶端之間,需要相互進行消息的發送,那么,就需要在不同的Worker節點之間進行路由和轉發。
Worker節點路由是指,根據消息需要轉發的目標用戶,找到用戶的連接所在的Worker節點。由於節點和節點之間,都有可能需要相互轉發,所以,節點之間的關系是一種網狀結構。每一個節點,都需要具備路由的能力。
1.4.1. IM路由器WorkerRouter
為每一個Worker節點增加一個IM路由器類,叫做WorkerRouter 。為了能夠轉發到所有的節點,需要一是要訂閱到集群中所有的在線Netty服務器,並且保存起來,二是要其他的Netty服務器建立一個長連接,用於轉發消息。
WorkerRouter 核心代碼,節選如下:
/**
\* create by 尼恩 @ 瘋狂創客圈
**/
@Slf4j
public class WorkerRouter {
//Zk客戶端
private CuratorFramework client = null;
//單例模式
private static WorkerRouter singleInstance = null;
//監聽路徑
private static final String path =
"/im/Workers";
//節點的容器
private ConcurrentHashMap<Long, WorkerReSender> workerMap =
new ConcurrentHashMap<>();
public static WorkerRouter getInst() {
if (null == singleInstance) {
singleInstance = new WorkerRouter();
singleInstance.client = ZKclient.instance.getClient();
singleInstance.init();
}
return singleInstance;
}
private void init() {
try {
//訂閱節點的增加和刪除事件
TreeCache treeCache = new TreeCache(client, path);
TreeCacheListener l = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client,
TreeCacheEvent event) throws Exception
{
ChildData data = event.getData();
if (data != null) {
switch (event.getType()) {
case NODE_REMOVED:
processNodeRemoved(data);
break;
case NODE_ADDED:
processNodeAdded(data);
break;
default:
break;
}
}
}
};
treeCache.getListenable().addListener(l);
treeCache.start();
} catch (Exception e) {
e.printStackTrace();
}
//...
}
在上一小節中,我們已經知道,一個節點上線時,首先要通過命名服務,加入到Netty 集群中。上面的代碼中,WorkerRouter 路由器使用curator的TreeCache 緩存,訂閱了節點的NODE_ADDED節點新增消息。當一個新的Netty節點加入是,通過processNodeAdded(data) 方法, 在本地保存一份節點的POJO信息,並且建立一個消息中轉的Netty客戶連接。
處理節點新增的方法 processNodeAdded(data)比較重要,代碼如下:
private void processNodeAdded(ChildData data) {
log.info("[TreeCache]節點更新端口, path={}, data={}",
data.getPath(), data.getData());
byte[] payload = data.getData();
String path = data.getPath();
ImNode imNode =
ObjectUtil.JsonBytes2Object(payload, ImNode.class);
long id = getId(path);
imNode.setId(id);
WorkerReSender reSender = workerMap.get(imNode.getId());
//重復收到注冊的事件
if (null != reSender && reSender.getNode().equals(imNode)) {
return;
}
//服務器重新上線
if (null != reSender) {
//關閉老的連接
reSender.stopConnecting();
}
//創建一個消息轉發器
reSender = new WorkerReSender(imNode);
//建立轉發的連接
reSender.doConnect();
workerMap.put(id, reSender);
}
router路由器有一個容器成員workerMap,用於封裝和保存所有的在線節點。當一個節點新增時,router取到新增的Znode路徑和負載。Znode路徑中有新節點的ID,Znode的payload負載中,有新節點的Netty服務的IP和端口,這個三個信息共同構成新節點的POJO信息 —— ImNode節點信息。 router在檢查完和確定本地不存在該節點的轉發器后,新增一個轉發器 WorkerReSender,將新節點的轉發器,保存在自己的容器中。
這里有一個問題,為什么在router路由器中,不簡單、直接、干脆的保存新節點的POJO信息呢?
因為router路由器的主要作用,除了路由節點,還需要方便的進行消息的轉發,所以,router路由器保存的是轉發器 WorkerReSender,而新增的遠程Netty節點的POJO信息,封裝在轉發器中。
1.4.2. IM轉發器WorkerReSender
IM轉發器,封裝了遠程節點的IP、端口、以及ID消息,具體是在ImNode類型的成員中。另外,IM轉發器還維持一個到遠程節點的長連接。也就是說,它是一個Netty的NIO客戶端,維護了一個到遠程節點的Netty Channel 通道成員,通過這個通道,將消息轉發給遠程的節點。
IM轉發器的核心代碼,如下:
package com.crazymakercircle.imServer.distributed;
import com.crazymakercircle.im.common.bean.User;
import com.crazymakercircle.im.common.codec.ProtobufEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* create by 尼恩 @ 瘋狂創客圈
**/
@Slf4j
@Data
public class WorkerReSender {
//連接遠程節點的Netty 通道
private Channel channel;
//連接遠程節點的POJO信息
private ImNode remoteNode;
/**
* 連接標記
*/
private boolean connectFlag = false;
GenericFutureListener<ChannelFuture> closeListener = (ChannelFuture f) ->
{
log.info(": 分布式連接已經斷開……", remoteNode.toString());
channel = null;
connectFlag = false;
WorkerRouter.getInst().removeWorkerById(remoteNode);
};
private GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) ->
{
final EventLoop eventLoop
= f.channel().eventLoop();
if (!f.isSuccess()) {
log.info("連接失敗!在10s之后准備嘗試重連!");
eventLoop.schedule(
() -> WorkerReSender.this.doConnect(),
10,
TimeUnit.SECONDS);
connectFlag = false;
} else {
connectFlag = true;
log.info("分布式IM節點連接成功:", remoteNode.toString());
channel = f.channel();
channel.closeFuture().addListener(closeListener);
}
};
private Bootstrap b;
private EventLoopGroup g;
public WorkerReSender(ImNode n) {
this.remoteNode = n;
/**
* 客戶端的是Bootstrap,服務端的則是 ServerBootstrap。
* 都是AbstractBootstrap的子類。
**/
b = new Bootstrap();
/**
* 通過nio方式來接收連接和處理連接
*/
g = new NioEventLoopGroup();
}
// 連接和重連
public void doConnect() {
// 服務器ip地址
String host = remoteNode.getHost();
// 服務器端口
int port = Integer.parseInt(remoteNode.getPort());
try {
if (b != null && b.group() == null) {
b.group(g);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.remoteAddress(host, port);
// 設置通道初始化
b.handler(
new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ProtobufEncoder());
}
}
);
log.info(new Date() + "開始連接分布式節點", remoteNode.toString());
ChannelFuture f = b.connect();
f.addListener(connectedListener);
// 阻塞
// f.channel().closeFuture().sync();
} else if (b.group() != null) {
log.info(new Date() + "再一次開始連接分布式節點", remoteNode.toString());
ChannelFuture f = b.connect();
f.addListener(connectedListener);
}
} catch (Exception e) {
log.info("客戶端連接失敗!" + e.getMessage());
}
}
public void stopConnecting() {
g.shutdownGracefully();
connectFlag = false;
}
public void writeAndFlush(Object pkg) {
if (connectFlag == false) {
log.error("分布式節點未連接:", remoteNode.toString());
return;
}
channel.writeAndFlush(pkg);
}
}
IM轉發器中,主體是與Netty相關的代碼,比較簡單。至少,IM轉發器比Netty服務器的代碼,簡單太多了。
轉發器有一個消息轉發的方法,直接通過Netty channel通道,將消息發送到遠程節點。
public void writeAndFlush(Object pkg) {
if (connectFlag == false) {
log.error("分布式節點未連接:", remoteNode.toString());
return;
}
channel.writeAndFlush(pkg);
}
1.7. Worker集群的負載均衡
理論上來說,負載均衡是一種手段,用來把對某種資源的訪問分攤給不同的服務器,從而減輕單點的壓力。
在高並發的IM系統中,負載均衡就是需要將IM長連接分攤到不同的Netty服務器,防止單個Netty服務器負載過大,而導致其不可用。
前面講到,當用戶登錄成功的時候,短連接網關WebGate需要返回給用戶一個可用的Netty服務器的地址,讓用戶來建立Netty長連接。而每台Netty工作服務器在啟動時,都會去zookeeper的“/im/Workers”節點下注冊臨時節點。
因此,短連接網關WebGate可以在用戶登錄成功之后,去“/im/Workers”節點下取得所有可用的Netty服務器列表,並通過一定的負載均衡算法計算得出一台Netty工作服務器,並且返回給客戶端。
1.5.1. ImLoadBalance 負載均衡器
短連接網關WebGate 獲得Netty服務器的地址,通過查詢Zookeeper集群來實現。定義一個負載均衡器,ImLoadBalance類 ,將計算最佳服務器的算法,放在負載均衡器中。
ImLoadBalance類 的核心代碼,如下:
package com.crazymakercircle.Balance;
import com.crazymakercircle.ObjectUtil;
import com.crazymakercircle.util.ImNode;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* create by 尼恩 @ 瘋狂創客圈
**/
@Data
@Slf4j
public class ImLoadBalance {
//Zk客戶端
private CuratorFramework client = null;
//工作節點的路徑
private String mangerPath = "/im/Workers";
public ImLoadBalance() {
}
public ImLoadBalance(CuratorFramework client, String mangerPath)
{
this.client = client;
this.mangerPath = mangerPath;
}
public static ImLoadBalance instance() {
}
public ImNode getBestWorker()
{
List<ImNode> workers =getWorkers();
ImNode best= balance(workers);
return best;
}
protected ImNode balance(List<ImNode> items) {
if (items.size() > 0) {
// 根據balance值由小到大排序
Collections.sort(items);
// 返回balance值最小的那個
return items.get(0);
} else {
return null;
}
}
///....
}
短連接網關WebGate 會調用getBestWorker()方法,取得最佳的IM服務器。而在這個方法中,有兩個很重要的方法。 一個是取得所有的IM服務器列表,注意是帶負載的。二個是通過負載信息,計算最小負載的服務器。
所有的IM服務器列表的代碼如下:
/**
* 從zookeeper中拿到所有IM節點
*/
protected List<ImNode> getWorkers() {
List<ImNode> workers = new ArrayList<ImNode>();
List<String> children = null;
try {
children = client.getChildren().forPath(mangerPath);
} catch (Exception e) {
e.printStackTrace();
return null;
}
for (String child : children) {
log.info("child:", child);
byte[] payload = null;
try {
payload = client.getData().forPath(child);
} catch (Exception e) {
e.printStackTrace();
}
if (null == payload) {
continue;
}
ImNode worker = ObjectUtil.
JsonBytes2Object(payload, ImNode.class);
workers.add(worker);
}
return workers;
}
代碼中,首先取得 "/im/Workers" 目錄下所有的臨時節點,使用的是curator的getChildren 獲取子節點方法。然后,通過getData方法,取得每一個子節點的二進制負載。最后,將負載信息轉成成 POJO ImNode 對象。
取到了工作節點的POJO 列表之后,通過一個簡單的算法,計算出balance值最小的ImNode對象。
取得最小負載的 balance 方法的代碼如下:
protected ImNode balance (List<ImNode> items) {
if (items.size() > 0) {
// 根據balance由小到大排序
Collections.sort(items);
// 返回balance值最小的那個
return items.get(0);
} else {
return null;
}
}
1.5.2. 與WebGate的整合
在用戶登錄的Http API 方法中,調用ImLoadBalance類的getBestWorker()方法,取得最佳的IM服務器信息,返回給登錄的客戶端。
核心代碼如下:
@EnableAutoConfiguration
@RestController
@RequestMapping(value = "/user",
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public class UserAction extends BaseController
{
@Resource
private UserService userService;
@RequestMapping(value = "/login/{username}/{password}")
public String loginAction(
@PathVariable("username") String username,
@PathVariable("password") String password)
{
User user = new User();
user.setUserName(username);
user.setPassWord(password);
User loginUser = userService.login(user);
ImNode best=ImLoadBalance.instance().getBestWorker();
LoginBack back =new LoginBack();
back.setImNode(best);
back.setUser(loginUser);
back.setToken(loginUser.getUserId().toString());
String r = super.getJsonResult(back);
return r;
}
//....
}
寫在最后
目前和幾個小伙伴一起,組織了一個高並發的實戰社群【瘋狂創客圈】,完成整個項目的完整的架構和開發實戰,歡迎參與。
瘋狂創客圈 億級流量 高並發IM 學習實戰
- Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰
- Netty 源碼、原理、JAVA NIO 原理
- Java 面試題 一網打盡
- 瘋狂創客圈 【 博客園 總入口 】