Flume Sink的目的是從Flume Channel中獲取數據然后輸出到存儲或者其他Flume Source中。Flume Agent啟動的時候,它會為每一個Sink都啟動一個SinkRunner的對象,SinkRunner.start()方法會啟動一個新的線程去管理每一個Sink的生命周期。每一個Sink需要實現start()、Stop()和process()方法。你可以在start方法中去初始化Sink的參數和狀態,在stop方法中清理Sink的資源。最關鍵的是process方法,它將處理從Channel中拿出來的數據。另外如果Sink有一些配置則需要實現Configurable接口。
由於Flume官方提供的Sink往往不能滿足要求,所以我們自定義Sink來實現定制化的需求,這里以ElasticSearch為例。在Sink中實現所以文檔的簡單的Insert功能。例子使用Flume 1.7。
1. 編寫代碼
首先新建類ElasticSearchSink類繼承AbstractSink類,由於還希望有自定義的Sink的配置,所以實現Configurable接口。
public class ElasticSearchSink extends AbstractSink implements Configurable
ElasticSearch的IP以及索引的名稱可以配置在配置文件里面,配置文件就是使用flume的conf文件。你可以重寫Configurable的configure的方法去獲取配置,代碼如下:
@Override public void configure(Context context) { esHost = context.getString("es_host"); esIndex = context.getString("es_index"); }
注意里面的配置項“es_host”和“es_index”在conf配置文件中的語法:
agent.sinks = sink1
agent.sinks.sink1.type = nick.test.flume.ElasticSearchSink
agent.sinks.sink1.es_host = 192.168.50.213
agent.sinks.sink1.es_index = vehicle_event_test
接下來就是實現process方法,在這個方法中需要獲取channel,因為數據都是從channel中獲得的。獲取消息之前,需要先獲取一個Channel是事務,處理完成之后需要commit和關閉這個事務。這樣才能讓channel知道這個消息已經消費完成,它可以從它的內部隊列中刪除這個消息。如果消費失敗,需要重新消費的話,可以rollback這個事務。事務的引入是flume對消息可靠性保證的關鍵。
process方法需要返回一個Status類型的枚舉,Ready和BackOff。如果你到了一個消息,並正常處理了,需要使用Ready。如果拿到的消息是null,則可以返回BackOff。所謂BackOff(失效補償)就是當sink獲取不到 消息的時候, Sink的PollingRunner 線程需要等待一段backoff時間,等channel中的數據得到了補償再來進行pollling 操作。
完整的代碼如下:
public class ElasticSearchSink extends AbstractSink implements Configurable { private String esHost; private String esIndex; private TransportClient client; @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { Event event = ch.take(); if (event != null) { String body = new String(event.getBody(), "UTF-8"); BulkRequestBuilder bulkRequest = client.prepareBulk(); List<JSONObject> jsons = new ArrayList<JSONObject>(); JSONObject obj = JSONObject.parseObject(body); String vehicleId = obj.getString("vehicle_id"); String eventBeginCode = obj.getString("event_begin_code"); String eventBeginTime = obj.getString("event_begin_time"); //doc id in index String id = (vehicleId + "_" + eventBeginTime + "_" + eventBeginCode).trim(); JSONObject json = new JSONObject(); json.put("vehicle_id", vehicleId); bulkRequest.add(client.prepareIndex(esIndex, esIndex).setSource(json)); BulkResponse bulkResponse = bulkRequest.get(); status = Status.READY; } else { status = Status.BACKOFF; } txn.commit(); } catch (Throwable t) { txn.rollback(); t.getCause().printStackTrace(); status = Status.BACKOFF; } finally { txn.close(); } return status; } @Override public void configure(Context context) { esHost = context.getString("es_host"); esIndex = context.getString("es_index"); } @Override public synchronized void stop() { super.stop(); } @Override public synchronized void start() { try { Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHost), 9300)); super.start(); System.out.println("finish start"); } catch (Exception ex) { ex.printStackTrace(); } } }
2. 打包、配置和運行
由於是自定義的Sink,所以需要打成jar包,然后copy到flume的lib文件夾下。然后配置agent的配置文件,最后啟動flume就可以了。本例中,我使用了kafkasource、memorychannel和自定義的sink,完整的配置文件如下:
agent.sources = source1
agent.channels = channel1
agent.sinks = sink1
agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.source1.channels = channel1
agent.sources.source1.batchSize = 1
agent.sources.source1.batchDurationMillis = 2000
agent.sources.source1.kafka.bootstrap.servers = 192.168.50.116:9092,192.168.50.117:9092,192.168.50.118:9092,192.168.50.226:9092
agent.sources.source1.kafka.topics = iov-vehicle-event
agent.sources.source1.kafka.consumer.group.id = flume-vehicle-event-nick
agent.sinks.sink1.type = nick.test.flume.ElasticSearchSink
agent.sinks.sink1.es_host = 192.168.50.213
agent.sinks.sink1.es_index = vehicle_event_test
agent.sinks.sink1.channel = channel1
agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 1000