添加依賴
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
注冊
定義一個參數model
public class ZookeeperRegisterConnectModel {
//連接地址(可以多個地址,用逗號分割)
private String connectString;
private int sessionTimeout;
public String getConnectString() {
return connectString;
}
public void setConnectString(String connectString) {
this.connectString = connectString;
}
public int getSessionTimeout() {
return sessionTimeout;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
public ZookeeperRegisterConnectModel(){}
public ZookeeperRegisterConnectModel(String connectString, int sessionTimeout) {
this.connectString = connectString;
this.sessionTimeout = sessionTimeout;
}
}
服務注冊具體代碼:
public class ZookeeperRegisterNodeServiceImpl implements IRegisterNodeService, Closeable {
private ZooKeeper zk;
/**
* 實例連接zookeeper
*
* @param zookeeperConnectModel
* @throws IOException
*/
public ZookeeperRegisterNodeServiceImpl(ZookeeperRegisterConnectModel zookeeperConnectModel) throws IOException {
//創建一個與ZooKeeper服務器的連接
zk = new ZooKeeper(zookeeperConnectModel.getConnectString(), zookeeperConnectModel.getSessionTimeout(), new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
/**
* 創建節點
*
* @param service 服務接口名稱
* @param address 服務發布的地址和端口
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String createNode(String service, String address) throws KeeperException, InterruptedException {
Stat stat = zk.exists("/" + service, false);
//不存在就創建根節點
if (stat == null) {
zk.create("/" + service, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//創建一個子節點
return zk.create("/" + service + "/" + address, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
@Override
public void close() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
發現
服務發現具體代碼:
public class ZookeeperGetChildrenServiceImpl implements IGetChildrenService , Closeable {
private ZooKeeper zk;
public ZookeeperGetChildrenServiceImpl(ZookeeperFindConnectModel zookeeperConnectModel) throws IOException {
//創建一個與ZooKeeper服務器的連接
zk = new ZooKeeper(zookeeperConnectModel.getConnectString(), zookeeperConnectModel.getSessionTimeout(), new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
/**
* 獲取路徑下所有節點,幷隨機返回一個節點的信息
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
@Override
public String getChildren(String path) throws KeeperException, InterruptedException {
List<String> childrens = zk.getChildren("/"+path, false);
if(null==childrens ||childrens.size()==0){
return null;
}
// shuffle 打亂順序
Collections.shuffle(childrens);
return childrens.get(0);
}
@Override
public void close() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試
public class RegisterFindTest {
private String service = "zookeeperTest";
boolean error=false;
@Test
public void findTest() throws InterruptedException, IOException, KeeperException {
registerTest();
IGetChildrenService getChildrenService = new ZookeeperGetChildrenServiceImpl(new ZookeeperFindConnectModel("192.168.10.200:2181", 5000));
List<Thread> threads=new ArrayList<>();
for(int i=0;i<500;i++){
Thread thread = new Thread() {
@Override
public void run() {
try {
String organization = getChildrenService.getChildren(service);
Assert.assertNotNull(service + "服務地址沒有找到", organization);
} catch (InterruptedException | KeeperException |AssertionError e) {
error=true;
}
}
};
thread.start();
threads.add(thread);
}
threads.forEach(thread -> {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Assert.assertFalse(error);
}
private void registerTest() throws IOException, KeeperException, InterruptedException {
IRegisterNodeService registerNodeService = new ZookeeperRegisterNodeServiceImpl(new ZookeeperRegisterConnectModel("192.168.10.200:2181", 18000));
registerNodeService.createNode(service, "192.168.10.11:1111");
registerNodeService.createNode(service, "192.168.10.22:2222");
registerNodeService.createNode(service, "192.168.10.33:3333");
}
}
其他基本操作
/**
** 獲取節點上面的數據
** @param path 路徑
** @return
** @throws KeeperException
** @throws InterruptedException
*
*/
public String getData(String path) throws KeeperException, InterruptedException {
byte[] data = zookeeper.getData(path, false, null);
if (data == null) {
return "";
}
return new String(data);
}
/**
* 設置節點信息
* @param path 路徑
* @param data 數據
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public Stat setData(String path,String data) throws KeeperException,InterruptedException{
Stat stat = zookeeper.setData(path, data.getBytes(), -1);
return stat;
}
/**
* 刪除節點
* @param path
* @throws InterruptedException
* @throws KeeperException
*/
public void deleteNode(String path) throws InterruptedException, KeeperException{
zookeeper.delete(path, -1);
}
/**
* 獲取創建時間
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String getCTime(String path) throws KeeperException, InterruptedException{
Stat stat = zookeeper.exists(path, false);
return String.valueOf(stat.getCtime());
}
/**
* 獲取某個路徑下孩子的數量
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{
int childenNum = zookeeper.getChildren(path, false).size();
return childenNum;
}
/**
* 關閉連接
* @throws InterruptedException
*/
public void closeConnection() throws InterruptedException{
if (zookeeper != null) {
zookeeper.close();
}
}