利用zookeeper實現發布訂閱模式


zookeeper應用

發布訂閱

zk實現的方式是推拉結合,Client想服務端注冊自己需要關注的節點,一旦節點的數據發生變更,那么Server會向對應的客戶端發送Watcher事件通知,客戶端接收到這個消息后,需要主動到服務端獲取最新的數據。

目前很多應用使用發布訂閱都不是用zk的這種方式,比較典型的純的推模式和拉模式,這個之前有記錄過Notify和MetaQ的比較,不是本篇的重點。本次主要是利用zookeeper來實現以下發布訂閱這種功能。

搭建了一個zk環境,手動創建了一個節點/publish,客戶端發布者代碼如下:

package com.wpr.zk.pulish;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * 利用zk來模擬發布訂閱模式
 * Created by peirong.wpr on 2017/4/5.
 */
public class Publish implements Watcher{
    private static CountDownLatch latch =  new CountDownLatch(1);
    private static Stat stat = new Stat();
    private static ZooKeeper zk =null;
    private final static Integer  SESSION_TIMEOUT = 5000;

    public static void main(String[] args) {
        try {
            String path  ="/publish";
             zk =  new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new Publish());
            latch.await();
            System.out.println("zk connection");
            byte[]  temp = zk.getData(path,true,stat);
            System.out.println("init data :pulish node data"+new String(temp));
            int i=0;
            while(true){
                System.out.println( "publish new Data:"+i);
                zk.setData(path,String.valueOf(i).getBytes(),-1);
                Thread.sleep(5000L);
                i++;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            System.out.println("receive watched event:"+event);
            System.out.println(event.getState());
            latch.countDown();
        }
    }
}

訂閱者代碼如下:

package com.wpr.zk.pulish;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * Created by peirong.wpr on 2017/4/5.
 */
public class Subscribe  implements Watcher {
    private static CountDownLatch latch =  new CountDownLatch(1);
    private static Stat stat = new Stat();
    private static ZooKeeper zk =null;
    private final static Integer  SESSION_TIMEOUT = 5000;

    public static void main(String[] args) {
        try {
            String path  ="/publish";
            zk =  new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new Subscribe());
            latch.await();
            System.out.println("zk connection");
            byte[]  temp = zk.getData(path,true,stat);
            System.out.println("init data :pulish node data"+new String(temp));
            int i=0;
            while(true){
                Thread.sleep(Integer.MAX_VALUE);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected == event.getState()){
            if(Event.EventType.None == event.getType() && event.getPath() == null){
                latch.countDown();
            }else if(event.getType()  == Event.EventType.NodeDataChanged){
                //Clinet需要去拉取最新的數據信息
                try {
                    byte[] newByte = zk.getData(event.getPath(),true,stat);
                    System.out.println("path:"+event.getPath()+"\tdata has changed.\t new Data :"+ new String(newByte));
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM