Flume的Sink


一、Logger Sink

記錄指定級別(比如INFO,DEBUG,ERROR等)的日志,通常用於調試

要求,在 --conf(-c )參數指定的目錄下有log4j的配置文件

根據設計,logger sink將體內容限制為16字節,從而避免屏幕充斥着過多的內容。如果想要查看調試的完整內容,那么你應該使用其他的sink,也許可以使用file_roll sink,它會將日志寫到本地文件系統中。

可配置項說明:

配置示例:

#配置Agent a1 的組件
a1.sources=r1
a1.channels=c1
a1.sinks=s1
 
#描述/配置a1的r1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=44444
 
#描述a1的s1
a1.sinks.s1.type=logger
#描述a1的c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

二、File Roll Sink

在本地系統中存儲事件。

每隔指定時長生成文件保存這段時間內收集到的日志信息。

可配置參數說明:

配置示例:

#配置Agent a1 的組件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
 
#描述/配置a1的source1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8888
 
#描述sink
a1.sinks.s1.type=file_roll
a1.sinks.s1.sink.directory=/home/work/rolldata
a1.sinks.s1.sink.rollInterval=60
 
#描述內存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

創建指定的文件目錄 /home/work/rolldata

啟動測試

../bin/flume-ng agent -c ./ -f ./template.conf -n a1

三、Avro Sink

是實現多級流動、扇出流(1到多) 扇入流(多到1) 的基礎。

可配置項說明:

3.1  多級流動案例需求說明:

讓01機的flume通過netcat source源接收數據,然后通過avro sink 發給02機==》02機的flume利用avro source源收數據,然后通過avro sink 傳給03機==》03機通過avro source源收數據,通過logger sink 輸出到控制台上(本例中,02機的ip:192.168.234.212  || 03機的ip:192.168.234.213)

實現步驟:

1.准備三台虛擬機,並安裝好flume(關閉每台機器的防火牆)

2.配置每台flume的配置文件

3.啟動測試

01機的配置示例:

#配置Agent a1 的組件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
 
#描述/配置a1的source
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8888
 
#描述sink
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=192.168.234.212
a1.sinks.s1.port=9999
 
#描述內存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

02機的配置示例:

#配置Agent a1 的組件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
 
#描述/配置a1的source
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9999
 
#描述sink
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=192.168.234.213
a1.sinks.s1.port=9999
 
#描述內存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

03機的配置示例:

#配置Agent a1 的組件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
 
#描述/配置a1的source1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9999
 
#描述sink
a1.sinks.s1.type=logger
 
#描述內存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

3.2扇出流案例需求說明

01機發出的數據,讓02,03來接收

實現步驟:

1.准備三台虛擬機,並安裝好flume(關閉每台機器的防火牆)

2.配置每台flume的配置文件

3.啟動測試

01機的配置文件:

#配置Agent a1 的組件
a1.sources=r1
a1.sinks=s1 s2
a1.channels=c1 c2
 
#描述/配置a1的source1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8888
 
#描述sink
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=192.168.234.212
a1.sinks.s1.port=9999
 
a1.sinks.s2.type=avro
a1.sinks.s2.hostname=192.168.234.213
a1.sinks.s2.port=9999
#描述內存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
shanchu.conf

02,03配置示例:

#配置Agent a1 的組件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
 
#描述/配置a1的source1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9999
 
#描述sink
a1.sinks.s1.type=logger
 
#描述內存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
shanchu.conf

3.3 扇入案列需求說明

02,03機收到的數據都發往01

02,03的配置示例:

#配置Agent a1 的組件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
 
#描述/配置a1的source1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8888
 
#描述sink
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=192.168.234.163
a1.sinks.s1.port=9999
 
#描述內存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
shanru.conf

01機的配置示例:

#配置Agent a1 的組件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
 
#描述/配置a1的source1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9999
 
#描述sink
a1.sinks.s1.type=logger
 
#描述內存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
shanru.conf

四、HDFS Sink

此Sink將事件寫入到Hadoop分布式文件系統HDFS中。

目前它支持創建文本文件和序列化文件。

對這兩種格式都支持壓縮。

這些文件可以分卷,按照指定的時間或數據量或事件的數量為基礎。

它還通過類似時間戳或機器屬性對數據進行 buckets/partitions 操作    It also buckets/partitions data by attributes like timestamp or machine where the event originated.

HDFS的目錄路徑可以包含將要由HDFS替換格式的轉移序列用以生成存儲事件的目錄/文件名。

使用這個Sink要求haddop必須已經安裝好,以便Flume可以通過hadoop提供的jar包與HDFS進行通信。

可配置項說明

配置項

說明

channel

 

type

hdfs

hdfs.path

HDFS 目錄路徑 (hdfs://namenode/flume/webdata/)

hdfs.inUseSuffix

.tmp        Flume正在處理的文件所加的后綴

hdfs.rollInterval

30        Number of seconds to wait before
舉例:如果flume需要40s,30s=>1個文件  10s=>30s 1個文件

hdfs.rollSize

1024        File size to trigger roll, in bytes (0: never roll based on file size)

hdfs.rollCount

10        Number of events written to file before it rolled (0 = never roll based on number of events)

hdfs.fileType

SequenceFile        File format: currently SequenceFile, DataStream or CompressedStream

hdfs.retryInterval

80        Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.

 

#配置Agent a1 的組件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
 
#描述/配置a1的source1
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8888
 
#描述sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://192.168.234.21:9000/flume
#處理數據的類型,DataStream為普通的文本類型
a1.sinks.s1.hdfs.fileType=DataStream
 
#描述內存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
 
#位channel 綁定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1

報錯是因為flume缺少相關hadoop的依賴jar包,找到以下的jar包,放到flume的lib目錄下即可。

commons-configuration-1.6.jar

hadoop-auth-2.5.2.jar

hadoop-common-2.5.2.jar

hadoop-hdfs-2.5.2.jar

hadoop-mapreduce-client-core-2.5.2.jar

但是一個一個找特別麻煩,所以解決辦法是將hadoop的jar包都拷貝到flume的lib目錄下:

進入到hadoop安裝目錄的share目錄下的hadoop目錄

執行:scp common/*   common/lib/*   hdfs/*   hdfs/lib/*   mapreduce/*   mapreduce/lib/*   tools/lib/* 192.168.234.163:/home/software/flume/lib/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM