Curator的介紹
Curator就是Zookeeper的一個客戶端工具(不知道Zookeeper的同學可以到http://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/學習下),封裝ZooKeeper client與ZooKeeper server之間的連接處理以及zookeeper的常用操作,提供ZooKeeper各種應用場景(recipe, 比如共享鎖服務, 集群領導選舉機制)的抽象封裝。當然還有他看起來非常舒服的Fluent風格的API。 Curator主要從以下幾個方面降低了zk使用的復雜性:
- 重試機制:提供可插拔的重試機制, 它將給捕獲所有可恢復的異常配置一個重試策略, 並且內部也提供了幾種標准的重試策略(比如指數補償).
- 連接狀態監控: Curator初始化之后會一直的對zk連接進行監聽, 一旦發現連接狀態發生變化, 將作出相應的處理.
- zk客戶端實例管理:Curator對zk客戶端到server集群連接進行管理. 並在需要的情況, 重建zk實例, 保證與zk集群的可靠連接
- 各種使用場景支持:Curator實現zk支持的大部分使用場景支持(甚至包括zk自身不支持的場景), 這些實現都遵循了zk的最佳實踐, 並考慮了各種極端情況.
Curator通過以上的處理, 讓用戶專注於自身的業務本身, 而無需花費更多的精力在zk本身.這里我們介紹的是Curator的Service Discovery模塊
Service Discovery
我們通常在調用服務的時候,需要知道服務的地址,端口,或者其他一些信息,通常情況下,我們是把他們寫到程序里面,但是隨着服務越來越多,維護起來也越來越費勁,更重要的是,由於地址都是在程序中配置的,我們根本不知道遠程的服務是否可用,當我們增加或者刪除服務,我們又需要到配置文件中配置么? 這時候,Zookeeper幫大忙了,我們可以把我們的服務注冊到Zookeeper中,創建一個臨時節點(當連接斷開之后,節點將被刪除),存放我們的服務信息(url,ip,port等信息),把這些臨時節點都存放在以serviceName命名的節點下面,這樣我們要獲取某個服務的地址,只需要到Zookeeper中找到這個path,然后就可以讀取到里面存放的服務信息,這時候我們就可以根據這些信息調用我們的服務。這樣,通過Zookeeper我們就做到了動態的添加和刪除服務,做到了一旦一個服務時效,就會自動從Zookeeper中移除,基本上Curator中的Service Discovery就是做的這點事。
下面我們用兩張圖片來比較一下,一般情況下的服務調用,和使用 Dynamic Service Registry 的區別
使用zookeeper做服務注冊之后:
關於Apache curator的service discovery的一些介紹可以參考官方文檔:http://curator.apache.org/curator-x-discovery/index.html
Service Discovery 的使用
一般而言,分為 Service Registry 和 Service Discovery,對應服務端和客戶端。也就是由服務提供者,講自身的信息注冊到Zookeeper,然后,客戶端通過到Zookeeper中查找服務信息,然后根據信息就行調用(見上圖)。說了這么多,上代碼了。
首先我們定義個payload,我們這一在里面存儲一些服務信息。這個信息會被保存在Zookeeper,這里只是舉個例子,你還可以寫入更多你想要的信息。
package discovery; import org.codehaus.jackson.map.annotate.JsonRootName; /** * Created by hupeng on 2014/9/16. */ @JsonRootName("details") public class InstanceDetails { private String id; private String listenAddress; private int listenPort; private String interfaceName; public InstanceDetails(String id, String listenAddress, int listenPort,String interfaceName) { this.id = id; this.listenAddress = listenAddress; this.listenPort = listenPort; this.interfaceName = interfaceName; } public InstanceDetails() { } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getListenAddress() { return listenAddress; } public void setListenAddress(String listenAddress) { this.listenAddress = listenAddress; } public int getListenPort() { return listenPort; } public void setListenPort(int listenPort) { this.listenPort = listenPort; } public String getInterfaceName() { return interfaceName; } public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } @Override public String toString() { return "InstanceDetails{" + "id='" + id + '\'' + ", listenAddress='" + listenAddress + '\'' + ", listenPort=" + listenPort + ", interfaceName='" + interfaceName + '\'' + '}'; } }
我們先寫服務注冊,也就是服務端那邊做的事情。
package discovery; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import java.io.IOException; /** * Created by hupeng on 2014/9/16. */ public class ServiceRegistrar{ private ServiceDiscovery<InstanceDetails> serviceDiscovery; private final CuratorFramework client; public ServiceRegistrar(CuratorFramework client,String basePath) throws Exception { this.client = client; JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class); serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class) .client(client) .serializer(serializer) .basePath(basePath) .build(); serviceDiscovery.start(); } public void registerService(ServiceInstance<InstanceDetails> serviceInstance) throws Exception { serviceDiscovery.registerService(serviceInstance); } public void unregisterService(ServiceInstance<InstanceDetails> serviceInstance) throws Exception { serviceDiscovery.unregisterService(serviceInstance); } public void updateService(ServiceInstance<InstanceDetails> serviceInstance) throws Exception { serviceDiscovery.updateService(serviceInstance); } public void close() throws IOException { serviceDiscovery.close(); } }
一般情況下,會在我們服務啟動的時候就將服務信息注冊,比如我們是web項目的話可以寫一個Servlet Listener進行注冊,這里為了方便,寫一個Main方法進行測試,如果我們把我們的信息存儲在payload中的話,UriSpec是可以不定義的。
package discovery; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.UriSpec; import java.util.UUID; /** * User: hupeng * Date: 14-9-16 * Time: 下午8:05 */ public class ServerApp { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); ServiceRegistrar serviceRegistrar = new ServiceRegistrar(client,"services"); ServiceInstance<InstanceDetails> instance1 = ServiceInstance.<InstanceDetails>builder() .name("service1") .port(12345) .address("192.168.1.100") //address不寫的話,會取本地ip .payload(new InstanceDetails(UUID.randomUUID().toString(),"192.168.1.100",12345,"Test.Service1")) .uriSpec(new UriSpec("{scheme}://{address}:{port}")) .build(); ServiceInstance<InstanceDetails> instance2 = ServiceInstance.<InstanceDetails>builder() .name("service2") .port(12345) .address("192.168.1.100") .payload(new InstanceDetails(UUID.randomUUID().toString(),"192.168.1.100",12345,"Test.Service2")) .uriSpec(new UriSpec("{scheme}://{address}:{port}")) .build(); serviceRegistrar.registerService(instance1); serviceRegistrar.registerService(instance2); Thread.sleep(Integer.MAX_VALUE); } }
再來寫Service discovery
package discovery; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.strategies.RandomStrategy; import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; /** * Created by hupeng on 2014/9/16. */ public class ServiceDiscoverer { private ServiceDiscovery<InstanceDetails> serviceDiscovery; private Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap(); private List<Closeable> closeableList = Lists.newArrayList(); private Object lock = new Object(); public ServiceDiscoverer(CuratorFramework client ,String basePath) throws Exception { JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class); serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class) .client(client) .basePath(basePath) .serializer(serializer) .build(); serviceDiscovery.start(); } public ServiceInstance<InstanceDetails> getInstanceByName(String serviceName) throws Exception { ServiceProvider<InstanceDetails> provider = providers.get(serviceName); if (provider == null) { synchronized (lock) { provider = providers.get(serviceName); if (provider == null) { provider = serviceDiscovery.serviceProviderBuilder(). serviceName(serviceName). providerStrategy(new RandomStrategy<InstanceDetails>()) .build(); provider.start(); closeableList.add(provider); providers.put(serviceName, provider); } } } return provider.getInstance(); } public synchronized void close(){ for (Closeable closeable : closeableList) { CloseableUtils.closeQuietly(closeable); } } }
客戶端測試程序:
package discovery; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.discovery.ServiceInstance; /** * User: hupeng * Date: 14-9-16 * Time: 下午8:16 */ public class ClientApp { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); ServiceDiscoverer serviceDiscoverer = new ServiceDiscoverer(client,"services"); ServiceInstance<InstanceDetails> instance1 = serviceDiscoverer.getInstanceByName("service1"); System.out.println(instance1.buildUriSpec()); System.out.println(instance1.getPayload()); ServiceInstance<InstanceDetails> instance2 = serviceDiscoverer.getInstanceByName("service1"); System.out.println(instance2.buildUriSpec()); System.out.println(instance2.getPayload()); serviceDiscoverer.close(); CloseableUtils.closeQuietly(client); } }
好了,代碼就到這里,如果有什么問題的話,請指正。