Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。目前屬於apache的一個子項目。
一般來說,部署到服務器上的flume是安裝在unix/linux環境下使用的,但是有時為了測試和調試方便,我們也會有在windows系統上安裝的需求。對當前flume最新版本1.6來說,在windows上使用相對比較方便,因為其自帶了一套執行環境shell。
這是一套使用powershell編制的執行環境,啟動程序在apache-flume-1.6.0-bin\bin目錄下,flume-ng.cmd。
打開命令行,輸入flume-ng.cmd help可以查看該程序使用方法。 如圖:
以一個采集脫機目錄日志源的flume agent為例,可以以如下命令運行這個agent :
flume-ng.cmd agent --conf ..\conf --conf-file ..\conf\t1.conf --name a1
t1.conf:
a1.sources = r1 a1.channels = memoryChannel a1.sinks = spoolSink a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = spool_dir a1.sources.r1.fileHeader = true a1.sources.r1.channels = memoryChannel #interceptors a1.sources.r1.interceptors = e1 a1.sources.r1.interceptors.e1.type = regex_extractor a1.sources.r1.interceptors.e1.regex = ^([\\D]+[\\d]+[\\s]+[\\d\\:]+)\\s+([\\d\\-]+[\\s]+[\\d\\:]+[\\s]+[\\d\\:]+)\\s([\\S\\-]+)\\s([\\S\\/]+)\\s.[\\w]+\\S([\\d\\.]+)\\S\\s[\\w]+\\S([\\w]+)\\S\\s([\\d\\.]+\\S[\\d]+)\\S\\s([\\d\\.]+\\S[\\d]+)\\S\\s[\\W]+([\\d\\.]+\\S[\\d]+)\\S\\s\\S([\\d\\/]+[\\s]+[\\d\\:]+)\\s\\S\\s([\\d\\/]+[\\s][\\d\\:]+)\\S\\s[\\w\\s]+\\S([\\d]+)\\s[\\w\\s]+\\S([\\d]+)\\S\\s[\\w]+\\S([\\d\\#]+)[\\w\\s]+\\S([\\d\\.]+);$ a1.sources.r1.interceptors.e1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13 s14 s15 a1.sources.r1.interceptors.e1.serializers.s1.name = time1 a1.sources.r1.interceptors.e1.serializers.s2.name = time2 a1.sources.r1.interceptors.e1.serializers.s3.name = xy a1.sources.r1.interceptors.e1.serializers.s4.name = session a1.sources.r1.interceptors.e1.serializers.s5.name = devip a1.sources.r1.interceptors.e1.serializers.s6.name = protocol a1.sources.r1.interceptors.e1.serializers.s7.name = ip1 a1.sources.r1.interceptors.e1.serializers.s8.name = ip2 a1.sources.r1.interceptors.e1.serializers.s9.name = ip3 a1.sources.r1.interceptors.e1.serializers.s10.name = starttime a1.sources.r1.interceptors.e1.serializers.s11.name = endtime a1.sources.r1.interceptors.e1.serializers.s12.name = srcvpn a1.sources.r1.interceptors.e1.serializers.s13.name = desvpn a1.sources.r1.interceptors.e1.serializers.s14.name = status a1.sources.r1.interceptors.e1.serializers.s15.name = username #channels a1.channels.memoryChannel.type = memory a1.channels.memoryChannel.capacity = 300 a1.channels.memoryChannel.transactionCapacity= 300 #sink a1.sinks.spoolSink.type = com.hzfi.flume.PatternTestSink a1.sinks.spoolSink.channel = memoryChannel
上例中使用了一個interceptor regex_extractor來對脫機目錄下的日志中的記錄進行正則表達式模式識別,將記錄切分為15個子模式,分別加入到flume event的header里邊。
sink為一個自定義的PatternTestSink,代碼如下:
package com.hzfi.flume; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; public class PatternTestSink extends AbstractSink implements Configurable { @Override public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; try { transaction.begin(); event = channel.take(); if (event != null) { String body = new String(event.getBody(), "utf-8"); System.out.println("----->event headers..."); System.out.println("header content:[" + event.getHeaders().toString() + "]"); System.out.println("----->event body..."); System.out.println("body content:[" + body + "]"); } else { result = Status.BACKOFF; } transaction.commit(); } catch (Exception ex) { transaction.rollback(); throw new EventDeliveryException("Failed to got pattern event: " + event, ex); } finally { transaction.close(); } return result; } @Override public void configure(Context arg0) { // TODO Auto-generated method stub } }