zk 10之:Curator之三:服務的注冊及發現


Service Discovery 

  我們通常在調用服務的時候,需要知道服務的地址,端口,或者其他一些信息,通常情況下,我們是把他們寫到程序里面,但是隨着服務越來越多,維護起來也越來越費勁,更重要的是,由於地址都是在程序中配置的,我們根本不知道遠程的服務是否可用,當我們增加或者刪除服務,我們又需要到配置文件中配置么? 這時候,Zookeeper幫大忙了,我們可以把我們的服務注冊到Zookeeper中,創建一個臨時節點(當連接斷開之后,節點將被刪除),存放我們的服務信息(url,ip,port等信息),把這些臨時節點都存放在以serviceName命名的節點下面,這樣我們要獲取某個服務的地址,只需要到Zookeeper中找到這個path,然后就可以讀取到里面存放的服務信息,這時候我們就可以根據這些信息調用我們的服務。這樣,通過Zookeeper我們就做到了動態的添加和刪除服務,做到了一旦一個服務時效,就會自動從Zookeeper中移除,基本上Curator中的Service Discovery就是做的這點事。
  下面我們用兩張圖片來比較一下,一般情況下的服務調用,和使用 Dynamic Service Registry 的區別

    

 

  使用zookeeper做服務注冊之后:

 

關於Apache curator的service discovery的一些介紹可以參考官方文檔:http://curator.apache.org/curator-x-discovery/index.html

 

Service Discovery 的使用

  一般而言,分為 Service Registry 和 Service Discovery,對應服務端和客戶端。也就是由服務提供者,講自身的信息注冊到Zookeeper,然后,客戶端通過到Zookeeper中查找服務信息,然后根據信息就行調用(見上圖)。說了這么多,上代碼了。

  首先我們定義個payload,我們這一在里面存儲一些服務信息。這個信息會被保存在Zookeeper,這里只是舉個例子,你還可以寫入更多你想要的信息。

一般情況下,會在我們服務啟動的時候就將服務信息注冊,比如我們是web項目的話可以寫一個Servlet Listener進行注冊,這里為了方便,寫一個Main方法進行測試,如果我們把我們的信息存儲在payload中的話,UriSpec是可以不定義的。

package com.sf.zkclient.discovery;

import org.codehaus.jackson.map.annotate.JsonRootName;

@JsonRootName("details")
public class InstanceDetails {

    private String id;

    private String listenAddress;

    private int listenPort;

    private String interfaceName;

    public InstanceDetails(String id, String listenAddress, int listenPort, String interfaceName) {
        this.id = id;
        this.listenAddress = listenAddress;
        this.listenPort = listenPort;
        this.interfaceName = interfaceName;
    }

    public InstanceDetails() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getListenAddress() {
        return listenAddress;
    }

    public void setListenAddress(String listenAddress) {
        this.listenAddress = listenAddress;
    }

    public int getListenPort() {
        return listenPort;
    }

    public void setListenPort(int listenPort) {
        this.listenPort = listenPort;
    }

    public String getInterfaceName() {
        return interfaceName;
    }

    public void setInterfaceName(String interfaceName) {
        this.interfaceName = interfaceName;
    }

    @Override
    public String toString() {
        return "InstanceDetails{" + "id='" + id + '\'' + ", listenAddress='" + listenAddress + '\'' + ", listenPort="
                + listenPort + ", interfaceName='" + interfaceName + '\'' + '}';
    }
}
package com.sf.zkclient.discovery;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

import java.io.IOException;

public class ServiceRegistry {

    private ServiceDiscovery<InstanceDetails> serviceDiscovery;
    private final CuratorFramework client;

    public ServiceRegistry(CuratorFramework client, String basePath) throws Exception {
        this.client = client;
        JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
                InstanceDetails.class);
        serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).serializer(serializer)
                .basePath(basePath).build();
        serviceDiscovery.start();
    }

    public void registerService(ServiceInstance<InstanceDetails> serviceInstance) throws Exception {
        serviceDiscovery.registerService(serviceInstance);
    }

    public void unregisterService(ServiceInstance<InstanceDetails> serviceInstance) throws Exception {
        serviceDiscovery.unregisterService(serviceInstance);

    }

    public void updateService(ServiceInstance<InstanceDetails> serviceInstance) throws Exception {
        serviceDiscovery.updateService(serviceInstance);

    }

    public void close() throws IOException {
        serviceDiscovery.close();
    }
}
package com.sf.zkclient.discovery;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.strategies.RandomStrategy;

