一、RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通信程序之間攜帶信息數據。在OSI網絡通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分布式多程序在內的應用程序更加容易。
RPC采用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,然后等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息到達為止。當一個調用信息到達,服務器獲得進程參數,計算結果,發送答復
信息,然后等待下一個調用信息,最后,客戶端調用進程接收答復信息,獲得進程結果,然后調用執行繼續進行。
二、調用過程
1.調用客戶端句柄;執行傳送參數
2.調用本地系統
內核發送網絡消息
4.服務器句柄得到消息並取得參數
5.執行遠程過程
6.執行的過程將結果返回服務器句柄
7.服務器句柄返回結果,調用遠程系統
內核
8.消息傳回
本地主機
9.客戶句柄由內核接收消息
10.客戶接收句柄返回的數據
三、調用過程,大概就是這樣子,這樣子來張圖簡單一點

1、這張圖是dubbo官網的模型圖
2、過程:服務——>注冊地址到zk
客戶端——>通過zk獲取對應服務地址
代理——>通過接口代理,實現服務調用
調用方式——>這個不一定了,一般tcp方式比較直接
四、簡易版rpc實現,注冊中心用的zk,可以使用其他注冊中心
1)目錄結構
2)依賴的jar
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>Zookeeper</artifactId> <version>3.4.0</version> </dependency>
3)zookeeper的連接工具
import org.apache.zookeeper.*; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class ZookeeperUtils { private static ZooKeeper zooKeeper = null; public static ZooKeeper connect() throws IOException, InterruptedException { CountDownLatch latch = new CountDownLatch(1); //連接zk zooKeeper = new ZooKeeper("localhost:2182", 60000, watchedEvent -> { if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) { latch.countDown(); } }); //無連接阻塞 latch.await(); return zooKeeper; } }
4)數據傳輸的實體
import java.io.Serializable; /** * 請求需要的數據 */ public class RpcRequest implements Serializable{ //接口名稱,用於反射 private String interfaceName; //調用方法 private String method; //參數類型 private Class<?>[] parameterTypes; //參數 private Object[] params; public String getInterfaceName() { return interfaceName; } public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } public String getMethod() { return method; } public void setMethod(String method) { this.method = method; } public Class<?>[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } public Object[] getParams() { return params; } public void setParams(Object[] params) { this.params = params; } }
import java.io.Serializable; /** * 相應對象 */ public class RpcResponse implements Serializable { //返回狀態,當然這里可以,加入對應的異常等(這里簡化) private String status; //返回的數據 private Object data; public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public Object getData() { return data; } public void setData(Object data) { this.data = data; } }
5)測試的服務以及實現
import com.pinnet.zookeeper.service.ITestService; public class TestServiceImpl implements ITestService{ public String test(String name) { System.out.println(name); return "return" + name; } }
6)提供方
import com.pinnet.zookeeper.bean.RpcBeanFactory; import com.pinnet.zookeeper.data.RpcRequest; import com.pinnet.zookeeper.data.RpcResponse; import com.pinnet.zookeeper.service.ITestService; import com.pinnet.zookeeper.service.impl.TestServiceImpl; import com.pinnet.zookeeper.zookeeper.ZookeeperUtils; import org.apache.zookeeper.*; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.ServerSocket; import java.net.Socket; /** * rpc服務端 */ public class Server { //服務器設定的目錄 private String registryPath = "/registry"; //接口,這里方便測試用 private String serviceName = ITestService.class.getName(); //地址目錄 private static String addressName = "address"; //本地地址 private static String ip = "localhost"; //監聽接口 public static Integer port = 8000; //鏈接zk public ZooKeeper connect() throws Exception { ZooKeeper zooKeeper = ZookeeperUtils.connect(); return zooKeeper; } //創建節點,也就是訪問的,目錄 public void createNode(ZooKeeper zooKeeper) throws Exception { if (zooKeeper.exists(registryPath, false) == null) { //創建永久目錄,接口服務,可以創建永久目錄 zooKeeper.create(registryPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } String servicePath = registryPath + "/" +serviceName; if (zooKeeper.exists(servicePath, false) == null) { //接口目錄 zooKeeper.create(servicePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } String addressPath = servicePath + "/" +addressName; //地址目錄,這里ip就是本地的地址,用於tcp鏈接使用 //這里創建的是臨時目錄,當zk服務斷連過后,自動刪除臨時節點 zooKeeper.create(addressPath, (ip + ":"+ port).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } //監聽過程 private void accept() throws Exception { //當然這里也可以使用netty來進行監聽和其他過程 //這里簡化 ServerSocket serverSocket = new ServerSocket(port); while (true) { System.out.println("監聽中。。。。。。。"); Socket socket = serverSocket.accept(); resultData(socket); } } //執行並返回數據 private void resultData(Socket socket) throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException { ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); //讀取請求的參數 RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); //這里從容器中獲取bean,當然這里的bean可以自己緩存,獨立spring容器之外 Object bean = RpcBeanFactory.getBean(rpcRequest.getInterfaceName()); //方法調用 Method method = bean.getClass().getMethod(rpcRequest.getMethod(), rpcRequest.getParameterTypes()); Object data = method.invoke(bean, rpcRequest.getParams()); //返回數據 ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setStatus("success"); rpcResponse.setData(data); objectOutputStream.writeObject(rpcResponse); } public static void main(String[] args) throws Exception { //模擬spring容器的加載過程 RpcBeanFactory.putBean(ITestService.class.getName(), new TestServiceImpl()); Server server = new Server(); ZooKeeper zooKeeper = server.connect(); //創建節點,用於地址訪問 server.createNode(zooKeeper); //監聽,當然多線程更加理想,這里只顯示效果 server.accept(); } }
7)調用方
import com.pinnet.zookeeper.bean.RpcBeanFactory; import com.pinnet.zookeeper.data.RpcRequest; import com.pinnet.zookeeper.data.RpcResponse; import com.pinnet.zookeeper.service.ITestService; import com.pinnet.zookeeper.zookeeper.ZookeeperUtils; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * rpc客戶端 */ public class Client { //緩存地址 private Map<String, List<String>> adressMap = new ConcurrentHashMap<>(); //鏈接zk public ZooKeeper connect() throws Exception { return ZookeeperUtils.connect(); } /** * 這里主要是創建代理,利用代理接口實現來達到調用的目的 * @param interfaceName * @return * @throws ClassNotFoundException */ public Object createProxy(final String interfaceName) throws ClassNotFoundException { //使用線程實例化,主要考慮處理性 final Class clazz = Thread.currentThread().getContextClassLoader().loadClass(interfaceName); //創建代理 return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() { //用重連計數 private int num = 5; @Override public Object invoke(Object proxy, Method method, Object[] params) throws Throwable { //發送請求需要的請求參數 RpcRequest rpcRequest = new RpcRequest(); //接口名稱 rpcRequest.setInterfaceName(interfaceName); //調用方法名 rpcRequest.setMethod(method.getName()); //對應參數類型 rpcRequest.setParameterTypes(method.getParameterTypes()); //對應參數 rpcRequest.setParams(params); //返回響應結果 return sendData(rpcRequest); } //tcp方式調用 private Object sendData(RpcRequest rpcRequest) throws Exception { //當訪問地址存在時 if (adressMap.containsKey(interfaceName)) { List<String> adresses = adressMap.get(interfaceName); if (adresses != null && !adresses.isEmpty()) { //如果存在多個地址,使用可以調通的一個 for (String adress:adresses) { //這個是注冊zk的時候設定的數據 String[] strs = adress.split(":"); //這里簡易版的實現,所以直接使用的socket. //實際上可以采用netty框架編寫,保存channel訪問就可以了,也可以對數據進行加解密 Socket socket = new Socket(); try { //連接可以訪問zk,如果連接失敗直接拋出異常,循環下一個地址 socket.connect(new InetSocketAddress(strs[0], Integer.valueOf(strs[1]))); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectOutputStream.writeObject(rpcRequest); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); //這里是響應數據,也就是執行過后的數據。遠程執行結果 RpcResponse rpcResponse = (RpcResponse) objectInputStream.readObject(); return rpcResponse.getData(); } catch (IOException e) { e.printStackTrace(); } } //設置重連機制,跳出循環 if (num == 0) { throw new RuntimeException("server connect fail"); } num--; //如果多個地址還是不能訪問,則從zk上面更新地址 getAddress(interfaceName); //如果多個地址還是不能訪問,則重新訪問 sendData(rpcRequest); } throw new RuntimeException("not found service"); }else { //如果沒有地址存儲時的訪問,主要是請求鏈接問題 if (num == 0) { throw new RuntimeException("not found server"); } num--; getAddress(interfaceName); return sendData(rpcRequest); } } }); } //從zk獲取最新的地址 private void getAddress(String interfaceName) throws Exception { //鏈接zk ZooKeeper zooKeeper = connect(); //設定的地址目錄 String interfacePath = "/registry/" + interfaceName; //獲取地址目錄 List<String> addresses = zooKeeper.getChildren(interfacePath, false); if (addresses != null && !addresses.isEmpty()) { List<String> datas = new ArrayList<>(); for (String address:addresses) { //獲取數據,也就是配置對應的訪問地址 byte[] bytes = zooKeeper.getData(interfacePath + "/" + address, false, null); if (bytes.length > 0) { //放入數組 datas.add(new String(bytes)); } } //加入緩存 adressMap.put(interfaceName, datas); } } public static void main(String[] args) throws ClassNotFoundException, InterruptedException { Client client = new Client(); //這一步是模擬spring容器放入bean的過程,實際用spring容器可自定義標簽實現 RpcBeanFactory.putBean("testService", client.createProxy(ITestService.class.getName())); //獲取bean的過程,實際上實現是代理接口的實現 ITestService testService = (ITestService) RpcBeanFactory.getBean("testService"); //調用方法,也就是調用代理的過程 String test = testService.test("test"); System.out.println(test); } }
五、代碼部分就這么多,看一下測試結果
服務端:
客戶端:
zookeeper的展示部分
六、源碼分享
rpc-zk代碼:https://github.com/lilin409546297/rpc-zk
zkui代碼:https://github.com/lilin409546297/zkui(這個是別人做的,我放在github的)
zookeeper下載地址:http://apache.org/dist/zookeeper/