該rpc框架是一個mini版的dubbo。學習rpc之前,建議先了解NIO,Netty和Dubbo等知識。請移步網絡編程
前言:(借用阿里大佬的一段話)
為什么要自己寫一個RPC框架,我覺得從個人成長上說,如果一個程序員能清楚的了解RPC框架所具備的要素,掌握RPC框架中涉及的服務注冊發現、負載均衡、序列化協議、RPC通信協議、Socket通信、異步調用、熔斷降級等技術,可以全方位的提升基本素質。雖然也有相關源碼,但是只看源碼容易眼高手低,動手寫一個才是自己真正掌握這門技術的最優路徑。
一.概述
什么是RPC?
- 遠程服務調用
- 官方:一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的思想
- 通俗一點:客戶端在不知道調用細節的情況下,調用存在於遠程計算機上的某個對象,就像調用本地應用程序中的對象一樣。
- 市面上常見的rpc框架:dobbo,springCloud,gRPC...
那為什么要有 RPC,HTTP 不好么?
- 因為 RPC 和 HTTP 就不是一個層級的東西,所以嚴格意義上這兩個沒有可比性,也不應該來作比較。
- HTTP 只是傳輸協議,協議只是規范了一定的交流格式
- RPC 對比的是本地過程調用,是用來作為分布式系統之間的通信,它可以用 HTTP 來傳輸,也可以基於 TCP 自定義協議傳輸。
- HTTP 協議比較冗余,所以 RPC 大多都是基於 TCP 自定義協議,定制化的才是最適合自己的。
項目總體結構
整體架構
接下來,分別解釋上述的過程
二.自定義注解
服務的提供者和消費者公用一個接口,@ServiceExpose是為了暴露服務,放在生產者的某個實現類上;@ServiceReference是為了引用服務,放在消費者的需要注入的屬性上。
- Target:指定被修飾的Annotation可以放置的位置(被修飾的目標)
-
@Target(ElementType.TYPE) //接口、類
-
@Target(ElementType.FIELD) //屬性
-
@Target(ElementType.METHOD) //方法
- @Target(ElementType.PARAMETER) //方法參數
-
@Target(ElementType.CONSTRUCTOR) //構造函數
-
@Target(ElementType.LOCAL_VARIABLE) //局部變量
-
@Target(ElementType.ANNOTATION_TYPE) //注解
-
@Target(ElementType.PACKAGE) //包
-
- Retention:定義注解的保留策略
-
@Retention(RetentionPolicy.SOURCE) //注解僅存在於源碼中,在class字節碼文件中不包含
-
@Retention(RetentionPolicy.CLASS) //默認的保留策略,注解會在class字節碼文件中存在,但運行時無法獲得
-
@Retention(RetentionPolicy.RUNTIME) //注解會在class字節碼文件中存在,在運行時可以通過反射獲取到
-
- Documented:指定被修飾的該Annotation可以被javadoc工具提取成文檔
- Inherited:指定被修飾的Annotation將具有繼承性
二.啟動配置
主要是加載一些rpc相關的配置類,使用SpringBoot自動裝配。可以使用SPI機制加入一些自定義的類,放到指定文件夾中。
三.rpc接口注入/rpc服務掃描
這里主要就是通過反射獲得對應注解的屬性/類,進行服務暴露/服務引用。 這里需要關注的是什么時候進行服務暴露/引用?如下:
- 客戶端:一般有倆種方案
-
- 餓漢式:餓漢式是通過實現 Spring 的
InitializingBean
接口中的afterPropertiesSet
方法,容器通過調用ReferenceBean
的afterPropertiesSet
方法時引入服務。(在Spring啟動時,給所有的屬性注入實現類,包含遠程和本地的實現類) - 懶漢式:只有當這個服務被注入到其他類中時啟動引入流程,也就是說用到了才會開始服務引入。
- 在應用的Spring IOC 容器刷新完畢(spring Context初始化)之后,掃描所有的Bean,將Bean中帶有@ServiceExpose/@ServiceReference注解的field獲取到,然后創建field類型的代理對象,創建完成后,將代理對象set給此field。后續就通過該代理對象創建服務端連接,並發起調用。(dubbo默認)
- 餓漢式:餓漢式是通過實現 Spring 的
- 服務端:與懶漢式一樣。
那么怎么知道Spring IOC刷新完成,這里就使用一個Spring提供的監聽器,當Spring IOC刷新完成,就會觸發監聽器。
四.服務注冊到ZK/從Zk獲得服務
Zookeeper采用節點樹的數據模型,類似linux文件系統,/,/node1,/node2 比較簡單。不懂Zookeeper請移步:Zookeeper原理
我們采用的是對每個服務名創建一個持久節點,服務注冊時實際上就是在zookeeper中該持久節點下創建了一個臨時節點,該臨時節點存儲了服務的IP、端口、序列化方式等。
客戶端獲取服務時通過獲取持久節點下的臨時節點列表,解析服務地址數據:
客戶端監聽服務變化:
五.生成代理類對象
這里使用JDK的動態代理,也可以使用cglib或者Javassist(dobbo使用)。
public class ClientProxyFactory { /** * 獲取代理對象,綁定 invoke 行為 * * @param clazz 接口 class 對象 * @param <T> 類型 * @return 代理對象 */public <T> T getProxyInstance(Class<T> clazz) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() { final Random random = new Random(); @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 第一步:通過服務發現機制選擇一個服務提供者暴露的服務 String serviceName = clazz.getName(); final List<ServiceInfo> serviceInfos = serviceDiscovery.listServices(serviceName); logger.info("Rpc server instance list: {}", serviceInfos); if (CollectionUtils.isEmpty(serviceInfos)) { throw new RpcException("No rpc servers found."); } // TODO: 這里模擬負載均衡,從多個服務提供者暴露的服務中隨機挑選一個,后期寫方法實現負載均衡 final ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size())); // 第二步:構造 rpc 請求對象 final RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setServiceName(serviceName); rpcRequest.setMethod(method.getName()); rpcRequest.setParameterTypes(method.getParameterTypes()); rpcRequest.setParameters(args); // 第三步:編碼請求消息, TODO: 這里可以配置多種編碼方式 byte[] data = messageProtocol.marshallingReqMessage(rpcRequest); // 第四步:調用 rpc client 開始發送消息 byte[] byteResponse = rpcClient.sendMessage(data, serviceInfo); // 第五步:解碼響應消息 final RpcResponse rpcResponse = messageProtocol.unmarshallingRespMessage(byteResponse); // 第六步:解析返回結果進行處理 if (rpcResponse.getException() != null) { throw rpcResponse.getException(); } return rpcResponse.getRetValue(); } }); } }
六.負載均衡
本實現支持兩種主要負載均衡策略,隨機和輪詢,其中他們都支持帶權重的隨機和輪詢,其實也就是四種策略。
七.Netty通信
服務端和客戶端基本一樣,這里只展示服務端的代碼。代理對象在Spring啟動的時候就生成了,但是沒有調用,每一個調用(請求)都會生成一個Netty的連接。
public class NettyRpcServer extends RpcServer { @Override public void start() { // 創建兩個線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 創建服務端的啟動對象 ServerBootstrap serverBootstrap = new ServerBootstrap() // 設置兩個線程組 .group(bossGroup, workerGroup) // 設置服務端通道實現類型 .channel(NioServerSocketChannel.class) // 服務端用於接收進來的連接,也就是boosGroup線程, 線程隊列大小 .option(ChannelOption.SO_BACKLOG, 100) .childOption(ChannelOption.SO_KEEPALIVE, true) // child 通道,worker 線程處理器 .childHandler(new ChannelInitializer<SocketChannel>() { // 給 pipeline 管道設置自定義的處理器 @Override public void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new NettyServerHandler()); } }); // 綁定端口號,同步啟動服務 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channel = channelFuture.channel(); // 對關閉通道進行監聽,變為同步 channelFuture.channel().closeFuture().sync(); } catch (Exception e) { logger.error("server error.", e); } finally { // 釋放線程組資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
實現具體handler
public class NettyServerHandler extends ChannelInboundHandlerAdapter { //當通道就緒就會觸發該方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //進行記錄 logger.info("channel active: {}", ctx); } //讀取數據實際(這里我們可以讀取客戶端發送的消息) @Override public void channelRead(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception { //將數據讀到buffer中 final ByteBuf msgBuf = (ByteBuf) msg; final byte[] reqBytes = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(reqBytes); } //數據讀取完畢 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //使用反射獲找到目標方法進行返回 final byte[] respBytes = requestHandler.handleRequest(reqBytes); ctx.writeAndFlush(respBytes); } //處理異常, 一般是需要關閉通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
八.序列化協議
對計算機網絡稍微有一點了解的同學都知道,數據在網絡中傳輸是二進制的:01010101010101010,類似這種,只有二進制數據才能在網絡中傳輸。但是在編碼之前我們一般先進行序列化,目的是為了優化傳輸的數據量。因為有的數據太大,需要進行空間優化。
那么我們來區分一下序列化和編碼:我畫一張圖大家都全明白了
定義一個序列化協議,放入作為一個handler放入pipeline中。
Netty支持多種序列化,比如jdk,Json,ProtoBuf 等,這里使用ProtoBuf,其序列化后碼流小性能高,非常適合RPC調用。接下來看怎么使用ProtoBuf?
- 1.編寫需要序列化的類xxx.proto:ProtoBuf有自己的語法規則(自行百度)
- 2.通過官網提供的protoc.exe生成對應的Java代碼
- 3.前面通過工具生成的代碼(AnimalProto)已經幫我們封裝好了序列化和反序列化的方法,我們只需要調用對應方法即可
引入Protobuf的依賴
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.4.1</version> </dependency>
序列化:
/** * 調用對象構造好的Builder,完成屬性賦值和序列化操作 * @return */ public static byte[] protobufSerializer(){ AnimalProto.Animal.Builder builder = AnimalProto.Animal.newBuilder(); builder.setId(1L); builder.setName("小豬"); List<String> actions = new ArrayList<>(); actions.add("eat"); actions.add("run"); builder.addAllActions(actions); return builder.build().toByteArray(); }
反序列化:
/** * 通過調用parseFrom則完成反序列化 * @param bytes * @return * @throws InvalidProtocolBufferException */ public static Animal deserialize(byte[] bytes) throws Exception { AnimalProto.Animal pAnimal = AnimalProto.Animal.parseFrom(bytes); Animal animal = new Animal(); animal.setId(pAnimal.getId()); animal.setName(pAnimal.getName()); animal.setActions(pAnimal.getActionsList()); return animal; }
測試:
public static void main(String[] args) throws Exception { byte[] bytes = serializer(); Animal animal = deserialize(bytes); System.out.println(animal); }
以下看到是能正常序列化和反序列化的:
九.通信協議
通信協議主要是解決網絡傳輸問題,比如TCP拆包粘包問題。
TCP問題:
- TCP拆包粘包主要就是把一些數據合並或者分割開進行發送,這時候有的數據就不完整,有的數據就多出一部分,就會造成問題。一般使用TCP協議都需要考慮拆包粘包問題
- tcp粘包和半包問題就是因為滑動窗口。 因為不管你的數據是多少長度,怎么分割每一條數據。但是tcp只按照我滑動窗口的長度發送。
- 本質是因為TCP是流式協議,消息無邊界。
解決方案:業界的主流協議的解決方案可以歸納如下
- 消息定長:例如每個報文的大小為固定長度100字節,如果不夠用空格補足。(定長解碼器)
在包尾加特殊結束符進行分割。(分隔符編碼器)
- 消息長度+消息:將消息分為消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段。
- Netty自帶:
-
- 自定義編解碼器
這里只是列舉出來編碼過程,解碼是逆過程。(說白了,編碼就是找着固定的格式進行寫入,解碼就是照着固定的格式讀)
恭喜你,已經學會寫RPC框架了,想深入了解的朋友可以參照源碼。進行學習,升級。
該rpc最終打成一個Spring Boot starter,如果不會的請參照手寫一個Spring Boot starter
寄語:生命只有一次,你要活得暢快淋漓
參考文章:
https://mp.weixin.qq.com/s/yaIOCfEigkQMm2kt6I7Orw
https://mp.weixin.qq.com/s/ltos1nEgktec5pn47xAgMw