Flume-NG源碼閱讀之FileChannel


  FileChannel是flume一個非常重要的channel組件,非常常用。這個channel非常復雜,涉及的文件更多涉及三個包:org.apache.flume.channel.file、org.apache.flume.channel.file.encryption(加密)、org.apache.flume.channel.file.proto共計40個源碼文件。

  一、configure(Context context)方法:

  1、首先獲取配置文件中的checkpointDir和dataDirs屬性,這是存放檢查點和數據的目錄,默認使用$user.home/.flume/file-channel/checkpoint和$user.home/.flume/file-channel/data來;checkpointDir是一個目錄,而dataDirs可以是多個以“,”分割;且這兩個目錄最好不要來回修改,因為里面存儲着數據呢;

  2、獲取容量capacity,並做一些檢查比如是否<0,是否是動態加載(有無變化?),默認1000000;這指的是checkpoint文件存放event信息的最大容量。

  3、keepAlive超時時間,就是如果channel中沒有數據最長等待時間,默認3s;

  4、transactionCapacity事務的最大容量,默認1000;

  5、checkpointInterval檢查點寫入間隔,默認30000ms;

  6、maxFileSize,data文件大小的上限,用戶設置的和1623195647 之間較小的那個;

  7、最少需要多少空間minimumRequiredSpace,max((用戶配置的,500M),1M);

  8、useLogReplayV1,默認false;

  9、useFastReplay,默認false;

  10、encryptionActiveKey,加密密鑰別名默認為null;

  11、encryptionContext加密配置信息;

  12、encryptionCipherProvider加密密碼提供者,缺省值為null

  13、encryptionKeyProviderName,加密密鑰提供者,缺省值為null;

  14、queueRemaining,隊列是否有剩余空間信號量,初始化容量為capacity;

  15、設置Log log的檢查間隔checkpointInterval和maxFileSize最大文件大小。

  16、是否新建一個計數器channelCounter。

  二、start()方法。

  1、通過Log.Builder()構建了一個builder對象,並設置了相應的參數,然后log = builder.build(),Log的構造方法會對checkpointDir及logDirs嘗試獲取鎖操作,所以如果存在多個file channel則checkpointDir及logDirs最好配置在多個磁盤下或者多個目錄下,否則只能一個獲得初始化;Log用來將封裝好的FlumeEvent寫入磁盤並將指向這些event的指針存入一個內存隊列queue。會創建一個線程工作內容就是每隔checkpointInterval毫秒,默認30s寫一次檢查點log.writeCheckpoint(),會將checpoint、inflightTakes、inflightPuts都刷新至磁盤,會先后將inflightPuts、inflightTakes、checkpoint.meta重建,更新checkpoint文件並刷新至磁盤,這些文件都在checkpointDir目錄下;更新log-ID.meta文件;同時肩負起刪除log文件及其對應的meta文件的責任。

  2、log.replay(),一旦一個Log對象被創建,則需要調用replay()方法使用queue最新的檢查點來調整磁盤上的write ahead log。會獲取最大的fileID;然后讀取log文件根據record的類型進行相應的操作,進行恢復;遍歷所有的data目錄,然后roll(index)創建LogFile.Writer(空的);然后將queue刷新到相關文件。

  3、 open = true,表示channel打開;

  4、depth = getDepth(),FlumeEventQueue的大小,然后需要判斷一下queueRemaining是否有足夠的剩余量queueRemaining.tryAcquire(depth);

  5、如果open==true,計數器開始工作。

  三、createTransaction()方法主要是構造一個FileBackedTransaction對象用來直接操作channel,並返回。

  四、stop()停止channel,清理數據。

  五、靜態類FileBackedTransaction extends BasicTransactionSemantics。類似可參考memory channel,必須要實現doTake()、doCommit()、doRollback()、doPut()四個方法。put和take不能同時操作。

  1、doPut(Event event)方法,該方法source會通過transaction.put()方法調用。檢查LinkedBlockingDeque<FlumeEventPointer> putList是否有剩余空間(putList.remainingCapacity() == 0);檢查queue是否有剩余空間,如果沒有則等待keepAlive秒(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)),獲取一個許可;獲取log的共享鎖;FlumeEventPointer ptr = log.put(transactionID, event)將event寫入數據文件(是RandomAccessFile),transactionID指的是每次事務的編號,且都是在上一次基礎之上加一而來,每個event都要調用put(Event)方法,該方法會獲取要寫入的數據文件的目錄(配置文件中可以配置多個data文件目錄,這里會依據transactionID輪訓式的向所有的目錄寫數據),由於start()方法中有log.replay()方法,該方法會遍歷所有的data目錄並roll(index)創建LogFile.Writer,logFiles.get(logFileIndex).getUsableSpace()不會為0,檢查是否剩余空間夠,然后獲取transactionID對應文件的寫LogFile.Writer(其實是其子類LogFileV3.Writer),如果沒有則調用方法 roll(logFileIndex, buffer)創建一個LogFileV3.Writer,放入logFiles(這個維持着每個data目錄對應着一個正在活動的可以用於寫的文件)可能會根據buffer的大小滾動文件因為單個文件有最大限制,LogFileV3.Writer的構造方法會在這個data目錄創建一個meta文件,寫入一些基本數據,FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer)會把event封裝成的ByteBuffer,通過LogFile.Writer.write(buffer)方法寫入磁盤文件(buffer會再次被封裝,最終封裝成的格式順序是:OP_RECORD、buffer.limit()、buffer。buffer的內容是(RecordType(這是1)、TransactionID、LogWriteOrderID、event.getHeaders、event.getBody())),返回Pair.of(getLogFileID(), offset);ptr由兩部分組成:fileID和在這個文件中的偏移量offset。fileID就是Log.nextFileID,不管有幾個data目錄,始終根據這個變量設置文件編號(文件名的后面的編號);putList.offer(ptr)加入到putList之中;queue.addWithoutCommit(ptr, transactionID)將這個方法在每個put操作中必須要被調用,確保檢查點之后事務的提交,它會調用inflightPuts.addEvent(transactionID, e.toLong())此時e.toLong()=fileID+offset,addEvent方法會執行inflightEvents.put(transactionID, pointer)、inflightFileIDs.put(transactionID,FlumeEventPointer.fromLong(pointer).getFileID())、syncRequired = true,inflightEvents中存放的就是<transactionID,fileID+offset>而inflightFileIDs存放的是<transactionID,fileID>;success = true表示put成功;log.unlockShared()解除共享鎖。put事件並未刷新至磁盤文件,因為並沒有commit,commit操作會導致刷新至磁盤的操作。queue中累計的數量不能超過capacity,超過就會等待一定時間后異常。

  2、doTake()方法,該方法sink會通過transaction.take()方法調用。檢查LinkedBlockingDeque<FlumeEventPointer> takeList時刻有剩余空間;然后獲取共享鎖;ptr = queue.removeHead(transactionID)取出頭部的數據,ptr的內容<fileID, offset>,頭部是邏輯上的0位置,但是物理上的頭位置會一邊take一邊變化,是從checkpoint中取出的數據,期間會inflightTakes.addEvent(transactionID, value)將數據緩存在inflightTakes之中;然后放入takeList.offer(ptr);log.take(transactionID, ptr)會封裝數據(buffer會再次被封裝,最終封裝成的格式順序 是:OP_RECORD、buffer.limit()、buffer。buffer的內容是(RecordType(這是1)、 TransactionID、LogWriteOrderID、fileID、offset))然后寫入緩存等待commit刷新至磁盤;event = log.get(ptr)是從data文件中取出數據event;最后釋放共享鎖。take操作queue.removeHead(transactionID)會從overwriteMap或者內存映射elementsBuffer中取出對應的head位置數據。

  3、doCommit()方法,該方法source和sink會通過transaction.commit()方法調用。首先獲取takeList和putList的大小;putList和takeList不能同時都>0,其中有一個得是==0;如果putList>0,獲取共享鎖,log.commitPut(transactionID)會調用Log.commit(long transactionID, short type)方法把commit操作封裝成一個ByteBuffer buffer(最終封裝成的格式順序是:OP_RECORD、 buffer.limit()、buffer。buffer的內容是(RecordType(這是4)、TransactionID、 LogWriteOrderID、type))寫入數據文件,並刷新至磁盤文件,此刷新也會將這次的put或者take中的所有事件寫入磁盤文件;然后是將所有putList中的數據放入queue中queue.addTail(putList.removeFirst())當添加第一個時會從checkpointfile的最后一個位置開始先寫入overwriteMap(邏輯位置轉為物理位置)中,后續會再從0開始循環寫入overwriteMap,take操作也會從最后一個位置取會先檢查overwriteMap中有無對應的數據,沒有就再檢查checkpoint的內存映射elementsBuffer中有無(控制take位置的是head位置,控制put位置的是size),每次更新檢查點時都會把overwriteMap寫入內存映射elementsBuffer中並刷新至磁盤文件checkpoint;queue.completeTransaction(transactionID)會執行清除操作即如果inflightPuts和inflightTakes執行其一,如果inflightPuts包含transactionID則清空inflightPuts,否則清空inflightTakes;然后解鎖。如果takes>0,則獲取共享鎖,log.commitTake(transactionID)會進行封裝寫入data文件,commit中的類型標記除了自己的表示還有要提交類型的標記這里是TAKE,上面是PUT;queue.completeTransaction(transactionID)和上面的一樣;解除共享鎖;queueRemaining.release(takes)釋放。最后將putList和takeList清空。

  4、doRollback()方法該方法source和sink會通過transaction.rollback()方法調用。首先會獲取takeList和putList的大小;然后獲取共享鎖;如果takes>0,

並且puts==0,將putList中的所有有數據queue.addHead(takeList.removeLast()),addHead操作和聲明的addTail操作相似,只不過是要在調用add(int index, long value)

方法時index是0,會插入到第一個位置;清空putList、takeList;queue.completeTransaction(transactionID)上面已經講過;log.rollback(transactionID)會調用Log.rollback(long transactionID, short type)方法把commit操作封裝成一個ByteBuffer buffer(最 終封裝成的格式順序是:OP_RECORD、 buffer.limit()、buffer。buffer的內容是(RecordType(這是3)、TransactionID、 LogWriteOrderID))寫入緩存中;釋放共享鎖;queueRemaining.release(puts)釋放許可。(這一段的格式亂了,這編輯器,我屮艸芔茻!!無語了。。。不自動換行了。。手動換的行。)

  1中的put操作將寫入log文件的指針添加進了緩存putList中;2中的take操作從緩存中的取出指針,然如takeList中,然后寫入log文件,從log文件中獲取數據還原為event;3中的commit操作無論是對put的還是對take的都會講commit信息寫入log文件,都會清理queue中的緩存(inflightPuts和inflightTakes),如果對put還要將putList中的所有數據添加進queue的隊尾,實際上是overwriteMap中,如果是對take則要釋放queueRemaining的takes個許可量,還要清空putList、takeList;4中的rollback操作會將takeList中所有數據放入queue的頭部,再清空putList、takeList,再清空queue中的緩存(inflightPuts和inflightTakes),將rollback信息寫入log,要釋放queueRemaining的puts個許可量。

 

  ps:

  1、data/log-ID,這種類型的文件存放的是put、take、commit、rollback的操作記錄及數據。

  2、checkpoint/checkpoint存放的是event在那個data文件logFileID,的什么位置offset等信息。

  2、checkpoint/inflightTakes存放的是事務take的緩存數據,每隔段時間就重建文件。內容:1、16字節是校驗碼;2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...

  3、checkpoint/inflightPuts存放的是事務對應的put緩存數據,每隔段時間就重建文件。內容:1、16字節是校驗碼;2、transactionID1+eventsCount1+eventPointer11+eventPointer12+...;3、transactionID2+eventsCount2+eventPointer21+eventPointer22+...

  4、checkpoint/checkpoint.meta主要存儲的是logfileID及對應event的數量等信息。

  5、data/log-ID.meta,主要記錄log-ID下一個寫入位置以及logWriteOrderID等信息。

    6、每個data目錄里data文件保持一般不超過2個,會刪除文件編號比當前正在使用的文件編號小的數據文件。

  7、putList和takeList是緩存存儲的是相應的FlumeEventPointer,但是inflightTakes和inflightPuts其實也是緩存存儲的也是相應的信息,只不過比兩者多存一些信息罷了,功能重合度很高,為什么會這樣呢?我想是一個只能在內存,一個可以永久存儲(當然是不斷重建的),后者可以用來進行flume再啟動的恢復。

  file channel太過復雜了,比配置的文件的加載復雜更多,涉及的知識非常多,還不能一下子就消耗了。。。。大體的已經了解了,剩下的都是細節!!后續會再慢慢咀嚼!!爭取吃透file。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM