org.apache.flume.sink.RollingFileSink 這個類比較簡單。
source的種類有兩種:一種是PollableSource;另外一種是EventDrivenSource。前者“必須有它自己的callback機制,該機制用於捕獲新數據並將數據存儲到通道中”,后者“不是由其自身的線程驅動”。在自定義source時,前者必須要實現process方法,通過調用這個方法將event放入channel中;后者沒有這個方法,可以自由發揮。
sink不像source,只有一種sink,需要extends AbstractSink implements Configurable。
sink用於從通道中提取事件,並將事件傳送到流中的下個flume或者將事件存儲到一個外部數據倉庫。一個sink與一個或者多個通道相連,在flume屬性文件中配置。
有一個SinkRunner實例與各個配置好的sink相連。當flume框架調用 SinkRunner.start(),一個新的線程產生來驅動該sink(使用SinkRunner.PollingRunner as the thread’s Runnable)。該線程管理這個sink的生命周期。sink需要實現start()和stop()方法,這兩個方法是LifecycleAware接口的一部分。Sink.start()方法會初始化sink,讓該sink可以將事件傳輸到下個目的地。Sink.process()方法是一個核心操作,它將事件從通道中提取並傳輸事件。Sink.stop()方法為必要的清理方法(例如釋放資源)。sink的實現還需要實現Configurable 接口,用於處理其自身的參數配置。
RollingFileSink類中需要理解的有start()方法和process()方法。
一、start()方法
主要作用在於啟動了一個線程,用來每隔rollInterval秒就創建一個新的文件(程序啟動的時間戳+文件編號為名稱,配置文件動態修改后,時間戳會變動)。並且通知process可以將正在寫的這個文件關閉,准備寫新的文件。
需要注意這個變量shouldRotate,初始的時候即start之前,是false的,執行start之后由於線程啟動之后首次運行需要等待rollInterval秒,所以這個時間段shouldRotate一直是false,在此期間process方法會一直向一個文件寫數據,直到shouldRotate=true,也就是線程每隔rollInterval秒運行之后(shouldRotate會設置為true,並且會獲得下一個寫入的文件名),這會周期性的運行。
二、process方法
這個方法會一直被重復調用。
它會首先判斷是否需要關閉當前的文件shouldRotate=true就會關閉文件。並且重新
shouldRotate=false;//可以對下一個文件一直寫
pathController.rotate();//表示當前的文件寫入完畢,並且可以准備寫入下一個文件
File currentFile = pathController.getCurrentFile();//在pathController.rotate()之后該方法就可以獲取下一個將要寫入的文件名,
//start方法中也有這句代碼,目測start中的這句代碼沒啥作用,因為shouldRotate = true會導致文件的滾動
rollService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
logger.debug("Marking time to rotate file {}",
pathController.getCurrentFile());//這句沒啥作用。。。。。。。。
shouldRotate = true; //表示當前文件寫滿,准備寫下一個文件
}
}, rollInterval, rollInterval, TimeUnit.SECONDS);
serializer默認是org.apache.flume.serialization.BodyTextEventSerializer
接下來就是向channel發送數據了。。。
Status result = Status.READY;
transaction.begin();
event = channel.take();
//自己的處理邏輯,本類中就是序列化到文件中serializer.write(event)
transaction.commit();
transaction.rollback();
transaction.close();
這個比較簡單,可以用來熟悉sink的處理流程,以及學習如何自定義sink。