flume的配置詳解


Flume:
=====================
Flume是一種分布式的、可靠的、可用的服務,可以有效地收集、聚合和移動大量的日志數據。
它有一個基於流數據的簡單而靈活的體系結構。
它具有健壯性和容錯能力,具有可調的可靠性機制和許多故障轉移和恢復機制。
它使用一個簡單的可擴展數據模型,允許在線分析應用程序。

 

source:源
對channel而言,相當於生產者,通過接收各種格式數據發送給channel進行傳輸

channel:通道
相當於數據緩沖區,接收source數據發送給sink

sink:沉槽
對channel而言,相當於消費者,通過接收channel數據通過指定數據類型發送到指定位置

 

Event:
===============
flume傳輸基本單位:
head + body

flume安裝:
================
1、解壓
2、符號鏈接
3、配置環境變量並使其生效
4、修改配置文件
1)重命名flume-env.ps1.template為flume-env.ps1
2)重命名flume-env.sh.template為flume-env.sh
3)修改flume-env.sh,配置jdk目錄,添加
export JAVA_HOME=/soft/jdk

5、flume 查看版本
flume-ng version

 

flume使用:
=========================
//flume可以將配置文件寫在zk上

//flume運行命令
flume-ng agent -n a1 -f xxx.conf /flume-ng agent -n xx -f xxx.conf

agent: a1
source: s1
channel:c1
sink: n1

使用方法:
1、編寫配置文件r_nc.conf
# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、啟動flume,指定配置文件
flume-ng agent -n a1 -f r_nc.conf

3、啟動另一個會話,進行測試
nc localhost 8888


//用戶手冊
http://flume.apache.org/FlumeUserGuide.html

后台運行程序:
=============================================

ctrl + z :將程序放在后台運行 =====> [1]+ Stopped flume-ng agent -n a1 -f r_nc.conf

通過 bg %1 的方式將程序后台運行

通過jobs查看后台任務

通過 fg %1 的方式將程序放在前台運行

 

 

flume:
海量日志數據的收集、聚合和移動


flume-ng agent -n a1 -f xxx.conf


source
相對於channel是生產者 //netcat
channel
類似於緩沖區 //memory
sink
相對於channel是消費者 //logger


Event:
header + body
k v data


source:
============================================
1、序列(seq)源:多用作測試
# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = seq
# 總共發送的事件個數
a1.sources.r1.totalEvents = 1000

# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、壓力(stress)源:多用作負載測試
# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = org.apache.flume.source.StressSource
# 單個事件大小,單位:byte
a1.sources.r1.size = 10240
# 事件總數
a1.sources.r1.maxTotalEvents = 1000000

# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、滾動目錄(Spooldir)源:監聽指定目錄新文件產生,並將新文件數據作為event發送
# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = spooldir
# 設置監聽目錄
a1.sources.r1.spoolDir = /home/centos/spooldir

# 通過以下配置指定消費完成后文件后綴
#a1.sources.r1.fileSuffix = .COMPLETED

# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


4、exec源 //通過執行linux命令產生新數據
//典型應用 tail -F (監聽一個文件,文件增長的時候,輸出追加數據)
//不能保證數據完整性,很可能丟失數據

# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = exec
# 配置linux命令
a1.sources.r1.command = tail -F /home/centos/readme.txt

# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5、Taildir源 //監控目錄下文件
//文件類型可通過正則指定
//有容災機制

# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = TAILDIR
# 設置source組 可設置多個
a1.sources.r1.filegroups = f1
# 設置組員的監控目錄和監控文件類型,使用正則表示,只能監控文件
a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*

# 設置定位文件的位置
# a1.sources.r1.positionFile ~/.flume/taildir_position.json

# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


sink:
====================================
1、fileSink //多用作數據收集
# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 配置sink
a1.sinks.k1.type = file_roll
# 配置目標文件夾
a1.sinks.k1.sink.directory = /home/centos/file
# 設置滾動間隔,默認30s,設為0則不滾動,成為單個文件
a1.sinks.k1.sink.rollInterval = 0

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、hdfsSink //默認以seqFile格式寫入
//k:LongWritable
//v: BytesWritable
//
# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 配置sink
a1.sinks.k1.type = hdfs
# 配置目標文件夾
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/
# 配置文件前綴
a1.sinks.k1.hdfs.filePrefix = events-
# 滾動間隔,秒
a1.sinks.k1.hdfs.rollInterval = 0
# 觸發滾動文件大小,byte
a1.sinks.k1.hdfs.rollSize = 1024
# 配置使用本地時間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 配置輸出文件類型,默認SequenceFile
# DataStream文本格式,不能設置壓縮編解碼器
# CompressedStream壓縮文本格式,需要設置編解碼器
a1.sinks.k1.hdfs.fileType = DataStream


# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、hiveSink: //hiveserver幫助:hive --service help
//1、hive --service metastore 啟動hive的metastore服務,metastore地址:thrift://localhost:9083
//2、將hcatalog的依賴放在/hive/lib下,cp hive-hcatalog* /soft/hive/lib (位置/soft/hive/hcatalog/share/hcatalog)
//3、創建hive事務表
//SET hive.support.concurrency=true;
SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.compactor.initiator.on=true;
SET hive.compactor.worker.threads=1;

//create table myhive.weblogs(id int, name string, age int)
clustered by(id) into 2 buckets
row format delimited
fields terminated by '\t'
stored as orc
tblproperties('transactional'='true');


# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 配置sink
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = myhive
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.useLocalTimeStamp = true
#輸入格式,DELIMITED和json
#DELIMITED 普通文本
#json json文件
a1.sinks.k1.serializer = DELIMITED
#輸入字段分隔符,雙引號
a1.sinks.k1.serializer.delimiter = ","
#輸出字段分隔符,單引號
a1.sinks.k1.serializer.serdeSeparator = '\t'
#字段名稱,","分隔,不能有空格
a1.sinks.k1.serializer.fieldnames =id,name,age

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4、hbaseSink //SimpleHbaseEventSerializer將rowKey和col設置了默認值,不能自定義
//RegexHbaseEventSerializer可以手動指定rowKey和col字段名稱

# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 配置sink
a1.sinks.k1.type = hbase
a1.sinks.k1.table = flume_hbase
a1.sinks.k1.columnFamily = f1
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer


# 配置col正則手動指定
# rowKeyIndex手動指定rowKey,索引以0開頭
a1.sinks.k1.serializer.colNames = ROW_KEY,name,age
a1.sinks.k1.serializer.regex = (.*),(.*),(.*)
a1.sinks.k1.serializer.rowKeyIndex=0

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


5、asynchbaseSink //異步hbaseSink
//異步機制,寫入速度快
# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 配置sink
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = flume_hbase
a1.sinks.k1.columnFamily = f1
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

channel:緩沖區
=====================================
1、memorychannel
a1.channels.c1.type = memory
# 緩沖區中存留的最大event個數
a1.channels.c1.capacity = 1000
# channel從source中每個事務提取的最大event數
# channel發送給sink每個事務發送的最大event數
a1.channels.c1.transactionCapacity = 100

2、fileChannel: //檢查點和數據存儲在默認位置時,當多個channel同時開啟
//會導致文件沖突,引發其他channel會崩潰

# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/centos/flume/checkpoint
a1.channels.c1.dataDirs = /home/centos/flume/data

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


memoryChannel:快速,但是當設備斷電,數據會丟失

FileChannel: 速度較慢,即使設備斷電,數據也不會丟失


Avro
===============================================
source
# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4444

# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

***********************************************************************************************
*啟動avro客戶端,發送數據: *
* flume-ng avro-client -H localhost -p 4444 -R ~/avro/header.txt -F ~/avro/user0.txt *
* 指定ip 指定端口 指定header文件 指定數據文件 *
***********************************************************************************************


sink
# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*

# 配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.bind = 192.168.23.101
a1.sinks.k1.port = 4444


# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

Flume躍點:
=====================================
1、將s101的flume發送到其他節點
xsync.sh /soft/flume
xsync.sh /soft/apache-flume-1.8.0-bin/

2、切換到root用戶,分發環境變量文件
su root
xsync.sh /etc/profile
exit

3、配置文件
1)配置s101 //hop.conf
設置source:avro
設置sink: hdfs

# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4444

# 配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/hop/%y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 1024
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


2)配置s102-s104 //hop2.conf
設置source:taildir
設置sink: avro

# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*

# 配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.23.101
a1.sinks.k1.port = 4444


# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4、在s102-s104創建~/taildir文件夾
xcall.sh "mkdir ~/taildir"


5、啟動s101的flume
flume-ng agent -n a1 -f /soft/flume/conf/hop.conf

6、分別啟動s102-s104的flume,並將其放在后台運行
flume-ng agent -n a1 -f /soft/flume/conf/hop2.conf &


7、進行測試,分別在s102-s104的taildir中創建數據,觀察hdfs數據情況
s102]$ echo 102 > taildir/1.txt
s103]$ echo 103 > taildir/1.txt
s104]$ echo 104 > taildir/1.txt


interceptor:攔截器
==================================
是source端組件:負責修改或刪除event
每個source可以配置多個攔截器 ===> interceptorChain

 

1、Timestamp Interceptor //時間戳攔截器 + header

# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
# 給攔截器起名
a1.sources.r1.interceptors = i1
# 指定攔截器類型
a1.sources.r1.interceptors.i1.type = timestamp


# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


2、Static Interceptor //靜態攔截器 + header

3、Host Interceptor //主機攔截器 + header

4、設置攔截器鏈:

# 將agent組件起名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i3.type = static
a1.sources.r1.interceptors.i3.key = location
a1.sources.r1.interceptors.i3.value = NEW_YORK


# 配置sink
a1.sinks.k1.type = logger

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


channel selector:通道挑選器
====================================
是source端組件:負責將event發送到指定的channel,相當於分區

當一個source設置多個channel時,默認以副本形式向每個channel發送一個event拷貝


1、replication副本通道挑選器 //默認挑選器,source將所有channel發送event副本
//設置source x 1, channel x 3, sink x 3
// nc memory file

# 將agent組件起名
a1.sources = r1
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sources.r1.selector.type = replicating

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100


# 配置sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/centos/file1
a1.sinks.k1.sink.rollInterval = 0

a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /home/centos/file2
a1.sinks.k2.sink.rollInterval = 0

a1.sinks.k3.type = file_roll
a1.sinks.k3.sink.directory = /home/centos/file3
a1.sinks.k3.sink.rollInterval = 0

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3



2、Multiplexing 多路復用通道挑選器 //選擇avro源發送文件



# 將agent組件起名
a1.sources = r1
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3

# 配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4444
# 配置通道挑選器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = country
a1.sources.r1.selector.mapping.CN = c1
a1.sources.r1.selector.mapping.US = c2
a1.sources.r1.selector.default = c3

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100


# 配置sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/centos/file1
a1.sinks.k1.sink.rollInterval = 0

a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /home/centos/file2
a1.sinks.k2.sink.rollInterval = 0

a1.sinks.k3.type = file_roll
a1.sinks.k3.sink.directory = /home/centos/file3
a1.sinks.k3.sink.rollInterval = 0

# 綁定channel-source, channel-sink
a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3


1、創建file1 file2 file3文件夾,家目錄
mkdir file1 file2 file3

2、創建文件夾country,並放入頭文件和數據
創建頭文件CN.txt、US.txt、OTHER.txt
CN.txt ===> country CN
US.txt ===> country US
OTHER.txt ===> country OTHER

創建數據 1.txt
1.txt ====> helloworld

3、運行flume
flume-ng agent -n a1 -f /soft/flume/selector_multi.conf

4、運行Avro客戶端
flume-ng avro-client -H localhost -p 4444 -R ~/country/US.txt -F ~/country/1.txt ===> 查看file2
flume-ng avro-client -H localhost -p 4444 -R ~/country/CN.txt -F ~/country/1.txt ===> 查看file1
flume-ng avro-client -H localhost -p 4444 -R ~/country/OTHER.txt -F ~/country/1.txt ===> 查看file3



sinkProcessor
=================================
sink Runner 運行一個 sink Group

sink Group 是由一個或多個 sink 構成

sink Runner 告訴 sink Group 處理下一批 event

sink Group 含有一個 sink Processor , 負責指定一個 sink 來處理這批數據


2、failover 容災 //將所有sink設置一個優先級
//數量越大,優先級越高
//當數據傳入時,優先級最高的sink負責處理
//當sink掛掉,次高優先級的sink被激活,繼續處理數據
//channel和sink必須一對一

a1.sources = r1
a1.sinks = s1 s2 s3
a1.channels = c1 c2 c3

# Describe/configure the source
a1.sources.r1.type = seq

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = s1 s2 s3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.s1 = 5
a1.sinkgroups.g1.processor.priority.s2 = 10
a1.sinkgroups.g1.processor.priority.s3 = 15
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
a1.sinks.s1.type = file_roll
a1.sinks.s1.sink.directory = /home/centos/file1
a1.sinks.s2.type = file_roll
a1.sinks.s2.sink.directory = /home/centos/file2
a1.sinks.s3.type = file_roll
a1.sinks.s3.sink.directory = /home/centos/file3

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c2.type = memory
a1.channels.c3.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2 c3
a1.sinks.s1.channel = c1
a1.sinks.s2.channel = c2
a1.sinks.s3.channel = c3

 


Event事件是由Source端封裝輸入數據的字節數組得來的
Event event = EventBuilder.withBody(body);

 

Sink中的process方法返回兩種狀態:
1、READY //一個或多個event成功分發
2、BACKOFF //channel中沒有數據提供給sink

flume中事務的生命周期:

tx.begin() //開啟事務,之后執行操作
tx.commit() //提交事務,操作完成后由此提交
tx.rollback() //回滾事務,出現異常可以采取回滾措施
tx.close() //關閉事務,最后一定要關閉事務


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM