攔截器作用:攔截器是簡單的插件式組件,設置在source和channel之間。source接收到的事件,在寫入channel之前,攔截器都可以進行轉換或者刪除這些事件。每個攔截器只處理同一個source接收到的事件。可以自定義攔截器。
flume修改時間戳的插件見 https://github.com/haebin/flume-timestamp-interceptor
有一個缺陷是,DateUtils.parseDate(timestamp, dateFormat)里面的dateFormat不支持unix時間戳,只能自己手動添加了
原來是:
- String timestamp = get(index, data);
- now = DateUtils.parseDate(timestamp, dateFormat).getTime();
- headers.put(TIMESTAMP, Long.toString(now));
修改后
- String timestamp = get(index, data);
- if (dateFormat[0].equals("tsecond")){
- now = Long.parseLong(timestamp)*1000;
- }
- else if(dateFormat[0].equals("tmillisecond")){
- now = Long.parseLong(timestamp);
- }
- else if(dateFormat[0].equals("tnanosecond")){
- now = Long.parseLong(timestamp)/1000000;
- }
- else {
- now = DateUtils.parseDate(timestamp, dateFormat).getTime();
- }
- headers.put(TIMESTAMP, Long.toString(now));
flume配置:
- kafka_sn_hive.sources.s1.interceptors = timestamp
- kafka_sn_hive.sources.s1.interceptors.timestamp.type = org.apache.flume.interceptor.EventTimestampInterceptor$Builder
- kafka_sn_hive.sources.s1.interceptors.timestamp.preserveExisting = false
- kafka_sn_hive.sources.s1.interceptors.timestamp.delimiter = ,
- kafka_sn_hive.sources.s1.interceptors.timestamp.dateIndex = 4
- kafka_sn_hive.sources.s1.interceptors.timestamp.dateFormat = tsecond
表示按逗號做分隔符的第四個(從0開始)字段是一個秒單位的時間戳。
在flume里面,時間戳是毫秒級別的,所以要判斷這個字段是秒還是毫秒納秒
見http://lisux.me/lishuai/?p=867