一、RPC是什么
remote procedure call:遠程過程調用
過程就是程序,像調用本地方法一樣調用遠程的過程
RPC采用Client-Server結構,通過request-response消息模式實現
RMI(remote method invocation)遠程方法調用時oop領域中RPC的一種具體實現
webservice、restfull接口調用都是RPC,僅消息組織方式及消息協議不同
與本地調用相比,速度相對較慢、可靠性減弱
為什么用RPC
- 服務化
- 可重用
- 系統間交互調用
術語
二、RPC的流程環節
1.客戶端處理過程中調用Client stub,傳遞參數
2.Client stub將參數編組為消息,然后通過系統調用向服務端發送消息
3.客戶端本地操作系統將消息從客戶端機器發送到服務端機器
4.服務端操作系統接收到數據包傳遞給Server stub
5.Server stub解組消息為參數
6.Server stub再調用服務端的過程,過程執行結果以反方向的相同的步驟響應給客戶端
需要處理的問題
1.Client stub、Server stub的開發
2.參數如何編組為消息,以及解組消息
3.消息如何發送
4.過程結果如何表示、異常情況如何處理
5.如何實現安全的訪問控制
三、RPC協議
RPC調用過程中需要將參數編組為消息進行發送,接受方需要解組消息為參數,過程處理結果同樣需要編組、解組。消息由哪些部分構成及消息的表示形式就構成了消息協議。
RPC調用過程中采用協議成為RPC協議。
常見RPC協議
四、手寫RPC框架
封裝好參數編組、消息解碼、底層網絡通信的RPC程序開發框架,帶來的便捷是可以直接在其基礎上只需專注於過程代碼的編寫
從使用者角度開始
2.1 客戶端
2.1.1 客戶端設計
客戶端生成過程接口的代理對象
設計客戶端代理工廠,用JDK動態代理即可生成接口的代理對象
ServiceInfoDiscoverer接口得到服務信息,返回服務信息的列表,大並發的支持,某個服務提供者可能有多個提供者,並發量很大需要用到集群
ServiceInfo,服務的名稱,服務協議
根據需要提供服務信息發現者,動態可以使用zookeeper
消息協議獨立為一層,客戶端、服務端均需要
客戶端完整類圖
不同顏色代表不同層,入口是ClientStubProxyFactory
2.1.2 實現客戶端
package com.study.mike.rpc.client; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import com.study.mike.rpc.client.net.NetClient; import com.study.mike.rpc.common.protocol.MessageProtocol; import com.study.mike.rpc.common.protocol.Request; import com.study.mike.rpc.common.protocol.Response; import com.study.mike.rpc.discovery.ServiceInfo; import com.study.mike.rpc.discovery.ServiceInfoDiscoverer; public class ClientStubProxyFactory { private ServiceInfoDiscoverer sid; private Map<String, MessageProtocol> supportMessageProtocols; private NetClient netClient; private Map<Class<?>, Object> objectCache = new HashMap<>(); public <T> T getProxy(Class<T> interf) { T obj = (T) this.objectCache.get(interf); if (obj == null) { obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf }, new ClientStubInvocationHandler(interf)); this.objectCache.put(interf, obj); } return obj; } public ServiceInfoDiscoverer getSid() { return sid; } public void setSid(ServiceInfoDiscoverer sid) { this.sid = sid; } public Map<String, MessageProtocol> getSupportMessageProtocols() { return supportMessageProtocols; } public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) { this.supportMessageProtocols = supportMessageProtocols; } public NetClient getNetClient() { return netClient; } public void setNetClient(NetClient netClient) { this.netClient = netClient; } private class ClientStubInvocationHandler implements InvocationHandler { private Class<?> interf; private Random random = new Random(); public ClientStubInvocationHandler(Class<?> interf) { super(); this.interf = interf; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("toString")) { return proxy.getClass().toString(); } if (method.getName().equals("hashCode")) { return 0; } // 1、獲得服務信息 String serviceName = this.interf.getName(); List<ServiceInfo> sinfos = sid.getServiceInfo(serviceName); if (sinfos == null || sinfos.size() == 0) { throw new Exception("遠程服務不存在!"); } // 隨機選擇一個服務提供者(軟負載均衡) ServiceInfo sinfo = sinfos.get(random.nextInt(sinfos.size())); // 2、構造request對象 Request req = new Request(); req.setServiceName(sinfo.getName()); req.setMethod(method.getName()); req.setPrameterTypes(method.getParameterTypes()); req.setParameters(args); // 3、協議層編組 // 獲得該方法對應的協議 MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol()); // 編組請求 byte[] data = protocol.marshallingRequest(req); // 4、調用網絡層發送請求 byte[] repData = netClient.sendRequest(data, sinfo); // 5解組響應消息 Response rsp = protocol.unmarshallingResponse(repData); // 6、結果處理 if (rsp.getException() != null) { throw rsp.getException(); } return rsp.getReturnValue(); } } }
package com.study.mike.rpc.client.net; import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.study.mike.rpc.discovery.ServiceInfo; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyNetClient implements NetClient { private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class); @Override public byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable { String[] addInfoArray = sinfo.getAddress().split(":"); SendHandler sendHandler = new SendHandler(data); byte[] respData = null; // 配置客戶端 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(sendHandler); } }); // 啟動客戶端連接 b.connect(addInfoArray[0], Integer.valueOf(addInfoArray[1])).sync(); respData = (byte[]) sendHandler.rspData(); logger.info("sendRequest get reply: " + respData); } finally { // 釋放線程組資源 group.shutdownGracefully(); } return respData; } private class SendHandler extends ChannelInboundHandlerAdapter { private CountDownLatch cdl = null; private Object readMsg = null; private byte[] data; public SendHandler(byte[] data) { cdl = new CountDownLatch(1); this.data = data; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("連接服務端成功:" + ctx); ByteBuf reqBuf = Unpooled.buffer(data.length); reqBuf.writeBytes(data); logger.info("客戶端發送消息:" + reqBuf); ctx.writeAndFlush(reqBuf); } public Object rspData() { try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } return readMsg; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("client read msg: " + msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] resp = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(resp); readMsg = resp; cdl.countDown(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); logger.error("發生異常:" + cause.getMessage()); ctx.close(); } } }
package com.study.mike.rpc.client.net; import com.study.mike.rpc.discovery.ServiceInfo; public interface NetClient { byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable; }
package com.study.mike.rpc.discovery; public class ServiceInfo { private String name; private String protocol; private String address; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } }
package com.study.mike.rpc.discovery; import java.util.List; public interface ServiceInfoDiscoverer { List<ServiceInfo> getServiceInfo(String name); }
package com.study.mike.rpc.discovery; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.util.ArrayList; import java.util.List; import org.I0Itec.zkclient.ZkClient; import com.alibaba.fastjson.JSON; import com.study.mike.rpc.server.register.MyZkSerializer; import com.study.mike.rpc.util.PropertiesUtils; public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer { ZkClient client; private String centerRootPath = "/Rpc-framework"; public ZookeeperServiceInfoDiscoverer() { String addr = PropertiesUtils.getProperties("zk.address"); client = new ZkClient(addr); client.setZkSerializer(new MyZkSerializer()); } @Override public List<ServiceInfo> getServiceInfo(String name) { String servicePath = centerRootPath + "/" + name + "/service"; List<String> children = client.getChildren(servicePath); List<ServiceInfo> resources = new ArrayList<ServiceInfo>(); for (String ch : children) { try { String deCh = URLDecoder.decode(ch, "UTF-8"); ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class); resources.add(r); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return resources; } }
package com.study.mike.rpc.common.protocol; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class JavaSerializeMessageProtocol implements MessageProtocol { private byte[] serialize(Object obj) throws Exception { ByteArrayOutputStream bout = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(bout); out.writeObject(obj); return bout.toByteArray(); } @Override public byte[] marshallingRequest(Request req) throws Exception { return this.serialize(req); } @Override public Request unmarshallingRequest(byte[] data) throws Exception { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data)); return (Request) in.readObject(); } @Override public byte[] marshallingResponse(Response rsp) throws Exception { return this.serialize(rsp); } @Override public Response unmarshallingResponse(byte[] data) throws Exception { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data)); return (Response) in.readObject(); } }
package com.study.mike.rpc.demo.consumer; import java.awt.Point; import java.util.HashMap; import java.util.Map; import com.study.mike.rpc.client.ClientStubProxyFactory; import com.study.mike.rpc.client.net.NettyNetClient; import com.study.mike.rpc.common.protocol.JavaSerializeMessageProtocol; import com.study.mike.rpc.common.protocol.MessageProtocol; import com.study.mike.rpc.demo.DemoService; import com.study.mike.rpc.discovery.ZookeeperServiceInfoDiscoverer; public class Consumer { public static void main(String[] args) throws Exception { ClientStubProxyFactory cspf = new ClientStubProxyFactory(); // 設置服務發現者 cspf.setSid(new ZookeeperServiceInfoDiscoverer()); // 設置支持的協議 Map<String, MessageProtocol> supportMessageProtocols = new HashMap<>(); supportMessageProtocols.put("javas", new JavaSerializeMessageProtocol()); cspf.setSupportMessageProtocols(supportMessageProtocols); // 設置網絡層實現 cspf.setNetClient(new NettyNetClient()); DemoService demoService = cspf.getProxy(DemoService.class); // 獲取遠程服務代理 String hello = demoService.sayHello("world"); // 執行遠程方法 System.out.println(hello); // 顯示調用結果 System.out.println(demoService.multiPoint(new Point(5, 10), 2)); } }
2.2 服務端
2.2.1 設計服務端
2.2.2 實現服務端
package com.study.mike.rpc.demo.provider; import com.study.mike.rpc.common.protocol.JavaSerializeMessageProtocol; import com.study.mike.rpc.demo.DemoService; import com.study.mike.rpc.server.NettyRpcServer; import com.study.mike.rpc.server.RequestHandler; import com.study.mike.rpc.server.RpcServer; import com.study.mike.rpc.server.register.ServiceObject; import com.study.mike.rpc.server.register.ServiceRegister; import com.study.mike.rpc.server.register.ZookeeperExportServiceRegister; import com.study.mike.rpc.util.PropertiesUtils; public class Provider { public static void main(String[] args) throws Exception { int port = Integer.parseInt(PropertiesUtils.getProperties("rpc.port")); String protocol = PropertiesUtils.getProperties("rpc.protocol"); // 服務注冊 ServiceRegister sr = new ZookeeperExportServiceRegister(); DemoService ds = new DemoServiceImpl(); ServiceObject so = new ServiceObject(DemoService.class.getName(), DemoService.class, ds); sr.register(so, protocol, port); RequestHandler reqHandler = new RequestHandler(new JavaSerializeMessageProtocol(), sr); RpcServer server = new NettyRpcServer(port, protocol, reqHandler); server.start(); System.in.read(); // 按任意鍵退出 server.stop(); } }
配置端口
app.properties
zk.address=127.0.0.1:2181
rpc.port=19000
rpc.protocol=javas
package com.study.mike.rpc.server.register; import java.util.HashMap; import java.util.Map; public class DefaultServiceRegister implements ServiceRegister { private Map<String, ServiceObject> serviceMap = new HashMap<>(); @Override public void register(ServiceObject so, String protocolName, int port) throws Exception { if (so == null) { throw new IllegalArgumentException("參數不能為空"); } this.serviceMap.put(so.getName(), so); } @Override public ServiceObject getServiceObject(String name) { return this.serviceMap.get(name); } }
package com.study.mike.rpc.server.register; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.URLEncoder; import org.I0Itec.zkclient.ZkClient; import com.alibaba.fastjson.JSON; import com.study.mike.rpc.discovery.ServiceInfo; import com.study.mike.rpc.util.PropertiesUtils; /** * Zookeeper方式獲取遠程服務信息類。 * * ZookeeperServiceInfoDiscoverer */ public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister { private ZkClient client; private String centerRootPath = "/Rpc-framework"; public ZookeeperExportServiceRegister() { String addr = PropertiesUtils.getProperties("zk.address"); client = new ZkClient(addr); client.setZkSerializer(new MyZkSerializer()); } @Override public void register(ServiceObject so, String protocolName, int port) throws Exception { super.register(so, protocolName, port); ServiceInfo soInf = new ServiceInfo(); String host = InetAddress.getLocalHost().getHostAddress(); String address = host + ":" + port; soInf.setAddress(address); soInf.setName(so.getInterf().getName()); soInf.setProtocol(protocolName); this.exportService(soInf); } private void exportService(ServiceInfo serviceResource) { String serviceName = serviceResource.getName(); String uri = JSON.toJSONString(serviceResource); try { uri = URLEncoder.encode(uri, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } String servicePath = centerRootPath + "/" + serviceName + "/service"; if (!client.exists(servicePath)) { client.createPersistent(servicePath, true); } String uriPath = servicePath + "/" + uri; if (client.exists(uriPath)) { client.delete(uriPath); } client.createEphemeral(uriPath); } }
package com.study.mike.rpc.server; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class NettyRpcServer extends RpcServer { private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class); private Channel channel; public NettyRpcServer(int port, String protocol, RequestHandler handler) { super(port, protocol, handler); } @Override public void start() { // 配置服務器 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ChannelRequestHandler()); } }); // 啟動服務 ChannelFuture f = b.bind(port).sync(); logger.info("完成服務端端口綁定與啟動"); channel = f.channel(); // 等待服務通道關閉 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 釋放線程組資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } @Override public void stop() { this.channel.close(); } private class ChannelRequestHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("激活"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("服務端收到消息:" + msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] req = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(req); byte[] res = handler.handleRequest(req); logger.info("發送響應:" + msg); ByteBuf respBuf = Unpooled.buffer(res.length); respBuf.writeBytes(res); ctx.write(respBuf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); logger.error("發生異常:" + cause.getMessage()); ctx.close(); } } }
package com.study.mike.rpc.server; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import com.study.mike.rpc.common.protocol.MessageProtocol; import com.study.mike.rpc.common.protocol.Request; import com.study.mike.rpc.common.protocol.Response; import com.study.mike.rpc.common.protocol.Status; import com.study.mike.rpc.server.register.ServiceObject; import com.study.mike.rpc.server.register.ServiceRegister; public class RequestHandler { private MessageProtocol protocol; private ServiceRegister serviceRegister; public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) { super(); this.protocol = protocol; this.serviceRegister = serviceRegister; } public byte[] handleRequest(byte[] data) throws Exception { // 1、解組消息 Request req = this.protocol.unmarshallingRequest(data); // 2、查找服務對象 ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName()); Response rsp = null; if (so == null) { rsp = new Response(Status.NOT_FOUND); } else { // 3、反射調用對應的過程方法 try { Method m = so.getInterf().getMethod(req.getMethod(), req.getPrameterTypes()); Object returnValue = m.invoke(so.getObj(), req.getParameters()); rsp = new Response(Status.SUCCESS); rsp.setReturnValue(returnValue); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { rsp = new Response(Status.ERROR); rsp.setException(e); } } // 4、編組響應消息 return this.protocol.marshallingResponse(rsp); } public MessageProtocol getProtocol() { return protocol; } public void setProtocol(MessageProtocol protocol) { this.protocol = protocol; } public ServiceRegister getServiceRegister() { return serviceRegister; } public void setServiceRegister(ServiceRegister serviceRegister) { this.serviceRegister = serviceRegister; } }