我們使用curator建立連接,curator有session維護,重試機制,對遞歸創建節點和刪除節點有較好的支持:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("10.10.210.123:2181") .sessionTimeoutMs(50000000) .connectionTimeoutMs(50000000) .retryPolicy(retryPolicy) .build(); client.start();//start以后就可以使用Curator操作zk了,用完了以后不要忘記釋放調用close方法。
常規代碼可以先判斷是否存在后,創建消息通訊的根節點:
Stat s = client.checkExists().forPath("/msg_node_list"); if (s == null) { String o = client.create().withMode(CreateMode.PERSISTENT).forPath("/msg_node_list"); }
針對我們的每一個netty服務,根據自己的服務id向msg_node_list注冊臨時節點:
client.create().withMode(CreateMode.EPHEMERAL).forPath("/msg_node_list/b");
而我們的監控中心,負責給客戶端分發具體的netty鏈接服務器(固定10個節點進行hash分發,服務端向kafka中選擇topic依然根據用戶id與10按位&計算獲得具體的partition,因為partiton和netty機器編號一致),則需要輪詢監聽每一個臨時子節點的狀態,並且通過watch監聽異常下線節點:
List<String> list = client.getChildren().forPath("/msg_node_list"); //遍歷list中的每一個臨時子節點后調用如下代碼,參數為list中取出的臨時節點名稱: client.getChildren().usingWatcher(w).forPath("/msg_node_list/b");
Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { Event.EventType type = watchedEvent.getType(); if (type.equals(Event.EventType.NodeDeleted)) { System.out.println(watchedEvent.getPath()); } } };
對於節點監聽處理,可以使用curator的高級策略:
1) NodeCache: 對一個節點進行監聽,監聽事件包括指定的路徑節點的增、刪、改的操作。
2) PathChildrenCache: 對指定的路徑節點的一級子目錄進行監聽,不對該節點的操作進行監聽,對其子目錄的節點進行增、刪、改的操作監聽
3) TreeCache: 可以將指定的路徑節點作為根節點(祖先節點),對其所有的子節點操作進行監聽,呈現樹形目錄的監聽,可以設置監聽深度,最大監聽深度為2147483647(int類型的最大值)。
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/msg_server_list", false);//監聽msg_server_list路徑下的所有節點 pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//異步初始化 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { System.out.println(pathChildrenCacheEvent.getData().getPath()); } } });
=========================================================================================================
如果客戶端連接的netty服務異常下線,則在次訪問監控服務器獲取可用的netty地址。
redis中維護着一個list列表,每個連接到netty服務器對應的客戶端id,保存在一個redis的set中(也可以使用redis的bit按位置存儲,然后用BITCOUNT統計總數)。
監控服務器在給客戶端分配netty服務器時,會從zk中拉取可用的netty服務列表,並根據redis中netty服務對應的BITCount取出最小值,返回給客戶端,如果客戶端發現分配的netty服務不可用,則帶着該netty服務的id繼續請求監控服務,監控服務會從zk中再次拉取netty服務列表,並將不可用的netty服務器id去除后,在根據服務器列表中對應的BITCount選出最小值並返回客戶端。
ps:進入正常流程后,消息通信通過redis的list實現,lpush插入待辦信息,rpop取出信息。
=================================================使用kafka重構========================================================
每個netty服務器,監控一個kafka partition。然后每個客戶端直接連接到netty以后,只要客戶端不異常下線,它的channel就會被保存在一個Map中,key為客戶端首次發送過來的id。
當從kafka中取出數據時會帶有id,根據id判斷channel是否還存在於map中,如果客戶端已經下線則會在對應是事件中將該channel從map刪除,如果存在則將消息解析出來以后通過channel.writeAndFlush推送給客戶端。
每個netty服務對應一個partition。所有的消息放入一個topic中,當客戶端被分配了具體的某個netty服務以后,服務器以同樣的hash算法向具體partition中推入數據。
如果netty服務消費kafka的指定partition后,沒有發現該channel連接上來則,將消息直接丟棄掉 “因為消息中心,只負責客戶端在線時的消息推送,客戶端下線后重新啟動將會從服務中獲取,此處與消息中心無關”*。
(1)服務端:只要kafka不宕機,服務端調用永遠沒有問題,消息發布端會將消息發送到所有的partition,保證客戶端故障切換時能夠消費到數據,監控中心會根據redis中保存的連接客戶端的多少做出排序,找出可用的並持有最少客戶端連接的netty服務分配給客戶端。
(2)客戶端:如果客戶端的網絡異常情況下,不發送運維短信通知,如果客戶端正常,還是連不上服務端的netty則發送短信通知運維,同時去監控中心重新獲取其它可用的netty服務,只要服務端netty恢復正常運行則馬上可以對客戶端提供服務。
關鍵代碼如下:
public Map<String, Channel> channelMap = new ConcurrentHashMap<>(); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //如果該客戶端剛剛連接上來,則帶有連接標識。將其存入Map中,這里仍然只是示例,實際不應該寫在這個方法中,而是寫在channelActive方法中 ....... channelMap.put(msg, ctx.channel()); ....... } //這里給出獲取消息后的示例代碼,用queue來模擬kafka,當然僅供參考。 @Override public void run() { for (; ; ) { try {//模擬消息格式為,“用戶id,實際消息內容”,再聲明這里僅僅是演示,實際的消息格式需要嚴謹定義 String o = linkedBlockingQueue.take(); String msg[] = o.split("\\,"); Channel channel = channelMap.get(msg[0]); if (channel != null) { channel.writeAndFlush(msg[1]); } } catch (InterruptedException e) { e.printStackTrace(); } } }