一、概述:
本節首先提供一個基於netcat的source+channel(memory)+sink(logger)的數據傳輸過程。然后剖析一下NetcatSource中的代碼執行邏輯。
二、flume配置文件:
下面的配置文件netcat.conf中定義了source使用netcat,它會監聽44444端口。
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = locahost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
三、命令行啟動:
切換到flume的安裝目錄下,執行下述代碼:
bin/flume-ng agent --conf conf --conf-file study/netcat.conf --name a1 -Dflume.root.logger=INFO,console
四、利用telnet來直接訪問且發送數據:
在命令行中鍵入以下代碼:其中node5是flume所在的主機名。
telnet node5 44444
在telnet命令行輸入信息:
在flume的啟動界面就會輸出接收到的數據:
由此,使用netcat作為source的功能即演示成功了。
除了利用telnet來發送數據以外,也可以自己實現一個socket編程來向node5主機的44444端口發送數據。
當然,我們發現了一個問題,明明在telnet中發送的數據是:This is flume netcat source!,接收到的數據卻是This is flume ne。數據不完整。后面通過分析一下源碼,看能不能找到原因。
出現上述的顯示不完整的情況,是因為我們使用的是LoggerSink組件,它內部的實現邏輯導致了僅打印了16個字符。
五、agent啟動的基本步驟:
六、NetcatSource源碼剖析:
該類的全路徑為org.apache.flume.source.NetcatSource,繼承了AbstractSource 並實現了Configurable接口。
由於NetcatSource一個監聽服務,所以它是通過EventDrivenSourceRunner來啟動一個線程,調用其start()方法的。
首先在正式啟動source之前,會首先執行configure方法,初始化配置文件中提供的參數:bind\port\ack-every-event\max-line-length。
start()方法如下:
該方法內創建一個AcceptHandler內部類實例,實際的監聽工作就是在該類的run方法中來實現的。







