SOFA 源碼分析 —— 服務引用過程


前言

在前面的 SOFA 源碼分析 —— 服務發布過程 文章中,我們分析了 SOFA 的服務發布過程,一個完整的 RPC 除了發布服務,當然還需要引用服務。 So,今天就一起來看看 SOFA 是如何引用服務的。實際上,基礎邏輯和我們之前用 Netty 寫的 RPC 小 demo 類似。有興趣可以看看這個 demo—— 自己用 Netty 實現一個簡單的 RPC

示例代碼

ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
    .setInterfaceId(HelloService.class.getName()) // 指定接口
    .setProtocol("bolt") // 指定協議
    .setDirectUrl("bolt://127.0.0.1:9696"); // 指定直連地址

HelloService helloService = consumerConfig.refer();

while (true) {
    System.out.println(helloService.sayHello("world"));
    try {
        Thread.sleep(2000);
    } catch (Exception e) {
    }
}

同樣的,代碼例子來自 SOFA-RPC 源碼,位於com.alipay.sofa.rpc.quickstart.QuickStartClient

很簡單,創建一個消費者配置類,然后使用這個配置引用一個代理對象,調用
代理對象的方法,實際上就是調用了遠程服務。

我們就通過上面這個簡單的例子,看看 SOFA 是如何進行服務引用的。注意:我們今天的目的是看看主流程,有些細節可能暫時就一帶而過了,比如負載均衡,錯誤重試啥的,我們以后再詳細分析,實際上,Client 相對有 Server,還是要復雜一些的,因為它要考慮更多的情況。

好,開始吧!

源碼分析

首先看這個 ConsumerConfig 類,和前面的 ProviderConfig 類似,甚至於實現的接口和繼承的抽象類都是一樣的。

上面的例子設置了一些屬性,比如接口名稱,協議,直連地址。

關鍵點來了, refer 方法。

這個方法就是返回了一個代理對象,代理對象中包含了之后遠程調用中需要的所有信息,比如過濾器,負載均衡等等。

然后,調用動態代理的方法,進行遠程調用,如果是 JDK 的動態代理的話,就是一個實現了 InvocationHandler 接口的類。這個類的 invoke 方法會攔截並進行遠程調用,自然就是使用 Netty 的客戶端對服務端發起調用並得到數據啦。

先看看 refer 方法。

從 refer 方法開始源碼分析

該方法類套路和 provider 的套路類似,都是使用了一個 BootStrap 引導。即單一職責。

public T refer() {
    if (consumerBootstrap == null) {
        consumerBootstrap = Bootstraps.from(this);
    }
    return consumerBootstrap.refer();
}

ConsumerBootstrap 是個抽象類,SOFA 基於他進行擴展,目前有 2 個擴展點,bolt 和 rest。默認是 bolt。而 bolt 的實現則是 BoltConsumerBootstrap,目前來看 bolt 和 rest 並沒有什么區別,都是使用的一個父類 DefaultConsumerBootstrap。

所以,來看看 DefaultConsumerBootstrap 的 refer 方法。代碼我就不貼了,因為很長。基本上引用服務的邏輯全部在這里了,類似 Spring 的 refresh 方法。邏輯如下:

  1. 根據 ConsumerConfig 創建 key 和 appName。檢查參數合法性。對調用進行計數。
  2. 創建一個 Cluster 對象,這個對象非常重要,該對象管理着核心部分的信息。詳細的后面會說,而構造該對象的參數則是 BootStrap。
  3. 設置一些監聽器。
  4. 初始化 Cluster。其中包括設置路由,地址初始化,連接管理,過濾器鏈構造,重連線程等。
  5. 創建一個 proxyInvoker 執行對象,也就是初始調用對象,作用是注入到動態代理的攔截類中,以便動態代理從此處開始調用。構造參數也是 BootStrap。
  6. 最后,創建一個動態代理對象,目前動態代理有 2 個擴展點,分別是 JDK,javassist。默認是 JDK,但似乎 javassist 的性能會更好一點。如果是 JDK 的話,攔截器則是 JDKInvocationHandler 類,構造方法需要代理類和剛剛創建的 proxyInvoker 對象。proxyInvoker 的作用就是從這里開始鏈式調用。

其中,關鍵的對象是 Cluster。該對象需要重點關注。

Cluster 是個抽象類,也是個擴展點,實現了 Invoker, ProviderInfoListener, Initializable, Destroyable 等接口。而他目前的具體擴展點有 2 個: FailFastCluster(快速失敗), FailoverCluster(故障轉移和重試)。默認是后者。當然,還有一個抽象父類,AbstractCluster。

該類有個重要的方法, init 方法,初始化 Cluster,Cluster 可以理解成客戶端,封裝了集群模式、長連接管理、服務路由、負載均衡等抽象類。

init 方法代碼如下,不多。

public synchronized void init() {
    if (initialized) { // 已初始化
        return;
    }
    // 構造Router鏈
    routerChain = RouterChain.buildConsumerChain(consumerBootstrap);
    // 負載均衡策略 考慮是否可動態替換?
    loadBalancer = LoadBalancerFactory.getLoadBalancer(consumerBootstrap);
    // 地址管理器
    addressHolder = AddressHolderFactory.getAddressHolder(consumerBootstrap);
    // 連接管理器
    connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);
    // 構造Filter鏈,最底層是調用過濾器
    this.filterChain = FilterChain.buildConsumerChain(this.consumerConfig,
        new ConsumerInvoker(consumerBootstrap));
    // 啟動重連線程
    connectionHolder.init();
    // 得到服務端列表
    List<ProviderGroup> all = consumerBootstrap.subscribe();
    if (CommonUtils.isNotEmpty(all)) {
        // 初始化服務端連接(建立長連接)
        updateAllProviders(all);
    }
    // 啟動成功
    initialized = true;
    // 如果check=true表示強依賴
    if (consumerConfig.isCheck() && !isAvailable()) {
    }
}

可以看到,干的活很多。每一步都值得花很多時間去看,但看完所有不是我們今天的任務,我們今天關注一下調用,上面的代碼中,有一段構建 FilterChain 的代碼是值得我們今天注意的。

創建了一個 ConsumerInvoker 對象,作為最后一個過濾器調用,關於過濾器的設計,我們之前已經研究過了,不再贅述,詳情 SOFA 源碼分析 —— 過濾器設計

我們主要看看 ConsumerInvoker 類,該類是離 Netty 最近的過濾器。實際上,他也是擁有了一個 BootStrap,但,注意,擁有了 BootStrap ,相當於挾天子以令諸侯,啥都有了,在他的 invoke 方法中,會直接獲取 Boostrap 的 Cluster 向 Netty 發送數據。

代碼如下:

return consumerBootstrap.getCluster().sendMsg(providerInfo, sofaRequest);

厲害吧。

那么,Cluster 是如何進行 sendMsg 的呢?如果是 bolt 類型的 Cluster 的話,就直接使用 bolt 的 RpcClient 進行調用了,而 RpcClient 則是使用的 Netty 的 Channel 的 writeAndFlush 方法發送數據。如果是同步調用的話,就阻塞等待數據。

總的流程就是這樣,具體細節留到以后慢慢分析。

下面看看拿到動態代理對象后,如何進行調用。

動態代理如何調用?

當我們調用的時候,肯定會被 JDKInvocationHandler 攔截。攔截方法則是 invoke 方法。方法很簡單,主要就是使用我們之前注入的 proxyInvoker 的 invoke 方法。我們之前說了,proxyInvoker 的作用其實就是一個鏈表的頭。而他主要了代理了真正的主角 Cluster,所以,你可以想到,他的 invoke 方法肯定是調用了 Cluster 的 invoke 方法。

Cluster 是真正的主角(注意:默認的 Cluster 是 FailoverCluster),那么他的調用肯定是一連串的過濾器。目前默認有兩個過濾器:ConsumerExceptionFilter, RpcReferenceContextFilter。最后一個過濾器則是我們之前說的,離 Netty 最近的過濾器 —— ConsumerInvoker。

ConsumerInvoker 會調用 Cluster 的 sendMsg 方法,Cluster 內部包含一個 ClientTransport ,這個 ClientTransport 就是個膠水類,融合 bolt 的 RpcClient。所以,你可以想到,當 ConsumerInvoker 調用 sendMsg 方法的時候,最后會調用 RpcClient 的 invokeXXX 方法,可能是異步,也可能是同步的,bolt 支持多種調用方式。

而 RpcClient 最后則是調用 Netty 的 Channel 的 writeAndFlush 方法向服務提供者發送數據。

取一段 RpcClietn 中同步(默認)執行的代碼看看:

protected RemotingCommand invokeSync(final Connection conn, final RemotingCommand request,
                                     final int timeoutMillis) throws RemotingException,
                                                             InterruptedException {
    final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
    conn.addInvokeFuture(future);
    conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) throws Exception {
            if (!f.isSuccess()) {
                conn.removeInvokeFuture(request.getId());
                future.putResponse(commandFactory.createSendFailedResponse(
                    conn.getRemoteAddress(), f.cause()));
            }
        }
    });
    // 阻塞等待
    RemotingCommand response = future.waitResponse(timeoutMillis);
    return response;
}

通過 Netty 的 Channel 的 writeAndFlush 方法發送數據,並添加一個監聽器,如果失敗了,就向 future 中注入一個失敗的對象。

在異步執行后,主線程開始等待,內部使用 countDownLatch 進行阻塞。而 countDownLatch 的初始化參數為 1。什么時候喚醒 countDownLatch 呢?

在 putResponse 方法中,會喚醒 countDownLatch。

而 putResponse 方法則會被很多地方使用。比如在 bolt 的 RpcResponseProcessor 的 doProcess 方法中就會調用。而這個方法則是在 RpcHandler 的 channelRead 方法中間接調用。

所以,如果 writeAndFlush 失敗了,會 putResponse ,沒有失敗的話,正常執行,則會在 channelRead 方法后簡介調用 putResponse.

總結一下調用的邏輯吧,樓主畫了一張圖,大家可以看看,畫的不好,還請見諒。

image.png

紅色線條是調用鏈,從 JDKInvocationHandler 開始,直到 Netty。綠色部分是 Cluster,和 Client 的核心。大紅色部分是 bolt 和 Netty。

好了,關於 SOFA 的服務引用主流程我們今天就差不多介紹完了,當然,有很多精華還沒有研究到。我們以后會慢慢的去研究。

總結

看完了 SOFA 的服務發布和服務引用,相比較 SpringCloud 而言,真心覺得很輕量。上面的一幅圖基本就能展示大部分調用過程,這在 Spring 和 Tomcat 這種框架中,是不可想象的。而 bolt 的隔離也讓 RPC 框架有了更多的選擇,通過不同的擴展點實現,你可以使用不同的網絡通信框架。這時候,有必要上一張 SOFA 官方的圖了:

image.png

從上圖可以看出,我們今天比較熟悉的地方,比如 Cluster,包含了過濾器,負載均衡,路由,然后調用 remoting 的遠程模塊,也就是 bolt。 通過 sendMsg 方法。

而 Cluster 的外部模塊,我們今天就沒有仔細看了,這個肯定是要留到今后看的。比如地址管理,連接管理等等。

好啦,今天就到這里。如有不對之處,還請指正。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM