為了方便現場安裝完了etcd集群后確認集群是否好用,簡單寫了個測試類,網上搜的有點亂還有些不能運行,在這里再整理一個能夠直接運行的
1、我把etcd的API設成3版本了,調用使用的jetcd,功能挺多,這里只用了最簡單的數據增刪查操作,再Maven配置文件中增加依賴
<dependency> <groupId>io.etcd</groupId> <artifactId>jetcd-core</artifactId> <version>0.3.0</version> </dependency>
2、直接貼代碼了,代碼很簡單沒什么好解釋的,要想用復雜功能還需要再去研究jetcd的各種API了
/** * */ package com.zyh.etcd; import static com.google.common.base.Charsets.UTF_8; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.Watch.Watcher; import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.WatchOption; import io.etcd.jetcd.watch.WatchEvent; /** * etcd 操作工具,包括啟動監聽和操作etcd v3 版本協議,只測試功能,未添加log * * @version 1.0 * @author zhangyanhua * @date 2019年10月29日 下午4:30:57 */ public class EtcdUtil { // etcl客戶端鏈接 private static Client etcdClient = null; // 鏈接初始化 public static synchronized Client getEtclClient() { if (etcdClient == null) { //String[] urls = PropertiesUtil.getValue("etcd_node_url").split(","); String[] urls = "http://10.110.30.210:2379,http://10.110.30.212:2379,http://10.110.30.213:2379".split(","); etcdClient = Client.builder().endpoints(urls).build(); } return etcdClient; } /** * 新增或者修改指定的配置 * * @param key * @param value * @throws Exception * @author zhangyanhua * @date 2019年10月29日 下午4:41:06 */ public static void putEtcdValueByKey(String key, String value) throws Exception { Client client = EtcdUtil.getEtclClient(); client.getKVClient().put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8)).get(); //System.out.println("put etcd key:value \"" + key + ":" + value + "\" success"); client.close(); etcdClient = null; } /** * 查詢指定的key名稱對應的value * * @param key * @return value值 * @throws Exception * @author zhangyanhua * @date 2019年10月29日 下午4:35:44 */ public static String getEtcdValueByKey(String key) throws Exception { Client client = EtcdUtil.getEtclClient(); GetResponse getResponse = client.getKVClient() .get(ByteSequence.from(key, UTF_8), GetOption.newBuilder().build()).get(); client.close(); etcdClient = null; // key does not exist if (getResponse.getKvs().isEmpty()) { return null; } return getResponse.getKvs().get(0).getValue().toString(UTF_8); } /** * 刪除指定的配置 * * @param key * @throws InterruptedException * @throws ExecutionException * @author zhangyanhua * @date 2019年10月29日 下午4:53:24 */ public static void deleteEtcdValueByKey(String key) throws InterruptedException, ExecutionException { Client client = EtcdUtil.getEtclClient(); client.getKVClient().delete(ByteSequence.from(key, UTF_8)).get(); //System.out.println("delete etcd key \"" + key + "\" success"); client.close(); etcdClient = null; } /** * 持續監控某個key變化的方法,執行后如果key有變化會被監控到,輸入結果如下 * watch type= "PUT", key= "zyh1", value= "zyh1-value" * watch type= "PUT", key= "zyh1", value= "zyh1-value111" * watch type= "DELETE", key= "zyh1", value= "" * * @param key * @throws Exception * @author zhangyanhua * @date 2019年10月29日 下午5:26:09 */ public static void watchEtcdKey(String key) throws Exception { Client client = EtcdUtil.getEtclClient(); // 最大事件數量 Integer maxEvents = Integer.MAX_VALUE; CountDownLatch latch = new CountDownLatch(maxEvents); Watcher watcher = null; try { ByteSequence watchKey = ByteSequence.from(key, UTF_8); WatchOption watchOpts = WatchOption.newBuilder().build(); watcher = client.getWatchClient().watch(watchKey, watchOpts, response -> { for (WatchEvent event : response.getEvents()) { System.out.println("watch type= \"" + event.getEventType().toString() + "\", key= \"" + Optional.ofNullable(event.getKeyValue().getKey()).map(bs -> bs.toString(UTF_8)).orElse("") + "\", value= \"" + Optional.ofNullable(event.getKeyValue().getValue()) .map(bs -> bs.toString(UTF_8)).orElse("") + "\""); } latch.countDown(); }); latch.await(); } catch (Exception e) { if (watcher != null) { watcher.close(); client.close(); etcdClient = null; } throw e; } } /** * TODO * * @param args * @throws Exception * @author zhangyanhua * @date 2019年10月29日 下午6:01:54 */ public static void main(String[] args) throws Exception { boolean success = true; String key = "zyh"; String value = "zyh-value"; String newValue = "zyh-value-new"; System.out.println("**** 測試方法開始 ****"); EtcdUtil.putEtcdValueByKey(key, value); String retValue = EtcdUtil.getEtcdValueByKey(key); // System.out.println("查詢key " + key + " 對應的值是 " + retValue); if (value.equals(retValue)) { System.out.println("數據插入成功。"); System.out.println("數據查詢成功。"); } else { success = false; System.out.println("數據插入或查詢失敗!"); } EtcdUtil.putEtcdValueByKey(key, newValue); retValue = EtcdUtil.getEtcdValueByKey(key); // System.out.println("查詢key " + key + " 對應的值是 " + retValue); if (newValue.equals(retValue)) { System.out.println("數據更新成功。"); } else { success = false; System.out.println("數據更新失敗!"); } EtcdUtil.deleteEtcdValueByKey(key); retValue = EtcdUtil.getEtcdValueByKey(key); // System.out.println("查詢key " + key + " 對應的值是 " + retValue); if (retValue == null) { System.out.println("數據刪除成功。"); } else { success = false; System.out.println("數據刪除失敗!"); } // EtcdUtil.watchEtcdKey(key); if (success) { System.out.println("**** 測試方法全部通過。 ****"); } else { System.out.println("**** 測試失敗! ****"); } } }
上面代碼運行結果如下
還有個監控數據的watchEtcdKey方法沒有執行,這個執行后代碼是一直運行的,持續監控想要監控的數據,當數據出現變化事可以獲取變化的事件,例如這樣
在后台操作一組數據
監控端可以檢測到數據變化