Flume 入門--幾種不同的Sinks


主要介紹幾種常見Flume的Sink--匯聚點

1.Logger Sink 

記錄INFO級別的日志,一般用於調試。前面介紹Source時候用到的Sink都是這個類型的Sink

必須配置的屬性:

屬性說明:
            !channel    –    
            !type    –    The component type name, needs to be logger
            maxBytesToLog    16    Maximum number of bytes of the Event body to log

            要求必須在 --conf 參數指定的目錄下有 log4j的配置文件
            可以通過-Dflume.root.logger=INFO,console在命令啟動時手動指定log4j參數

案例:前面的例子都是這種類型的Sink

2.File Roll Sink

在本地文件系統中存儲事件。每隔指定時長生成文件保存這段時間內收集到的日志信息。

屬性說明:
            !channel    –    
            !type    –    類型,必須是"file_roll"
            !sink.directory    –    文件被存儲的目錄
            sink.rollInterval    30    滾動文件每隔30秒(應該是每隔30秒鍾單獨切割數據到一個文件的意思)。如果設置為0,則禁止滾動,從而導致所有數據被寫入到一個文件。
            sink.serializer    TEXT    Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
            batchSize    100    

案例:

              編寫配置文件:
				#命名Agent a1的組件
				a1.sources  =  r1
				a1.sinks  =  k1
				a1.channels  =  c1

				#描述/配置Source
				a1.sources.r1.type  = http
				a1.sources.r1.port  = 6666

				#描述Sink
				a1.sinks.k1.type  = file_roll
				a1.sinks.k1.sink.directory = /home/park/work/apache-flume-1.6.0-bin/mysink
				#描述內存Channel
				a1.channels.c1.type  =  memory
				a1.channels.c1.capacity  =  1000
				a1.channels.c1.transactionCapacity  =  100

				#為Channle綁定Source和Sink
				a1.sources.r1.channels  =  c1
				a1.sinks.k1.channel  =  c1

 啟動flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template7.conf --name a1 -Dflume.root.logger=INFO,console

測試:

通過curl命令向目標主機發送請求,就會發現在指定的文件夾下出現記錄收集日志的文件

3.Avro Sink

是實現多級流動 和 扇出流(1到多) 扇入流(多到1) 的基礎。非常重要 但是需要多台機器

必要屬性說明:
            !channel    –    
            !type    –    The component type name, needs to be avro.
            !hostname    –    The hostname or IP address to bind to.
            !port    –    The port # to listen on.

案例1.多級流動  h1流動到h2

h2:
				配置配置文件:
					#命名Agent組件
					a1.sources=r1
					a1.sinks=k1
					a1.channels=c1

					#描述/配置Source
					a1.sources.r1.type=avro
					a1.sources.r1.bind=0.0.0.0
					a1.sources.r1.port=9988
					#描述Sink
					a1.sinks.k1.type=logger
					#描述內存Channel
					a1.channels.c1.type=memory
					a1.channels.c1.capacity=1000
					a1.channels.c1.transactionCapacity=1000
					#為Channel綁定Source和Sink
					a1.sources.r1.channels=c1
					a1.sinks.k1.channel=c1
				啟動flume:
					./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console

				
			h1:
				配置配置文件
					#命名Agent組件
					a1.sources=r1
					a1.sinks=k1
					a1.channels=c1

					#描述/配置Source
					a1.sources.r1.type=http
					a1.sources.r1.port=8888
					#描述Sink
					a1.sinks.k1.type=avro
					a1.sinks.k1.hostname=192.168.242.138
					a1.sinks.k1.port=9988
					#描述內存Channel
					a1.channels.c1.type=memory
					a1.channels.c1.capacity=1000
					a1.channels.c1.transactionCapacity=1000
					#為Channel綁定Source和Sink
					a1.sources.r1.chafile:///C:/Users/park/Desktop/Day01_Flume/%E6%96%87%E6%A1%A3/Flume%201.6.0%20User%20Guide%20%E2%80%94%20Apache%20Flume.htm#irc-sinknnels=c1
					a1.sinks.k1.channel=c1

 啟動flume

發送http請求到h1:

curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://192.168.242.133:8888

 稍等幾秒后,發現h2最終收到了這條消息

案例2:扇出流(h1扇出到h2,h3)

h2 h3:
				配置配置文件:
					#命名Agent組件
					a1.sources=r1
					a1.sinks=k1
					a1.channels=c1

					#描述/配置Source
					a1.sources.r1.type=avro
					a1.sources.r1.bind=0.0.0.0
					a1.sources.r1.port=9988
					#描述Sink
					a1.sinks.k1.type=logger
					#描述內存Channel
					a1.channels.c1.type=memory
					a1.channels.c1.capacity=1000
					a1.channels.c1.transactionCapacity=1000
					#為Channel綁定Source和Sink
					a1.sources.r1.channels=c1
					a1.sinks.k1.channel=c1
				啟動flume:
					./flume-ng agent --conf ../conf --conf-file ../conf/template8.conf --name a1 -Dflume.root.logger=INFO,console

			h1:
				配置配置文件
					#命名Agent組件
					a1.sources=r1
					a1.sinks=k1 k2
					a1.channels=c1 c2

					#描述/配置Source
					a1.sources.r1.type=http
					a1.sources.r1.port=8888
					#描述Sink
					a1.sinks.k1.type=avro
					a1.sinks.k1.hostname=192.168.242.138
					a1.sinks.k1.port=9988
					a1.sinks.k2.type=avro
					a1.sinks.k2.hostname=192.168.242.135
					a1.sinks.k2.port=9988
					#描述內存Channel
					a1.channels.c1.type=memory
					a1.channels.c1.capacity=1000
					a1.channels.c1.transactionCapacity=1000
					a1.channels.c2.type=memory
					a1.channels.c2.capacity=1000
					a1.channels.c2.transactionCapacity=1000
					#為Channel綁定Source和Sink
					a1.sources.r1.channels=c1 c2
					a1.sinks.k1.channel=c1	
					a1.sinks.k2.channel=c2	

案例3:扇入流()

m3:
				編寫配置文件:
					#命名Agent組件
					a1.sources=r1
					a1.sinks=k1
					a1.channels=c1
					#描述/配置Source
					a1.sources.r1.type=avro
					a1.sources.r1.bind=0.0.0.0
					a1.sources.r1.port=4141
					#描述Sink
					a1.sinks.k1.type=logger
					#描述內存Channel
					a1.channels.c1.type=memory
					a1.channels.c1.capacity=1000
					a1.channels.c1.transactionCapacity=1000
					#為Channel綁定Source和Sink
					a1.sources.r1.channels=c1
					a1.sinks.k1.channel=c1
				啟動flume:
					./flume-ng agent --conf ../conf --conf-file ../conf/template.conf --name a1 -Dflume.root.logger=INFO,console
			
			m1、m2:
				編寫配置文件:
					#命名Agent組件
					a1.sources=r1
					a1.sinks=k1
					a1.channels=c1

					#描述/配置Source
					a1.sources.r1.type=http
					a1.sources.r1.port=8888
					#描述Sink
					a1.sinks.k1.type=avro
					a1.sinks.k1.hostname=192.168.242.135
					a1.sinks.k1.port=4141
					#描述內存Channel
					a1.channels.c1.type=memory
					a1.channels.c1.capacity=1000
					a1.channels.c1.transactionCapacity=1000
					#為Channel綁定Source和Sink
					a1.sources.r1.channels=c1
					a1.sinks.k1.channel=c1
				啟動flume:
					./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console
				m1通過curl發送一條http請求,由於默認使用的是jsonHandler,數據格式必須是指定的json格式:
					[root@localhost conf]# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
			 	m2通過curl發送一條http請求,由於默認使用的是jsonHandler,數據格式必須是指定的json格式:
					[root@localhost conf]# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
				發現m3均能正確收到消息

 

4、HDFS Sink

此Sink將事件寫入到Hadoop分布式文件系統HDFS中。
            目前它支持創建文本文件和序列化文件。對這兩種格式都支持壓縮。 這些文件可以分卷,按照指定的時間或數據量或事件的數量為基礎。
            它還通過類似時間戳或機器屬性對數據進行 buckets/partitions 操作  
            HDFS的目錄路徑可以包含將要由HDFS替換格式的轉移序列用以生成存儲事件的目錄/文件名。
            使用這個Sink要求hadoop必須已經安裝好,以便Flume可以通過hadoop提供的jar包與HDFS進行通信。
            注意,此版本hadoop必須支持sync()調用。

必要屬性說明:
                !channel    –    
                !type    –    類型名稱,必須是“HDFS”
                !hdfs.path    –    HDFS 目錄路徑 (eg hdfs://namenode/flume/webdata/)
                hdfs.filePrefix    FlumeData    Flume在目錄下創建文件的名稱前綴
                hdfs.fileSuffix    –    追加到文件的名稱后綴 (eg .avro - 注: 日期時間不會自動添加)
                hdfs.inUsePrefix    –    Flume正在處理的文件所加的前綴
                hdfs.inUseSuffix    .tmp    Flume正在處理的文件所加的后綴

案例:

                #命名Agent組件
					a1.sources=r1
					a1.sinks=k1
					a1.channels=c1

					#描述/配置Source
					a1.sources.r1.type=http
					a1.sources.r1.port=8888
					#描述Sink
					a1.sinks.k1.type=hdfs
					a1.sinks.k1.hdfs.path=hdfs://0.0.0.0:9000/ppp
					#描述內存Channel
					a1.channels.c1.type=memory
					a1.channels.c1.capacity=1000
					a1.channels.c1.transactionCapacity=1000
					#為Channel綁定Source和Sink
					a1.sources.r1.channels=c1
					a1.sinks.k1.channel=c1
				           

 啟動flume:

./flume-ng agent --conf ../conf --conf-file ../conf/template9.conf --name a1 -Dflume.root.logger=INFO,console 

 測試:通過利用curl給目的主機發送命令,會發現在HDFS中會生成相應的記錄文件。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM