java實現RPC


一,服務提供者

工程為battercake-provider,項目結構圖如下圖所示

1.1 先創建一個“賣煎餅”微服務的接口和實現類

package com.jp.service;

public interface BatterCakeService {
    /**
     * 賣煎餅的服務
     */
    public String sellBatterCake(String name);
}
package com.jp.service;

import com.jp.service.BatterCakeService;

/**
 * 賣煎餅服務的實現類
 *
 */
public class BatterCakeServiceImpl implements BatterCakeService {

    public String sellBatterCake(String name) {
        return name+"煎餅,賣的特別好";
    }    
}

1.2 RPC框架調用部分

該部分有兩個關鍵部分:RPC服務提供器線程處理類

1)RPC服務提供器

  1. 需要發布的服務存儲在一個內存變量serviceList中。(該例就是把賣煎餅服務的實例對象傳入
  2. 啟動socket,server.accept()方法阻塞在那,監聽輸入
  3. 針對每一個請求,單獨啟動一個線程處理
 1 package com.jp.rpc;
 2 
 3 import java.net.ServerSocket;
 4 import java.net.Socket;
 5 import java.util.ArrayList;
 6 import java.util.Arrays;
 7 import java.util.List;
 8 /**
 9  * RPC服務提供器
10  * 1,將需要發布的服務存儲在一個內存變量serviceList中
11  * 2,啟動socket,server.accept()方法阻塞在那,監聽輸入
12  * 3,針對每一個請求,單獨啟動一個線程處理
13  */
14 public class RpcProvider {
15     
16     //存儲注冊的服務列表
17     private static List<Object> serviceList;
18     
19     /**
20      * 發布rpc服務
21      * @param object 提供(賣煎餅)服務的實例對象 22      * @param port 監聽的端口 23      * @throws Exception
24      */
25     public static void export(int port,Object... services) throws Exception {
26         serviceList=Arrays.asList(services);
27         ServerSocket server = new ServerSocket(port);
28         Socket client = null;
29         while (true) {
30             //阻塞等待輸入,每來一個請求就會產生一個socket對象
31             client = server.accept();
32             //每一個請求,啟動一個線程處理
33             new Thread(new ServerThread(client,serviceList)).start();
34         }
35     }
36 }

2)線程處理類

ServerThread(socke對象服務實例列表)線程處理類的代碼,ServerThread主要做以下幾個步驟

  1. 讀取客戶端發送的服務名
  2. 判斷服務是否發布
  3. 如果發布,則走反射邏輯,動態調用,返回結果
  4. 如果未發布,則返回提示通知
 1 package com.jp.rpc;
 2 
 3 import java.io.IOException;
 4 import java.io.ObjectInputStream;
 5 import java.io.ObjectOutputStream;
 6 import java.lang.reflect.Method;
 7 import java.net.Socket;
 8 import java.util.List;
 9 
10 public class ServerThread implements Runnable {
11 
12     private Socket client = null;
13 
14     private List<Object> serviceList = null;
15 
16     public ServerThread(Socket client, List<Object> service) {
17         this.client = client;
18         this.serviceList = service;
19     }
20 
21     //@Override
22     public void run() {
23         ObjectInputStream input = null;
24         ObjectOutputStream output = null;
25         try {
26             input = new ObjectInputStream(client.getInputStream());
27             output = new ObjectOutputStream(client.getOutputStream());
28             // 讀取客戶端要訪問那個service
29             Class serviceClass = (Class) input.readObject();
30             // 找到該服務類實例
31             Object obj = findService(serviceClass);
32             if (obj == null) {
33                 output.writeObject(serviceClass.getName() + "服務未發現");
34             } else {
35                 //利用反射調用該方法,返回結果
36                 //從請求中得到請求的方法名和方法參數;加上上面得到了服務對象實例;反射得到具體的方法實例;invoke執行
37                 try {
38                     String methodName = input.readUTF();
39                     Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
40                     Object[] arguments = (Object[]) input.readObject();
41                     
42                     Method method = obj.getClass().getMethod(methodName, parameterTypes);  
43                     Object result = method.invoke(obj, arguments);  
44                     output.writeObject(result); 
45                 } catch (Throwable t) {
46                     output.writeObject(t);
47                 }
48             }
49         } catch (Exception e) {
50             e.printStackTrace();
51         } finally {
52             try {
53                 client.close();
54                 input.close();
55                 output.close();
56             } catch (IOException e) {
57                 // TODO Auto-generated catch block
58                 e.printStackTrace();
59             }
60         }
61 
62     }
63 
64     //到服務列表中找服務實例
65     private Object findService(Class serviceClass) {
66         for (Object obj : serviceList) {
67             boolean isFather = serviceClass.isAssignableFrom(obj.getClass());
68             if (isFather) {
69                 return obj;
70             }
71         }
72         return null;
73     }
74 
75 }

1.3 發布服務

 1 package com.jp.start;
 2 
 3 import com.jp.rpc.RpcProvider;
 4 import com.jp.service.BatterCakeService;
 5 import com.jp.service.BatterCakeServiceImpl;
 6 
 7 public class RpcBootStrap {
 8     public static void main(String[] args) throws Exception {
 9         //實例化“賣煎餅”這個服務的實現類
10         BatterCakeService batterCakeService =new BatterCakeServiceImpl();
11         //發布賣煎餅的服務:注冊在20006端口,並把提供服務的實例傳入
12         RpcProvider.export(20006,batterCakeService);
13     }
14 }

二,服務消費者

消費者工程為battercake-consumer,項目結構圖如下圖所示

2.1 rpc調用部分

分為兩部分:代理類處理器(代理類工廠)和 service的代理類對象(即前面工廠生產返回的)

1)代理類處理器(代理類工廠)

負責生產代理類(傳入服務的名字(類?);ip;端口

 1 package com.jp.rpc;
 2 
 3 import java.lang.reflect.Proxy;
 4 
 5 /**
 6  * 用於生產服務代理類
 7  */
 8 public class RpcConsumer {
 9     public static <T> T getService(Class<T> clazz,String ip,int port) {
10         ProxyHandler proxyHandler =new ProxyHandler(ip,port);
11         return (T)Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler);
12     }
13 }

2)服務代理類的處理器該類就是代理類功能的具體實現者,其實就是封裝了調用遠程服務的過程(封裝請求數據發給遠端服務提供者,把提供者返回的結果返回

  1. 建立socket連接
  2. 封裝請求數據,發送給服務提供者
  3. 返回結果
 1 package com.jp.rpc;
 2 
 3 import java.io.ObjectInputStream;
 4 import java.io.ObjectOutputStream;
 5 import java.lang.reflect.InvocationHandler;
 6 import java.lang.reflect.Method;
 7 import java.net.Socket;
 8 
 9 public class ProxyHandler implements InvocationHandler {
10     
11     private String ip;
12     private int port;
13 
14     public ProxyHandler(String ip, int port) {
15         this.ip = ip;
16         this.port = port;
17     }
18 
19     //@Override
20     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
21         Socket socket = new Socket(this.ip, this.port);
22         ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
23         ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
24         try {
25             output.writeObject(proxy.getClass().getInterfaces()[0]);
26             output.writeUTF(method.getName());
27             output.writeObject(method.getParameterTypes());
28             output.writeObject(args);
29             output.flush();
30             Object result = input.readObject();
31             if (result instanceof Throwable) {
32                 throw (Throwable) result;
33             }
34             return result;
35         } finally {
36             socket.shutdownOutput();
37         }
38     }
39 
40 }

2.2 接下來建立一個測試類RpcTest如下

(跑該測試類前,記得運行在battercake-provider端的RpcBootstrap類發布BatterCakeService服務)

 1 package com.jp.start;
 2 
 3 import com.jp.rpc.RpcConsumer;
 4 import com.jp.service.BatterCakeService;
 5 
 6 public class RpcTest {
 7     public static void main(String[] args) {
 8         //生成代理類,三個參數:被代理對象,ip,端口
 9         BatterCakeService batterCakeService = RpcConsumer.getService(BatterCakeService.class, "127.0.0.1", 20006);
10         //調用代理類的方法並獲得結果
11         String result = batterCakeService.sellBatterCake("雙蛋");
12         System.out.println(result);
13     }
14 }

輸出結果如下

 

 

https://blog.csdn.net/wangyunpeng0319/article/details/78651998

https://www.cnblogs.com/rjzheng/category/1205773.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM