rpc簡易實現-zookeeper


  一、RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通信程序之間攜帶信息數據。在OSI網絡通信模型中,RPC跨越了傳輸層應用層。RPC使得開發包括網絡分布式多程序在內的應用程序更加容易。

  RPC采用客戶機/服務器模式。請求程序就是一個客戶機,而服務提供程序就是一個服務器。首先,客戶機調用進程發送一個有進程參數的調用信息到服務進程,然后等待應答信息。在服務器端,進程保持睡眠狀態直到調用信息到達為止。當一個調用信息到達,服務器獲得進程參數,計算結果,發送答復 信息,然后等待下一個調用信息,最后,客戶端調用進程接收答復信息,獲得進程結果,然后調用執行繼續進行。
  二、調用過程
  1.調用客戶端句柄;執行傳送參數
  2.調用本地系統 內核發送網絡消息
  3. 消息傳送到遠程 主機
  4.服務器句柄得到消息並取得參數
  5.執行遠程過程
  6.執行的過程將結果返回服務器句柄
  7.服務器句柄返回結果,調用遠程系統 內核
  8.消息傳回 本地主機
  9.客戶句柄由內核接收消息
  10.客戶接收句柄返回的數據
  三、調用過程,大概就是這樣子,這樣子來張圖簡單一點
  

 

  1、這張圖是dubbo官網的模型圖

  2、過程:服務——>注冊地址到zk

        客戶端——>通過zk獲取對應服務地址

        代理——>通過接口代理,實現服務調用

        調用方式——>這個不一定了,一般tcp方式比較直接

  四、簡易版rpc實現,注冊中心用的zk,可以使用其他注冊中心

  1)目錄結構

  

  2)依賴的jar

      <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>Zookeeper</artifactId>
            <version>3.4.0</version>
        </dependency>

  3)zookeeper的連接工具

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZookeeperUtils {

    private static ZooKeeper zooKeeper = null;

    public static ZooKeeper connect() throws IOException, InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        //連接zk
        zooKeeper = new ZooKeeper("localhost:2182", 60000, watchedEvent -> {
            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                latch.countDown();
            }
        });
        //無連接阻塞
        latch.await();
        return zooKeeper;
    }

}

  4)數據傳輸的實體

import java.io.Serializable;

/**
 * 請求需要的數據
 */
public class RpcRequest implements Serializable{
    //接口名稱,用於反射
    private String interfaceName;
    //調用方法
    private String method;
    //參數類型
    private Class<?>[] parameterTypes;
    //參數
    private Object[] params;

    public String getInterfaceName() {
        return interfaceName;
    }

    public void setInterfaceName(String interfaceName) {
        this.interfaceName = interfaceName;
    }

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }

    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }
}
import java.io.Serializable;

/**
 * 相應對象
 */
public class RpcResponse implements Serializable {

    //返回狀態,當然這里可以,加入對應的異常等(這里簡化)
    private String status;
    //返回的數據
    private Object data;

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }
}

  5)測試的服務以及實現

import com.pinnet.zookeeper.service.ITestService;

public class TestServiceImpl implements ITestService{

    public String test(String name) {
        System.out.println(name);
        return "return" + name;
    }
}

  6)提供方

import com.pinnet.zookeeper.bean.RpcBeanFactory;
import com.pinnet.zookeeper.data.RpcRequest;
import com.pinnet.zookeeper.data.RpcResponse;
import com.pinnet.zookeeper.service.ITestService;
import com.pinnet.zookeeper.service.impl.TestServiceImpl;
import com.pinnet.zookeeper.zookeeper.ZookeeperUtils;
import org.apache.zookeeper.*;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * rpc服務端
 */
public class Server {

    //服務器設定的目錄
    private String registryPath = "/registry";
    //接口,這里方便測試用
    private String serviceName = ITestService.class.getName();
    //地址目錄
    private static String addressName = "address";
    //本地地址
    private static String ip = "localhost";
    //監聽接口
    public static Integer port = 8000;

    //鏈接zk
    public ZooKeeper connect() throws Exception {
        ZooKeeper zooKeeper = ZookeeperUtils.connect();
        return zooKeeper;
    }

    //創建節點,也就是訪問的,目錄
    public void createNode(ZooKeeper zooKeeper) throws Exception {
        if (zooKeeper.exists(registryPath, false) == null) {
            //創建永久目錄,接口服務,可以創建永久目錄
            zooKeeper.create(registryPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        String servicePath = registryPath + "/" +serviceName;
        if (zooKeeper.exists(servicePath, false) == null) {
            //接口目錄
            zooKeeper.create(servicePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        String addressPath = servicePath + "/" +addressName;
        //地址目錄,這里ip就是本地的地址,用於tcp鏈接使用
        //這里創建的是臨時目錄,當zk服務斷連過后,自動刪除臨時節點
        zooKeeper.create(addressPath, (ip + ":"+ port).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    //監聽過程
    private void accept() throws Exception {
        //當然這里也可以使用netty來進行監聽和其他過程
        //這里簡化
        ServerSocket serverSocket = new ServerSocket(port);
        while (true) {
            System.out.println("監聽中。。。。。。。");
            Socket socket = serverSocket.accept();
            resultData(socket);
        }
    }

    //執行並返回數據
    private void resultData(Socket socket) throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
        ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
        //讀取請求的參數
        RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
        //這里從容器中獲取bean,當然這里的bean可以自己緩存,獨立spring容器之外
        Object bean = RpcBeanFactory.getBean(rpcRequest.getInterfaceName());
        //方法調用
        Method method = bean.getClass().getMethod(rpcRequest.getMethod(), rpcRequest.getParameterTypes());
        Object data = method.invoke(bean, rpcRequest.getParams());
        //返回數據
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setStatus("success");
        rpcResponse.setData(data);
        objectOutputStream.writeObject(rpcResponse);
    }

    public static void main(String[] args) throws Exception {
        //模擬spring容器的加載過程
        RpcBeanFactory.putBean(ITestService.class.getName(), new TestServiceImpl());
        Server server = new Server();
        ZooKeeper zooKeeper = server.connect();
        //創建節點,用於地址訪問
        server.createNode(zooKeeper);
        //監聽,當然多線程更加理想,這里只顯示效果
        server.accept();
    }

}

  7)調用方

import com.pinnet.zookeeper.bean.RpcBeanFactory;
import com.pinnet.zookeeper.data.RpcRequest;
import com.pinnet.zookeeper.data.RpcResponse;
import com.pinnet.zookeeper.service.ITestService;
import com.pinnet.zookeeper.zookeeper.ZookeeperUtils;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * rpc客戶端
 */
public class Client {

    //緩存地址
    private Map<String, List<String>> adressMap = new ConcurrentHashMap<>();

    //鏈接zk
    public ZooKeeper connect() throws Exception {
        return ZookeeperUtils.connect();
    }

    /**
     * 這里主要是創建代理,利用代理接口實現來達到調用的目的
     * @param interfaceName
     * @return
     * @throws ClassNotFoundException
     */
    public Object createProxy(final String interfaceName) throws ClassNotFoundException {
        //使用線程實例化,主要考慮處理性
        final Class clazz = Thread.currentThread().getContextClassLoader().loadClass(interfaceName);
        //創建代理
        return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
            //用重連計數
            private int num = 5;

            @Override
            public Object invoke(Object proxy, Method method, Object[] params) throws Throwable {
                //發送請求需要的請求參數
                RpcRequest rpcRequest = new RpcRequest();
                //接口名稱
                rpcRequest.setInterfaceName(interfaceName);
                //調用方法名
                rpcRequest.setMethod(method.getName());
                //對應參數類型
                rpcRequest.setParameterTypes(method.getParameterTypes());
                //對應參數
                rpcRequest.setParams(params);
                //返回響應結果
                return sendData(rpcRequest);
            }

            //tcp方式調用
            private Object sendData(RpcRequest rpcRequest) throws Exception {
                //當訪問地址存在時
                if (adressMap.containsKey(interfaceName)) {
                    List<String> adresses = adressMap.get(interfaceName);
                    if (adresses != null && !adresses.isEmpty()) {
                        //如果存在多個地址,使用可以調通的一個
                        for (String adress:adresses) {
                            //這個是注冊zk的時候設定的數據
                            String[] strs = adress.split(":");
                            //這里簡易版的實現,所以直接使用的socket.
                            //實際上可以采用netty框架編寫,保存channel訪問就可以了,也可以對數據進行加解密
                            Socket socket = new Socket();
                            try {
                                //連接可以訪問zk,如果連接失敗直接拋出異常,循環下一個地址
                                socket.connect(new InetSocketAddress(strs[0], Integer.valueOf(strs[1])));
                                ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                                objectOutputStream.writeObject(rpcRequest);
                                ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                                //這里是響應數據,也就是執行過后的數據。遠程執行結果
                                RpcResponse rpcResponse = (RpcResponse) objectInputStream.readObject();
                                return rpcResponse.getData();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        //設置重連機制,跳出循環
                        if (num == 0) {
                            throw new RuntimeException("server connect fail");
                        }
                        num--;
                        //如果多個地址還是不能訪問,則從zk上面更新地址
                        getAddress(interfaceName);
                        //如果多個地址還是不能訪問,則重新訪問
                        sendData(rpcRequest);
                    }
                    throw new RuntimeException("not found service");
                }else {
                    //如果沒有地址存儲時的訪問,主要是請求鏈接問題
                    if (num == 0) {
                        throw new RuntimeException("not found server");
                    }
                    num--;
                    getAddress(interfaceName);
                    return sendData(rpcRequest);
                }
            }
        });
    }

    //從zk獲取最新的地址
    private void getAddress(String interfaceName) throws Exception {
        //鏈接zk
        ZooKeeper zooKeeper = connect();
        //設定的地址目錄
        String interfacePath = "/registry/" + interfaceName;
        //獲取地址目錄
        List<String> addresses = zooKeeper.getChildren(interfacePath, false);
        if (addresses != null && !addresses.isEmpty()) {
            List<String> datas = new ArrayList<>();
            for (String address:addresses) {
                //獲取數據,也就是配置對應的訪問地址
                byte[] bytes = zooKeeper.getData(interfacePath + "/" + address, false, null);
                if (bytes.length > 0) {
                    //放入數組
                    datas.add(new String(bytes));
                }
            }
            //加入緩存
            adressMap.put(interfaceName, datas);
        }
    }

    public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
        Client client = new Client();
        //這一步是模擬spring容器放入bean的過程,實際用spring容器可自定義標簽實現
        RpcBeanFactory.putBean("testService", client.createProxy(ITestService.class.getName()));
        //獲取bean的過程,實際上實現是代理接口的實現
        ITestService testService = (ITestService) RpcBeanFactory.getBean("testService");
        //調用方法,也就是調用代理的過程
        String test = testService.test("test");
        System.out.println(test);
    }
}

  五、代碼部分就這么多,看一下測試結果

  服務端:

  

  客戶端:

  

  zookeeper的展示部分

  

  六、源碼分享

  rpc-zk代碼:https://github.com/lilin409546297/rpc-zk

  zkui代碼:https://github.com/lilin409546297/zkui(這個是別人做的,我放在github的)

  zookeeper下載地址:http://apache.org/dist/zookeeper/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM