1.前言
之前自己寫了一些關於Zookeeper的基礎知識,Zookeeper作為一種協調分布式應用高性能的調度服務,實際的應用場景也非常的廣泛,這里主要通過幾個例子來具體的說明Zookeeper在特定場景下的使用方式(下面的這些功能估計consul和etcd也能實現,以后學到了再說吧)。
2.具體應用
2.1.一致性配置管理
我們在開發的時候,有時候需要獲取一些公共的配置,比如數據庫連接信息等,並且偶然可能需要更新配置。如果我們的服務器有N多台的話,那修改起來會特別的麻煩,並且還需要重新啟動。這里Zookeeper就可以很方便的實現類似的功能。
2.1.1.思路
將公共的配置存放在Zookeeper的節點中
應用程序可以連接到Zookeeper中並對Zookeeper中配置節點進行讀取或者修改(對於寫操作可以進行權限驗證設置),下面是具體的流程圖:
2.1.2.事例
數據庫配置信息一致性的維護
配置類:
public class CommonConfig implements Serializable{
// 數據庫連接配置
private String dbUrl;
private String username;
private String password;
private String driverClass;
public CommonConfig() {}
public CommonConfig(String dbUrl, String username, String password, String driverClass) {
super();
this.dbUrl = dbUrl;
this.username = username;
this.password = password;
this.driverClass = driverClass;
}
public String getDbUrl() {
return dbUrl;
}
public void setDbUrl(String dbUrl) {
this.dbUrl = dbUrl;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDriverClass() {
return driverClass;
}
public void setDriverClass(String driverClass) {
this.driverClass = driverClass;
}
@Override
public String toString() {
return "CommonConfig:{dbUrl:" + this.dbUrl +
", username:" + this.username +
", password:" + this.password +
", driverClass:" + this.driverClass + "}";
}
}
配置管理中心
- 獲取本地配置信息
- 修改配置,並同步
同步配置信息到Zookeeper服務器
public class ZkConfigMng {
private String nodePath = "/commConfig";
private CommonConfig commonConfig;
private ZkClient zkClient;
public CommonConfig initConfig(CommonConfig commonConfig) {
if(commonConfig == null) {
this.commonConfig = new CommonConfig("jdbc:mysql://127.0.0.1:3306/mydata?useUnicode=true&characterEncoding=utf-8",
"root", "root", "com.mysql.jdbc.Driver");
} else {
this.commonConfig = commonConfig;
}
return this.commonConfig;
}
/**
* 更新配置
*
* @param commonConfig
* @return
*/
public CommonConfig update(CommonConfig commonConfig) {
if(commonConfig != null) {
this.commonConfig = commonConfig;
}
syncConfigToZookeeper();
return this.commonConfig;
}
public void syncConfigToZookeeper() {
if(zkClient == null) {
zkClient = new ZkClient("127.0.0.1:2181");
}
if(!zkClient.exists(nodePath)) {
zkClient.createPersistent(nodePath);
}
zkClient.writeData(nodePath, commonConfig);
}
}
以上是提供者,下面我們需要一個客戶端獲取這些配置
public class ZkConfigClient implements Runnable {
private String nodePath = "/commConfig";
private CommonConfig commonConfig;
@Override
public void run() {
ZkClient zkClient = new ZkClient(new ZkConnection("127.0.0.1:2181", 5000));
while (!zkClient.exists(nodePath)) {
System.out.println("配置節點不存在!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 獲取節點
commonConfig = (CommonConfig)zkClient.readData(nodePath);
System.out.println(commonConfig.toString());
zkClient.subscribeDataChanges(nodePath, new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
if(dataPath.equals(nodePath)) {
System.out.println("節點:" + dataPath + "被刪除了!");
}
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
if(dataPath.equals(nodePath)) {
System.out.println("節點:" + dataPath + ", 數據:" + data + " - 更新");
commonConfig = (CommonConfig) data;
}
}
});
}
}
下面啟動Main函數
配置管理服務啟動
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ZookeeperApiDemoApplication.class, args);
ZkConfigMng zkConfigMng = new ZkConfigMng();
zkConfigMng.initConfig(null);
zkConfigMng.syncConfigToZookeeper();
TimeUnit.SECONDS.sleep(10);
// 修改值
zkConfigMng.update(new CommonConfig("jdbc:mysql://192.168.1.122:3306/mydata?useUnicode=true&characterEncoding=utf-8",
"root", "wxh", "com.mysql.jdbc.Driver"));
}
}
客戶端啟動:
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ZookeeperApiDemoApplication.class, args);
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 模擬多個客戶端獲取配置
executorService.submit(new ZkConfigClient());
executorService.submit(new ZkConfigClient());
executorService.submit(new ZkConfigClient());
}
}
2.2.分布式鎖
在我們日常的開發中,如果是單個進程中對共享資源的訪問,我們只需要用synchronized或者lock就能實現互斥操作。但是對於跨進程、跨主機、跨網絡的共享資源似乎就無能為力了。
另外,分布式系列面試題和答案全部整理好了,微信搜索Java技術棧,在后台發送:面試,可以在線閱讀。
2.1.1.思路
- 首先zookeeper中我們可以創建一個
/distributed_lock
持久化節點 - 然后再在
/distributed_lock
節點下創建自己的臨時順序節點,比如:/distributed_lock/task_00000000008
- 獲取所有的
/distributed_lock
下的所有子節點,並排序 - 判讀自己創建的節點是否最小值(第一位)
- 如果是,則獲取得到鎖,執行自己的業務邏輯,最后刪除這個臨時節點。
- 如果不是最小值,則需要監聽自己創建節點前一位節點的數據變化,並阻塞。
- 當前一位節點被刪除時,我們需要通過遞歸來判斷自己創建的節點是否在是最小的,如果是則執行5);如果不是則執行6)(就是遞歸循環的判斷)
下面是具體的流程圖:
2.1.3.事例
public class DistributedLock {
// 常亮
static class Constant {
private static final int SESSION_TIMEOUT = 10000;
private static final String CONNECTION_STRING = "127.0.0.1:2181";
private static final String LOCK_NODE = "/distributed_lock";
private static final String CHILDREN_NODE = "/task_";
}
private ZkClient zkClient;
public DistributedLock() {
// 連接到Zookeeper
zkClient = new ZkClient(new ZkConnection(Constant.CONNECTION_STRING));
if(!zkClient.exists(Constant.LOCK_NODE)) {
zkClient.create(Constant.LOCK_NODE, "分布式鎖節點", CreateMode.PERSISTENT);
}
}
public String getLock() {
try {
// 1。在Zookeeper指定節點下創建臨時順序節點
String lockName = zkClient.createEphemeralSequential(Constant.LOCK_NODE + Constant.CHILDREN_NODE, "");
// 嘗試獲取鎖
acquireLock(lockName);
return lockName;
} catch(Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 獲取鎖
* @throws InterruptedException
*/
public Boolean acquireLock(String lockName) throws InterruptedException {
// 2.獲取lock節點下的所有子節點
List<String> childrenList = zkClient.getChildren(Constant.LOCK_NODE);
// 3.對子節點進行排序,獲取最小值
Collections.sort(childrenList, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
}
});
// 4.判斷當前創建的節點是否在第一位
int lockPostion = childrenList.indexOf(lockName.split("/")[lockName.split("/").length - 1]);
if(lockPostion < 0) {
// 不存在該節點
throw new ZkNodeExistsException("不存在的節點:" + lockName);
} else if (lockPostion == 0) {
// 獲取到鎖
System.out.println("獲取到鎖:" + lockName);
return true;
} else if (lockPostion > 0) {
// 未獲取到鎖,阻塞
System.out.println("...... 未獲取到鎖,阻塞等待 。。。。。。");
// 5.如果未獲取得到鎖,監聽當前創建的節點前一位的節點
final CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 6.前一個節點被刪除,當不保證輪到自己
System.out.println("。。。。。。前一個節點被刪除 。。。。。。");
acquireLock(lockName);
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
// 不用理會
}
};
try {
zkClient.subscribeDataChanges(Constant.LOCK_NODE + "/" + childrenList.get(lockPostion - 1), listener);
latch.await();
} finally {
zkClient.unsubscribeDataChanges(Constant.LOCK_NODE + "/" + childrenList.get(lockPostion - 1), listener);
}
}
return false;
}
/**
* 釋放鎖(刪除節點)
*
* @param lockName
*/
public void releaseLock(String lockName) {
zkClient.delete(lockName);
}
public void closeZkClient() {
zkClient.close();
}
}
@SpringBootApplication
public class ZookeeperDemoApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ZookeeperDemoApplication.class, args);
DistributedLock lock = new DistributedLock();
String lockName = lock.getLock();
/**
* 執行我們的業務邏輯
*/
if(lockName != null) {
lock.releaseLock(lockName);
}
lock.closeZkClient();
}
}
2.3.分布式隊列
在日常使用中,特別是像生產者消費者模式中,經常會使用BlockingQueue來充當緩沖區的角色。但是在分布式系統中這種方式就不能使用BlockingQueue來實現了,但是Zookeeper可以實現。
2.1.1.思路
- 首先利用Zookeeper中臨時順序節點的特點
- 當生產者創建節點生產時,需要判斷父節點下臨時順序子節點的個數,如果達到了上限,則阻塞等待;如果沒有達到,就創建節點。
- 當消費者獲取節點時,如果父節點中不存在臨時順序子節點,則阻塞等待;如果有子節點,則獲取執行自己的業務,執行完畢后刪除該節點即可。
- 獲取時獲取最小值,保證FIFO特性。
2.1.2.事例
這個是一個消費者對一個生產者,如果是多個消費者對多個生產者,對代碼需要調整。
public interface AppConstant {
static String ZK_CONNECT_STR = "127.0.0.1:2181";
static String NODE_PATH = "/mailbox";
static String CHILD_NODE_PATH = "/mail_";
static int MAILBOX_SIZE = 10;
}
public class MailConsumer implements Runnable, AppConstant{
private ZkClient zkClient;
private Lock lock;
private Condition condition;
public MailConsumer() {
lock = new ReentrantLock();
condition = lock.newCondition();
zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
System.out.println("sucess connected to zookeeper server!");
// 不存在就創建mailbox節點
if(!zkClient.exists(NODE_PATH)) {
zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
}
}
@Override
public void run() {
IZkChildListener listener = new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("Znode["+parentPath + "] size:" + currentChilds.size());
// 還是要判斷郵箱是否為空
if(currentChilds.size() > 0) {
// 喚醒等待的線程
try {
lock.lock();
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
};
// 監視子節點的改變,不用放用while循環中,監聽一次就行了,不需要重復綁定
zkClient.subscribeChildChanges(NODE_PATH, listener);
try {
//循環隨機發送郵件模擬真是情況
while(true) {
// 判斷是否可以發送郵件
checkMailReceive();
// 接受郵件
List<String> mailList = zkClient.getChildren(NODE_PATH);
// 如果mailsize==0,也沒有關系;可以直接循環獲取就行了
if(mailList.size() > 0) {
Collections.sort(mailList, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
}
});
// 模擬郵件處理(0-1S)
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
zkClient.delete(NODE_PATH + "/" + mailList.get(0));
System.out.println("mail has been received:" + NODE_PATH + "/" + mailList.get(0));
}
}
}catch (Exception e) {
e.printStackTrace();
} finally {
zkClient.unsubscribeChildChanges(NODE_PATH, listener);
}
}
private void checkMailReceive() {
try {
lock.lock();
// 判斷郵箱是為空
List<String> mailList = zkClient.getChildren(NODE_PATH);
System.out.println("mailbox size: " + mailList.size());
if(mailList.size() == 0) {
// 郵箱為空,阻塞消費者,直到郵箱有郵件
System.out.println("mailbox is empty, please wait 。。。");
condition.await();
// checkMailReceive();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class MailProducer implements Runnable, AppConstant{
private ZkClient zkClient;
private Lock lock;
private Condition condition;
/**
* 初始化狀態
*/
public MailProducer() {
lock = new ReentrantLock();
condition = lock.newCondition();
zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
System.out.println("sucess connected to zookeeper server!");
// 不存在就創建mailbox節點
if(!zkClient.exists(NODE_PATH)) {
zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
}
}
@Override
public void run() {
IZkChildListener listener = new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("Znode["+parentPath + "] size:" + currentChilds.size());
// 還是要判斷郵箱是否已滿
if(currentChilds.size() < MAILBOX_SIZE) {
// 喚醒等待的線程
try {
lock.lock();
condition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
};
// 監視子節點的改變,不用放用while循環中,監聽一次就行了,不需要重復綁定
zkClient.subscribeChildChanges(NODE_PATH, listener);
try {
//循環隨機發送郵件模擬真是情況
while(true) {
// 判斷是否可以發送郵件
checkMailSend();
// 發送郵件
String cretePath = zkClient.createEphemeralSequential(NODE_PATH + CHILD_NODE_PATH, "your mail");
System.out.println("your mail has been send:" + cretePath);
// 模擬隨機間隔的發送郵件(0-10S)
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
}
}catch (Exception e) {
e.printStackTrace();
} finally {
zkClient.unsubscribeChildChanges(NODE_PATH, listener);
}
}
private void checkMailSend() {
try {
lock.lock();
// 判斷郵箱是否已滿
List<String> mailList = zkClient.getChildren(NODE_PATH);
System.out.println("mailbox size: " + mailList.size());
if(mailList.size() >= MAILBOX_SIZE) {
// 郵箱已滿,阻塞生產者,直到郵箱有空間
System.out.println("mailbox is full, please wait 。。。");
condition.await();
checkMailSend();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
2.4.均衡負載
首先我們需要簡單的理解分布式和集群,通俗點說:分布式就是將一個系統拆分到多個獨立運行的應用中(有可能在同一台主機也有可能在不同的主機上),集群就是將單個獨立的應用復制多分放在不同的主機上來減輕服務器的壓力。
而Zookeeper不僅僅可以作為分布式集群的服務注冊調度中心(例如dubbo),也可以實現集群的負載均衡。
2.4.1.思路
首先我們要理解,如果是一個集群,那么他就會有多台主機。所以,他在Zookeeper中信息的存在應該是如下所示:
如上的結構,當服務調用方調用服務時,就可以根據特定的均衡負載算法來實現對服務的調用(調用前需要監聽/service/serviceXXX節點,以更新列表數據)
2.4.2.事例
/**
* 服務提供者
*
* @author Administrator
*
*/
public class ServiceProvider {
// 靜態常量
static String ZK_CONNECT_STR = "127.0.0.1:2181";
static String NODE_PATH = "/service";
static String SERIVCE_NAME = "/myService";
private ZkClient zkClient;
public ServiceProvider() {
zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
System.out.println("sucess connected to zookeeper server!");
// 不存在就創建NODE_PATH節點
if(!zkClient.exists(NODE_PATH)) {
zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
}
}
public void registryService(String localIp, Object obj) {
if(!zkClient.exists(NODE_PATH + SERIVCE_NAME)) {
zkClient.create(NODE_PATH + SERIVCE_NAME, "provider services list", CreateMode.PERSISTENT);
}
// 對自己的服務進行注冊
zkClient.createEphemeral(NODE_PATH + SERIVCE_NAME + "/" + localIp, obj);
System.out.println("注冊成功![" + localIp + "]");
}
}
/**
* 消費者,通過某種均衡負載算法選擇某一個提供者
*
* @author Administrator
*
*/
public class ServiceConsumer {
// 靜態常量
static String ZK_CONNECT_STR = "127.0.0.1:2181";
static String NODE_PATH = "/service";
static String SERIVCE_NAME = "/myService";
private List<String> serviceList = new ArrayList<String>();
private ZkClient zkClient;
public ServiceConsumer() {
zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));
System.out.println("sucess connected to zookeeper server!");
// 不存在就創建NODE_PATH節點
if(!zkClient.exists(NODE_PATH)) {
zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);
}
}
/**
* 訂閱服務
*/
public void subscribeSerivce() {
serviceList = zkClient.getChildren(NODE_PATH + SERIVCE_NAME);
zkClient.subscribeChildChanges(NODE_PATH + SERIVCE_NAME, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
serviceList = currentChilds;
}
});
}
/**
* 模擬調用服務
*/
public void consume() {
//負載均衡算法獲取某台機器調用服務
int index = new Random().nextInt(serviceList.size());
System.out.println("調用[" + NODE_PATH + SERIVCE_NAME + "]服務:" + serviceList.get(index));
}
}
3.總結
Zookeeper是一個功能非常強大的應用,除了上面幾種應用外,還有命名服務、分布式協調通知等也是常用的場景。
原文鏈接:https://blog.csdn.net/u013468915/article/details/80955110
版權聲明:本文為CSDN博主「永遠_不會懂」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2021最新版)
2.別在再滿屏的 if/ else 了,試試策略模式,真香!!
3.卧槽!Java 中的 xx ≠ null 是什么新語法?
4.Spring Boot 2.5 重磅發布,黑暗模式太炸了!
覺得不錯,別忘了隨手點贊+轉發哦!