HSF源碼閱讀


HSF各組成之間的關系

 

1 服務提供者注冊與發布

<bean id="hsfTestService"
        class="com.test.service.impl.HsfTestServiceImpl" />
    <bean class="com.taobao.hsf.app.spring.util.HSFSpringProviderBean"
        init-method="init">
        <property name="serviceName" value="hsfTestService" />
        <property name="target" ref="hsfTestService" />
        <property name="serviceInterface">
            <value>com.test.service.HsfTestService
            </value>
        </property>
        <property name="serviceVersion">
            <value>${hsf.common.provider.version}</value>
        </property>
    </bean>

首先服務發布初始化bean,HSFSpringProviderBean實現了Spring的3個接口,將HSF的publish和Spring容器的生命周期綁定在一起。

1)InitializingBean,實現afterPropertiesSet接口,在init方法之前調用,執行服務發布的初始化信息

    @Override
    public void afterPropertiesSet() throws Exception {
        init();
    } 

2)ApplicationContextAware,在該方法會在Spring容器加載Bean之前執行,這里面最關鍵的就是設定了isInSpringContainer=true。它對后面的初始化有什么用呢?一般我們在配置HSFSpringProviderBean都會指定它的init-method,也就是這個HSFSpringProviderBean加載完成后執行的一個初始化方法,這個初始化方法中就是判斷isInSpringContainer的值,如果為true,則不會在這里執行publish操作。

3)ApplicationListener,這個方法會在所有的Bean初始化完成以后被Spring回調,這就保證了當所有的Bean初始化完成(包括各種設值注入和init方法執行)后,判斷是事件ContextRefreshedEvent來執行publish方法,Spring銷毀時,判斷ContextClosedEvent事件,執行服務的關閉。

首先看,init方法,

public void init() throws Exception {
        // 避免被初始化多次
        if (!providerBean.getInited().compareAndSet(false, true)) {
            return;
        }
        LoggerInit.initHSFLog();

        SpasInit.initSpas();
        providerBean.checkConfig();
        publishIfNotInSpringContainer();
    }

    private void publishIfNotInSpringContainer() {
        if (!isInSpringContainer) {
            LOGGER.warn("[SpringProviderBean]不是在Spring容器中創建, 不推薦使用");
            providerBean.publish();
        }

從代碼中很明顯的看到服務發布providerBean.publish(),先來看大致類圖,類圖中有些不是很關鍵的先省略了:

整個服務發布的流程,歸納如下:

  • 服務初始化,首先需要有一個提供服務的service實現類和接口;
  • 初始化HSFSpringProviderBean,從配置文件獲取服務名稱、接口、實現類、版本等等;
  • providerBean是HSFApiProviderBean在HSFSpringProviderBean中的變量,HSFSpringProviderBean會將從配置文件獲取的服務名稱、接口、實現類、版本等等賦值給providerBean;
  • providerBean中有個服務實體類ServiceMetadata,providerBean會將服務發布的所有信息放在這里,如接口、實現類、版本等等,在整個發布過程中,ServiceMetadata是所有對象之間的傳輸對象;
  • 這里先來解釋一下為什么有HSFSpringProviderBean和HSFApiProviderBean,其實兩個可以合並成一個,但是為什么要分開呢?我的理解是對於不同環境的不同實現,比如現在用的是spring環境,那就需要有個spring適配類HSFSpringProviderBean來獲取配置信息,假如是其他環境那么就會有另一個適配類,最終把信息統一轉成給HSFApiProviderBean,HSFApiProviderBean是來具體操作實現;
  • 當執行providerBean.publish()時,會調用ProcessService的publish方法,具體實現類是ProcessComponent;
  • 發布的具體流程就是ProcessComponent里:
    • 第一步,調用rpcProtocolService來注冊發布RPC服務,首先啟動一個hsf server服務,其實就是一個netty的服務端,通過與配置中心連接,發布自己的服務。處理RPC請求就是建立一個netty鏈接,解析請求參數返回處理結果。然后在server本地發布一個線程池,每一個服務都會申請一個線程池,當請求過來時從線程池獲取executor進行執行並返回,最后將provider封裝為ProviderServiceModel,注冊到HSF Server上;
    • 第二步,檢查單元化發布,就unitService在發布前檢查是中心發布還是單元發布,對ServiceMetadata設置不同的發布路由;
    • 第三步,通過metadataService將ServiceMetadata發布到ConfigServer上,供服務的消費方能從configserver獲取到服務提供方的信息;
    • 第四步,通過metadataInfoStoreService將ServiceMetadata保存到redis供服務治理或者其他用途。

2 服務消費者的訂閱和被推送

<bean id="hsfTestService" class="com.taobao.hsf.app.spring.util.HSFSpringConsumerBean" init-method="init">
    <property name="interfaceName" value="com.test.service.hsfTestService"/>
    <property name="version" value="1.0.0.daily"/>
</bean>

服務的訂閱流程:

  • 服務初始化,首先需要引入服務接口相關的pom,然后寫配置文件;
  • 將需要被調用的服務注冊成spring bean,即上面配置文件中的內容。
    • 這里用到了動態代理,通過類圖我們可以看到HSFSpringConsumerBean實現了FactoryBean;
    • FactoryBean:是一個Java Bean,但是它是一個能生產對象的工廠Bean,通過getObject方法返回具體的bean,在spring bean實例化bean的過程中會去判斷是不是FactoryBean,如果不是就返回bean,否則返回FactoryBean生產的bean,具體同學們可以去看AbstractBeanFactory的doGetBean方法,里面會調用getObjectForBeanInstance方法,這個方法里有具體實現;
    • HSFSpringConsumerBean實現了FactoryBean,那么getObject方法具體返回了什么呢?怎么返回的呢?

 

@Override
public Object getObject() throws Exception {
    return consumerBean.getObject();
}

 

    • 從代碼看得出是調用了consumerBean(HSFApiConsumerBean)的getObject方法返回的,那么我們再來看getObject方法
public Object getObject() throws Exception {
    return metadata.getTarget();
}
    • 這個方法返回的是metadata(ServiceMetadata)的target,那么target是怎么獲取的呢?下面重點說明;
  • HSFSpringConsumerBean的init方法調用了consumerBean(HSFApiConsumerBean)的init方法,我們來看consumerBean里init方法的某一段代碼:
ProcessService processService = HSFServiceContainer.getInstance(ProcessService.class);
try {
    metadata.setTarget(processService.consume(metadata));
    LOGGER.warn("成功生成對接口為[" + metadata.getInterfaceName() + "]版本為[" + metadata.getVersion() + "]的HSF服務調用的代理!");
} catch (Exception e) {
    LOGGER.error("", "生成對接口為[" + metadata.getInterfaceName() + "]版本為[" + metadata.getVersion()
            + "]的HSF服務調用的代理失敗", e);
    // since 2007,一旦初始化異常就拋出
    throw e;
}
int waitTime = metadata.getMaxWaitTimeForCsAddress();
if (waitTime > 0) {
    try {
        metadata.getCsAddressCountDownLatch().await(waitTime, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        // ignore
    }
}
  • 這一段代碼包含了動態代理對象的具體生成和服務訂閱以及服務信息接收;
  • 先說了一下代碼邏輯,服務的訂閱和服務信息的接收(被推送)在processService中執行,動態代理對象在processService中生成,下面的wait是用來等目標服務信息的推送(當收到訂閱的目標具體服務實現,接下來的調用過程才能走通);
  • 看來processService是一個很重要的組件,這邊通過**processService.consume(metadata)**這樣的方法調用實現了那么多步驟,target也在這里面生成,說一下這個方法內的邏輯:
    • 首先去緩存中找是否之前target有生成,有就返回;
    • 初始化尋址服務,調用hookService.preConsume;
    • 生成對目標服務的尋址對象后,然后通過java Proxy生成代理對象;
    • 完成HSF消費端的本地初始化后,然后從HSF治理中心訂閱服務信息(返回的可調用地址);
      • 訂閱路由規則和機房規則;
      • 訂閱服務,首先根據訂閱信息生成一個訂閱登記表;其次,向訂閱中心訂閱服務;最后,注冊訂閱數據監聽器,並立即執行地址刷新任務;
      • 訂閱notify,支持消費端回調;
    • 保存客戶端metadata到redis,返回target。

 

3 服務消費者發起調用

服務信息已經注冊發布,客戶端也獲取到了服務的調用地址,接下去就是調用就行,調用就是真正的rpc請求了,hsf的rpc是通過netty實現的。

 

當服務消費方發起調用時,對代理對象的調用,都會觸發到HSFServiceProxy的invoke方法,該方法是服務調用的入口方法,invoke會調用trueInvoke方法:

  • trueInvoke方法首先獲取一個線程池的大小,如果沒有,將同步分配一個線程,並調用rpc的invoke方法
  • 在trueInvoke里調用RPCProtocolTemplateService,在這里封裝HSFRequest,地址路由的獲取、檢查,監測信息的埋點,日志的處理等,然后執行具體的invoke0調用;
  • invoke0方法調用流程
    • 組裝HSFRequest對象,封裝目標方法名稱、方法參數、方法參數淚下等信息
    • 根據需要獲取目標方法的URL,調用地址。不同情況選擇不同的調用地址;
    • 如果是廣播模式,那么對應每一個地址都會發送一次調用
    • 檢查是否是本地調用;獲取超時時間;根據調用類型獲取不同的RPC調用,HSF主要包含3種RPC調用,CallbackInvokeComponent(回調的方式)、FutureInvokeComponent(future方式)、SyncInvokeComponent(同步調用)
    • 對於SyncInvokeComponent調用方式,
      • 創建一個Netty客戶端鏈接
      • 向客戶端提交一個HSFRequest請求,得到一個future對象
      • 通過future.get獲取返回值,並通過客戶端傳入的timeout控制超時時間,get方法在超時或者獲取結果之前是阻塞的,所以是同步調用方式
      • 將獲取到的結果放在HSFResponse對象中,完成調用並返回。如果有超時或者異常,則返回異常;

4 服務提供方處理請求

provider端啟動一個NettyServer用於接收請求,那么一次請求的處理就是從NettyServer收到請求開始的。在NettyServer的start代碼中,注冊了一個serverHandler,用來處理客戶端的請求。

  • 服務端會啟動nettyServer,具體由NettyServerHandler來處理所有rpc請求;
  • NettyServerHandler中定義了HandleRequest方法,用來處理所有請求,當一個請求到來,handleRequest完成了以下幾件事,1)根據請求類型找到對應的ServerHandler,可能是HSF Dubbo等
  • 如果沒有分配線程池,則由serverHandler來同步處理請求;
  • 如果分配了線程池,則封裝一個HandlerRunnable對象,來異步處理請求;
  • 無論同步還是異步,都是由ServerHandler的handleRequest來處理請求,這個方法將請求對象和連接封裝成一個ServerOutput對象,交由RpcRequestProcessor來處理
    • 解析請求參數
    • 在本地查找對應的ProviderServiceModel
    • 根據方法名和參數類型查找ProviderServiceModel
    • 反射的方式調用該方法,得到返回值並封裝在HSFReponse中
    • 發送Response對象

 

5 服務消費者獲取結果

在調用SyncInvokeComponent的invoke方法中,使用future.get獲取調用結果,getResponseObject方法負責從返回對象中解析結果。

 

過程后續慢慢補充。。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM