引言
本文利用java自帶的socket編程實現了一個簡單的rpc調用框架,由兩個工程組成分別名為battercake-provider(服務提供者)、battercake-consumer(服務調用者)。
設計思路如下:
1、在battercake-provider中,寫一個服務叫BatterCakeService
2、在battercake-provider中,啟動RpcProvider,發布該服務
3、在battercake-consumer中,啟動測試類RpcTest
4、在battercake-consumer中,利用jdk動態代理,獲得BatterCakeService的動態代理類BatterCakeService$Proxy0
5、在battercake-consumer中,動態代理類BatterCakeService$Proxy0,與battercake-provider建立socket連接,battercake-provider針對每一個連接,都會啟動一個ServerThread處理請求,代理類則發送服務參數等相關信息
6、在battercake-consumer中,接收battercake-provider的ServerThread請求返回的結果。
上述過程時序圖如下所示

接下來上代碼!!
服務提供者
本部分的工程為battercake-provider,項目結構圖如下圖所示

先上使用的部分的代碼
先創建一個微服務,接口如下
package com.rjzheng.service;
public interface BatterCakeService {
/**
* 賣煎餅的服務
* @param name
* @return
*/
public String sellBatterCake(String name);
}
實現類如下
package com.rjzheng.service.impl;
import com.rjzheng.service.BatterCakeService;
public class BatterCakeServiceImpl implements BatterCakeService {
@Override
public String sellBatterCake(String name) {
// TODO Auto-generated method stub
return name+"煎餅,賣的特別好";
}
}
接下來就是發布服務
package com.rjzheng.start;
import com.rjzheng.rpc.RpcProvider;
import com.rjzheng.service.BatterCakeService;
import com.rjzheng.service.impl.BatterCakeServiceImpl;
public class RpcBootStrap {
public static void main(String[] args) throws Exception {
BatterCakeService batterCakeService =new BatterCakeServiceImpl();
//發布賣煎餅的服務,注冊在20006端口
RpcProvider.export(20006,batterCakeService);
}
}
接下來是rpc框架調用部分的代碼,RpcProvider,該部分代碼可以總結為兩步
- 將需要發布的服務存儲在一個內存變量serviceList中
- 啟動socket,server.accept()方法阻塞在那,監聽輸入
- 針對每一個請求,單獨啟動一個線程處理
package com.rjzheng.rpc;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* RPC服務提供器
* @author zhengrongjun
*
*/
public class RpcProvider {
//存儲注冊的服務列表
private static List<Object> serviceList;
/**
* 發布rpc服務
* @param object
* @param port
* @throws Exception
*/
public static void export(int port,Object... services) throws Exception {
serviceList=Arrays.asList(services);
ServerSocket server = new ServerSocket(port);
Socket client = null;
while (true) {
//阻塞等待輸入
client = server.accept();
//每一個請求,啟動一個線程處理
new Thread(new ServerThread(client,serviceList)).start();
}
}
}
接下來ServerThread線程處理類的代碼,ServerThread主要做以下幾個步驟
- 讀取客戶端發送的服務名
- 判斷服務是否發布
- 如果發布,則走反射邏輯,動態調用,返回結果
- 如果未發布,則返回提示通知
package com.rjzheng.rpc;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.List;
public class ServerThread implements Runnable {
private Socket client = null;
private List<Object> serviceList = null;
public ServerThread(Socket client, List<Object> service) {
this.client = client;
this.serviceList = service;
}
@Override
public void run() {
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
input = new ObjectInputStream(client.getInputStream());
output = new ObjectOutputStream(client.getOutputStream());
// 讀取客戶端要訪問那個service
Class serviceClass = (Class) input.readObject();
// 找到該服務類
Object obj = findService(serviceClass);
if (obj == null) {
output.writeObject(serviceClass.getName() + "服務未發現");
} else {
//利用反射調用該方法,返回結果
try {
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
Method method = obj.getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(obj, arguments);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
client.close();
input.close();
output.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private Object findService(Class serviceClass) {
// TODO Auto-generated method stub
for (Object obj : serviceList) {
boolean isFather = serviceClass.isAssignableFrom(obj.getClass());
if (isFather) {
return obj;
}
}
return null;
}
}
服務消費者
本部分的工程為battercake-consumer,項目結構圖如下圖所示

先上rpc框架調用部分的代碼RpcConsumer,步驟分兩步
- 封裝一個代理類處理器
- 返回service的代理類對象
package com.rjzheng.rpc;
import java.lang.reflect.Proxy;
public class RpcConsumer {
public static <T> T getService(Class<T> clazz,String ip,int port) {
ProxyHandler proxyHandler =new ProxyHandler(ip,port);
return (T)Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler);
}
}
接下來上代理類處理器的代碼,代理類處理步驟分以下幾步
- 建立socket連接
- 封裝請求數據,發送給服務提供者
- 返回結果
package com.rjzheng.rpc;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.net.Socket;
import com.rjzheng.service.BatterCakeService;
public class ProxyHandler implements InvocationHandler {
private String ip;
private int port;
public ProxyHandler(String ip, int port) {
// TODO Auto-generated constructor stub
this.ip = ip;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// TODO Auto-generated method stub
Socket socket = new Socket(this.ip, this.port);
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
output.writeObject(proxy.getClass().getInterfaces()[0]);
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
output.flush();
Object result = input.readObject();
if(result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
socket.shutdownOutput();
}
}
}
接下來建立一個測試類RpcTest如下(跑該測試類前,記得運行在battercake-provider端的RpcBootstrap類發布BatterCakeService服務)
package com.rjzheng.start;
import com.rjzheng.rpc.RpcConsumer;
import com.rjzheng.service.BatterCakeService;
public class RpcTest {
public static void main(String[] args) {
BatterCakeService batterCakeService=RpcConsumer.getService(BatterCakeService.class, "127.0.0.1", 20006);
String result=batterCakeService.sellBatterCake("雙蛋");
System.out.println(result);
}
}
輸出結果如下
雙蛋煎餅,賣的特別好
至此,我們就實現了一個簡易的rpc服務調用框架
