HBase之Table.put客戶端流程


  首先,讓我們從HTable.put方法開始。由於這一節有很多方法只是簡單的參數傳遞,我就簡單略過,但是,關鍵的方法我還是會截圖講解,所以希望大家盡可能對照源碼進行流程分析。另外,在這一節,我單單介紹put操作在客戶端的流程,畢竟,這個內容已經很多了。至於具體服務端的流程,我會在后面的章節中介紹到,歡迎大家到時候閱讀。
  由於這一節的方法還是比較復雜的,我特地畫了一張思維導圖,大家可以先通過思維導圖來對本節的內容有一個大概的了解,置於具體的流程,我在下面將對照源碼的貼圖一一為大家講解(在這里聲明一點,我在這一節只介紹單個put操作的流程,至於put批處理,大家有興趣可以自己研究一下)。
  首先,讓我們來到HTable.put方法,如下圖所示:   這里我先講一下這一節的最后調用流程,也同時讓大家明確一下在本節我着重要講解的流程是哪塊。在上圖中我已經表示出來了,后面方法的調用最后調用到了上面新創建的ClientServiceCallable中覆寫的rpcCall方法,也就是調用到了ClientServiceCallable.doMutate。關於這個方法中具體與服務端的交互流程在本節我就略過,但是,在后面的內容中,我會談到類似的情況,如果大家感興趣的話,可以繼續后面的內容。   接下來讓我們回到本節的重點。首先是RpcRetryingCallerFactory.newCaller方法的調用,該方法使用RpcRetryingCallerFactory的成員參數創建了RpcRetryingCaller,用於后面對於RetryingCallable的調用(該方法在后面也會多次調用,在后面我就不貼圖了)。   接下來讓我們來到RpcRetryingCallerImpl.callWithRetries。這個方法是本節中最為重要的方法,在后面也會多次用到。方法雖然比較長,但大多是異常的情況的解決,在本節中我們就單單介紹callable.prepare與callable.call兩個方法。至於interceptor.intercept,由於在構造RpcRetryingCallerFactory時默認的interceptor類型為RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR,在本節並不會有其它影響,所以我們暫時不需要關注。   上面的方法調用的callable具體類型為覆寫了rpcCall方法的ClientServiceCallable,下面讓我們來到ClientServiceCallable類的內部。ClientServiceCallable繼承自RegionServerCallable,因此,上面方法實際調用的是RegionServerCallable.prepare與RegionServerCallable.call。   首先讓我們來到RegionServerCallable.prepare方法。這里比較重要的方法我已經框選出來了。需要大家特別留意的是最后的setStubByServiceName,一是因為他比較重要,二是我在后面的內容才會介紹,大家到時候可能忘記了,所以在這里特別提醒一下大家。
  容易看到,首先調用了connection.getRegionLocator獲得一個新構建的HRegionLocator(這里就不截圖了,因為實在是沒有什么內容需要講),不過大家需要注意的是,這里的tableName是我們實際要查詢到tableName,而后面會用到META_TABLE_NAME,容易混淆,我在這里簡單提一下。接下來調用了HRegionLocator.getRegionLocation。   在調用HRegionLocator.getRegionLocation時,這里會有一系列簡單方法的調用,由於在上面的導圖中我並沒有畫出,在這里我就一一貼圖描述。
  一系列方法走下來,到這里就到了比較重要的方法。由於這個是長圖,沒有辦法框選除重點,我就在文字中一一介紹該方法中調用的比較好重要的方法。
  1.getCachedLocation,該方法簡介調用到了metaCache.getCachedLocation,但此時,由於我們是第一次調用該表的信息,並沒有放到緩存中,因此,這里返回的locations = null。
  2.然后我們來到RegionInfo.createRegionName,需要注意的是,其入參row就是我們put操作創建的rowKey,也就是我們常說的行鍵。另外,在metaStartKey中傳入的id為HConstants.NINES(NINES = "99999999999999"),而在metaStopKey中傳入的id為空字符串。
  3.接着構造了Scan。其中withStartRow與withStopRow中的inclusive入參都為true。將reversed設置為true,並且將catalog family設置為"info"(CATALOG_FAMILY_STR = "info")。大家可能注意到了,這里的info列族在我們的表中並不一定存在。到了這里,大家可能就猜到我在前面埋的伏筆了。沒錯,這里構建的Scan是為了后面的查詢后面的META_TABLE_NAME做准備。
  4.緊接着來到fro循環中,這里連着調用了兩次getCachedLocation,后面的那次調用加了鎖,類似我們在單例設計模式中流程,加鎖以確保對象不會重復。
  5.然后構建了ReversedClientScanner對象。(鑒於之前經驗,貼太多圖容易擾亂大家的思維,我在這里盡量用文字來介紹)。ReversedClientScanner是ClientScanner的子類,另外,大家需要注意的是,在構造ReversedClientScanner時傳入的tableName為TableName.META_TABLE_NAME。在ReversedClientScanner的構造過程中,雖然有一些需要注意的地方,不過,我還是放在后面來描述,以便大家能夠更好的理解整個流程。
  6.接下來調用了ReversedClientScanner.next,大家千萬不要小看這個方法,這個方法里面的一系列調用時非常復雜的,也是本節的另外一個重點,我將在后面詳細介紹。
  7.然后調用了MetaTableAccessor.getRegionLocations,其入參為ReversedClientScanner.next的返回值。這個方法的詳細流程也比較重要,同樣,我放到后面為大家講解。
  8.最后調用了cacheLocation,也就是將當前tableName放到緩存中。   上面,我將ConnectionImplementation.locateRegionInMeta方法中調用的各個流程都簡單介紹了一下,下面,我就選擇其中比較重要的方法來詳細描述。
  首先讓我們來到ReversedClientScanner.next。這個方法調用了ClientScanner.nextWithSyncCache,如下圖所示:   上圖框選的兩個方法都比較重要,讓我們首先介紹比較復雜的loadCache,如下圖所示。
  看到這個方法大家可能比較慌,沒有關系,我會在這里為大家一一介紹。
  1.首先調用了moveToNextRegion。該方法首先調用closeScanner(其間首先調用了成員變量callable.setClose方法,然后調用了ClientScanner.call方法,這個方法我在后面也會提到,最后將當前成員變量callable中的值置為null,簡而言之,將成員變量callable.setClose置為null)。
  然后構造了ScannerCallableWithReplicas並賦給成員變量callable。在構造ScannerCallableWithReplicas時需要注意的是其中創建了ReversedScannerCallable。也就是說ScannerCallableWithReplicas的成員變量currentScannerCallable為ReversedScannerCallable。順便提一下,ScannerCallableWithReplicas的成員變量scan為我們在上面構造的scan。
  2.接着調用了ClientScanner.call方法。這里的調用流程比較繁瑣。為了更清楚的解釋清楚loadCache方法,我們先跳過這里,假設其中已經有了返回值。
  3.然后調用了scanResultCache.addAndGet。簡單提示一下我們這里的scanResultCache類型為CompleteScanResultCache。
  4.然后將結果集中的內容遍歷放到成員變量cache中。這里我們可以回過頭來看看上面的圖。上面圖中我框選了cache.poll方法。也就是說cache.poll將在loadCache方法中放入的結果集取出來。
  上面我提到過很多次ClientScanner.call方法,但是都沒有詳細描述,下面我就特意來講解該方法。其實這個方法很簡單,只是調用了方法RpcRetryingCaller.callWithoutRetries。這里的caller是在ReversedClientScanner方法構造時創建的(上面只是提到說構造ReversedClientScanner有需要注意的地方,也就是這里,其截圖我在上面也已經貼出來了)。
  接下來讓我們來到RpcRetryingCallerImpl.callWithoutRetries。這里的入參callable我在上面的方法loadCache已經介紹過了。其類型為ScannerCallableWithReplicas。由於ScannerCallableWithReplicas.prepare方法為空實現,我在這里就不貼圖了,接下來將重點放在ScannerCallableWithReplicas.call。   讓我們來到ScannerCallableWithReplicas.call,如下圖所示。
  1.在ClientScanner.closeScanner方法調用時,會走上面的if判斷。由於currentScannerCallable.closed的值為true。
  2.由於默認的成員變量regionReplication,因此會調用RpcRetryingCallerWithReadReplicas.getRegionLocations。這個方法的調用與我們今天的主要流程並沒有什么太多的聯系,因此,在這里簡單略過。該方法我可能會放在后面的章節中講到。
  3.構造了ResultBoundedCompletionService。這個方法比較重要,在后面的流程中我會反復講到。
  4.調用了addCallsForCurrentReplica,將成員變量currentScannerCallable封裝到ScannerCallableWithReplicas.RetryingRPC中,並交由ResultBoundedCompletionService提交。
  5.接着調用cs.poll,獲取其提交的任務的返回值。
  后面我將詳細講解。   首先來到ScannerCallableWithReplicas.addCallsForCurrentReplica方法。容易看到,將成員變量currentScannerCallable封裝到RetryingRPC中。然后調用了ResultBoundedCompletionService.submit。這里着重提醒一下大家,這里的currentScannerCallable類型為ReversedScannerCallable。   接着讓我們來到ResultBoundedCompletionService.submit,如下圖所示。
  這里將傳入的RetryingRPC封裝到QueueingFuture,然后調用了executor.execute。由於QueueingFuture繼承自java.util.concurrent.RunnableFuture,也就是在調用executor.execute時,QueueingFuture.run方法會執行。   接下來讓我們來到QueueingFuture。在下圖中,我框選出了其中比較重要的方法。
  首先這里調用了RpcRetryingCallerImpl.callWithRetries方法(由於這個方法我在上面已經提到過了,因此在這里就不貼圖了)。重要的是其中的入參future類型為ScannerCallableWithReplicas.RetryingRPC。另外后面將當前QueueingFuture添加到ResultBoundedCompletionService成員變量completedTasks中。   讓我們來到ScannerCallableWithReplicas.RetryingRPC.prepare方法。如下圖所示。大家可能對這里的成員變量callable比較模糊了,大家可以往上翻到方法addCallsForCurrentReplica的描述,沒錯這里的callable就是ScannerCallableWithReplicas的成員變量currentScannerCallable。而ScannerCallableWithReplicas.currentScannerCallable正是在構造ScannerCallableWithReplicas時傳入的ReversedScannerCallable。   接下來讓我們來到ReversedScannerCallable.prepare。由於這是第一次調用prepare方法,因此其成員變量instantiated為false。這里簡單提一下,這里的getRow方法獲取的是我們調用put時的行鍵,也就是我們對於目標表的rowKey。由於這里的tableName為TableName.META_TABLE_NAME,其rowKey在后面並沒有用到。
  然后調用了ReversedScannerCallable.setStub方法。為成員變量stub的賦值。其值為getConnection().getClient(getLocation().getServerName())調用的返回值。   讓我們來到ConnectionImplementation.getClient方法。看過我博文《HBase之HRegionServer啟動(含與HMaster交互)》的同學看到這里可能就比較熟悉。 沒錯,這里正是通過ClientProtos.ClientService.newBlockingStub構造了協議ClientProtos.ClientService的客戶端stub。關於與服務端交互的流程,我在《HBase之HRegionServer啟動(含與HMaster交互)》中已經具體介紹了,大家感興趣的可以去看一下,我們這里來描述比較重要一個點。
  就是computeIfAbsentEx的最后一個入參IOExceptionSupplier。他類似於java中的Supplier(類似的方法調用我在后面講解方法MetaTableAccessor.getRegionLocations)。   在第一次調用時,我們的stubs中並沒有到該serverName的客戶端stub,因此調用了入參supplier的get方法。也就是我們上面看到的lambda表達式方法內容被調用。   到這里,ReversedScannerCallable.prepare方法就調用完成了。這個還有一個需要注意的點就是ReversedScannerCallable.prepare方法的最后將其成員變量instantiated置為true。
  接下來讓我們來到ScannerCallableWithReplicas.RetryingRPC.call方法(這里的callable類型為ReversedScannerCallable)。
  這里再次調用了RpcRetryingCallerImpl.callWithRetries,由於ReversedScannerCallable.prepare方法已經調用,並且其成員變量instantiated被置為true,所以上面描述的內容並不會再次調用(這里框選的內容作為后面的伏筆)。   也就是說,接下來應該調用的是ReversedScannerCallable.call。由於其並沒有call方法,因此,會一直調用到其父類RegionServerCallable.call。如下圖所示。這里的rpcController類型為HBaseRpcControllerImpl。接下來調用了rpcCall方法。由於ReversedScannerCallable中並沒有rpcCall方法的實現,而在其父類ScannerCallable有實現rpcCall。   接下來,讓我們來到ScannerCallable.rpcCall。由於默認的成員變量scannerId為-1,因此,會調用openScanner。由於openScanner方法僅僅是通過Client協議發送到服務端。關於rpc流程我在博客《hbase之RPC調用流程簡介》中已經介紹過了,感興趣的同學可以去看一下,那篇博文講的比較淺顯,我會在春節期間將那篇內容更新,大家可以關注我,到時候有更新大家也就收到通知了。
  然后調用了ResponseConverter.getResults,將服務端的返回的ScanResponse轉換為Result。   讓我們來到ResponseConverter.getResults。這個方法的主要作用是將CellScanner中Cell的或ScanResponse中的PB類型的results轉換為java類型的Result。至於該方法的詳細描述我要放到后面開設的第二章節,也就是HBase中客戶端協議各個操作中來講解,因為這里流程是比較復雜的,要結合上服務端的流程才能講述清楚。所以這里暫時略過。   到這里,一個完整的RpcRetryingCallerImpl.callWithRetries方法調用流程可以說是完結了。然后在ResultBoundedCompletionService.QueueingFuture.run方法的后面,將當前QueueingFuture添加到ResultBoundedCompletionService成員變量completedTasks中(雖然我在上面提到過,但這里還是重述一下,以便我們后面的理解)。
  而在我們本節描述的整體流程中,ScannerCallableWithReplicas.addCallsForCurrentReplica方法調用完結。
  接下來讓我們來到ResultBoundedCompletionService.poll,由於其間接調用了ResultBoundedCompletionService.pollForSpecificCompletedTask,如下圖所由於在QueueingFuture.run方法的最后,將自身添加到了completedTasks。因此,上面的方法獲取的正是剛剛添加的QueueingFuture。接着調用了ResultBoundedCompletionService.QueueingFuture.get方法。如下圖所示。也就是說,這里將result返回。這里result的類型我們需要注意一下,以便后面在類型上面的理解。由於這里QueueingFuture成員變量future的實際類型為ScannerCallableWithReplicas.RetryingRPC。大家可以往上翻到ScannerCallableWithReplicas.RetryingRPC.call,就可以發現,這里的result是從ResponseConverter.getResults獲得的Result數組與成員變量callable封裝后的Pair對象。接着,將r.getFirst(),也就是實際獲得的結果返回。
  到這里,大家可能以為要結束了,很遺憾,這里只是到了ClientScanner.call方法的返回。
  由於接下來的是兩個單獨的流程了。一個是MetaTableAccessor.getRegionLocations,另外一個是ConnectionImplementation.cacheLocation。至於這兩個流程之外的后續流程比較簡單,我就不一一敘述了,相信大家跟着源碼與我在前面的提示很容易就可以弄清楚了。而前面提到的那兩個單獨的流程我將放在后面的一節《HBase之Table.put客戶端流程(續)》中介紹。到時候歡迎大家閱讀。
  大家可以關注我的博客,或者發送郵件到我的郵箱15935152719@163.com來溝通交流大數據相關的知識。感謝大家的閱讀,如果覺得不錯,希望您可以點擊下面的推薦。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM