常用:去官網一頓擼,有很多哦
最右側有目錄導航喲=================>>>>>>>>
一、Source:
avro
多用於復制(a1.sources.r1.selector.type = replicating)、多路復用(a1.sources.r1.selector.type = multiplexing)、負載均衡、故障轉移(a1.sinkgroups.g1.processor.type = failover)
exec
一般監測啟動一類的,hive.log啟動日志
netcat
監控端口數據
spooling directory
監測目錄下的多個文件,上傳完成后文件結尾為COMPLETED,但是在上傳完成后不能修改文件,否則會報錯
taildir
描述:
實時監測某些文件,支持持續修改文件,支持正則表達式
Taildir Source維護了一個json格式的position File,其會定期的往position File中更新每個文件讀取到的最新的位置,因此能夠實現斷點續傳)
二、Channel:
Memory Channel
Kafka Channel(直接干到Kafka)
File Channel
三、Sink:
avro
發送到下游的avro source?都可以啊
File Roll
Stores events on the local filesystem(將事件存儲在本地文件系統上)
HBase
hdfs
輸出到hdfs
logger
輸出到控制台
四、案例
1. 監控端口數據
案例需求:使用Flume監聽一個端口,收集該端口數據,並打印到控制台。
( source:netcat channel:memory channel sink:logger)
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.sinks.k1.type = hdfs a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 #設置文件上傳到hdfs的路徑 a1.sinks.k1.hdfs.path = hdfs://Linux201:8020/flume/hive-events/%y-%m-%d/%H #設置文件前綴 a1.sinks.k1.hdfs.filePrefix = logs- #設置每個文件每60s滾動 a1.sinks.k1.hdfs.rollInterval = 60 #設置每個文件到達128M時滾動 a1.sinks.k1.hdfs.rollSize = 134217728 #設置每多少個event就滾動一個文件(此設置就是不依據event) a1.sinks.k1.hdfs.rollCount = 0 #設置每多少個event就寫入hdfs(不是文件滾動的意思) a1.sinks.k1.hdfs.batchSize = 100 #設置文件格式,此格式不會壓縮(但是支持壓縮?) a1.sinks.k1.hdfs.fileType = DataStream #設置時間戳四舍五入 a1.sinks.k1.hdfs.round = true #設置多長時間創建一個文件夾 a1.sinks.k1.hdfs.roundValue = 1 #設置四舍五入的值的單位 a1.sinks.k1.hdfs.roundUnit = hour #設置使用本地時間,而不是事件標頭中的時間戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true
2. 實時監控單個追加文件
案例需求:實時監控Hive日志,並上傳到HDFS中
(source: exec channel:memory sink: hdfs)
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.sinks.k1.type = hdfs a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 #設置文件上傳到hdfs的路徑 a1.sinks.k1.hdfs.path = hdfs://Linux201:8020/flume/hive-events/%y-%m-%d/%H #設置文件前綴 a1.sinks.k1.hdfs.filePrefix = logs- #設置每個文件每60s滾動 a1.sinks.k1.hdfs.rollInterval = 60 #設置每個文件到達128M時滾動 a1.sinks.k1.hdfs.rollSize = 134217728 #設置每多少個event就滾動一個文件(此設置就是不依據event) a1.sinks.k1.hdfs.rollCount = 0 #設置每多少個event就寫入hdfs(不是文件滾動的意思) a1.sinks.k1.hdfs.batchSize = 100 #設置文件格式,此格式不會壓縮(但是支持壓縮?) a1.sinks.k1.hdfs.fileType = DataStream #設置時間戳四舍五入 a1.sinks.k1.hdfs.round = true #設置多長時間創建一個文件夾 a1.sinks.k1.hdfs.roundValue = 1 #設置四舍五入的值的單位 a1.sinks.k1.hdfs.roundUnit = hour #設置使用本地時間,而不是事件標頭中的時間戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true
3. 實時監控目錄下多個新文件
案例需求:使用Flume監聽整個目錄的新文件,並上傳至HDFS
(source: spooldir channel: memory sink: hdfs)
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /opt/module/flume/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true #忽略所有以.tmp結尾的文件,不上傳 a3.sources.r3.ignorePattern = ([^ ]*\.tmp) # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://Linux201:8020/flume/upload/%Y%m%d/%H #上傳文件的前綴 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照時間滾動文件夾 a3.sinks.k3.hdfs.round = true #多少時間單位創建一個新的文件夾 a3.sinks.k3.hdfs.roundValue = 1 #重新定義時間單位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地時間戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #積攢多少個Event才flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 100 #設置文件類型,可支持壓縮 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一個新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #設置每個文件的滾動大小大概是128M a3.sinks.k3.hdfs.rollSize = 134217700 // ? 134217728 #文件的滾動與Event數量無關 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
4. 實時監控目錄下的多個追加文件
案例需求:使用Flume監聽整個目錄的實時追加文件,並上傳至HDFS
(source: taildir channel: memory sink: hdfs)
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = TAILDIR a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json a3.sources.r3.filegroups = f1 f2 a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.* a3.sources.r3.filegroups.f2 = /opt/module/flume/files/.*log.* # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://Linux201:8020/flume/upload2/%Y%m%d/%H #上傳文件的前綴 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照時間滾動文件夾 a3.sinks.k3.hdfs.round = true #多少時間單位創建一個新的文件夾 a3.sinks.k3.hdfs.roundValue = 1 #重新定義時間單位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地時間戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #積攢多少個Event才flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 100 #設置文件類型,可支持壓縮 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一個新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #設置每個文件的滾動大小大概是128M a3.sinks.k3.hdfs.rollSize = 134217700 //? 134217728 #文件的滾動與Event數量無關 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
5. 復制和多路復用
案例需求
使用Flume-1監控文件變動,Flume-1將變動內容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1將變動內容傳遞給Flume-3,Flume-3負責輸出到Local FileSystem。
flume1:(source: exec channel: memory sink: avro)
flume2:(source: avro channel: memory sink: hdfs)
flume3:(source: avro channel: memory sink: File Roll Sink)
flume1:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 將數據流復制給所有channel a1.sources.r1.selector.type = replicating # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log a1.sources.r1.shell = /bin/bash -c # Describe the sink # sink端的avro是一個數據發送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = Linux201 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = Linux201 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
flume2:
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source # source端的avro是一個數據接收服務 a2.sources.r1.type = avro a2.sources.r1.bind = Linux201 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://Linux201:8020/flume2/%Y%m%d/%H #上傳文件的前綴 a2.sinks.k1.hdfs.filePrefix = flume2- #是否按照時間滾動文件夾 a2.sinks.k1.hdfs.round = true #多少時間單位創建一個新的文件夾 a2.sinks.k1.hdfs.roundValue = 1 #重新定義時間單位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地時間戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #積攢多少個Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 100 #設置文件類型,可支持壓縮 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一個新的文件 a2.sinks.k1.hdfs.rollInterval = 600 #設置每個文件的滾動大小大概是128M a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滾動與Event數量無關 a2.sinks.k1.hdfs.rollCount = 0 # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume3:
注意:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,並不會創建新的目錄。
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = Linux201 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/datas/flume3 # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
6. 負載均衡和故障轉移
案例需求:使用Flume1監控一個端口,其sink組中的sink分別對接Flume2和Flume3,采用FailoverSinkProcessor,實現故障轉移的功能。
flume1:(source: netcat channel: memory sink: avro)
flume2:(source: avro channel: memory sink: logger)
flume3:(source: avro channel: memory sink: logger)
flume1:
# Name the components on this agent a1.sources = r1 a1.channels = c1 a1.sinkgroups = g1 a1.sinks = k1 k2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = Linux201 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = Linux201 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
flume2:
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = Linux201 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = logger # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume3:
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = Linux201 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
使用jps -ml查看Flume進程。
7. 聚合
案例需求:
Linux201上的Flume-1監控文件/opt/module/group.log,
Linux202上的Flume-2監控某一個端口的數據流,
Flume-1與Flume-2將數據發送給Linux203上的Flume-3,Flume-3將最終數據打印到控制台
flume1:(source: exec channel: memory sink: avro)
flume2:(source: netcat channel: memory sink: avro)
flume3:(source: avro channel: memory sink: logger)
flume1:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/group.log a1.sources.r1.shell = /bin/bash -c # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = Linux203 a1.sinks.k1.port = 4141 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
flume2:
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = netcat a2.sources.r1.bind = Linux202 a2.sources.r1.port = 44444 # Describe the sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = Linux203 a2.sinks.k1.port = 4141 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
flume3:
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = Linux203 a3.sources.r1.port = 4141 # Describe the sink # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
五、自定義Interceptor
案例需求
使用Flume采集服務器本地日志,需要按照日志類型的不同,將不同種類的日志發往不同的分析系統。此案例中,將字母開頭和數字開頭的數據發送到不同的控制台中
需求分析
在實際的開發中,一台服務器產生的日志類型可能有很多種,不同類型的日志可能需要發送到不同的分析系統。此時會用到Flume拓撲結構中的Multiplexing結構,Multiplexing的原理是,根據event中Header的某個key的值,將不同的event發送到不同的Channel中,所以我們需要自定義一個Interceptor,為不同類型的event的Header中的value賦予不同的值。
在該案例中,我們以端口數據模擬日志,以數字(單個)和字母(單個)模擬不同類型的日志,我們需要自定義interceptor區分數字和字母,將其分別發往不同的分析系統(Channel)。
實現步驟
(1)創建一個maven項目,並引入以下依賴
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>
(2)定義CustomInterceptor類並實現Interceptor接口
import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; /** * 根據event的首字符是字母還是數字,添加不同header */ public class MyInterceptor implements Interceptor { /** * 初始化方法 */ public void initialize() { } /** * 對一個event進行更改:我們要插入header * * @param event 要更改的event * @return 更改完的event */ public Event intercept(Event event) { //取出event的header和body Map<String, String> headers = event.getHeaders(); byte[] body = event.getBody();//body是二級制數組 //根據Body首字母不同,進行不同處理 String line = null; try { line = new String(body, "utf-8");//將二進制數組轉化為字符串,如果不加utf-8,默認使用當前環境的編解碼方式 } catch (UnsupportedEncodingException e) { e.printStackTrace(); } char first = line.charAt(0); if ((first >= 'a' && first <= 'z') || (first >= 'A' && first <= 'Z')) { //是字母 headers.put("AAA", "XXX"); } else if (first >= '0' && first <= '9') { //是數字 headers.put("AAA", "YYY"); } else { //不是字母不是數字 headers.put("AAA", "ZZZ"); } return event; } /** * 批量對一批事件進行更改 * * @param events 要更改的一批事件 * @return 更改完的事件 */ public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } /** * 關閉資源的方法 */ public void close() { } /** * 用來構建Interceptor實體的類 */ public static class MyBuilder implements Interceptor.Builder { //構建方法 public Interceptor build() { return new MyInterceptor(); } /** * 配置方法 * * @param context 配置文件 */ public void configure(Context context) { } } }
將其打包放入flume中的lib包下
(3)編輯flume配置文件(a1.sources.r1.interceptors.i1.type = MyInterceptor$MyBuilder使用的全類名)
為Linux201上的Flume1配置1個netcat source,1個sink group(3個avro sink),並配置相應的ChannelSelector和interceptor
a1.sources = r1 a1.sinks = k1 k2 k3 a1.channels = c1 c2 c3 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = MyInterceptor$MyBuilder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = AAA a1.sources.r1.selector.mapping.XXX = c1 a1.sources.r1.selector.mapping.YYY = c2 a1.sources.r1.selector.mapping.ZZZ = c3 a1.sinks.k1.type = avro a1.sinks.k1.hostname = Linux201 a1.sinks.k1.port = 4141 a1.sinks.k2.type=avro a1.sinks.k2.hostname = Linux202 a1.sinks.k2.port = 4242 a1.sinks.k3.type=avro a1.sinks.k3.hostname = Linux203 a1.sinks.k3.port = 4343 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000 a1.channels.c3.transactionCapacity = 100 a1.sources.r1.channels = c1 c2 c3 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3
為Linux201上的Flume2配置一個avro source和一個logger sink
a2.sources = r1 a2.sinks = k1 a2.channels = c1 a2.sources.r1.type = avro a2.sources.r1.bind = Linux201 a2.sources.r1.port = 4141 a2.sinks.k1.type = logger a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 a2.sinks.k1.channel = c1 a2.sources.r1.channels = c1
為Linux202上的Flume3配置一個avro source和一個logger sink
a3.sources = r1 a3.sinks = k1 a3.channels = c1 a3.sources.r1.type = avro a3.sources.r1.bind = Linux202 a3.sources.r1.port = 4242 a3.sinks.k1.type = logger a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 a3.sinks.k1.channel = c1 a3.sources.r1.channels = c1
為Linux203上的Flume4配置一個avro source和一個logger sink
a4.sources = r1 a4.sinks = k1 a4.channels = c1 a4.sources.r1.type = avro a4.sources.r1.bind = Linux203 a4.sources.r1.port = 4343 a4.sinks.k1.type = logger a4.channels.c1.type = memory a4.channels.c1.capacity = 1000 a4.channels.c1.transactionCapacity = 100 a4.sinks.k1.channel = c1 a4.sources.r1.channels = c1
(4)在Linux201(啟兩flume),Linux202,Linux203上啟動flume進程,注意先后順序
Linux201: bin/flume-ng agent -c conf/ -n a1 -f job/flume-file-flume bin/flume-ng agent -c conf/ -n a2 -f job/flume-flume-console1 -Dflume.root.logger=INFO,console Linux202: bin/flume-ng agent -c conf/ -n a3 -f job/flume-flume-console2 -Dflume.root.logger=INFO,console Linux203: bin/flume-ng agent -c conf/ -n a4 -f job/flume-flume-console3 -Dflume.root.logger=INFO,console
(5)在Linux201使用netcat向localhost:44444發送字母和數字
(6)觀察Linux201、Linux202和Linux203打印的日志
六、自定義Source
官方也提供了自定義source的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source根據官方說明自定義MySource需要繼承AbstractSource類並實現Configurable和PollableSource接口。
使用場景:讀取MySQL數據或者其他文件系統。
1、需求:
使用flume接收數據,並給每條數據添加前綴,輸出到控制台。前綴可從flume配置文件中配置。
2. 實現相應方法:
getBackOffSleepIncrement()//失敗后每次遞增的時間
getMaxBackOffSleepInterval()//最多多長時間后就不source了
configure(Context context)//初始化context(讀取配置文件內容)
process()//獲取數據封裝成event並寫入channel,這個方法將被循環調用。
3. 編碼
(1)導入pom依賴
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> </dependencies>
(2)編寫代碼
import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; /** * @version 1.0 * @Author: zls * @Date: 2020/10/10 12:40 * @Desc: */ public class MySource extends AbstractSource implements Configurable, PollableSource { //前綴 private String prefix; //間隔 private Long interval; /** * 框架調用該方法來拉取數據並處理 * * @return 事件處理的狀態 * @throws EventDeliveryException */ public Status process() throws EventDeliveryException { Status status = null; //獲取ChannelProcessor ChannelProcessor channelProcessor = getChannelProcessor(); try { //處理事件 Event e = getSomeData(); channelProcessor.processEvent(e); status = Status.READY; } catch (Exception e) { //處理異常 e.printStackTrace(); status = Status.BACKOFF; } return status; } /** * 對於一個自定義源,獲取數據的方式 * * @return 獲取到的事件 */ private Event getSomeData() throws InterruptedException { Event event = new SimpleEvent(); event.setBody((prefix + "Test content").getBytes()); Thread.sleep(interval); return event; } /** * 如果出現異常,停止調用Source的遞增時間 * * @return 遞增時間 */ public long getBackOffSleepIncrement() { return 1000; } /** * 停止調用Source的最大時間 * * @return */ public long getMaxBackOffSleepInterval() { return 10000; } /** * 定義方法,可以用來配置我們的自定義Source * * @param context 配置文件 */ public void configure(Context context) { prefix = context.getString("XXX", "DD"); interval = context.getLong("YYY", 500L); } }
4. 測試
(1)打包
將寫好的代碼打包,並放到flume的lib目錄下。
(2)編寫配置文件(a1.sources.r1.type = MySource使用的全類名)
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = MySource a1.sources.r1.XXX = Myprefix a1.sources.r1.YYY = 1000 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(3)開啟flume,測試一下
[zls@Linux201 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
結果:每隔1S輸出以下數據
七、 自定義Sink
官方提供了自定義sink的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink根據官方說明自定義MySink需要繼承AbstractSink類並實現Configurable接口。
實現相應方法:
configure(Context context)//初始化context(讀取配置文件內容)
process()//從Channel讀取獲取數據(event),這個方法將被循環調用。
使用場景:讀取Channel數據寫入MySQL或者其他文件系統。
需求:
使用flume接收數據,並在Sink端給每條數據添加前綴和后綴,輸出到控制台。前后綴可在flume任務配置文件中配置。
編碼
import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import java.io.IOException; /** * @version 1.0 * @Author: zls * @Date: 2020/10/10 15:07 * @Desc: 將所有拿到的數據打印到控制台 */ public class MySink extends AbstractSink implements Configurable { private String prefix; private String suffix; /** * Sink會從channel中拉取數據並處理 * * @return * @throws EventDeliveryException */ public Status process() throws EventDeliveryException { Status status = null; //獲取Sink對應的Channel Channel channel = getChannel(); //事務 Transaction transaction = channel.getTransaction(); transaction.begin(); try { //做事情 //1. 從channel中拿數據 Event take = channel.take(); //2. 將數據寫入到對應的sink storeSomeData(take); status = Status.READY; transaction.commit(); } catch (Exception e) { //處理異常 status = Status.BACKOFF; transaction.rollback(); } finally { transaction.close(); } return status; } /** * 將event數據進行處理(儲存或者消費) * * @param take 拿到的數據 */ private void storeSomeData(Event take) throws IOException, InterruptedException { if (take != null) { System.out.print(prefix); System.out.write(take.getBody()); System.out.println(suffix); } else { Thread.sleep(5000L); } } /** * 配置方法:用來配置我們的sink * * @param context */ public void configure(Context context) { prefix = context.getString("XXX", "DPre"); suffix = context.getString("YYY", "DSuf"); } }
測試
(1)打包
將寫好的代碼打包,並放到flume的lib目錄下。
(2)配置文件
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 a1.sinks.k1.type = MySink a1.sinks.k1.XXX = zls: a1.sinks.k1.YYY = :zls a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(3)開啟任務
[zls@Linux201 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console [zls@Linux201 ~]$ nc localhost 44444 1 OK hello OK
結果