flume的配置詳解


Flume:
=====================
    Flume是一種分布式的、可靠的、可用的服務,可以有效地收集、聚合和移動大量的日志數據。
    它有一個基於流數據的簡單而靈活的體系結構。
    它具有健壯性和容錯能力,具有可調的可靠性機制和許多故障轉移和恢復機制。
    它使用一個簡單的可擴展數據模型,允許在線分析應用程序。

    

    source:源    
        對channel而言,相當於生產者,通過接收各種格式數據發送給channel進行傳輸

    channel:通道
        相當於數據緩沖區,接收source數據發送給sink

    sink:沉槽
        對channel而言,相當於消費者,通過接收channel數據通過指定數據類型發送到指定位置



Event:
===============
    flume傳輸基本單位:
        head + body
        
    

flume安裝:
================
    1、解壓
    2、符號鏈接
    3、配置環境變量並使其生效
    4、修改配置文件
        1)重命名flume-env.ps1.template為flume-env.ps1
        2)重命名flume-env.sh.template為flume-env.sh
        3)修改flume-env.sh,配置jdk目錄,添加
            export JAVA_HOME=/soft/jdk

    5、flume 查看版本
         flume-ng version

        

flume使用:
=========================
    //flume可以將配置文件寫在zk上

    //flume運行命令
    flume-ng agent -n a1 -f xxx.conf    /flume-ng agent -n xx -f xxx.conf

    agent:    a1
    source:    s1
    channel:c1
    sink:    n1

    使用方法:
        1、編寫配置文件r_nc.conf
            # 將agent組件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1

            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888

            # 配置sink
            a1.sinks.k1.type = logger

            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100

            # 綁定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1

        2、啟動flume,指定配置文件
            flume-ng agent -n a1 -f r_nc.conf

        3、啟動另一個會話,進行測試
            nc localhost 8888


    //用戶手冊
        http://flume.apache.org/FlumeUserGuide.html

后台運行程序:
=============================================

    ctrl + z :將程序放在后台運行 =====> [1]+  Stopped                 flume-ng agent -n a1 -f r_nc.conf

    通過 bg %1 的方式將程序后台運行

    通過jobs查看后台任務

    通過  fg %1 的方式將程序放在前台運行



 



flume:
    海量日志數據的收集、聚合和移動


    flume-ng agent -n a1 -f xxx.conf


    source
        相對於channel是生產者    //netcat
    channel
        類似於緩沖區        //memory
    sink
        相對於channel是消費者    //logger

    
Event: 
    header + body
    k v     data


source:
============================================
    1、序列(seq)源:多用作測試
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = seq
        # 總共發送的事件個數
        a1.sources.r1.totalEvents = 1000    

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
    
    2、壓力(stress)源:多用作負載測試
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = org.apache.flume.source.StressSource
        # 單個事件大小,單位:byte
        a1.sources.r1.size = 10240
        # 事件總數
        a1.sources.r1.maxTotalEvents = 1000000

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    3、滾動目錄(Spooldir)源:監聽指定目錄新文件產生,並將新文件數據作為event發送
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = spooldir
        # 設置監聽目錄
        a1.sources.r1.spoolDir = /home/centos/spooldir

        # 通過以下配置指定消費完成后文件后綴
        #a1.sources.r1.fileSuffix = .COMPLETED 

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1


    4、exec源    //通過執行linux命令產生新數據
            //典型應用 tail -F (監聽一個文件,文件增長的時候,輸出追加數據)
            //不能保證數據完整性,很可能丟失數據

        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = exec
        # 配置linux命令
        a1.sources.r1.command = tail -F /home/centos/readme.txt

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    5、Taildir源        //監控目錄下文件
                //文件類型可通過正則指定
                //有容災機制

        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = TAILDIR
        # 設置source組 可設置多個
        a1.sources.r1.filegroups = f1
        # 設置組員的監控目錄和監控文件類型,使用正則表示,只能監控文件
        a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*

        # 設置定位文件的位置
        # a1.sources.r1.positionFile     ~/.flume/taildir_position.json

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1


sink:
====================================
    1、fileSink    //多用作數據收集
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        # 配置sink
        a1.sinks.k1.type = file_roll
        # 配置目標文件夾
        a1.sinks.k1.sink.directory = /home/centos/file
        # 設置滾動間隔,默認30s,設為0則不滾動,成為單個文件
        a1.sinks.k1.sink.rollInterval = 0

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    2、hdfsSink        //默認以seqFile格式寫入
                //k:LongWritable
                //v: BytesWritable
                //
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888
        
        # 配置sink
        a1.sinks.k1.type = hdfs
        # 配置目標文件夾
        a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/
        # 配置文件前綴
        a1.sinks.k1.hdfs.filePrefix = events-
        # 滾動間隔,秒
        a1.sinks.k1.hdfs.rollInterval = 0
        # 觸發滾動文件大小,byte
        a1.sinks.k1.hdfs.rollSize = 1024
        # 配置使用本地時間戳
        a1.sinks.k1.hdfs.useLocalTimeStamp = true
        # 配置輸出文件類型,默認SequenceFile
        # DataStream文本格式,不能設置壓縮編解碼器
        # CompressedStream壓縮文本格式,需要設置編解碼器
        a1.sinks.k1.hdfs.fileType = DataStream


        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    3、hiveSink:        //hiveserver幫助:hive --service help
                //1、hive --service metastore 啟動hive的metastore服務,metastore地址:thrift://localhost:9083
                //2、將hcatalog的依賴放在/hive/lib下,cp hive-hcatalog* /soft/hive/lib    (位置/soft/hive/hcatalog/share/hcatalog)
                //3、創建hive事務表
                //SET hive.support.concurrency=true;                                  
                  SET hive.enforce.bucketing=true;                                    
                  SET hive.exec.dynamic.partition.mode=nonstrict;                     
                  SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
                  SET hive.compactor.initiator.on=true;                               
                  SET hive.compactor.worker.threads=1;
                  
                //create table myhive.weblogs(id int, name string, age int)
                  clustered by(id) into 2 buckets                                         
                  row format delimited                                                          
                  fields terminated by '\t'                                                     
                  stored as orc                                                                 
                  tblproperties('transactional'='true');                                        


        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        # 配置sink
        a1.sinks.k1.type = hive
        a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
        a1.sinks.k1.hive.database = myhive
        a1.sinks.k1.hive.table = weblogs
        a1.sinks.k1.useLocalTimeStamp = true
        #輸入格式,DELIMITED和json
        #DELIMITED    普通文本
        #json        json文件
        a1.sinks.k1.serializer = DELIMITED
        #輸入字段分隔符,雙引號
        a1.sinks.k1.serializer.delimiter = ","
        #輸出字段分隔符,單引號
        a1.sinks.k1.serializer.serdeSeparator = '\t'
        #字段名稱,","分隔,不能有空格
        a1.sinks.k1.serializer.fieldnames =id,name,age

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    4、hbaseSink            //SimpleHbaseEventSerializer將rowKey和col設置了默認值,不能自定義
                    //RegexHbaseEventSerializer可以手動指定rowKey和col字段名稱

        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888
        
        # 配置sink
        a1.sinks.k1.type = hbase
        a1.sinks.k1.table = flume_hbase
        a1.sinks.k1.columnFamily = f1
        a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

        
        # 配置col正則手動指定
        # rowKeyIndex手動指定rowKey,索引以0開頭
        a1.sinks.k1.serializer.colNames = ROW_KEY,name,age
        a1.sinks.k1.serializer.regex = (.*),(.*),(.*)
        a1.sinks.k1.serializer.rowKeyIndex=0

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    
    5、asynchbaseSink        //異步hbaseSink
                    //異步機制,寫入速度快
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888
        
        # 配置sink
        a1.sinks.k1.type = asynchbase
        a1.sinks.k1.table = flume_hbase
        a1.sinks.k1.columnFamily = f1
        a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
        

channel:緩沖區
=====================================
    1、memorychannel
        a1.channels.c1.type = memory
        # 緩沖區中存留的最大event個數
        a1.channels.c1.capacity = 1000
        # channel從source中每個事務提取的最大event數
        # channel發送給sink每個事務發送的最大event數
        a1.channels.c1.transactionCapacity = 100

    2、fileChannel:    //檢查點和數據存儲在默認位置時,當多個channel同時開啟
                //會導致文件沖突,引發其他channel會崩潰
        
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels = c1
        a1.channels.c1.type = file
        a1.channels.c1.checkpointDir = /home/centos/flume/checkpoint
        a1.channels.c1.dataDirs = /home/centos/flume/data

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1


    memoryChannel:快速,但是當設備斷電,數據會丟失
    
    FileChannel:  速度較慢,即使設備斷電,數據也不會丟失


Avro 
===============================================
    source
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = avro
        a1.sources.r1.bind = 0.0.0.0
        a1.sources.r1.port = 4444

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    ***********************************************************************************************    
    *啟動avro客戶端,發送數據:                                      *
    *    flume-ng avro-client -H localhost -p 4444 -R ~/avro/header.txt -F ~/avro/user0.txt    *
    *                 指定ip                   指定端口 指定header文件      指定數據文件          *
    ***********************************************************************************************


    sink
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = TAILDIR
        a1.sources.r1.filegroups = f1
        a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*

        # 配置sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.bind = 192.168.23.101
        a1.sinks.k1.port = 4444


        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
        
        



Flume躍點:
=====================================
    1、將s101的flume發送到其他節點
        xsync.sh /soft/flume
        xsync.sh /soft/apache-flume-1.8.0-bin/

    2、切換到root用戶,分發環境變量文件
        su root
        xsync.sh /etc/profile
        exit

    3、配置文件
        1)配置s101    //hop.conf
            設置source:avro
            設置sink: hdfs

            # 將agent組件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1

            # 配置source
            a1.sources.r1.type = avro
            a1.sources.r1.bind = 0.0.0.0
            a1.sources.r1.port = 4444

            # 配置sink
            a1.sinks.k1.type = hdfs
            a1.sinks.k1.hdfs.path = /flume/hop/%y-%m-%d/
            a1.sinks.k1.hdfs.filePrefix = events-
            a1.sinks.k1.hdfs.rollInterval = 0
            a1.sinks.k1.hdfs.rollSize = 1024
            a1.sinks.k1.hdfs.useLocalTimeStamp = true
            a1.sinks.k1.hdfs.fileType = DataStream

            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100

            # 綁定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1


        2)配置s102-s104        //hop2.conf
            設置source:taildir
            設置sink: avro

            # 將agent組件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1

            # 配置source
            a1.sources.r1.type = TAILDIR
            a1.sources.r1.filegroups = f1
            a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*

            # 配置sink
            a1.sinks.k1.type = avro
            a1.sinks.k1.hostname = 192.168.23.101
            a1.sinks.k1.port = 4444


            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100

            # 綁定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1

    4、在s102-s104創建~/taildir文件夾
        xcall.sh "mkdir ~/taildir"

    
    5、啟動s101的flume
        flume-ng agent -n a1 -f /soft/flume/conf/hop.conf

    6、分別啟動s102-s104的flume,並將其放在后台運行
        flume-ng agent -n a1 -f /soft/flume/conf/hop2.conf &

    
    7、進行測試,分別在s102-s104的taildir中創建數據,觀察hdfs數據情況
        s102]$ echo 102 > taildir/1.txt 
        s103]$ echo 103 > taildir/1.txt
        s104]$ echo 104 > taildir/1.txt

    
interceptor:攔截器
==================================
    是source端組件:負責修改或刪除event
    每個source可以配置多個攔截器    ===> interceptorChain

    

    1、Timestamp Interceptor    //時間戳攔截器    + header

        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888
        # 給攔截器起名
        a1.sources.r1.interceptors = i1
        # 指定攔截器類型
        a1.sources.r1.interceptors.i1.type = timestamp


        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

        
    2、Static Interceptor    //靜態攔截器    + header

    3、Host Interceptor    //主機攔截器    + header

    4、設置攔截器鏈:
        
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        a1.sources.r1.interceptors = i1 i2 i3
        a1.sources.r1.interceptors.i1.type = timestamp
        a1.sources.r1.interceptors.i2.type = host
        a1.sources.r1.interceptors.i3.type = static
        a1.sources.r1.interceptors.i3.key = location
        a1.sources.r1.interceptors.i3.value = NEW_YORK


        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
    


channel selector:通道挑選器
====================================
    是source端組件:負責將event發送到指定的channel,相當於分區
        
    當一個source設置多個channel時,默認以副本形式向每個channel發送一個event拷貝

    
    1、replication副本通道挑選器    //默認挑選器,source將所有channel發送event副本
                    //設置source x 1, channel x 3, sink x 3 
                    //    nc       memory    file
    
        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1 k2 k3
        a1.channels = c1 c2 c3

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888
        a1.sources.r1.selector.type = replicating

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        a1.channels.c2.type = memory
        a1.channels.c2.capacity = 1000
        a1.channels.c2.transactionCapacity = 100

        a1.channels.c3.type = memory
        a1.channels.c3.capacity = 1000
        a1.channels.c3.transactionCapacity = 100

        
        # 配置sink
        a1.sinks.k1.type = file_roll
        a1.sinks.k1.sink.directory = /home/centos/file1
        a1.sinks.k1.sink.rollInterval = 0

        a1.sinks.k2.type = file_roll
        a1.sinks.k2.sink.directory = /home/centos/file2
        a1.sinks.k2.sink.rollInterval = 0

        a1.sinks.k3.type = file_roll
        a1.sinks.k3.sink.directory = /home/centos/file3
        a1.sinks.k3.sink.rollInterval = 0

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1 c2 c3
        a1.sinks.k1.channel = c1
        a1.sinks.k2.channel = c2
        a1.sinks.k3.channel = c3


    
    2、Multiplexing 多路復用通道挑選器    //選擇avro源發送文件
                        
                        
                        
                        

        # 將agent組件起名
        a1.sources = r1
        a1.sinks = k1 k2 k3
        a1.channels = c1 c2 c3
        
        # 配置source
        a1.sources.r1.type = avro
        a1.sources.r1.bind = 0.0.0.0
        a1.sources.r1.port = 4444
        # 配置通道挑選器
        a1.sources.r1.selector.type = multiplexing
        a1.sources.r1.selector.header = country
        a1.sources.r1.selector.mapping.CN = c1
        a1.sources.r1.selector.mapping.US = c2
        a1.sources.r1.selector.default = c3
        
        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        a1.channels.c2.type = memory
        a1.channels.c2.capacity = 1000
        a1.channels.c2.transactionCapacity = 100

        a1.channels.c3.type = memory
        a1.channels.c3.capacity = 1000
        a1.channels.c3.transactionCapacity = 100

        
        # 配置sink
        a1.sinks.k1.type = file_roll
        a1.sinks.k1.sink.directory = /home/centos/file1
        a1.sinks.k1.sink.rollInterval = 0

        a1.sinks.k2.type = file_roll
        a1.sinks.k2.sink.directory = /home/centos/file2
        a1.sinks.k2.sink.rollInterval = 0

        a1.sinks.k3.type = file_roll
        a1.sinks.k3.sink.directory = /home/centos/file3
        a1.sinks.k3.sink.rollInterval = 0

        # 綁定channel-source, channel-sink
        a1.sources.r1.channels = c1 c2 c3
        a1.sinks.k1.channel = c1
        a1.sinks.k2.channel = c2
        a1.sinks.k3.channel = c3


        1、創建file1 file2 file3文件夾,家目錄
            mkdir file1 file2 file3

        2、創建文件夾country,並放入頭文件和數據
            創建頭文件CN.txt、US.txt、OTHER.txt 
                CN.txt ===> country CN              
                US.txt ===> country US              
                OTHER.txt ===> country OTHER   
            
            創建數據 1.txt 
                1.txt ====> helloworld

        3、運行flume
            flume-ng agent -n a1 -f /soft/flume/selector_multi.conf

        4、運行Avro客戶端
            flume-ng avro-client -H localhost -p 4444 -R ~/country/US.txt -F ~/country/1.txt    ===> 查看file2
            flume-ng avro-client -H localhost -p 4444 -R ~/country/CN.txt -F ~/country/1.txt    ===> 查看file1
            flume-ng avro-client -H localhost -p 4444 -R ~/country/OTHER.txt -F ~/country/1.txt    ===> 查看file3


        
sinkProcessor
=================================
    sink Runner 運行一個 sink Group

    sink Group 是由一個或多個 sink 構成

    sink Runner 告訴 sink Group 處理下一批 event

    sink Group 含有一個 sink Processor , 負責指定一個 sink 來處理這批數據


    2、failover 容災    //將所有sink設置一個優先級
                //數量越大,優先級越高
                //當數據傳入時,優先級最高的sink負責處理
                //當sink掛掉,次高優先級的sink被激活,繼續處理數據
                //channel和sink必須一對一

        a1.sources = r1
        a1.sinks = s1 s2 s3
        a1.channels = c1 c2 c3

        # Describe/configure the source
        a1.sources.r1.type = seq

        a1.sinkgroups = g1
        a1.sinkgroups.g1.sinks = s1 s2 s3
        a1.sinkgroups.g1.processor.type = failover
        a1.sinkgroups.g1.processor.priority.s1 = 5
        a1.sinkgroups.g1.processor.priority.s2 = 10
        a1.sinkgroups.g1.processor.priority.s3 = 15
        a1.sinkgroups.g1.processor.maxpenalty = 10000

        # Describe the sink
        a1.sinks.s1.type = file_roll
        a1.sinks.s1.sink.directory = /home/centos/file1
        a1.sinks.s2.type = file_roll
        a1.sinks.s2.sink.directory = /home/centos/file2
        a1.sinks.s3.type = file_roll
        a1.sinks.s3.sink.directory = /home/centos/file3

        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c2.type = memory
        a1.channels.c3.type = memory

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1 c2 c3
        a1.sinks.s1.channel = c1
        a1.sinks.s2.channel = c2
        a1.sinks.s3.channel = c3




Event事件是由Source端封裝輸入數據的字節數組得來的
    Event event = EventBuilder.withBody(body);



Sink中的process方法返回兩種狀態:
    1、READY    //一個或多個event成功分發
    2、BACKOFF    //channel中沒有數據提供給sink
        

flume中事務的生命周期:
    
    tx.begin()    //開啟事務,之后執行操作
    tx.commit()    //提交事務,操作完成后由此提交
    tx.rollback()    //回滾事務,出現異常可以采取回滾措施
    tx.close()    //關閉事務,最后一定要關閉事務

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM