Flume-NG(1.5版本)中SpillableMemoryChannel源碼級分析


  SpillableMemoryChannel是1.5版本新增的一個channel。這個channel優先將evnet放在內存中,一旦內存達到設定的容量就使用file channel寫入磁盤。然后讀的時候會按照順序讀取:會通過一個DrainOrderQueue來保證不管是內存中的還是溢出(本文的“溢出”指的是內存channel已滿,需要使用file channel存儲數據)文件中的順序。這個Channel是memory channel和file channel的一個折中,雖然在內存中的數據仍然可能因為進程的突然中斷而丟失,但是相對於memory channel而言一旦sink處理速度跟不上不至於丟失數據(后者一旦滿了爆發異常會丟失后續的數據),提高了數據的可靠性;相對於file channel而言自然是大大提高了速度,但是可靠性較file channel有所降低。

  我們來看一下SpillableMemoryChannel的繼承結構:SpillableMemoryChannel extends FileChannel,原來SpillableMemoryChannel是file的子類,天熱具有file channel的特性。但是它的BasicTransactionSemantics是自己實現的。接下來我們來分析分析這個channel,這個channel可以看成是兩個channel。相關內容傳送門:Flume-NG源碼閱讀之FileChannel 和 flume-ng源碼閱讀memory-channel(原創) 。

   一、首先來看configure(Context context)方法,這個方法是對這個channel進行配置。一些主要參數介紹:
  (1)Semaphore totalStored,這兩個channel【內存channel(並不是flume內置的memory channel,這里是新實現的一個,本文中的“內存channel”若無說明就是新實現的這個)和溢出而使用的file channel】中event數量的總和的信號量,初始為0;
  (2)ArrayDeque<Event> memQueue,這就是這里的內存channel,使用可以改變大小的數組雙端隊列ArrayDeque,存儲event數據;
  (3)int memoryCapacity(對應參數名 "memoryCapacity"),內存channel中存儲的event的最大數量;
  (4)Semaphore memQueRemaining,內存channel剩余的可存儲event的數量的信號量,初始大小為memoryCapacity;
  (5)int overflowTimeout(對應參數名 "overflowTimeout"),溢出超時時間,指的是內存channel滿了之后,切換到file channel的等待時間,默認是3s;
  (6)double overflowDeactivationThreshold(對應參數名 "overflowDeactivationThreshold"),指的是停止溢出的閾值------內存channel剩余內存(這里指可再存儲的event數量),默認5%;
  (7)volatile int byteCapacityBufferPercentage(對應參數名 "byteCapacityBufferPercentage"),用來限制內存channel使用物理內存量,默認20;
  (8)volatile double avgEventSize()(對應參數名 "avgEventSize"),指定每個event的大小,用來計算內存channel可以使用的slot總數量,會把event量化為slot,而不是字節,默認500;
  (9)volatile int byteCapacity(對應參數名 "byteCapacity"),slot數量,默認是JVM可使用的最大物理內存(可通過配置 "byteCapacity"參數來控制物理內存使用 )的80% * (1 - byteCapacityBufferPercentage * .01 )) / avgEventSize得來;
  (10)Semaphore bytesRemaining,內存channel中剩余可使用的slot數量信號量,初始大小是byteCapacity;
  (11)volatile int lastByteCapacity,動態加載配置文件時才會有用,記錄上一次的ByteCapacity,用於修改bytesRemaining信號量的大小;
  (12)int overflowCapacity(對應參數名 "overflowCapacity"),用於設置file channel的容量,默認是1億;
  此外,boolean overflowDisabled用來是否禁用溢出,只要overflowCapacity不小於1就不會禁用;boolean overflowActivated表示是否可以使用溢出,默認是false;還會對對file channel的 "keep-alive"設置為0;最后會通過super.configure(context)來對file channel進行配置。對於file channel的配置信息可以和SpillableMemoryChannel的配置信息在一起配置。
  二、start()方法,首先會super.start()啟動file channel,獲取file中溢出的數據量overFlowCount,重置totalStored和 DrainOrderQueue對象drainOrder,內存channel的start是不會有數據的。
  三、需要講一下DrainOrderQueue drainOrder = new DrainOrderQueue()。我們知道SpillableMemoryChannel其實是由兩個channel組成,分別是內存channel和file channel,因此數據也會分布在內存和磁盤文件之中,那我們take時,是什么機制呢?換句話說就是什么時候讀內存中的數據,什么時候讀磁盤上文件的數據?take的順序怎么樣呢?我們希望take的順序和put的順序一樣,先put的應該先take,所以我們應該給所有的put(包括內存和文件)進行“編號”使得可以有序的take,還要注意的就是需要標示這個take是應該從內存還是file中去讀。為此設計了DrainOrderQueue類,來使得有序的put和take。
  這個類設計的狠精巧,是保證take和put正常合理操作的關鍵。在講之前先大概說一下原理:這個類的關鍵屬性是ArrayDeque<MutableInteger> queue,這也是一個ArrayDeque,ArrayDeque特性是數組可變且大小不受限制,可在頭尾操作,此類很可能在用作堆棧時快於 Stack,在用作隊列時快於 LinkedList,但是不是線程安全的不支持多線程並發操作;put操作總是對queue中的最后(尾)一個元素操作,take操作總是對queue中第一個(頭)操作;put時,如果是內存channel,在queue增加的就是正數,如果是溢出操作增加的就是負數,內存和溢出分別對應queue中不同的元素(可以分類去讀);take時,如果從內存中取數據,就會使得queue第一個元素的值不斷縮小(正數)至0,然后刪除這個元素,如果是從溢出文件中取數據則會使得queue中第一個元素不斷增大(負數)至0,然后刪除這個元素;這樣就會形成流,使得put不斷追加數據到流中,take不斷從流中取數據,這個流就是有序的,且流中元素其實就是內存中的evnet個數和溢出文件中event的個數。
  好了,DrainOrderQueue詳細代碼如下:
 1   public static class DrainOrderQueue {
 2     public ArrayDeque<MutableInteger> queue = new ArrayDeque<MutableInteger>(1000);
 3 
 4     public int totalPuts = 0;  // for debugging only
 5     private long overflowCounter = 0; // # of items in overflow channel
 6 
 7     public  String dump() {
 8       StringBuilder sb = new StringBuilder();
 9 
10       sb.append("  [ ");
11       for (MutableInteger i : queue) {
12         sb.append(i.intValue());
13         sb.append(" ");
14       }
15       sb.append("]");
16       return  sb.toString();
17     }
18 
19     public void putPrimary(Integer eventCount) {
20       totalPuts += eventCount;
21       if (  (queue.peekLast() == null) || queue.getLast().intValue() < 0) {    //獲取,但不移除此雙端隊列的最后一個元素;如果此雙端隊列為空,則返回 null
22         queue.addLast(new MutableInteger(eventCount));
23       } else {
24         queue.getLast().add(eventCount);//獲取,但不移除此雙端隊列的第一個元素。
25       }
26     }
27 
28     public void putFirstPrimary(Integer eventCount) {
29       if ( (queue.peekFirst() == null) || queue.getFirst().intValue() < 0) {    //獲取,但不移除此雙端隊列的第一個元素;如果此雙端隊列為空,則返回 null。
30         queue.addFirst(new MutableInteger(eventCount));
31       } else {
32         queue.getFirst().add(eventCount);//獲取,但不移除此雙端隊列的第一個元素。
33       }
34     }
35 
36     public void putOverflow(Integer eventCount) {
37       totalPuts += eventCount;
38       if ( (queue.peekLast() == null) ||  queue.getLast().intValue() > 0) {
39         queue.addLast(new MutableInteger(-eventCount));
40       } else {
41         queue.getLast().add(-eventCount);
42       }
43       overflowCounter += eventCount;
44     }
45 
46     public void putFirstOverflow(Integer eventCount) {
47       if ( (queue.peekFirst() == null) ||  queue.getFirst().intValue() > 0) {
48         queue.addFirst(new MutableInteger(-eventCount));
49       }  else {
50         queue.getFirst().add(-eventCount);
51       }
52       overflowCounter += eventCount;
53     }
54 
55     public int front() {
56       return queue.getFirst().intValue();
57     }
58 
59     public boolean isEmpty() {
60       return queue.isEmpty();
61     }
62 
63     public void takePrimary(int takeCount) {
64       MutableInteger headValue = queue.getFirst();
65 
66       // this condition is optimization to avoid redundant conversions of
67       // int -> Integer -> string in hot path
68       if (headValue.intValue() < takeCount)  {
69         throw new IllegalStateException("Cannot take " + takeCount +
70                 " from " + headValue.intValue() + " in DrainOrder Queue");
71       }
72 
73       headValue.add(-takeCount);
74       if (headValue.intValue() == 0) {
75         queue.removeFirst();
76       }
77     }
78 
79     public void takeOverflow(int takeCount) {
80       MutableInteger headValue = queue.getFirst();
81       if(headValue.intValue() > -takeCount) {
82         throw new IllegalStateException("Cannot take " + takeCount + " from "
83                 + headValue.intValue() + " in DrainOrder Queue head " );
84       }
85 
86       headValue.add(takeCount);
87       if (headValue.intValue() == 0) {
88         queue.removeFirst();    //獲取並移除此雙端隊列第一個元素。
89       }
90       overflowCounter -= takeCount;
91     }
92 
93   }
View Code

  我們一個方法一個方法的來剖析這個類:

  (1)dump(),這個方法比較簡單就是獲得queue中所有元素的數據量;

  (2)putPrimary(Integer eventCount),這個方法用在put操作的commit時,在commitPutsToPrimary()方法中被調用,表示向內存提交數據。這個方法會嘗試獲取queue中最后一個元素,如果為空(說明沒數據)或者元素數值小於0(說明這個元素是面向溢出文件的),就新建一個元素賦值這個事務的event數量加入queue;否則表示當前是的元素表征的是內存中的event數量,直接累加即可。

  (3)putFirstPrimary(Integer eventCount),在doRollback()回滾的時候被調用,表示將takeList中的數據放回內存memQueue的頭。這個方法會嘗試獲取queue中第一個元素,如果為空(說明沒數據)或者元素數值小於0(說明這個元素是面向溢出文件的),就新建一個元素賦值takeList的event數量加入queue;否則表示當前是的元素表征的是內存中的event數量,直接累加即可。

  (4)putOverflow(Integer eventCount),這個方法發生在put操作的commit時,在commitPutsToOverflow_core方法和start()方法中,后者是設置初始量,前者表示內存channel已滿要溢出到file channel。這個方法會嘗試獲取queue中最后一個元素,如果為空(說明沒數據)或者元素數值大於0(表示這個元素是面向內存的),就新建一個元素賦值這個事務的event數量加入queue,這里賦值為負數;否則表示當前是的元素表征的是溢出文件中的event數量,直接累加負數即可。

  (5)putFirstOverflow(Integer eventCount),在doRollback()回滾的時候被調用,表示將takeList中event的數量放回溢出文件。這個方法會嘗試獲取queue中第一個元素,如果為空(說明沒數據)或者元素數值大於0(表示這個元素是面向內存的),就新建一個元素賦值這個事務的 event數量加入queue,這里賦值為負數;否則表示當前是的元素表征的是溢出到文件中的event數量,直接累加負數即可。

  (6)front(),返回queue中第一個元素的值

  (7)takePrimary(int takeCount),這個方法在doTake()中被調用,表示take發生之后,要將內存中的event數量減takeCount(這個值一般都是1,即每次取一個)。這個方法會獲取第一個元素的值(表示內存channel中有多少event),如果這個值比takeCount小,說明內存中沒有足夠的數量,這種情況不應該發生,報錯;否則將這個元素的值減去takeCount,表示已取出takeCount個。最后如果這個元素的值為0,則從queue中刪除這個元素。注意這里雖然是可以取takeCount個,但是源碼調用這個參數都是一次取1個而已。

  (8)takeOverflow(int takeCount),這個方法在doTake()中被調用,表示take發生之后,要將溢出文件中的event數量加上takeCount(這個值一般都是1,即每次取一個)。這個 方法會獲取第一個元素的值(表示溢出文件中有多少event),如果這個值比takeCount的負值大,說明文件中沒有足夠的數量,這種情況不應該發生,報錯;否則將這個元素的值加上takeCount,表示已取出takeCount個。最后如果這個元素的值為0,則從queue中刪除這個元素。注意這里雖然是可以取 takeCount個,但是源碼調用這個參數都是一次取1個而已。

  四、這個channel的BasicTransactionSemantics:SpillableMemoryTransaction,這是每個channel的必須實現的可靠性保證。這個類也有一些屬性:
  (1)BasicTransactionSemantics overflowTakeTx = null,這個是file channel的事務FileBackedTransaction,表示take操作從溢出文件中獲取event;
  (2)BasicTransactionSemantics overflowPutTx = null,這個是file channel的事務FileBackedTransaction,表示put操作溢出到磁盤文件;
  (3)boolean useOverflow = false,是否使用溢出;
  (4)boolean putCalled = false,put操作,初次put的時候會置為true;
  (5)boolean takeCalled = false,take操作,初次take的時候會置為true;
  (6)int largestTakeTxSize = 5000,不是常量,可以再分配;
  (7)int largestPutTxSize = 5000,不是常量,可以再分配;
  (8)Integer overflowPutCount = 0,這次事務溢出的event的數量;
  (9)int putListByteCount = 0,這次事務putList所有event占用字節總和;
  (10)int takeListByteCount = 0,這次事務takeList所有event占用字節總和;
  (11)int takeCount = 0,這次事務take操作的個數;
  (12)ArrayDeque<Event> takeList,從memQueue拿出來的event暫存之所;
  (13)ArrayDeque<Event> putList,放入memQueue之前event的暫存之所; 
  按照國際慣例必須實現的4個方法:
  A、doPut(Event event),代碼如下:
 1 protected void doPut(Event event) throws InterruptedException {
 2       channelCounter.incrementEventPutAttemptCount();
 3 
 4       putCalled = true;    //說明是在put操作
 5       int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);//獲取這個event可以占用幾個slot
 6       if (!putList.offer(event)) {    //加入putList
 7         throw new ChannelFullException("Put queue in " + getName() +
 8                 " channel's Transaction having capacity " + putList.size() +
 9                 " full, consider reducing batch size of sources");
10       }
11       putListByteCount += eventByteSize;
12     }
View Code

  這個方法比較簡單,就是put開始;設置putCalled為true表示put操作;計算占用slot個數;將event放入putList等待commit操作;putListByteCount加上這個evnet占用的slot數。

  B、doTake(),代碼如下: 
 1 protected Event doTake() throws InterruptedException {
 2       channelCounter.incrementEventTakeAttemptCount();
 3       if (!totalStored.tryAcquire(overflowTimeout, TimeUnit.SECONDS)) {
 4         LOGGER.debug("Take is backing off as channel is empty.");
 5         return null;
 6       }
 7       boolean takeSuceeded = false;
 8       try {
 9         Event event;
10         synchronized(queueLock) {
11           int drainOrderTop = drainOrder.front();
12 
13           if (!takeCalled) {
14             takeCalled = true;
15             if (drainOrderTop < 0) {
16               useOverflow = true;
17               overflowTakeTx = getOverflowTx();        //獲取file channle的事務
18               overflowTakeTx.begin();
19             }
20           }
21 
22           if (useOverflow) {
23             if (drainOrderTop > 0) {
24               LOGGER.debug("Take is switching to primary");
25               return null;       // takes should now occur from primary channel
26             }
27 
28             event = overflowTakeTx.take();
29             ++takeCount;
30             drainOrder.takeOverflow(1);
31           } else {
32             if (drainOrderTop < 0) {
33               LOGGER.debug("Take is switching to overflow");
34               return null;      // takes should now occur from overflow channel
35             }
36 
37             event = memQueue.poll();    //獲取並移除此雙端隊列所表示的隊列的頭(換句話說,此雙端隊列的第一個元素);如果此雙端隊列為空,則返回 null。
38             ++takeCount;
39             drainOrder.takePrimary(1);
40             Preconditions.checkNotNull(event, "Queue.poll returned NULL despite"
41                     + " semaphore signalling existence of entry");
42           }
43         }
44 
45         int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);
46         if (!useOverflow) {
47           // takeList is thd pvt, so no need to do this in synchronized block
48           takeList.offer(event);
49         }
50 
51         takeListByteCount += eventByteSize;
52         takeSuceeded = true;
53         return event;
54       } finally {
55         if(!takeSuceeded) {
56           totalStored.release();
57         }
58       }
59     }
View Code

   由於ArrayDeque是非線程安全的(memQueue就是ArrayDeque),所以take操作從memQueue獲取數據時,要獨占memQueue。任何對memQueue都要進行同步,這里是同步queueLock。

  doTake方法會先檢查totalStored中有無許可,即channel中有無數據;然后同步;再獲取drainOrder的頭元素,如果takeCalled為false(初始為false),則設置其為true,再判斷獲取到的drainOrder頭元素的值是否為負數,負數說明數據在溢出文件中,設置useOverflow為true表示要從溢出文件中讀取數據並且獲取file channel的FileBackedTransaction賦值給overflowTakeTx,begin()可以獲取數據。如果useOverflow為true則轉到調用overflowTakeTx.take獲取event,然后takeCount自增1,調用drainOrder.takeOverflow(1)修改隊列中溢出event數量的值。如果useOverflow為false說明數據在內存中,直接調用memQueue.poll()獲得event,然后takeCount自增1,調用drainOrder.takePrimary(1)修改隊列中內存中evnet數量的值。然后計算這個event占用的slot數。如果是從內存channel中讀取的event則將其放入takeList中;takeListByteCount加上這個evnet占用的slot數。最后返回event。

  C、doCommit()方法,如果putCalled為true就會調用putCommit()方法來處理put的操作,如果takeCalled為true就調用takeCommit()方法來處理take操作。
  1、putCommit()方法,會首先依據overflowActivated的真假來設置超時時間。內存channel的溢出情況由兩個信號量控制memQueRemaining和bytesRemaining,前者控制着event的數量,后者控制着物理內存的使用情況,如果這兩者中的任何一個不滿足都會觸發溢出,溢出會設置overflowActivated = true;useOverflow = true,如果useOverflow為true,就調用commitPutsToOverflow()方法來處理溢出,這個方法會創建一個file channel的FileBackedTransaction賦值給overflowPutTx,begin可以put數據,然后依次將putList中的event通過overflowPutTx.put(event)放入file channel中,調用commitPutsToOverflow_core方法來處理overflowPutTx提交事務,再調用drainOrder.putOverflow(putList.size())修改queue中溢出文件中event的數量,如果在overflowPutTx提交過程中失敗,最多再嘗試一次,中間等待overflowTimeout秒。返回到commitPutsToOverflow方法,將totalStored釋放putList.size的許可,溢出數量overflowPutCount增加putList.size。到這溢出的情況完成。如果putCommit()useOverflow為false則說明event在內存channel中,會調用commitPutsToPrimary()來處理,這個方法會將putList中的所有event放入memQueue中,然后調用drainOrder.putPrimary(putList.size())修改queue中內存中event的數量,修改maxMemQueueSize的值,將totalStored釋放putList.size的許可
  2、takeCommit()方法,如果overflowTakeTx不為null,說明是從溢出文件取得的event,就調用commit方法提交事務。然后獲得內存channel剩余空間的百分比,包括兩部分之和,一部分是內存channel還可以再存儲evnet的數量,另一部分就是takeCount,他們倆之和與memoryCapacity(不能為0)之比就是百分比memoryPercentFree。如果overflowActivated為true且memoryPercentFree不小於overflowDeactivationThreshold,說明內存中剩余空間已經達到了停止溢出的閾值,就設置overflowActivated為false停止溢出,這樣其實會導致內存滿了之后等待溢出的時間加長。如果take操作是從內存channel中取數據,memQueRemaining會釋放takeCount個許可,表示騰出takeCount個空間;bytesRemaining會釋放takeListByteCount個許可,表示騰出takeListByteCount個slot。
  D、doRollback(),代碼如下:
 1 protected void doRollback() {
 2       LOGGER.debug("Rollback() of " +
 3               (takeCalled ? " Take Tx" : (putCalled ? " Put Tx" : "Empty Tx")));
 4 
 5       if (putCalled) {
 6         if (overflowPutTx!=null) {
 7           overflowPutTx.rollback();
 8         }
 9         if (!useOverflow) {
10           bytesRemaining.release(putListByteCount);
11           putList.clear();
12         }
13         putListByteCount = 0;
14       } else if (takeCalled) {
15         synchronized(queueLock) {
16           if (overflowTakeTx!=null) {
17             overflowTakeTx.rollback();
18           }
19           if (useOverflow) {
20             drainOrder.putFirstOverflow(takeCount);
21           } else {
22             int remainingCapacity = memoryCapacity - memQueue.size();
23             Preconditions.checkState(remainingCapacity >= takeCount,
24                     "Not enough space in memory queue to rollback takes. This" +
25                             " should never happen, please report");
26             while (!takeList.isEmpty()) {
27               memQueue.addFirst(takeList.removeLast());
28             }
29             drainOrder.putFirstPrimary(takeCount);
30           }
31         }
32         totalStored.release(takeCount);
33       } else {
34         overflowTakeTx.rollback();
35       }
36       channelCounter.setChannelSize(memQueue.size() + drainOrder.overflowCounter);
37     }
View Code

   如果putCalled為true,則表明正在進行的是put操作。如果overflowPutTx不為null,說明是在溢出,執行overflowPutTx的roolback方法進行回滾。如果沒有溢出,則bytesRemaining釋放putListByteCount許可,表示騰出putListByteCount個slot;清空putList;最后將putListByteCount置為0。如果takeCalled為true,說明正在進行的操作是take,如果overflowTakeTx不為null,說明是在溢出,執行overflowTakeTx的roolback方法進行回滾;如果在溢出,則調用drainOrder.putFirstOverflow(takeCount)修改queue中溢出文件中的event的數量;如果在使用內存channel,則計算出內存channel中還可以最多存儲event的數量,如果這個數量小於takeCount,則報錯,否則將takeList中的所有event加入memQueue的頭部,執行drainOrder.putFirstPrimary(takeCount)來修改queue中內存channel存放的event的數量;然后totalStored釋放takeCount個許可,表示內存channel中增加了takeCount個event。

  五、stop方法,會調用父類file channel中的stop方法。

  六、createTransaction()方法,直接返回一個SpillableMemoryTransaction對象。這說明take和put可以並發執行,但是當涉及到memQueue時,還是需要同步。

 

  至此,這個新的channel介紹完了。總體來說SpillableMemoryChannel是精心設計的一個channel,兼顧Flume內置的file channel和memory channel的優點,又增加了一個選擇,大伙可根據需要選擇合適的channel。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM