Flume(3)source組件之NetcatSource使用介紹


一、概述:

本節首先提供一個基於netcat的source+channel(memory)+sink(logger)的數據傳輸過程。然后剖析一下NetcatSource中的代碼執行邏輯。

二、flume配置文件:

下面的配置文件netcat.conf中定義了source使用netcat,它會監聽44444端口。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = locahost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

三、命令行啟動:

切換到flume的安裝目錄下,執行下述代碼:

bin/flume-ng agent --conf conf --conf-file study/netcat.conf --name a1 -Dflume.root.logger=INFO,console

四、利用telnet來直接訪問且發送數據:

在命令行中鍵入以下代碼:其中node5是flume所在的主機名。

telnet node5 44444

在telnet命令行輸入信息:

image

在flume的啟動界面就會輸出接收到的數據:

image

由此,使用netcat作為source的功能即演示成功了。

除了利用telnet來發送數據以外,也可以自己實現一個socket編程來向node5主機的44444端口發送數據。

當然,我們發現了一個問題,明明在telnet中發送的數據是:This is flume netcat source!,接收到的數據卻是This is flume ne。數據不完整。后面通過分析一下源碼,看能不能找到原因。

出現上述的顯示不完整的情況,是因為我們使用的是LoggerSink組件,它內部的實現邏輯導致了僅打印了16個字符。

image

 

五、agent啟動的基本步驟:

image

六、NetcatSource源碼剖析:

該類的全路徑為org.apache.flume.source.NetcatSource,繼承了AbstractSource 並實現了Configurable接口。

由於NetcatSource一個監聽服務,所以它是通過EventDrivenSourceRunner來啟動一個線程,調用其start()方法的。

image

首先在正式啟動source之前,會首先執行configure方法,初始化配置文件中提供的參數:bind\port\ack-every-event\max-line-length。

start()方法如下:

image

該方法內創建一個AcceptHandler內部類實例,實際的監聽工作就是在該類的run方法中來實現的。

image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM