前言
前面我們zookeeper也安裝了,操作命令也學習了,現在來使用SpringBoot整合一下zookeeper。
整合
第一步設置配置文件(application.properties):
zookeeper.address=127.0.0.1:2181
zookeeper.timeout=40000
第二步將ZooKeeper加入Spring容器:
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.concurrent.CountDownLatch; @Configuration public class ZookeeperConfig { @Value("${zookeeper.address}") String address; @Value("${zookeeper.timeout}") int timeout; @Bean public ZooKeeper getZookeeper(){ ZooKeeper zooKeeper = null; try { /** * CountDownLatch 用於標記線程是否執行完。 */ final CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper = new ZooKeeper(address, timeout, (x) -> { if(Watcher.Event.KeeperState.SyncConnected == x.getState()){ countDownLatch.countDown(); } }); countDownLatch.await(); System.out.println("zookeeper連接成功!"); } catch (Exception e) { e.printStackTrace(); } return zooKeeper; } }
第三步測試監聽:
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class ZookeeperServer { @Autowired ZooKeeper zooKeeper; /** * 監聽其中的一個節點 * @throws KeeperException * @throws InterruptedException */ public void watchEvent() throws KeeperException, InterruptedException { Stat stat = new Stat(); zooKeeper.getData("/tao", (x)-> { System.out.println(x.getType()); }, stat); } }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TestZookeeperController { @Autowired ZookeeperServer zookeeperServer; @RequestMapping("/zookeeper.do") public String event() throws Exception { zookeeperServer.watchEvent(); return "success"; } }
測試效果:
ok,到這里,我們整合Spring完成。
其他
上面的節點監聽是一次性的,不符合我們的要求
在ZookeeperServer中加入如下代碼,實現繼續監聽
public void addWatchEvent() throws KeeperException, InterruptedException { zooKeeper.addWatch("/tao",(x) -> { System.out.println("PERSISTENT_RECURSIVE"+x); }, AddWatchMode.PERSISTENT_RECURSIVE); }
運行結果:
AddWatchMode.PERSISTENT::監聽該節點的變化,包含孩子節點的創建和刪除,但是孩子節點修改則不會被監聽到。
AddWatchMode.PERSISTENT_RECURSIVE:監聽該節點的變化,包含孩子節點的創建和刪除和修改值。
創建順序節點
public void createNode() throws Exception { String a = zooKeeper.create("/my", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println(a); }
刪除節點
public void deleteNode() throws Exception { zooKeeper.delete("/tao",-1); }
總結
整合Zookeeper分布式框架步驟
1.啟動zk的服務端
2.設置配置文件
3.通過java實現客戶端連接服務端
4.監聽節點變化等一些API的實現。