Flume: ===================== Flume是一種分布式的、可靠的、可用的服務,可以有效地收集、聚合和移動大量的日志數據。 它有一個基於流數據的簡單而靈活的體系結構。 它具有健壯性和容錯能力,具有可調的可靠性機制和許多故障轉移和恢復機制。 它使用一個簡單的可擴展數據模型,允許在線分析應用程序。 source:源 對channel而言,相當於生產者,通過接收各種格式數據發送給channel進行傳輸 channel:通道 相當於數據緩沖區,接收source數據發送給sink sink:沉槽 對channel而言,相當於消費者,通過接收channel數據通過指定數據類型發送到指定位置 Event: =============== flume傳輸基本單位: head + body flume安裝: ================ 1、解壓 2、符號鏈接 3、配置環境變量並使其生效 4、修改配置文件 1)重命名flume-env.ps1.template為flume-env.ps1 2)重命名flume-env.sh.template為flume-env.sh 3)修改flume-env.sh,配置jdk目錄,添加 export JAVA_HOME=/soft/jdk 5、flume 查看版本 flume-ng version flume使用: ========================= //flume可以將配置文件寫在zk上 //flume運行命令 flume-ng agent -n a1 -f xxx.conf /flume-ng agent -n xx -f xxx.conf agent: a1 source: s1 channel:c1 sink: n1 使用方法: 1、編寫配置文件r_nc.conf # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2、啟動flume,指定配置文件 flume-ng agent -n a1 -f r_nc.conf 3、啟動另一個會話,進行測試 nc localhost 8888 //用戶手冊 http://flume.apache.org/FlumeUserGuide.html 后台運行程序: ============================================= ctrl + z :將程序放在后台運行 =====> [1]+ Stopped flume-ng agent -n a1 -f r_nc.conf 通過 bg %1 的方式將程序后台運行 通過jobs查看后台任務 通過 fg %1 的方式將程序放在前台運行
flume: 海量日志數據的收集、聚合和移動 flume-ng agent -n a1 -f xxx.conf source 相對於channel是生產者 //netcat channel 類似於緩沖區 //memory sink 相對於channel是消費者 //logger Event: header + body k v data source: ============================================ 1、序列(seq)源:多用作測試 # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = seq # 總共發送的事件個數 a1.sources.r1.totalEvents = 1000 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2、壓力(stress)源:多用作負載測試 # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = org.apache.flume.source.StressSource # 單個事件大小,單位:byte a1.sources.r1.size = 10240 # 事件總數 a1.sources.r1.maxTotalEvents = 1000000 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3、滾動目錄(Spooldir)源:監聽指定目錄新文件產生,並將新文件數據作為event發送 # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = spooldir # 設置監聽目錄 a1.sources.r1.spoolDir = /home/centos/spooldir # 通過以下配置指定消費完成后文件后綴 #a1.sources.r1.fileSuffix = .COMPLETED # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 4、exec源 //通過執行linux命令產生新數據 //典型應用 tail -F (監聽一個文件,文件增長的時候,輸出追加數據) //不能保證數據完整性,很可能丟失數據 # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = exec # 配置linux命令 a1.sources.r1.command = tail -F /home/centos/readme.txt # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 5、Taildir源 //監控目錄下文件 //文件類型可通過正則指定 //有容災機制 # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = TAILDIR # 設置source組 可設置多個 a1.sources.r1.filegroups = f1 # 設置組員的監控目錄和監控文件類型,使用正則表示,只能監控文件 a1.sources.r1.filegroups.f1 = /home/centos/taildir/.* # 設置定位文件的位置 # a1.sources.r1.positionFile ~/.flume/taildir_position.json # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 sink: ==================================== 1、fileSink //多用作數據收集 # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = file_roll # 配置目標文件夾 a1.sinks.k1.sink.directory = /home/centos/file # 設置滾動間隔,默認30s,設為0則不滾動,成為單個文件 a1.sinks.k1.sink.rollInterval = 0 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2、hdfsSink //默認以seqFile格式寫入 //k:LongWritable //v: BytesWritable // # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = hdfs # 配置目標文件夾 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/ # 配置文件前綴 a1.sinks.k1.hdfs.filePrefix = events- # 滾動間隔,秒 a1.sinks.k1.hdfs.rollInterval = 0 # 觸發滾動文件大小,byte a1.sinks.k1.hdfs.rollSize = 1024 # 配置使用本地時間戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true # 配置輸出文件類型,默認SequenceFile # DataStream文本格式,不能設置壓縮編解碼器 # CompressedStream壓縮文本格式,需要設置編解碼器 a1.sinks.k1.hdfs.fileType = DataStream # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3、hiveSink: //hiveserver幫助:hive --service help //1、hive --service metastore 啟動hive的metastore服務,metastore地址:thrift://localhost:9083 //2、將hcatalog的依賴放在/hive/lib下,cp hive-hcatalog* /soft/hive/lib (位置/soft/hive/hcatalog/share/hcatalog) //3、創建hive事務表 //SET hive.support.concurrency=true; SET hive.enforce.bucketing=true; SET hive.exec.dynamic.partition.mode=nonstrict; SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; SET hive.compactor.initiator.on=true; SET hive.compactor.worker.threads=1; //create table myhive.weblogs(id int, name string, age int) clustered by(id) into 2 buckets row format delimited fields terminated by '\t' stored as orc tblproperties('transactional'='true'); # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = hive a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083 a1.sinks.k1.hive.database = myhive a1.sinks.k1.hive.table = weblogs a1.sinks.k1.useLocalTimeStamp = true #輸入格式,DELIMITED和json #DELIMITED 普通文本 #json json文件 a1.sinks.k1.serializer = DELIMITED #輸入字段分隔符,雙引號 a1.sinks.k1.serializer.delimiter = "," #輸出字段分隔符,單引號 a1.sinks.k1.serializer.serdeSeparator = '\t' #字段名稱,","分隔,不能有空格 a1.sinks.k1.serializer.fieldnames =id,name,age # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 4、hbaseSink //SimpleHbaseEventSerializer將rowKey和col設置了默認值,不能自定義 //RegexHbaseEventSerializer可以手動指定rowKey和col字段名稱 # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = hbase a1.sinks.k1.table = flume_hbase a1.sinks.k1.columnFamily = f1 a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer # 配置col正則手動指定 # rowKeyIndex手動指定rowKey,索引以0開頭 a1.sinks.k1.serializer.colNames = ROW_KEY,name,age a1.sinks.k1.serializer.regex = (.*),(.*),(.*) a1.sinks.k1.serializer.rowKeyIndex=0 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 5、asynchbaseSink //異步hbaseSink //異步機制,寫入速度快 # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = asynchbase a1.sinks.k1.table = flume_hbase a1.sinks.k1.columnFamily = f1 a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 channel:緩沖區 ===================================== 1、memorychannel a1.channels.c1.type = memory # 緩沖區中存留的最大event個數 a1.channels.c1.capacity = 1000 # channel從source中每個事務提取的最大event數 # channel發送給sink每個事務發送的最大event數 a1.channels.c1.transactionCapacity = 100 2、fileChannel: //檢查點和數據存儲在默認位置時,當多個channel同時開啟 //會導致文件沖突,引發其他channel會崩潰 # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/centos/flume/checkpoint a1.channels.c1.dataDirs = /home/centos/flume/data # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 memoryChannel:快速,但是當設備斷電,數據會丟失 FileChannel: 速度較慢,即使設備斷電,數據也不會丟失 Avro =============================================== source # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4444 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 *********************************************************************************************** *啟動avro客戶端,發送數據: * * flume-ng avro-client -H localhost -p 4444 -R ~/avro/header.txt -F ~/avro/user0.txt * * 指定ip 指定端口 指定header文件 指定數據文件 * *********************************************************************************************** sink # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /home/centos/taildir/.* # 配置sink a1.sinks.k1.type = avro a1.sinks.k1.bind = 192.168.23.101 a1.sinks.k1.port = 4444 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 Flume躍點: ===================================== 1、將s101的flume發送到其他節點 xsync.sh /soft/flume xsync.sh /soft/apache-flume-1.8.0-bin/ 2、切換到root用戶,分發環境變量文件 su root xsync.sh /etc/profile exit 3、配置文件 1)配置s101 //hop.conf 設置source:avro 設置sink: hdfs # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4444 # 配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/hop/%y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.rollSize = 1024 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.fileType = DataStream # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2)配置s102-s104 //hop2.conf 設置source:taildir 設置sink: avro # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /home/centos/taildir/.* # 配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.23.101 a1.sinks.k1.port = 4444 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 4、在s102-s104創建~/taildir文件夾 xcall.sh "mkdir ~/taildir" 5、啟動s101的flume flume-ng agent -n a1 -f /soft/flume/conf/hop.conf 6、分別啟動s102-s104的flume,並將其放在后台運行 flume-ng agent -n a1 -f /soft/flume/conf/hop2.conf & 7、進行測試,分別在s102-s104的taildir中創建數據,觀察hdfs數據情況 s102]$ echo 102 > taildir/1.txt s103]$ echo 103 > taildir/1.txt s104]$ echo 104 > taildir/1.txt interceptor:攔截器 ================================== 是source端組件:負責修改或刪除event 每個source可以配置多個攔截器 ===> interceptorChain 1、Timestamp Interceptor //時間戳攔截器 + header # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 給攔截器起名 a1.sources.r1.interceptors = i1 # 指定攔截器類型 a1.sources.r1.interceptors.i1.type = timestamp # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2、Static Interceptor //靜態攔截器 + header 3、Host Interceptor //主機攔截器 + header 4、設置攔截器鏈: # 將agent組件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 a1.sources.r1.interceptors = i1 i2 i3 a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.interceptors.i2.type = host a1.sources.r1.interceptors.i3.type = static a1.sources.r1.interceptors.i3.key = location a1.sources.r1.interceptors.i3.value = NEW_YORK # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 channel selector:通道挑選器 ==================================== 是source端組件:負責將event發送到指定的channel,相當於分區 當一個source設置多個channel時,默認以副本形式向每個channel發送一個event拷貝 1、replication副本通道挑選器 //默認挑選器,source將所有channel發送event副本 //設置source x 1, channel x 3, sink x 3 // nc memory file # 將agent組件起名 a1.sources = r1 a1.sinks = k1 k2 k3 a1.channels = c1 c2 c3 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 a1.sources.r1.selector.type = replicating # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000 a1.channels.c3.transactionCapacity = 100 # 配置sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/centos/file1 a1.sinks.k1.sink.rollInterval = 0 a1.sinks.k2.type = file_roll a1.sinks.k2.sink.directory = /home/centos/file2 a1.sinks.k2.sink.rollInterval = 0 a1.sinks.k3.type = file_roll a1.sinks.k3.sink.directory = /home/centos/file3 a1.sinks.k3.sink.rollInterval = 0 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 c2 c3 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3 2、Multiplexing 多路復用通道挑選器 //選擇avro源發送文件 # 將agent組件起名 a1.sources = r1 a1.sinks = k1 k2 k3 a1.channels = c1 c2 c3 # 配置source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4444 # 配置通道挑選器 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = country a1.sources.r1.selector.mapping.CN = c1 a1.sources.r1.selector.mapping.US = c2 a1.sources.r1.selector.default = c3 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000 a1.channels.c3.transactionCapacity = 100 # 配置sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/centos/file1 a1.sinks.k1.sink.rollInterval = 0 a1.sinks.k2.type = file_roll a1.sinks.k2.sink.directory = /home/centos/file2 a1.sinks.k2.sink.rollInterval = 0 a1.sinks.k3.type = file_roll a1.sinks.k3.sink.directory = /home/centos/file3 a1.sinks.k3.sink.rollInterval = 0 # 綁定channel-source, channel-sink a1.sources.r1.channels = c1 c2 c3 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3 1、創建file1 file2 file3文件夾,家目錄 mkdir file1 file2 file3 2、創建文件夾country,並放入頭文件和數據 創建頭文件CN.txt、US.txt、OTHER.txt CN.txt ===> country CN US.txt ===> country US OTHER.txt ===> country OTHER 創建數據 1.txt 1.txt ====> helloworld 3、運行flume flume-ng agent -n a1 -f /soft/flume/selector_multi.conf 4、運行Avro客戶端 flume-ng avro-client -H localhost -p 4444 -R ~/country/US.txt -F ~/country/1.txt ===> 查看file2 flume-ng avro-client -H localhost -p 4444 -R ~/country/CN.txt -F ~/country/1.txt ===> 查看file1 flume-ng avro-client -H localhost -p 4444 -R ~/country/OTHER.txt -F ~/country/1.txt ===> 查看file3 sinkProcessor ================================= sink Runner 運行一個 sink Group sink Group 是由一個或多個 sink 構成 sink Runner 告訴 sink Group 處理下一批 event sink Group 含有一個 sink Processor , 負責指定一個 sink 來處理這批數據 2、failover 容災 //將所有sink設置一個優先級 //數量越大,優先級越高 //當數據傳入時,優先級最高的sink負責處理 //當sink掛掉,次高優先級的sink被激活,繼續處理數據 //channel和sink必須一對一 a1.sources = r1 a1.sinks = s1 s2 s3 a1.channels = c1 c2 c3 # Describe/configure the source a1.sources.r1.type = seq a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = s1 s2 s3 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.s1 = 5 a1.sinkgroups.g1.processor.priority.s2 = 10 a1.sinkgroups.g1.processor.priority.s3 = 15 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe the sink a1.sinks.s1.type = file_roll a1.sinks.s1.sink.directory = /home/centos/file1 a1.sinks.s2.type = file_roll a1.sinks.s2.sink.directory = /home/centos/file2 a1.sinks.s3.type = file_roll a1.sinks.s3.sink.directory = /home/centos/file3 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c2.type = memory a1.channels.c3.type = memory # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 c3 a1.sinks.s1.channel = c1 a1.sinks.s2.channel = c2 a1.sinks.s3.channel = c3 Event事件是由Source端封裝輸入數據的字節數組得來的 Event event = EventBuilder.withBody(body); Sink中的process方法返回兩種狀態: 1、READY //一個或多個event成功分發 2、BACKOFF //channel中沒有數據提供給sink flume中事務的生命周期: tx.begin() //開啟事務,之后執行操作 tx.commit() //提交事務,操作完成后由此提交 tx.rollback() //回滾事務,出現異常可以采取回滾措施 tx.close() //關閉事務,最后一定要關閉事務