接着之前的RPC實現:https://www.cnblogs.com/wuzhenzhao/p/9962250.html RPC框架的簡單實現,基於這個小程序,在我學習完Zookeeper之后如何將注冊中心與RPC調用結合起來。直接進入正題
我這邊用到的 curator 客戶端工具的依賴是:版本太高不兼容的話會報異常
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.5.0</version>
</dependency>
承接上文,我們之前在服務發布的時候是直接寫死一個地址發布的,現在需要結合注冊中心 ,那么我們勢必要在發布服務的時候在指定的 zookeeper服務上面注冊我們的節點,基於 zookeeper 臨時節點的特性,一旦服務 down機,那么這個節點也會消失,同時會有事件觸發,添加一個注冊服務:
服務端:
public interface IRegisterCenter { /** * 注冊服務名稱和服務地址 * @param serviceName * @param serviceAddress */
void register(String serviceName,String serviceAddress); }
實現:注冊服務的實現就是往zookeeper 的指點根節點下插入一個臨時節點,如果根節點不存在則先創建,由於 curator 做了很好的實現,這里先用他來做實現。
public class RegisterCenterImpl implements IRegisterCenter { //zk鏈接地址
public final static String CONNNECTION_STR = "192.168.1.101:2181"; //注冊根節點
public final static String ZK_REGISTER_PATH = "/registrys"; private CuratorFramework curatorFramework; {// 這段代碼無非是連接服務器,自己看着寫在哪里把
curatorFramework = CuratorFrameworkFactory.builder(). connectString(CONNNECTION_STR). // sessionTimeoutMs(4000). retryPolicy(new ExponentialBackoffRetry(1000, 10)).build(); curatorFramework.start(); } @Override public void register(String serviceName, String serviceAddress) { //注冊相應的服務
String servicePath = ZK_REGISTER_PATH + "/" + serviceName; try { //判斷 /registrys/product-service是否存在,不存在則創建
if (curatorFramework.checkExists().forPath(servicePath) == null) { curatorFramework.create().creatingParentsIfNeeded(). withMode(CreateMode.PERSISTENT).forPath(servicePath, "0".getBytes()); } // 組裝節點地址
String addressPath = servicePath + "/" + serviceAddress; String rsNode = curatorFramework.create().withMode(CreateMode.EPHEMERAL). forPath(addressPath, "0".getBytes()); System.out.println("服務注冊成功:" + rsNode); } catch (Exception e) { e.printStackTrace(); } } }
有了注冊服務我們需要進入服務發布的類里面進行一些修改,由於服務的地址及端口都注冊到注冊中心上了,我們需要增加注冊中心屬性,便已獲取相關信息。
public class RpcServer { //創建一個線程池
private static final ExecutorService executorService= Executors.newCachedThreadPool(); private IRegisterCenter registerCenter; //注冊中心
private String serviceAddress; //服務發布地址 // 存放服務名稱和服務對象之間的關系
Map<String,Object> handlerMap=new HashMap<>(); public RpcServer(IRegisterCenter registerCenter, String serviceAddress) { this.registerCenter = registerCenter; this.serviceAddress = serviceAddress; } /** * 綁定服務名稱和服務對象 * @param services */
public void bind(Object... services){ for(Object service:services){// 這里為了獲取對應服務的類名,我們這里定義了一個注解來實現 代碼請看下面
RpcAnnotation annotation=service.getClass().getAnnotation(RpcAnnotation.class); String serviceName=annotation.value().getName(); handlerMap.put(serviceName,service);//綁定服務接口名稱對應的服務
} } public void publisher(){ ServerSocket serverSocket=null; try{ String[] addrs=serviceAddress.split(":");//這個時候服務的ip port 都是從這個注冊地址上獲取
serverSocket=new ServerSocket(Integer.parseInt(addrs[1])); //啟動一個服務監聽 // handlerMap 可能存放多個發布服務,我這里演示的是一個
for(String interfaceName:handlerMap.keySet()){ registerCenter.register(interfaceName,serviceAddress); System.out.println("注冊服務成功:"+interfaceName+"->"+serviceAddress); } while(true){ //循環監聽
Socket socket=serverSocket.accept(); //監聽服務 //通過線程池去處理請求
executorService.execute(new ProcessorHandler(socket,handlerMap)); } } catch (IOException e) { e.printStackTrace(); }finally { if(serverSocket!=null){ try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
我們可以采用注解的方式把這個服務注冊上去,注解類:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface RpcAnnotation { /** * 對外發布的服務的接口地址 * @return */ Class<?> value(); // 暫時沒用 可以處理版本
String version() default ""; }
然后再對應需要發布的服務類實現上加上注解:
public interface HelloService { String sayHello(String msg); }
@RpcAnnotation(HelloService.class) public class HelloServiceImpl implements HelloService { @Override public String sayHello(String msg) { return " RPC Hello, " + msg; } }
上面監聽服務請求的處理也發生了改變,之前傳入的是單獨的service 現在需要把綁定的 handlerMap 帶過去。 接下來看 ProcessorHandler :
public class ProcessorHandler implements Runnable { private Socket socket; Map<String, Object> handlerMap;// 現在需要從map里獲取需要發布的綁定服務
public ProcessorHandler(Socket socket, Map<String, Object> handlerMap) { this.socket = socket; this.handlerMap = handlerMap; } @Override public void run() { //處理請求
ObjectInputStream inputStream = null; try { //獲取客戶端的輸入流
inputStream = new ObjectInputStream(socket.getInputStream()); //反序列化遠程傳輸的對象RpcRequest
RpcRequest request = (RpcRequest) inputStream.readObject(); Object result = invoke(request); //通過反射去調用本地的方法 //通過輸出流講結果輸出給客戶端
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(result); outputStream.flush(); inputStream.close(); outputStream.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } } private Object invoke(RpcRequest request) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException { //以下均為反射操作,目的是通過反射調用服務
Object[] args = request.getParameters(); Class<?>[] types = new Class[args.length]; for (int i = 0; i < args.length; i++) { types[i] = args[i].getClass(); } //從handlerMap中,根據客戶端請求的地址,去拿到響應的服務,通過反射發起調用
Object service = handlerMap.get(request.getClassName()); Method method = service.getClass().getMethod(request.getMethodName(), types); return method.invoke(service, args); } }
然后發布服務:
public static void main(String[] args) throws IOException { HelloService helloService=new HelloServiceImpl(); IRegisterCenter registerCenter=new RegisterCenterImpl(); RpcServer rpcServer=new RpcServer(registerCenter,"127.0.0.1:8080"); rpcServer.bind(helloService); rpcServer.publisher(); System.in.read(); }
輸出:
服務注冊成功:/registrys/com.wuzz.demo.rpc.server.HelloService/127.0.0.1:8080 注冊服務成功:com.wuzz.demo.rpc.server.HelloService->127.0.0.1:8080
對應的zk服務器上也創建了對應的節點:
客戶端:
對於客戶端需要從遠程調用服務,現在是需要從注冊中心先去獲取該服務類鎖對應的再注冊中心注冊的服務地址及端口,再去發起請求,並且對指定到 路徑進行監聽,那么客戶端需要定義一個服務的發現服務:
public interface IServiceDiscovery { /** * 根據請求的服務地址,獲得對應的調用地址 * @param serviceName * @return */ String discover(String serviceName); }
實現:
public class ServiceDiscoveryImpl implements IServiceDiscovery { // 從指定節點下獲取的子節點列表
List<String> repos = new ArrayList<>(); // 服務器連接地址,就是服務端的 ZkConfig.CONNNECTION_STR
private String address; private CuratorFramework curatorFramework; //注冊根節點
public final static String ZK_REGISTER_PATH = "/registrys"; // 為了方便測試,這里直接再構造里面啟動連接
public ServiceDiscoveryImpl(String address) { this.address = address; curatorFramework = CuratorFrameworkFactory.builder(). connectString(address). sessionTimeoutMs(4000). retryPolicy(new ExponentialBackoffRetry(1000, 10)).build(); curatorFramework.start(); } @Override // 本質就是獲取指定服務節點下的子節點
public String discover(String serviceName) { String path = ZK_REGISTER_PATH + "/" + serviceName; try { repos = curatorFramework.getChildren().forPath(path); } catch (Exception e) { throw new RuntimeException("獲取子節點異常:" + e); } //動態發現服務節點的變化
registerWatcher(path); //簡單負載均衡機制
if (repos == null || repos.size() == 0) { return null; } if (repos.size() == 1) { return repos.get(0); } int len = repos.size(); Random random = new Random(); return repos.get(random.nextInt(len));//返回調用的服務地址
} // 這里是之前提到的用 curator 客戶端進行時間注冊的操作
private void registerWatcher(final String path) { PathChildrenCache childrenCache = new PathChildrenCache (curatorFramework, path, true); PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { repos = curatorFramework.getChildren().forPath(path); } }; childrenCache.getListenable().addListener(pathChildrenCacheListener); try { childrenCache.start(); } catch (Exception e) { throw new RuntimeException("注冊PatchChild Watcher 異常" + e); } } }
接下去由於客戶端是通過動態代理去獲取遠程對象,由於原來參數為 IP Port ,現在需要通過注冊中心去拿:
public class RpcClientProxy { // 服務發現
private IServiceDiscovery serviceDiscovery; public RpcClientProxy(IServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; } /** * 創建客戶端的遠程代理。通過遠程代理進行訪問 * @param interfaceCls * @param <T> * @return */
public <T> T clientProxy(final Class<T> interfaceCls){ //使用到了動態代理。
return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class[]{interfaceCls},new RemoteInvocationHandler(serviceDiscovery)); } }
對應的RemoteInvocationHandler:
public class RemoteInvocationHandler implements InvocationHandler { private IServiceDiscovery serviceDiscovery; public RemoteInvocationHandler(IServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //組裝請求
RpcRequest request = new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); String serviceAddress = serviceDiscovery.discover(request.getClassName()); //根據接口名稱得到對應的服務地址 //通過tcp傳輸協議進行傳輸
TCPTransport tcpTransport = new TCPTransport(serviceAddress); //發送請求
return tcpTransport.send(request); } }
由於 IP Port 都是來自注冊中心的一個服務節點下的子節點的信息,而zookeeper服務器上存儲的是該服務的全路徑名稱,在其之下的直接點才是真的IP/PORT的信息,所以我們通過這個request.getClassName()獲取服務的地址,這里的傳輸方法也要進行修改:僅僅是修改了 IP/PORT的來源。
public class TCPTransport { private String serviceAddress; public TCPTransport(String serviceAddress) { this.serviceAddress=serviceAddress; } //創建一個socket連接
private Socket newSocket(){ System.out.println("創建一個新的連接"); Socket socket; try{ String[] arrs=serviceAddress.split(":"); socket=new Socket(arrs[0],Integer.parseInt(arrs[1])); return socket; }catch (Exception e){ throw new RuntimeException("連接建立失敗"); } } public Object send(RpcRequest request){ Socket socket=null; try { socket = newSocket(); //獲取輸出流,將客戶端需要調用的遠程方法參數request發送給
ObjectOutputStream outputStream=new ObjectOutputStream (socket.getOutputStream()); outputStream.writeObject(request); outputStream.flush(); //獲取輸入流,得到服務端的返回結果
ObjectInputStream inputStream=new ObjectInputStream (socket.getInputStream()); Object result=inputStream.readObject(); inputStream.close(); outputStream.close(); return result; }catch (Exception e ){ throw new RuntimeException("發起遠程調用異常:",e); }finally { if(socket!=null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
啟動客戶端:
public static void main(String[] args) throws InterruptedException { IServiceDiscovery serviceDiscovery = new ServiceDiscoveryImpl("192.168.1.101:2181"); RpcClientProxy rpcClientProxy = new RpcClientProxy(serviceDiscovery); HelloService hello = rpcClientProxy.clientProxy(HelloService.class); System.out.println(hello.sayHello("wuzz")); }
輸出:
創建一個新的連接
RPC Hello , wuzz
這樣就完成了注冊中心的簡單結合,這里還可以通過注解里的 version實現版本的管理,以及如果想要實現負載的效果,由於客戶端已經實現了負載的簡單實現,只需要啟動兩個服務端,向zk注冊,然后啟動客戶端去調用即可。