curator監聽zk臨時節點實現信息中心服務的監控


我們使用curator建立連接,curator有session維護,重試機制,對遞歸創建節點和刪除節點有較好的支持:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString("10.10.210.123:2181")
                        .sessionTimeoutMs(50000000)
                        .connectionTimeoutMs(50000000)
                        .retryPolicy(retryPolicy)
                        .build();
client.start();//start以后就可以使用Curator操作zk了,用完了以后不要忘記釋放調用close方法。

常規代碼可以先判斷是否存在后,創建消息通訊的根節點:

 Stat s = client.checkExists().forPath("/msg_node_list");
            if (s == null) {
                String o = client.create().withMode(CreateMode.PERSISTENT).forPath("/msg_node_list");
            }

針對我們的每一個netty服務,根據自己的服務id向msg_node_list注冊臨時節點:

client.create().withMode(CreateMode.EPHEMERAL).forPath("/msg_node_list/b");

而我們的監控中心,負責給客戶端分發具體的netty鏈接服務器(固定10個節點進行hash分發,服務端向kafka中選擇topic依然根據用戶id與10按位&計算獲得具體的partition,因為partiton和netty機器編號一致),則需要輪詢監聽每一個臨時子節點的狀態,並且通過watch監聽異常下線節點:

List<String> list = client.getChildren().forPath("/msg_node_list");
//遍歷list中的每一個臨時子節點后調用如下代碼,參數為list中取出的臨時節點名稱:
client.getChildren().usingWatcher(w).forPath("/msg_node_list/b");
 Watcher w = new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            Event.EventType type = watchedEvent.getType();
            if (type.equals(Event.EventType.NodeDeleted)) {
                System.out.println(watchedEvent.getPath());
            }
        }
    };

對於節點監聽處理,可以使用curator的高級策略:

   1) NodeCache: 對一個節點進行監聽,監聽事件包括指定的路徑節點的增、刪、改的操作。

   2) PathChildrenCache: 對指定的路徑節點的一級子目錄進行監聽,不對該節點的操作進行監聽,對其子目錄的節點進行增、刪、改的操作監聽

   3) TreeCache:  可以將指定的路徑節點作為根節點(祖先節點),對其所有的子節點操作進行監聽,呈現樹形目錄的監聽,可以設置監聽深度,最大監聽深度為2147483647(int類型的最大值)。

  PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/msg_server_list", false);//監聽msg_server_list路徑下的所有節點
            pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//異步初始化
            pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
                    if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                        System.out.println(pathChildrenCacheEvent.getData().getPath());
                    }
                }
            });

 

=========================================================================================================

如果客戶端連接的netty服務異常下線,則在次訪問監控服務器獲取可用的netty地址。

redis中維護着一個list列表,每個連接到netty服務器對應的客戶端id,保存在一個redis的set中(也可以使用redis的bit按位置存儲,然后用BITCOUNT統計總數

監控服務器在給客戶端分配netty服務器時,會從zk中拉取可用的netty服務列表,並根據redis中netty服務對應的BITCount取出最小值,返回給客戶端,如果客戶端發現分配的netty服務不可用,則帶着該netty服務的id繼續請求監控服務,監控服務會從zk中再次拉取netty服務列表,並將不可用的netty服務器id去除后,在根據服務器列表中對應的BITCount選出最小值並返回客戶端。

ps:進入正常流程后,消息通信通過redis的list實現,lpush插入待辦信息,rpop取出信息。

=================================================使用kafka重構========================================================

每個netty服務器,監控一個kafka partition。然后每個客戶端直接連接到netty以后,只要客戶端不異常下線,它的channel就會被保存在一個Map中,key為客戶端首次發送過來的id。

當從kafka中取出數據時會帶有id,根據id判斷channel是否還存在於map中,如果客戶端已經下線則會在對應是事件中將該channel從map刪除,如果存在則將消息解析出來以后通過channel.writeAndFlush推送給客戶端。

每個netty服務對應一個partition。所有的消息放入一個topic中,當客戶端被分配了具體的某個netty服務以后,服務器以同樣的hash算法向具體partition中推入數據

如果netty服務消費kafka的指定partition后,沒有發現該channel連接上來則,將消息直接丟棄掉  “因為消息中心,只負責客戶端在線時的消息推送,客戶端下線后重新啟動將會從服務中獲取,此處與消息中心無關”*

(1)服務端:只要kafka不宕機,服務端調用永遠沒有問題,消息發布端會將消息發送到所有的partition,保證客戶端故障切換時能夠消費到數據,監控中心會根據redis中保存的連接客戶端的多少做出排序,找出可用的並持有最少客戶端連接的netty服務分配給客戶端。

(2)客戶端:如果客戶端的網絡異常情況下,不發送運維短信通知,如果客戶端正常,還是連不上服務端的netty則發送短信通知運維,同時去監控中心重新獲取其它可用的netty服務,只要服務端netty恢復正常運行則馬上可以對客戶端提供服務。

關鍵代碼如下:

public Map<String, Channel> channelMap = new ConcurrentHashMap<>();

@Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        //如果該客戶端剛剛連接上來,則帶有連接標識。將其存入Map中,這里仍然只是示例,實際不應該寫在這個方法中,而是寫在channelActive方法中
        .......
        channelMap.put(msg, ctx.channel());
        .......
    } 


 //這里給出獲取消息后的示例代碼,用queue來模擬kafka,當然僅供參考。
 @Override
    public void run() {
        for (; ; ) {
            try {//模擬消息格式為,“用戶id,實際消息內容”,再聲明這里僅僅是演示,實際的消息格式需要嚴謹定義
                String o = linkedBlockingQueue.take();
                String msg[] = o.split("\\,");
                Channel channel = channelMap.get(msg[0]);
                if (channel != null) {
                    channel.writeAndFlush(msg[1]);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM