flume 攔截器(interceptor)
1、flume攔截器介紹
攔截器是簡單的插件式組件,設置在source和channel之間。source接收到的事件event,在寫入channel之前,攔截器都可以進行轉換或者刪除這些事件。每個攔截器只處理同一個source接收到的事件。可以自定義攔截器。
2、flume內置的攔截器
2.1 時間戳攔截器
flume中一個最經常使用的攔截器 ,該攔截器的作用是將時間戳插入到flume的事件報頭中。如果不使用任何攔截器,flume接受到的只有message。時間戳攔截器的配置:
參數 |
默認值 |
描述 |
type |
timestamp |
類型名稱timestamp,也可以使用類名的全路徑org.apache.flume.interceptor.TimestampInterceptor$Builder |
preserveExisting |
false |
如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值 |
參數 默認值 描述
type timestamp 類型名稱timestamp,也可以使用類名的全路徑org.apache.flume.interceptor.TimestampInterceptor$Builder
preserveExisting false 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值
source連接到時間戳攔截器的配置:
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i1.preserveExisting=false
2.2 主機攔截器
主機攔截器插入服務器的ip地址或者主機名,agent將這些內容插入到事件的報頭中。事件報頭中的key使用hostHeader配置,默認是host。主機攔截器的配置:
參數 |
默認值 |
描述 |
type |
host |
類型名稱host,也可以使用類名的全路徑org.apache.flume.interceptor.HostInterceptor$Builder |
hostHeader |
host |
事件頭的key |
useIP |
true |
如果設置為false,host鍵插入主機名 |
preserveExisting |
false |
如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值 |
參數 默認值 描述
type host 類型名稱host,也可以使用類名的全路徑org.apache.flume.interceptor.HostInterceptor$Builder
hostHeader host 事件頭的key
useIP true 如果設置為false,host鍵插入主機名
preserveExisting false 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值
source連接到主機攔截器的配置:
a1.sources.r1.interceptors=i2
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.useIP=false
a1.sources.r1.interceptors.i2.preserveExisting=false
2.3 靜態攔截器
靜態攔截器的作用是將k/v插入到事件的報頭中。配置如下
參數 |
默認值 |
描述 |
type |
static |
類型名稱static,也可以使用類全路徑名稱org.apache.flume.interceptor.StaticInterceptor$Builder |
key |
key |
事件頭的key |
value |
value |
key對應的value值 |
preserveExisting |
true |
如果設置為true,若事件中報頭已經存在該key,不會替換value的值 |
參數 默認值 描述
type static 類型名稱static,也可以使用類全路徑名稱org.apache.flume.interceptor.StaticInterceptor$Builder
key key 事件頭的key
value value key對應的value值
preserveExisting true 如果設置為true,若事件中報頭已經存在該key,不會替換value的值
source連接到靜態攔截器的配置:
a1.sources.r1.interceptors = static
a1.sources.r1.interceptors.static.type=static
a1.sources.r1.interceptors.static.key=logs
a1.sources.r1.interceptors.static.value=logFlume
a1.sources.r1.interceptors.static.preserveExisting=false
2.4 正則過濾攔截器
在日志采集的時候,可能有一些數據是我們不需要的,這樣添加過濾攔截器,可以過濾掉不需要的日志,也可以根據需要收集滿足正則條件的日志。配置如下
參數 |
默認值 |
描述 |
type |
REGEX_FILTER |
類型名稱REGEX_FILTER,也可以使用類全路徑名稱org.apache.flume.interceptor.RegexFilteringInterceptor$Builder |
regex |
.* |
匹配除“\n”之外的任何個字符 |
excludeEvents |
false |
默認收集匹配到的事件。如果為true,則會刪除匹配到的event,收集未匹配到的 |
參數 默認值 描述
type REGEX_FILTER 類型名稱REGEX_FILTER,也可以使用類全路徑名稱org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
regex .* 匹配除“\n”之外的任何個字符
excludeEvents false 默認收集匹配到的事件。如果為true,則會刪除匹配到的event,收集未匹配到的
source連接到正則過濾攔截器的配置:
a1.sources.r1.interceptors=i4
a1.sources.r1.interceptors.i4.type=REGEX_FILTER a1.sources.r1.interceptors.i4.regex=(rm)|(kill) a1.sources.r1.interceptors.i4.excludeEvents=false
這樣配置的攔截器就只會接收日志消息中帶有rm 或者kill的日志。
測試案例:
test_regex.conf
# 定義這個agent中各組件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source組件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-001
a1.sources.r1.port = 44444
a1.sources.r1.interceptors=i4
a1.sources.r1.interceptors.i4.type=REGEX_FILTER
#保留內容中出現hadoop或者是spark的字符串的記錄
a1.sources.r1.interceptors.i4.regex=(hadoop)|(spark)
a1.sources.r1.interceptors.i4.excludeEvents=false
# 描述和配置sink組件:k1
a1.sinks.k1.type = logger
# 描述和配置channel組件,此處使用是內存緩存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之間的連接關系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
輸入以下指令:
bin/flume-ng agent --conf conf --conf-file conf/test_regex.conf --name a1 -Dflume.root.logger=INFO,console
發送數據測試:
打印到控制台信息:
只接受到存在hadoop或者spark的記錄,驗證成功!