flume 攔截器(interceptor)
1、flume攔截器介紹
攔截器是簡單的插件式組件,設置在source和channel之間。source接收到的事件event,在寫入channel之前,攔截器都可以進行轉換或者刪除這些事件。每個攔截器只處理同一個source接收到的事件。可以自定義攔截器。
2、flume內置的攔截器
2.1 時間戳攔截器
flume中一個最經常使用的攔截器 ,該攔截器的作用是將時間戳插入到flume的事件報頭中。如果不使用任何攔截器,flume接受到的只有message。時間戳攔截器的配置:
| 參數 |
默認值 |
描述 |
| type |
timestamp |
類型名稱timestamp,也可以使用類名的全路徑org.apache.flume.interceptor.TimestampInterceptor$Builder |
| preserveExisting |
false |
如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值 |
參數 默認值 描述
type timestamp 類型名稱timestamp,也可以使用類名的全路徑org.apache.flume.interceptor.TimestampInterceptor$Builder
preserveExisting false 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值
source連接到時間戳攔截器的配置:
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i1.preserveExisting=false
| a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i1.preserveExisting=false |
2.2 主機攔截器
主機攔截器插入服務器的ip地址或者主機名,agent將這些內容插入到事件的報頭中。事件報頭中的key使用hostHeader配置,默認是host。主機攔截器的配置:
參數 默認值 描述
type host 類型名稱host,也可以使用類名的全路徑org.apache.flume.interceptor.HostInterceptor$Builder
hostHeader host 事件頭的key
useIP true 如果設置為false,host鍵插入主機名
preserveExisting false 如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值
| 參數 |
默認值 |
描述 |
| type |
host |
類型名稱host,也可以使用類名的全路徑org.apache.flume.interceptor.HostInterceptor$Builder |
| hostHeader |
host |
事件頭的key |
| useIP |
true |
如果設置為false,host鍵插入主機名 |
| preserveExisting |
false |
如果設置為true,若事件中報頭已經存在,不會替換時間戳報頭的值 |
source連接到主機攔截器的配置:
a1.sources.r1.interceptors=i2
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.useIP=false
a1.sources.r1.interceptors.i2.preserveExisting=false
| a1.sources.r1.interceptors=i2 a1.sources.r1.interceptors.i2.type=host a1.sources.r1.interceptors.i2.useIP=false a1.sources.r1.interceptors.i2.preserveExisting=false |
2.3 靜態攔截器
靜態攔截器的作用是將k/v插入到事件的報頭中。配置如下
| 參數 |
默認值 |
描述 |
| type |
static |
類型名稱static,也可以使用類全路徑名稱org.apache.flume.interceptor.StaticInterceptor$Builder |
| key |
key |
事件頭的key |
| value |
value |
key對應的value值 |
| preserveExisting |
true |
如果設置為true,若事件中報頭已經存在該key,不會替換value的值 |
參數 默認值 描述
type static 類型名稱static,也可以使用類全路徑名稱org.apache.flume.interceptor.StaticInterceptor$Builder
key key 事件頭的key
value value key對應的value值
preserveExisting true 如果設置為true,若事件中報頭已經存在該key,不會替換value的值
source連接到靜態攔截器的配置:
a1.sources.r1.interceptors= i3
a1.sources.r1.interceptors.static.type=static a1.sources.r1.interceptors.static.key=logs a1.sources.r1.interceptors.static.value=logFlume a1.sources.r1.interceptors.static.preserveExisting=false
a1.sources.r1.interceptors= i3
a1.sources.r1.interceptors.static.type=static a1.sources.r1.interceptors.static.key=logs a1.sources.r1.interceptors.static.value=logFlume a1.sources.r1.interceptors.static.preserveExisting=false
2.4 正則過濾攔截器
在日志采集的時候,可能有一些數據是我們不需要的,這樣添加過濾攔截器,可以過濾掉不需要的日志,也可以根據需要收集滿足正則條件的日志。配置如下
| 參數 |
默認值 |
描述 |
| type |
REGEX_FILTER |
類型名稱REGEX_FILTER,也可以使用類全路徑名稱org.apache.flume.interceptor.RegexFilteringInterceptor$Builder |
| regex |
.* |
匹配除“\n”之外的任何個字符 |
| excludeEvents |
false |
默認收集匹配到的事件。如果為true,則會刪除匹配到的event,收集未匹配到的 |
參數 默認值 描述
type REGEX_FILTER 類型名稱REGEX_FILTER,也可以使用類全路徑名稱org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
regex .* 匹配除“\n”之外的任何個字符
excludeEvents false 默認收集匹配到的事件。如果為true,則會刪除匹配到的event,收集未匹配到的
source連接到正則過濾攔截器的配置:
| a1.sources.r1.interceptors=i4 a1.sources.r1.interceptors.i4.type=REGEX_FILTER a1.sources.r1.interceptors.i4.regex=(rm)|(kill) a1.sources.r1.interceptors.i4.excludeEvents=false |
這樣配置的攔截器就只會接收日志消息中帶有rm 或者kill的日志。
測試案例:
test_regex.conf
| # 定義這個agent中各組件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1
# 描述和配置source組件:r1 a1.sources.r1.type = netcat a1.sources.r1.bind = itcast01 a1.sources.r1.port = 44444 a1.sources.r1. a1.sources.r1.interceptors=i4 a1.sources.r1.interceptors.i4.type=REGEX_FILTER #保留內容中出現hadoop或者是spark的字符串的記錄 a1.sources.r1.interceptors.i4.regex=(hadoop)|(spark) a1.sources.r1.interceptors.i4.excludeEvents=false
# 描述和配置sink組件:k1 a1.sinks.k1.type = logger
# 描述和配置channel組件,此處使用是內存緩存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之間的連接關系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
發送數據測試:

打印到控制台信息:

只接受到存在hadoop或者spark的記錄,驗證成功!
自定義攔截器
1. 背景介紹
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。Flume有各種自帶的攔截器,比如:TimestampInterceptor、HostInterceptor、RegexExtractorInterceptor等,通過使用不同的攔截器,實現不同的功能。但是以上的這些攔截器,不能改變原有日志數據的內容或者對日志信息添加一定的處理邏輯,當一條日志信息有幾十個甚至上百個字段的時候,在傳統的Flume處理下,收集到的日志還是會有對應這么多的字段,也不能對你想要的字段進行對應的處理。
2. 自定義攔截器
根據實際業務的需求,為了更好的滿足數據在應用層的處理,通過自定義Flume攔截器,過濾掉不需要的字段,並對指定字段加密處理,將源數據進行預處理。減少了數據的傳輸量,降低了存儲的開銷。
3. 實現
本技術方案核心包括二部分:
① 編寫java代碼,自定義攔截器;
內容包括:
1. 定義一個類CustomParameterInterceptor實現Interceptor接口。
2. 在CustomParameterInterceptor類中定義變量,這些變量是需要到 Flume的配置文件中進行配置使用的。每一行字段間的分隔符(fields_separator)、通過分隔符分隔后,所需要列字段的下標(indexs)、多個下標使用的分隔符(indexs_separator)、多個下標使用的分隔符(indexs_separator)。
3. 添加CustomParameterInterceptor的有參構造方法。並對相應的變量進行處理。將配置文件中傳過來的unicode編碼進行轉換為字符串。
4. 寫具體的要處理的邏輯intercept()方法,一個是單個處理的,一個是批量處理。
5. 接口中定義了一個內部接口Builder,在configure方法中,進行一些參數配置。並給出,在flume的conf中沒配置一些參數時,給出其默認值。通過其builder方法,返回一個CustomParameterInterceptor對象。
6. 定義一個靜態類,類中封裝MD5加密方法
7. 通過以上步驟,自定義攔截器的代碼開發已完成,然后打包成jar, 放到Flume的根目錄下的lib中

② 修改Flume的配置信息
新增配置文件spool-interceptor-hdfs.conf,內容為:
a1.channels = c1
a1.sources = r1
a1.sinks = s1
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=50000
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8
a1.sources.r1.interceptors =i1 i2
a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
a1.sources.r1.interceptors.i1.encrypted_field_index =0
a1.sources.r1.interceptors.i2.type = timestamp
#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://192.168.200.101:9000/flume/%Y%m%d
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 1500
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60
啟動:
bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console
5. 項目實現截圖:

圖一 原始文件內容

圖二 HDFS上產生收集到的處理數據