import java.io.Closeable;
import java.util.List;
import java.util.Map;

public class ServiceDiscoverer {
    private ServiceDiscovery<InstanceDetails> serviceDiscovery;
    private Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap();
    private List<Closeable> closeableList = Lists.newArrayList();
    private Object lock = new Object();

    public ServiceDiscoverer(CuratorFramework client, String basePath) throws Exception {
        JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(
                InstanceDetails.class);
        serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(basePath)
                .serializer(serializer).build();

        serviceDiscovery.start();
    }

    public ServiceInstance<InstanceDetails> getInstanceByName(String serviceName) throws Exception {
        ServiceProvider<InstanceDetails> provider = providers.get(serviceName);
        if (provider == null) {
            synchronized (lock) {
                provider = providers.get(serviceName);
                if (provider == null) {
                    provider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName)
                            .providerStrategy(new RandomStrategy<InstanceDetails>()).build();
                    provider.start();
                    closeableList.add(provider);
                    providers.put(serviceName, provider);
                }
            }
        }

        return provider.getInstance();
    }

    public synchronized void close() {
        for (Closeable closeable : closeableList) {
            CloseableUtils.closeQuietly(closeable);
        }
    }

}
package com.sf.zkclient.discovery;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;


import java.util.UUID;

/**
 * User: hupeng
 * Date: 14-9-16
 * Time: 下午8:05
 */
public class ServerApp {

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
        client.start();
        ServiceRegistry serviceRegistrar = new ServiceRegistry(client,"services");
        ServiceInstance<InstanceDetails> instance1 = ServiceInstance.<InstanceDetails>builder()
                .name("service1")
                .port(12345)
                .address("192.168.1.100")   //address不寫的話,會取本地ip
                .payload(new InstanceDetails(UUID.randomUUID().toString(),"192.168.1.100",12345,"Test.Service1"))
                .uriSpec(new UriSpec("{scheme}://{address}:{port}"))
                .build();
        ServiceInstance<InstanceDetails> instance2 = ServiceInstance.<InstanceDetails>builder()
                .name("service2")
                .port(12345)
                .address("192.168.1.100")
                .payload(new InstanceDetails(UUID.randomUUID().toString(),"192.168.1.100",12345,"Test.Service2"))
                .uriSpec(new UriSpec("{scheme}://{address}:{port}"))
                .build();
        serviceRegistrar.registerService(instance1);
        serviceRegistrar.registerService(instance2);


        Thread.sleep(Integer.MAX_VALUE);
    }
}
package com.sf.zkclient.discovery;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceInstance;

public class ClientApp {

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
        client.start();
        ServiceDiscoverer serviceDiscoverer = new ServiceDiscoverer(client,"services");

        ServiceInstance<InstanceDetails> instance1 = serviceDiscoverer.getInstanceByName("service1");

        System.out.println(instance1.buildUriSpec());
        System.out.println(instance1.getPayload());

        ServiceInstance<InstanceDetails> instance2 = serviceDiscoverer.getInstanceByName("service1");

        System.out.println(instance2.buildUriSpec());
        System.out.println(instance2.getPayload());

        serviceDiscoverer.close();
        CloseableUtils.closeQuietly(client);
    }
}

結果:

17/02/25 16:59:06 INFO zookeeper.ClientCnxn: Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
17/02/25 16:59:06 WARN zookeeper.ClientCnxnSocket: Connected to an old server; r-o mode will be unavailable
17/02/25 16:59:06 INFO zookeeper.ClientCnxn: Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x15a739961cd0011, negotiated timeout = 40000
17/02/25 16:59:06 INFO state.ConnectionStateManager: State change: CONNECTED
http://192.168.1.100:12345
InstanceDetails{id='a197a1b9-a6c6-495e-b500-1160e02bd0a6', listenAddress='192.168.1.100', listenPort=12345, interfaceName='Test.Service1'}
http://192.168.1.100:12345
InstanceDetails{id='a197a1b9-a6c6-495e-b500-1160e02bd0a6', listenAddress='192.168.1.100', listenPort=12345, interfaceName='Test.Service1'}
17/02/25 16:59:07 INFO imps.CuratorFrameworkImpl: backgroundOperationsLoop exiting
17/02/25 16:59:07 INFO zookeeper.ZooKeeper: Session: 0x15a739961cd0011 closed

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM