flume在windows環境下的使用


      Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。目前屬於apache的一個子項目。

       一般來說,部署到服務器上的flume是安裝在unix/linux環境下使用的,但是有時為了測試和調試方便,我們也會有在windows系統上安裝的需求。對當前flume最新版本1.6來說,在windows上使用相對比較方便,因為其自帶了一套執行環境shell。

       這是一套使用powershell編制的執行環境,啟動程序在apache-flume-1.6.0-bin\bin目錄下,flume-ng.cmd。

       打開命令行,輸入flume-ng.cmd help可以查看該程序使用方法。  如圖:

     

以一個采集脫機目錄日志源的flume agent為例,可以以如下命令運行這個agent :

flume-ng.cmd agent --conf ..\conf --conf-file ..\conf\t1.conf --name a1

 

t1.conf:

a1.sources = r1
a1.channels = memoryChannel
a1.sinks = spoolSink


a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = spool_dir
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = memoryChannel

#interceptors
a1.sources.r1.interceptors = e1
a1.sources.r1.interceptors.e1.type = regex_extractor
a1.sources.r1.interceptors.e1.regex = ^([\\D]+[\\d]+[\\s]+[\\d\\:]+)\\s+([\\d\\-]+[\\s]+[\\d\\:]+[\\s]+[\\d\\:]+)\\s([\\S\\-]+)\\s([\\S\\/]+)\\s.[\\w]+\\S([\\d\\.]+)\\S\\s[\\w]+\\S([\\w]+)\\S\\s([\\d\\.]+\\S[\\d]+)\\S\\s([\\d\\.]+\\S[\\d]+)\\S\\s[\\W]+([\\d\\.]+\\S[\\d]+)\\S\\s\\S([\\d\\/]+[\\s]+[\\d\\:]+)\\s\\S\\s([\\d\\/]+[\\s][\\d\\:]+)\\S\\s[\\w\\s]+\\S([\\d]+)\\s[\\w\\s]+\\S([\\d]+)\\S\\s[\\w]+\\S([\\d\\#]+)[\\w\\s]+\\S([\\d\\.]+);$
a1.sources.r1.interceptors.e1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13 s14 s15
a1.sources.r1.interceptors.e1.serializers.s1.name = time1
a1.sources.r1.interceptors.e1.serializers.s2.name = time2
a1.sources.r1.interceptors.e1.serializers.s3.name = xy
a1.sources.r1.interceptors.e1.serializers.s4.name = session
a1.sources.r1.interceptors.e1.serializers.s5.name = devip
a1.sources.r1.interceptors.e1.serializers.s6.name = protocol
a1.sources.r1.interceptors.e1.serializers.s7.name = ip1
a1.sources.r1.interceptors.e1.serializers.s8.name = ip2
a1.sources.r1.interceptors.e1.serializers.s9.name = ip3
a1.sources.r1.interceptors.e1.serializers.s10.name = starttime
a1.sources.r1.interceptors.e1.serializers.s11.name = endtime
a1.sources.r1.interceptors.e1.serializers.s12.name = srcvpn
a1.sources.r1.interceptors.e1.serializers.s13.name = desvpn
a1.sources.r1.interceptors.e1.serializers.s14.name = status
a1.sources.r1.interceptors.e1.serializers.s15.name = username

#channels
a1.channels.memoryChannel.type = memory
a1.channels.memoryChannel.capacity = 300
a1.channels.memoryChannel.transactionCapacity= 300

#sink
a1.sinks.spoolSink.type = com.hzfi.flume.PatternTestSink
a1.sinks.spoolSink.channel = memoryChannel

上例中使用了一個interceptor regex_extractor來對脫機目錄下的日志中的記錄進行正則表達式模式識別,將記錄切分為15個子模式,分別加入到flume event的header里邊。

sink為一個自定義的PatternTestSink,代碼如下:

package com.hzfi.flume;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

public class PatternTestSink extends AbstractSink implements Configurable {

    @Override
    public Status process() throws EventDeliveryException {
        
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;
        try {
            transaction.begin();
            event = channel.take();
            if (event != null) {
                String body = new String(event.getBody(), "utf-8");
                System.out.println("----->event headers...");
                System.out.println("header content:[" + event.getHeaders().toString() + "]");
                System.out.println("----->event body...");
                System.out.println("body content:[" + body + "]");
            } else {
                result = Status.BACKOFF;
            }
            transaction.commit();
        } catch (Exception ex) {
            transaction.rollback();
            throw new EventDeliveryException("Failed to got pattern event: " + event, ex);
        } finally {
            transaction.close();
        }
        return result;
    }

    @Override
    public void configure(Context arg0) {
        // TODO Auto-generated method stub

    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM