flume-攔截器
有的時候希望通過Flume將讀取的文件再細分存儲,比如講source的數據按照業務類型分開存儲,具體一點比如類似:將source中web、wap、media等的內容分開存儲;比如丟棄或修改一些數據。這時可以考慮使用攔截器Interceptor。
flume通過攔截器實現修改和丟棄事件的功能。攔截器通過定義類繼承org.apache.flume.interceptor.Interceptor接口來實現。用戶可以通過該節點定義規則來修改或者丟棄事件。Flume支持鏈式攔截,通過在配置中指定構建的攔截器類的名稱。在source的配置中,攔截器被指定為一個以空格為間隔的列表。攔截器按照指定的順序調用。一個攔截器返回的事件列表被傳遞到鏈中的下一個攔截器。當一個攔截器要丟棄某些事件時,攔截器只需要在返回事件列表時不返回該事件即可。若攔截器要丟棄所有事件,則其返回一個空的事件列表即可。
先解釋一下一個重要對象Event:event是flume傳輸的最小對象,從source獲取數據后會先封裝成event,然后將event發送到channel,sink從channel拿event消費。event由頭(Map<String, String> headers)和身體(body)兩部分組成:Headers部分是一個map,body部分可以是String或者byte[]等。其中body部分是真正存放數據的地方,headers部分用於本節所講的interceptor。
Flume中攔截器的作用就是對於event中header的部分可以按需塞入一些屬性,當然你如果想要處理event的body內容,也是可以的,但是event的body內容是系統下游階段真正處理的內容,如果讓Flume來修飾body的內容的話,那就是強耦合了,這就違背了當初使用Flume來解耦的初衷了。
1.初識攔截器接口
public interface Interceptor {
public void initialize(); public Event intercept(Event event); public List<Event> intercept(List<Event> events);
public void close(); public interface Builder extends Configurable { public Interceptor build(); } }
1、public void initialize()運行前的初始化,一般不需要實現(上面的幾個都沒實現這個方法);
2、public Event intercept(Event event)處理單個event;
3、public List<Event> intercept(List<Event> events)批量處理event,實際上市循環調用上面的2;
4、public void close()可以做一些清理工作,上面幾個也都沒有實現這個方法;
5、 public interface Builder extends Configurable 構建Interceptor對象,外部使用這個Builder來獲取Interceptor對象。
如果要自己定制,必須要完成上面的2,3,5。
這個地方你可能有個很眼熟Configurable 這個攔截器的接口也繼承了配置的接口,因為很多東西都是從配置讀取出來的,所以你自己開發的之后上來一定腦子里面要有一個必須要實現的方法,即便說你不需要讀取配置,你也給我寫上!
2.TimestampInterceptor攔截器示例分析
1.initialize()方法解析
@Override public void initialize() { // no-op }
這個地方官方的幾個攔截器都沒有實現,我也沒實現過。
2.close()方法解析
@Override public void close() { // no-op }
這個地方是一個關閉的方法。
3.Event intercept(Event event) 方法解析
Map<String, String> headers = event.getHeaders(); if (preserveExisting && headers.containsKey(TIMESTAMP)) { // we must preserve the existing timestamp } else { long now = System.currentTimeMillis(); headers.put(TIMESTAMP, Long.toString(now)); } return event;
簡單的循環調用了intercept對event逐一處理
4.public List<Event> intercept(List<Event> events)方法解析
for (Event event : events) { intercept(event); } return events;
批量的處理event,和上面的3相結合處理。
5.TimestampInterceptor的具體實現
public static class Builder implements Interceptor.Builder { private boolean preserveExisting = PRESERVE_DFLT; @Override public Interceptor build() { return new TimestampInterceptor(preserveExisting); } @Override public void configure(Context context) { preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT); } }
該內部類實現了Interceptor的接口Builder,必須得有一個無參的構造方法,通過該構造方法就實例化了一個攔截器對象
6.該方法即攔截器的核心內容
1、如果拿到的event的header中本身包括timestamp這個key並且預留保存屬性為true,我們就直接返回該event就行了。
2、否則的話,我們生成一個時間戳,並將這個時間戳放到event的header中,作為一個屬性保存,再返回給event。
攔截器總結
1.攔截器被指定為一個以空格為間隔的列表,攔截器按照指定的順序調用。
2.核心是返回一個攔截器對象。
3.實現自己的event處理機制。
自己寫的event打包攔截器

public class EventCompressor extends AbstractFlumeInterceptor { //static final String COMPRESS_FORMAT = "gzip"; @Override public void initialize() { // NOPE } @Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); byte[] body = Compressor.compress(event.getBody()); //headers添加是否打包標志 headers.put(HeaderConstants.DEF_COMPRESS, HeaderConstants.VAL_COMPRESS_GZIP); event.setBody(body); return event; } @Override public void close() { // NOPE } public static class Builder implements Interceptor.Builder { @Override public void configure(Context context) { // NOPE } @Override public Interceptor build() { return new EventCompressor(); } }
因為flume 的數據采集到發送到kafka,如果一次一條數據的話很小,因此我把body取出來打包成大約40k左右的包來發送,還有一點kafka官方給出的一條消息大小為10k的時候kafka吞吐量達到最大效果。