需求背景:
在使用flume收集日志的時候,需要將本機的IP地址添加到body中再包裝在event中,以便后期的日志分類,和快速的故障定位,但是沒有發現有現成的interceptor來實現以上功能。
實現步驟:
1:新建一個類,實現Interceptor接口
2:重寫intercept(Event arg0)方法
3:打成jar包,放入Flume_Home 目錄的lib文件夾下
4: 修改flume配置文件,將攔截器的type值指向自定義類的地址,如果有自定義屬性,需要配置該自定義屬性
源碼如下:
1: 自定義類 AppendIPInterceptor
package com.xxx.flume.interceptor; /** * Created by vincent.yu on 2018/07/23. */ import com.google.common.base.Charsets; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; public class AppendIPInterceptor implements Interceptor { private String serviceId=null; // 自定義屬性 serviceId public AppendIPInterceptor(String _serviceId){ serviceId=_serviceId; } public Event intercept(Event arg0) { String eventBody = new String(arg0.getBody(),Charsets.UTF_8); String fmt="%s %s"; // 添加serviceId 到event的開頭 arg0.setBody(String.format(fmt, serviceId,eventBody).getBytes()); return arg0; } public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } public void close() { //~ null } public void initialize() { //~ null } /*public static class Builder implements Interceptor.Builder { public void configure(Context context) { } public Interceptor build() { return new AppendIPInterceptor(); } } */ }
2: 自定義 Builder ---> AppendIPBuilder
package com.xxx.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.interceptor.Interceptor; /** * Created by vincent.yu on 2018/07/23. */ public class AppendIPBuilder implements Interceptor.Builder{ private String serviceId=null; public void configure(Context context) { // set argument serviceId String configServiceId=context.getString("serviceId"); serviceId=configServiceId; } public Interceptor build() { return new AppendIPInterceptor(serviceId); } }
3: 修改flume配置文件
a1.sources.r1.interceptors = i2 #a1.sources.r1.interceptors.i1.type = com.xxx.flume.interceptor.MyMultiLineExecBuilder a1.sources.r1.interceptors.i2.type = com.xxx.flume.interceptor.AppendIPBuilder a1.sources.r1.interceptors.i2.serviceId = [source_ip]10.0.53.237
