com.netflix.loadbalancer.DynamicServerListLoadBalancer
中 updateListOfServers()方法 用於從注冊中心刷新服務列表
@VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); }
封裝在 ServerListUpdater.UpdateAction對象中
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } };
UpdateAction 可以從 PollingServerListUpdater 中拿到,所以需要自定義 ServerListUpdater 類繼承 PollingServerListUpdater ,注入並替換默認的PollingServerListUpdater
@Bean @ConditionalOnMissingBean public ServerListUpdater ribbonServerListUpdater(IClientConfig config) { return new PollingServerListUpdater(config); } @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater)); }
具體操作如下:
@Component("ribbonServerListUpdater") public class MyPollingServerListUpdater extends PollingServerListUpdater { private UpdateAction updateAction; @Override public synchronized void start(UpdateAction updateAction) { this.updateAction = updateAction; super.start(updateAction); } public UpdateAction getUpdateAction(){ return updateAction; } }
定義Nacos監聽器並調用 updateAction.doUpdate() 方法
@Component public class ServerStatusListner { @Autowired private MyPollingServerListUpdater myListUpdater; @Value("${spring.cloud.nacos.discovery.namespace}") private String namespace; @Value("${spring.cloud.nacos.discovery.server-addr}") private String serverAddr; @PostConstruct public void init() throws Exception { //初始化監聽服務上下線 Properties properties = System.getProperties(); properties.put(PropertyKeyConst.NAMESPACE, namespace); properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr); NamingService naming = NamingFactory.createNamingService(properties); List<String> serviceNames = new ArrayList<>(); // 要監聽的服務名稱 serviceNames.add("SVC1");
serviceNames.add("SVC2");
serviceNames.stream().forEach(serviceName->{ try { naming.subscribe(serviceName, (event -> { //通知ribbon更新服務列表 ServerListUpdater.UpdateAction updateAction = myListUpdater.getUpdateAction(); if (updateAction != null){ updateAction.doUpdate(); } NamingEvent namingEvent = (NamingEvent) event; List<Instance> instances = namingEvent.getInstances(); String name = namingEvent.getServiceName(); if(instances != null && !instances.isEmpty()){ instances.stream().forEach(instance -> { System.out.println("服務"+name+":"+instance); }); }else { System.out.println("服務"+name+"列表為空"); } })); } catch (NacosException e) { e.printStackTrace(); } }); } }