一、思路
-
服務方啟動則將自己注冊到zk上,臨時節點,節點數據為IP和端口信息
-
客戶端添加監聽器,監聽節點變化,每次變化更新本地服務列表
-
服務端有問題,則自動摘除節點,依靠臨時節點實現
二、注冊方實現
1)添加節點監聽器
public class ServiceRegister { private static final String BASE_SERVICES = "/services"; private static final String SERVICE_NAME="/products"; public static void register(String address,int port) { try { //產品服務最終會注冊倒的地址 String path = BASE_SERVICES + SERVICE_NAME; ZooKeeper zooKeeper = new ZooKeeper("192.168.112.131:2181",5000, (watchedEvent)->{}); createNodeSafe( zooKeeper,BASE_SERVICES); createNodeSafe( zooKeeper ,path ); //拼接ip和端口 String server_path = address+":" +port; //注冊的類型,EPHEMERAL_SEQUENTIAL 零時,並且帶序號 zooKeeper.create(path+"/child",server_path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("產品服務注冊成功"); } catch (Exception e) { e.printStackTrace(); } } private static void createNodeSafe(ZooKeeper zooKeeper , String path) throws KeeperException, InterruptedException { Stat exists = zooKeeper.exists(BASE_SERVICES + SERVICE_NAME, false); //先判斷服務根路徑是否存在 if(exists == null) { zooKeeper.create(path,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } }
2)創建一個監聽器
public class InitListener implements ServletContextListener { @Override //容器初始化的時候會調用,重啟tomcat public void contextInitialized(ServletContextEvent sce) { try { Properties properties = new Properties(); properties.load(InitListener.class.getClassLoader().getResourceAsStream("application.properties")); //獲得IP String hostAddress = InetAddress.getLocalHost().getHostAddress(); //獲得端口 int port = Integer.valueOf(properties.getProperty("server.port")); ServiceRegister.register(hostAddress,port); } catch (Exception e) { e.printStackTrace(); } } @Override public void contextDestroyed(ServletContextEvent sce) { } }
3)注冊監聽器,容器啟動的時候加載
@Bean //啟動監聽起,當tomcat啟動的時候,會調用InitListener public ServletListenerRegistrationBean servletListenerRegistrationBean() { ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean(); servletListenerRegistrationBean.setListener(new InitListener()); return servletListenerRegistrationBean; }
三、使用方
1)服務啟動添加節點監聽器

public class InitListener implements ServletContextListener { private static final String BASE_SERVICES = "/services"; private static final String SERVICE_NAME="/products"; private static AtomicInteger errorTryTimes = new AtomicInteger(0); private static final String zkAddr ="192.168.112.131:2181" ; private static final Integer zkTimeout = 5000 ; private ZooKeeper zooKeeper; private void init() { try { //連接上zk 獲得列表信息 zooKeeper = new ZooKeeper(zkAddr ,zkTimeout,(watchedEvent)->{ //當有節點變更的時候通知倒orderService if(watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged && watchedEvent.getPath().equals(BASE_SERVICES+SERVICE_NAME)) { updateServerList(); } }); //第一次連接的時候要獲得列表 updateServerList(); } catch (IOException e) { e.printStackTrace(); } } private void updateServerList() { List<String> newServerList = new ArrayList<>(); try { List<String> children = zooKeeper.getChildren(BASE_SERVICES + SERVICE_NAME, true); for(String subNode:children) { byte[] data = zooKeeper.getData(BASE_SERVICES + SERVICE_NAME + "/" + subNode, false, null); String host = new String(data, "utf-8"); System.out.println("host:"+host); newServerList.add(host); } LoadBalance.SERVICE_LIST = newServerList; } catch (Exception e) { //可能異常導致本地列表沒有更新,則異常可以重試 if(errorTryTimes.incrementAndGet()<3){ init(); } //打印日志 e.printStackTrace(); } } @Override public void contextInitialized(ServletContextEvent sce) { init(); } @Override public void contextDestroyed(ServletContextEvent sce) { } }
注冊監聽器:
@Bean public ServletListenerRegistrationBean servletListenerRegistrationBean() { ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean(); servletListenerRegistrationBean.setListener(new InitListener()); return servletListenerRegistrationBean; }
2) 客戶端保存服務端的服務列表數據
public abstract class LoadBalance { //最好使用set,這樣如果有重復的直接去掉 public volatile static List<String> SERVICE_LIST; public abstract String choseServiceHost(); }
3)簡單的負載均衡,訪問可以通過隨機獲得的ip端口,然后拼接對應的參數訪問:http或者其他的都可以
public class RamdomLoadBalance extends LoadBalance { @Override public String choseServiceHost() { String result = ""; //SERVICE_LIST 產品 服務的列表 if(!CollectionUtils.isEmpty(SERVICE_LIST)) { // 192.168.30.3:8083 //192.168.30.3:8084 2 //index 0,1 //傳入一個種子 int index = new Random().nextInt(SERVICE_LIST.size()); result = SERVICE_LIST.get(index); } return result ; } }