分布式RPC框架實現


現在大部分的互聯網公司都會采用微服務架構,但具體實現微服務架構的方式有所不同,主流上分為兩種,一種是基於Http協議的遠程調用,另外一種是基於RPC方式的調用。兩種方式都有自己的代表框架,前者是著名的Spring Cloud,后者則是有阿里巴巴開源的Dubbo,二者都被廣泛的采用。今天這篇文章,我們就一起來了解一下RPC,並且和大家一起動手實現一個簡單的RPC框架的Demo。

什么是RPC

RPC是一種遠程調用過程,是一種通過網絡遠程調用其他服務的協議。通俗的說就是,A通過打電話的方式讓B幫忙辦一件事,B辦完事后將結果告知A。 我們下面通過一張圖來大概了解一下在一個完整的RPC框架中存在的角色以及整個遠程調用的過程。

 

通過上面的圖可以看出來,在RPC框架中主要有以下4個角色:

  • registry - 注冊中心,當服務提供者啟動時會向注冊中心注冊,然后注冊中心會告知所有的消費者有新的服務提供者。
  • provider - 服務提供者,遠程調用過程中的被消費方。
  • consumer - 服務消費者,遠程調用過程中的消費方。
  • monitor - 監視器,它主要負責統計服務的消費和調用情況。

啟動服務提供者后,服務提供者會以異步的方式向注冊中心注冊。然后啟動服務消費者,它會訂閱注冊中心中服務提供者列表,當有服務提供者的信息發生改變時,注冊中心會通知所有的消費者。當消費者發起遠程調用時,會通過動態代理將需要請求的參數以及方法簽名等信息通過Netty發送給服務提供者,服務提供者收到調用的信息后調用對應的方法並將產生的結果返回給消費者,這樣就完成了一個完整的遠程調用。當然了這個過程中可能還會將調用信息異步發送給monitor用於監控和統計。

閱讀過上面的內容后,你應該對RPC框架有了一個大概的認識。為了更好更深入的了解RPC框架的原理,下面我們就一起來動手實現一個簡單的RPC框架吧。

框架核心部分

首先我們要實現的是整個RPC框架的核心部分,這部分的主要包含以下內容:

  1. RPC服務的注解的實現。
  2. 服務提供者初始化、注冊、以及響應遠程調用的實現。
  3. 服務消費者訂閱注冊中心、監聽服務提供者的變化的實現。
  4. 動態代理的實現。

整個核心部分將以一個Spring Boot Starter的形式實現,這樣我們可以很方便的在Spring Boot項目中使用它。

注解

我們需要使用一個注解來標識服務提供者所提供服務的實現類,方便在初始化的時候將其交由Spring管理,也只有這樣我們才可以在遠程調用發生時可以找到它們。

@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService { Class<?> value(); }

value屬性用來標記這個服務的實現類對應的接口,RPC框架中服務提供者和消費者之間會共同引用一個服務接口的包,當我們需要遠程調用的時候實際上只需要調用接口中定義的方法即可。
除了一個標識服務實現類的注解之外,我們還需要一個標識服務消費者注入服務實現的注解@RpcConsumer,被其修飾的屬性在初始化的時候都會被我們設置上動態代理,這一點在后面會詳細講到,我們先來看下它的具體實現吧。

@Target({ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcConsumer { /** * 服務名稱 * @return */ String providerName(); }

服務提供者

服務提供者啟動的時候,我們RPC框架需要做以下幾件事情:

  1. 掃描服務提供者中所有提供服務的類(被@RpcService修飾的類),並將其交由BeanFactory管理。
  2. 啟動Netty服務端,用來收到消費者的調用消息,並且返回調用結果。
  3. 向注冊中心注冊,本例中使用的注冊中心是Zookeeper。

這部分我們定義了一個ProviderAutoConfiguration類來實現這幾個步驟,

@PostConstruct public void init() { logger.info("rpc server start scanning provider service..."); Map<String, Object> beanMap = this.applicationContext.getBeansWithAnnotation(RpcService.class); if (null != beanMap && !beanMap.isEmpty()) { beanMap.entrySet().forEach(one -> { initProviderBean(one.getKey(), one.getValue()); }); } logger.info("rpc server scan over..."); // 如果有服務的話才啟動netty server if (!beanMap.isEmpty()) { startNetty(rpcProperties.getPort()); } }

看上面的代碼,首先我們獲取到了所有被@RpcService注解修飾的實體,並且調用了initProviderBean方法逐一對其處理,然后我們啟動了Netty。那么我們需要在initProviderBean方法中做些什么呢?其實很簡單,就是逐一將其交由BeanFactory管理。

private void initProviderBean(String beanName, Object bean) { RpcService rpcService = this.applicationContext .findAnnotationOnBean(beanName, RpcService.class); BeanFactory.addBean(rpcService.value(), bean); }

將服務實現類交由Spring管理之后,我們還需要啟動Netty用來接收遠程調用信息,啟動Netty的代碼在這里我就不全部粘出來了,大家可以在源碼中查看。在Netty啟動成功之后,其實我們還執行了下面的代碼,用來向ZK注冊。

new RegistryServer(rpcProperties.getRegisterAddress(), rpcProperties.getTimeout(), rpcProperties.getServerName(), rpcProperties.getHost(), port) .register();

整個注冊的過程也非常容易理解,首先是創建了一個ZK連接,然后是判斷是否有/rpc的根節點,如果沒有的話就創建一個,最后就是在根節點下創建一個EPHEMERAL_SEQUENTIAL類型的節點,這種類型的節點在ZK重啟之后會自動清除,這樣可以保證注冊中心重啟后會自動清除服務提供者的信息。而在節點中會存儲服務提供者的名稱,IP地址以及端口號的信息,這樣RPC框架就可以根據這些信息順利的定位到服務提供者。

public void register() throws ZkConnectException { try { // 獲取zk連接 ZooKeeper zooKeeper = new ZooKeeper(addr, timeout, event -> { logger.info("registry zk connect success..."); }); if (zooKeeper.exists(Constants.ZK_ROOT_DIR, false) == null) { zooKeeper.create(Constants.ZK_ROOT_DIR, Constants.ZK_ROOT_DIR.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } zooKeeper.create(Constants.ZK_ROOT_DIR + "/" + serverName, (serverName + ","+ host + ":" + port).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); logger.info("provider register success {}", serverName); } catch (Exception e) { throw new ZkConnectException("register to zk exception," + e.getMessage(), e.getCaus()); } }

就這樣我們RPC框架與服務提供者相關的內容就完成了,接下來要完成的是服務消費者部分。

服務消費者

對於服務消費者,我們框架需要對它的處理就是,為所有的RPC服務(被@RpcConsumer修飾的屬性)設置上動態代理。具體的設置代碼如下所示(PS:這段代碼寫在ConsumerAutoConfiguration類中哦):

@Bean public BeanPostProcessor beanPostProcessor() { return new BeanPostProcessor() { @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { Class<?> objClz = bean.getClass(); for (Field field : objClz.getDeclaredFields()) { RpcConsumer rpcConsumer = field.getAnnotation(RpcConsumer.class); if (null != rpcConsumer) { Class<?> type = field.getType(); field.setAccessible(true); try { field.set(bean, rpcProxy.create(type, rpcConsumer.providerName())); } catch (IllegalAccessException e) { e.printStackTrace(); } finally { field.setAccessible(false); } } } return bean; } }; }

BeanPostProcessor也稱為Bean后置處理器,它是Spring中定義的接口,在Spring容器的創建過程中(具體為Bean初始化前后)會回調BeanPostProcessor中定義的兩個方法。上面實現的postProcessBeforeInitialization是在Bean初始化之前調用的,還有一個postProcessAfterInitialization方法是在Bean初始化之后調用的。
如上面代碼所示,我們會在每一個帶有@RpcConsumer的實例初始化之前利用反射機制為其設置一個RpcProxy的代理,可以看到我們在創建這個動態代理的時候還需要服務提供者的名稱,這是因為在動態代理的實現里面需要使用服務提供者的名稱來查詢服務提供者的地址信息。那么這個動態代理的實現又是怎樣的呢?這就是我們下一步需要做的事情。

動態代理

在這個RPC框架里面動態代理主要實現的內容就是,當服務消費者調用服務提供者提供的接口時,將調用信息通過Netty發送給對應的服務調用者,然后由服務提供者完成相關的處理並且將處理結果返回給服務消費者。下面我們就一起來看一下RpcProxy的是如何實現這部分功能的。

@Component public class RpcProxy { @Autowired private ServiceDiscovery serviceDiscovery; public <T> T create(Class<?> interfaceClass, String providerName) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, (proxy, method, args) -> { // 通過netty向Rpc服務發送請求。 // 構建一個請求。 RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()) .setClassName(method.getDeclaringClass().getName()) .setMethodName(method.getName()) .setParamTypes(method.getParameterTypes()) .setParams(args); // 獲取一個服務提供者。 ProviderInfo providerInfo = serviceDiscovery.discover(providerName); // 解析服務提供者的地址信息,數組第一個元素為ip地址,第二個元素為端口號。 String[] addrInfo = providerInfo.getAddr().split(":"); String host = addrInfo[0]; int port = Integer.parseInt(addrInfo[1]); RpcClient rpcClient = new RpcClient(host, port); // 使用Netty向服務提供者發送調用消息,並接收請求結果。 RpcResponse response = rpcClient.send(request); if (response.isError()) { throw response.getError(); } else { return response.getResult(); } }); } }

其實在代理里面首先我們會構造請求信息實體,然后會根據服務提供者的名稱獲取一個服務提供者的地址,最后再將請求信息發送給服務提供者並接收調用結果。獲取服務提供者的方法會在后面消費者和提供者的通用配置里面講解。我們在這里重點來看一下發送調用信息並接收調用結果的實現。

public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> { ... 此處省略對象屬性信息,可查看源碼。 public RpcResponse send(RpcRequest request){ EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ... 此處省略Netty相關配置,可查看源碼。 // 連接服務器 ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); channelFuture.channel().writeAndFlush(request).sync(); future = new CompletableFuture<>(); future.get(); if (response != null) { // 關閉netty連接。 channelFuture.channel().closeFuture().sync(); } return response; } catch (Exception e) { logger.error("client send msg error,", e); return null; } finally { workerGroup.shutdownGracefully(); } } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception { logger.info("client get request result,{}", rpcResponse); this.response = rpcResponse; future.complete(""); } }

通過上面的代碼可以看出向服務提供者發送消息是異步的,我們通過CompletableFutureget()方法阻塞當前線程,直到接收到調用結果(PS:我們在channelRead0方法中收到返回結果后會將其設置成完成狀態)。看到這里,你可能會問服務提供者收到調用請求信息后如何處理的呢?具體的處理邏輯我們寫在了ServerHandler這個類中,可以看出在channelRead0方法收到一條調用信息之后,調用handle方法來處理具體的調用過程,在handle方法中會使用反射機制找到所調用方法的具體實現,然后執行調用過程並獲取結果,最后再使用Netty將結果返回給消費者服務。

public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest request) throws Exception { logger.info("provider accept request,{}", request); // 返回的對象。 RpcResponse rpcResponse = new RpcResponse(); // 將請求id原路帶回 rpcResponse.setRequestId(request.getRequestId()); try { Object result = handle(request); rpcResponse.setResult(result); } catch (Exception e) { rpcResponse.setError(e); } channelHandlerContext.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE); } private Object handle(RpcRequest request) throws Exception { String className = request.getClassName(); Class<?> objClz = Class.forName(className); Object o = BeanFactory.getBean(objClz); // 獲取調用的方法名稱。 String methodName = request.getMethodName(); // 參數類型 Class<?>[] paramsTypes = request.getParamTypes(); // 具體參數。 Object[] params = request.getParams(); // 調用實現類的指定的方法並返回結果。 Method method = objClz.getMethod(methodName, paramsTypes); Object res = method.invoke(o, params); return res; } }

消費者和提供者的通用配置

除了ProviderAutoConfigurationConsumerAutoConfiguration兩個配置類,我們還定義了一個RpcAutoConfiguration類來配置一些其他的東西,如下所示。

public class RpcAutoConfiguration { ... @Bean @ConditionalOnMissingBean public ServiceDiscovery serviceDiscovery() { ServiceDiscovery serviceDiscovery = null; try { serviceDiscovery = new ServiceDiscovery(rpcProperties.getRegisterAddress()); } catch (ZkConnectException e) { logger.error("zk connect failed:", e); } return serviceDiscovery; } @Bean @ConditionalOnMissingBean public RpcProxy rpcProxy() { RpcProxy rpcProxy = new RpcProxy(); rpcProxy.setServiceDiscovery(serviceDiscovery()); return rpcProxy; } }

在這個配置類里面,主要初始化了一個ServiceDiscovery的對象以及一個RpcProxy的對象。其中RpcProxy是動態代理,在上面我們已經詳細了解過了。那么這里就來着重了解一下ServiceDiscovery是干啥的吧。
大家還記得我們在文章開始的時候貼出來的那張圖片嗎?在服務消費者初始化的時候會去訂閱服務提供者內容的變化,ServiceDiscovery的主要功能就是這個,其主要代碼如下所示(如果你需要完整的代碼,可以查看本文源碼)。

public class ServiceDiscovery { // 存儲服務提供者的信息。 private volatile List<ProviderInfo> dataList = new ArrayList<>(); public ServiceDiscovery(String registoryAddress) throws ZkConnectException { try { // 獲取zk連接。 ZooKeeper zooKeeper = new ZooKeeper(registoryAddress, 2000, new Watcher() { @Override public void process(WatchedEvent event) { logger.info("consumer connect zk success!"); } }); watchNode(zooKeeper); } catch (Exception e) { throw new ZkConnectException("connect to zk exception," + e.getMessage(), e.getCause()); } } /** * 監聽服務提供者的變化 */ public void watchNode(final ZooKeeper zk) { ... } /** * 獲取一個服務提供者 */ public ProviderInfo discover(String providerName) { .... } }

在這個類的構造方法里面,我們和ZK注冊中心建立了一個連接,並且在watchNode方法中監聽服務提供者節點的變化,當有服務提供者信息有變化時會去修改dataList里的內容,這樣可以保證在服務本地維持一份可用的服務提供者的信息。而在遠程調用發生的時候我們會通過discover方法(PS:前面有見到過哦)去dataList里面尋找一個可用的服務提供者來提供服務。

Starter的配置

我們還需要在resources目錄下新建一個META-INF目錄,然后在該目錄下新建一個spring.factories文件,里面的內容如下面代碼所示。它主要是用來指定在Spring Boot項目啟動的時候需要加載的其他配置。如果你有不明白的地方可以查詢一下Spring Boot自定義Stater的相關內容。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.itweknow.sbrpccorestarter.config.RpcAutoConfiguration,\ cn.itweknow.sbrpccorestarter.config.ProviderAutoConfiguration,\ cn.itweknow.sbrpccorestarter.config.ConsumerAutoConfiguration

到這一步我們框架的核心部分就完成了,它將會以一個Spring Boot Stater的形式提供給服務提供者和服務消費者使用,接下來我們就將分別定義一個服務提供者和一個消費者來測試我們自己實現的RPC框架。

創建服務提供者

在創建服務提供者之前,我們需要新建一個與服務消費者之間共享的服務接口。因為前面提到過,在服務消費者眼里的遠程調用實際上就是調用本地的接口方法而已。在這個項目里我們就創建了一個HelloRpcService.java的接口,如下所示:

public interface HelloRpcService { String sayHello(); }

在接口定義完成之后,我們就來創建我們的服務提供者,並且實現上面定義的HelloRpcService接口。在服務提供者服務里還需要依賴RPC框架的核心Starter以及服務接口包,我們需要在pom.xml中添加下面的依賴。

<dependency> <groupId>cn.itweknow</groupId> <artifactId>sb-rpc-core-starter</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> <dependency> <groupId>cn.itweknow</groupId> <artifactId>sb-rpc-api</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>

添加完依賴后,我們就來看下HelloRpcService的具體實現吧:

@RpcService(HelloRpcService.class) public class HelloRpcServiceImpl implements HelloRpcService { @Override public String sayHello() { return "Hello RPC!"; } }

其實現很簡單,主要是要需要在實現類上加上@RpcService注解,這樣在項目啟動的時候RPC框架才會掃描到它,並將其交給BeanFactory管理。接下來還需要配置的是一些RPC框架需要的配置項,包括服務名稱,ZK的地址以及Netty啟動的端口等信息。這些信息在框架是通過RpcProperties這個配置類來讀取的,有興趣的同學可以在源碼中找到它。

spring.rpc.host=localhost
# netty服務的端口號 spring.rpc.port=21810 # zk地址 spring.rpc.register-address=localhost:2181 spring.rpc.server-name=provider # 連接zk的超時時間 spring.rpc.timeout=2000

創建服務消費者

服務消費者同樣也需要RPC核心框架的Starter以及服務接口的依賴,和RPC框架的一些基礎配置項,和服務提供者類似,這里就不粘出來了。這里需要說明的一點是,為了方便測試,服務消費者是一個Web服務,所以它還添加了spring-boot-starter-web的依賴。下面我們就一起來看下服務消費者是如何調用遠程服務的吧。

@RestController @RequestMapping("/hello-rpc") public class HelloRpcController { @RpcConsumer(providerName = "provider") private HelloRpcService helloRpcService; @GetMapping("/hello") public String hello() { return helloRpcService.sayHello(); } }

我們在消費者服務中寫了一個hello的接口,在接口里面調用了HelloRpcService接口里的sayHello()方法,看過前面內容的同學應該知道,被@RpcConsumer修飾的helloRpcService屬性在初始化的時候會為其設置一個動態代理,當我們調用這個接口里面的方法時,會通過Netty向服務提供者發送調用信息,然后由服務提供者調用相應方法並返回結果。
到這一步,我們可以說完成了一個簡單的RPC框架以及其使用,下面我們就一起來驗證一下結果吧。

測試

在測試之前我們需要在自己本地電腦上安裝Zookeeper,具體的安裝方式非常簡單。可以參考這篇文章。
安裝好Zookeeper后,我們需要完成以下幾個步驟:

  1. 啟動Zookeeper。
  2. 啟動服務提供者。
  3. 啟動服務消費者。

第一次啟動服務消費者的過程中,你的控制台可以能會報一個找不到/rpc節點的錯誤,產生這個錯誤的原因是我們在第一次啟動的時候ZK里面並不存在/rpc這個節點,但是如果你仔細研究源碼的話,會發現當這個節點不存在的時候,我們會創建一個。所以直接忽略這個異常即可。完成以上幾步之后,我們只需要在瀏覽器中訪問http://127.0.0.1:8080/hello-rpc/hello,如果你看到了下面的結果,那么恭喜你,整個RPC框架完美的運行成功了。

 

結束語

本文的主要內容是和大家一起完成了一個Demo版的RPC框架,其主要目的是讓大家更深刻的理解RPC的原理以及其調用過程。當然由於文章篇幅的原因,很多代碼沒有直接在文中給出,您可以在Github上找到完整的實現。如果您有什么問題可以在Github上提交Issue或者發送郵件到我的郵箱(gancy.programmer@gmail.com),如果您覺得這篇文章寫的還行的話,希望您能給我個Star,這是對我最好的鼓勵。

 

詳見:https://www.cnblogs.com/endless-code/p/11235624.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM