flume中的一些重要概念:
1. flume Event:flume 事件,被定義為一個具有有效荷載的字節數據流和可選的字符串屬性集。(json格式的字符串,由headers和body兩部分組成)
2. flume Agent:flume 代理,是一個進程承載從外部源事件流到下一個目的地的過程。包含source channel和sink。
3. Source:數據源,消耗外部傳遞給他的事件,外部源將數據按照flume Source 能識別的格式將Flume 事件發送給flume Source。
4. Channel:數據通道,是一個被動的存儲,用來保持事件,直到由一個flume Sink消耗。
5. Sink : 數據匯聚點,代表外部數據存放位置。發送flume event到指定的外部目標.
配置文件模版:在 conf目錄里面新建一個文件,文件名自定義
#example.conf:單節點Flume配置
#命名Agent a1的組件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#描述/配置Source
a1.sources.r1.type = netcat Source 類型(還有其他很多)
a1.sources.r1.bind = 0.0.0.0 綁定ip
a1.sources.r1.port = 44444 端口號
#描述Sink
a1.sinks.k1.type = logger sink類型
#描述內存Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#為Channle綁定Source和Sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意:
(1)一個配置文件中可以配置多個Agent,一個Agent中可以包含多個Source、Sink、Channel。
(2)一個Source 可以綁定到多個通道,但一個Sink只能綁定到一個通道
Source類型:
Avro Source 序列化
Exec Source 命令輸出作為源
#描述/配置Source
a1.sources.r1.type = exec
a1.sources.r1.command = ping 192.168.242.102
Spooling Directory Source
這個Source允許你將將要收集的數據放置到"自動搜集"目錄中。這個Source將監視該目錄,並將解析新文件的出現。事件處理邏輯是可插拔的,當一個文件被完全讀入通道,它會被重命名或可選的直接刪除。
要注意的是,放置到自動搜集目錄下的文件不能修改,如果修改,則flume會報錯。另外,也不能產生重名的文件,如果有重名的文件被放置進來,則flume會報錯
#描述/配置Source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/home/park/work/apache-flume-1.6.0-bin/mydata
NetCat Source 監聽一個指定端口,並將接收到的數據的每一行轉換為一個事件
Sequence Generator Source 簡單的序列發生器,不斷的產生事件,值是從0開始每次遞增1。
HTTP Source 接受HTTP的GET和POST請求作為Flume的事件,其中GET方式應該只用於試驗(默認jsonhandler,文件上傳 blobhandler)
#描述/配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 66666
主要用來進行測試
通過flume的工具啟動agent
$ bin/flume-ng agent --conf ../conf --conf-file ../conf/example.conf --name a1 -Dflume.root.logger=INFO,console
Interceptor
#描述/配置Source
a1.sources.r1.type = http
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1 i2 i3 i4 i5 i6
a1.sources.r1.interceptors.i1.type = timestamp 時間戳攔截器
#ip是攔截者所在機器的ip
a1.sources.r1.interceptors.i2.type = host host攔截器
a1.sources.r1.interceptors.i3.type = static 靜態攔截器
a1.sources.r1.interceptors.i3.key = country
a1.sources.r1.interceptors.i3.value = China
a1.sources.r1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder UUid攔截器
a1.sources.r1.interceptors.i5.type = search_replace 查找替換攔截器
#將所有的數字替換成*
a1.sources.r1.interceptors.i5.searchPattern = [0-9]
a1.sources.r1.interceptors.i5.replaceString = *
a1.sources.r1.interceptors.i6.type = regex_filter 正則攔截器
#只要是a開頭的拋棄
a1.sources.r1.interceptors.i6.regex = ^a.*
a1.sources.r1.interceptors.i6.excludeEvents = true
Processor
......
#描述Sink
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover 失敗重連
a1.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.g1.processor.priority.k2=10
......
#為Channel綁定Source和Sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1
------------------------------------------------------
#描述Sink 負載均衡方式
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random
