大數據3-Flume收集數據+落地HDFS


 

flume

  日志收集系統

    Flume是Cloudera提供的一個高可用的高可靠的分布式海量日志采集、聚合傳輸的系統,Flume支持在日志系統中定制各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定制)的能力。

    當前Flume有兩個版本Flume 0.9X版本的統稱Flume-og,Flume1.X版本的統稱Flume-ng。由於Flume-ng經過重大重構,與Flume-og有很大不同,使用時請注意區分。

  

 

 

 

  基本概念

     Event 事件

      把讀取的一條日志信息包裝成一個對象,這個對象就叫Flume Event。

      本質就是一個json字符串,如:{head:info,body:info}

    Agent 代理

      代理,是一個java進程(JVM),它承載event,從外部源傳遞到下一個目標的組件。

      主要由3部分組成:Source、Channel、Sink。

    Source 數據源

      Source組件是專門用來收集數據的,可以處理各種類型、各種格式的日志數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence       generator、syslog、http、legacy、自定義。

    Channel 數據通道

      Source組件把數據收集來以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的。對采集到的數據進行簡單的緩存,可以存放在memory、jdbc、file等等。

    Sink 數據匯聚點

      Sink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。

    組合過程

      為了安全性,數據的傳輸是將數據封裝成一個Event事件。Source會將從服務器收集的數據封裝成Event,然后存儲在緩沖區Channel,Channel的結構與隊列比較相似(先進先出)。Sink就會從緩沖區Channel中抓取數據,抓取到數據時,就會把Channel中的對應數據刪除,並且把抓取的數據寫入HDFS等目標地址或者也可以是下一個Source。一定是當數據傳輸成功后,才會刪除緩沖區Channel中的數據,這是為了可靠性。當接收方Crash(崩潰)時,以便可以重新發送數據。

 

 

 

 

 

  2、可靠性

    當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。

    Flume提供了三種級別的可靠性保障,從強到弱依次分別為:

      end-to-end(收到數據agent首先將event寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。) 

      Store on failure(這也是Scribe-Facebook開源的日志收集系統-采用的策略,當數據接收方crash(崩潰)時,將數據寫到本地,待恢復后,繼續發送) 

      Besteffort(數據發送到接收方后,不會進行確認) 

  3、需要安裝jdk 

jdk安裝

  4、安裝flume

安裝flume

   5、目錄結構

目錄結構

    

 

    

 

 

  Source組件

      重點掌握Avro SourceSpooling Directory Source

#單節點Flume配置
#命名Agent a1的組件
a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1

#描述/配置Source
a1.sources.r1.type  =  netcat        #內置類型,接收來自網絡的數據
a1.sources.r1.bind  =  0.0.0.0            #等同於網絡的127.0.0.1
a1.sources.r1.port  =  22222        #服務的端口號

#描述Sink
a1.sinks.k1.type  =  logger        #內置類型

#描述內存Channel
a1.channels.c1.type  =  memory    #保存數據到內存
a1.channels.c1.capacity  =  1000     #容量最大存放1000條日志
a1.channels.c1.transactionCapacity  =  100    #事務中的一批數據100條

#為Channle綁定Source和Sink
a1.sources.r1.channels  =  c1        #一個source可以綁定到多個channel
a1.sinks.k1.channel  =  c1        #一個sink只能綁定到一個channel    
flume.properties
a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1

a1.sources.r1.type  =  spooldir
a1.sources.r1.spoolDir  =  /usr/local/src/flume/data

a1.sinks.k1.type  =  logger

a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100

a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1
flume-dir.properties
a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1

a1.sources.r1.type  =  avro
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222

a1.sinks.k1.type  =  logger

a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100

a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1
flume-avro.properties
a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1

a1.sources.r1.type  =  http
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222

a1.sinks.k1.type  =  avro
a1.sinks.k1.hostname = 192.168.220.137
a1.sinks.k1.port = 22222

a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100

a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1
flume-http.properties
a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1

a1.sources.r1.type  =  avro
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222

a1.sinks.k1.type  =  avro
a1.sinks.k1.hostname = 192.168.220.137
a1.sinks.k1.port = 22222

a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100

a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1
flume-jt.properties
channels    –     
type    –    類型名稱,"AVRO"
bind    –    需要監聽的主機名或IP
port    –    要監聽的端口

threads    –    工作線程最大線程數
selector.type          
selector.*          
interceptors    –    空格分隔的攔截器列表
interceptors.*          
compression-type    none    壓縮類型,可以是“none”或“default”,這個值必須和AvroSource的壓縮格式匹配
ssl    false    是否啟用ssl加密,如果啟用還需要配置一個“keystore”和一個“keystore-password”.
keystore    –    為SSL提供的 java密鑰文件 所在路徑
keystore-password    –    為SSL提供的 java密鑰文件 密碼
keystore-type    JKS    密鑰庫類型可以是 “JKS” 或 “PKCS12”.
exclude-protocols    SSLv3    空格分隔開的列表,用來指定在SSL / TLS協議中排除。SSLv3將總是被排除除了所指定的協議。
ipFilter    false    如果需要為netty開啟ip過濾,將此項設置為true
ipFilterRules    –    陪netty的ip過濾設置表達式規則
參數說明  flume-avro.properties
channels    –     
type    –    類型,需要指定為"spooldir"
spoolDir    –    讀取文件的路徑,即"搜集目錄"

fileSuffix    .COMPLETED    對處理完成的文件追加的后綴
deletePolicy    never    處理完成后是否刪除文件,需是"never"或"immediate"
fileHeader    false    Whether to add a header storing the absolute path filename.
fileHeaderKey    file    Header key to use when appending absolute path filename to event header.
basenameHeader    false    Whether to add a header storing the basename of the file.
basenameHeaderKey    basename    Header Key to use when appending basename of file to event header.
ignorePattern    ^$    正則表達式指定哪些文件需要忽略
trackerDir    .flumespool    Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
consumeOrder    處理文件的策略,oldest, youngest 或 random。
maxBackoff    4000    The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.
batchSize    100    Granularity at which to batch transfer to the channel
inputCharset    UTF-8    讀取文件時使用的編碼。
decodeErrorPolicy    FAIL    當在輸入文件中發現無法處理的字符編碼時如何處理。FAIL:拋出一個異常而無法 ​​解析該文件。REPLACE:用“替換字符”字符,通常是Unicode的U + FFFD更換不可解析角色。 忽略:掉落的不可解析的字符序列。
deserializer    LINE    聲明用來將文件解析為事件的解析器。默認一行為一個事件。處理類必須實現EventDeserializer.Builder接口。
deserializer.*         Varies per event deserializer.
bufferMaxLines    –    (Obselete) This option is now ignored.
bufferMaxLineLength    5000    (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
selector.type    replicating    replicating or multiplexing
selector.*         Depends on the selector.type value
interceptors    –    Space-separated list of interceptors
interceptors.*
參數說明  flume-dir.properties
type         類型,必須為"HTTP"
port    –    監聽的端口

bind    0.0.0.0    監聽的主機名或ip
handler    org.apache.flume.source.http.JSONHandler    處理器類,需要實現HTTPSourceHandler接口
handler.*    –    處理器的配置參數
selector.type    
selector.*         
interceptors    –    
interceptors.*          
enableSSL    false    是否開啟SSL,如果需要設置為true。注意,HTTP不支持SSLv3。
excludeProtocols    SSLv3    空格分隔的要排除的SSL/TLS協議。SSLv3總是被排除的。
keystore         密鑰庫文件所在位置。
keystorePassword Keystore 密鑰庫密碼
參數說明  flume-http.properties
3.3.6.1啟動時報錯不繼續
[root@localhost conf]# ../bin/flume-ng agent --conf conf --conf-file flume.properties --name a1 -Dflume.root.logger=INFO,console
Info: Including Hive libraries found via () for Hive access
+ exec /usr/local/src/java/jdk1.7.0_51/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp 'conf:/usr/local/src/flume/apache-flume-1.6.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file flume.properties --name a1
log4j:WARN No appenders could be found for logger (org.apache.flume.lifecycle.LifecycleSupervisor).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

錯誤原因:
log4j屬性文件,路徑不正確
解決辦法:
[root@localhost bin]# ./flume-ng agent -c /usr/local/src/flume/apache-flume-1.6.0-bin/conf -f /usr/local/src/flume/apache-flume-1.6.0-bin/conf/flume.properties -n a1 -Dflume.root.logger=INFO,console
或者
[root@localhost bin]# ./flume-ng agent -c ../conf -f ../conf/flume.properties -n a1 -Dflume.root.logger=INFO,console
3.3.6.2監控目錄重名異常
如果文件已經處理過,哪怕完成的文件被刪除,也無濟於事
java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /usr/local/src/flume/data/g.txt.COMPLETED
解決辦法:
不能有重名文件放入到監控的目錄中。
都把文件刪除了,它怎么還知道這個文件處理過呢?

Flume在監控目錄下創建了一個隱藏目錄.flumespool下面有一個隱藏文件.flumespool-main.meta,里面記錄了處理過的信息。把此隱藏目錄刪除,就可以處理重名文件。
常見報錯  

 

[root@localhost conf]# ../bin/flume-ng agent -c ./ -f ./flume-avro.properties -n a1 -Dflume.root.logger=INFO,console
啟動結果:
2017-11-07 19:58:03,708 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source r1 started.
cd /usr/local/src/flume        #進入目錄
vi log.txt                        #創建數據文件,內容如下
hi flume.
you are good tools.
准備數據
通過flume提供的avro客戶端向指定機器指定端口發送日志信息:
./flume-ng –h                    #幫助可以看命令格式及參數用法
./flume-ng avro-client -c ../conf -H 0.0.0.0 -p 22222 -F ../../log.txt
控制台收到消息:

注意:紅色框中的打印的內容會被截斷,在控制台不能顯示很多,只顯示很短的一部分內容。
發送avro消息

 

  channel組件

事件將被存儲在內存中的具有指定大小的隊列中。非常適合那些需要高吞吐量但是失敗是會丟失數據的場景下。

參數說明:
type    –    類型,必須是“memory”
capacity    100    事件存儲在信道中的最大數量
transactionCapacity    100    每個事務中的最大事件數
keep-alive    3    添加或刪除操作的超時時間
byteCapacityBufferPercentage    20    Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity    see description    Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.
Memory Channel
事件被持久存儲在可靠的數據庫中。目前支持嵌入式的Derby數據庫。如果可恢復性非常的重要可以使用這種方式
JDBC Channel
性能會比較低下,但是即使程序出錯數據不會丟失。

參數說明:
type    –    類型,必須是“file”
checkpointDir    ~/.flume/file-channel/checkpoint    檢查點文件存放的位置
useDualCheckpoints    false    Backup the checkpoint. If this is set to true, backupCheckpointDir must be set
backupCheckpointDir    –    The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory
dataDirs    ~/.flume/file-channel/data    逗號分隔的目錄列表,用以存放日志文件。使用單獨的磁盤上的多個目錄可以提高文件通道效率。
transactionCapacity    10000    The maximum size of transaction supported by the channel
checkpointInterval    30000    Amount of time (in millis) between checkpoints
maxFileSize    2146435071    一個日志文件的最大尺寸
minimumRequiredSpace    524288000    Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value
capacity    1000000    Maximum capacity of the channel
keep-alive    3    Amount of time (in sec) to wait for a put operation
use-log-replay-v1    false    Expert: Use old replay logic
use-fast-replay    false    Expert: Replay without using queue
checkpointOnClose    true    Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
encryption.activeKey    –    Key name used to encrypt new data
encryption.cipherProvider    –    Cipher provider type, supported types: AESCTRNOPADDING
encryption.keyProvider    –    Key provider type, supported types: JCEKSFILE
encryption.keyProvider.keyStoreFile    –    Path to the keystore file
encrpytion.keyProvider.keyStorePasswordFile    –    Path to the keystore password file
encryption.keyProvider.keys    –    List of all keys (e.g. history of the activeKey setting)
encyption.keyProvider.keys.*.passwordFile    –    Path to the optional key password file
file channel
內存溢出通道。事件被存儲在內存隊列和磁盤中。
內存隊列作為主存儲,而磁盤作為溢出內容的存儲。當內存隊列已滿時,后續的事件將被存儲在文件通道中。這個通道適用於正常操作期間適用內存通道已期實現高效吞吐,而在高峰期間適用文件通道實現高耐受性。通過降低吞吐效率提高系統可耐受性。如果Agent崩潰,則只有存儲在文件系統中的事件可以被恢復,內存中數據會丟失。此通道處於試驗階段,不建議在生產環境中使用。 

參數說明:
type    –    類型,必須是"SPILLABLEMEMORY"
memoryCapacity    10000    內存中存儲事件的最大值,如果想要禁用內存緩沖區將此值設置為0。
overflowCapacity    100000000    可以存儲在磁盤中的事件數量最大值。設置為0可以禁用磁盤存儲。 
overflowTimeout    3    The number of seconds to wait before enabling disk overflow when memory fills up.
byteCapacityBufferPercentage    20    Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
byteCapacity    see description    Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.
avgEventSize    500    Estimated average size of events, in bytes, going into the channel
<file channel properties>    see file channel    Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity.
Spillable Memory Channel

 

  sink組件

  

記錄INFO級別的日志,通常用於調試。

參數說明:
channel    –     
type    –    The component type name, needs to be logger
maxBytesToLog    16    Maximum number of bytes of the Event body to log

要求必須在 --conf 參數指定的目錄下有 log4j的配置文件log4j.properties
可以通過-Dflume.root.logger=INFO,console在命令啟動時手動指定log4j參數
Logger Sink
在本地文件系統中存儲事件。
每隔指定時長生成文件保存這段時間內收集到的日志信息。

參數說明:
channel    –     
type    –    類型,必須是"file_roll"
sink.directory    –    文件被存儲的目錄
sink.rollInterval    30    滾動文件每隔30秒(應該是每隔30秒鍾單獨切割數據到一個文件的意思)。如果設置為0,則禁止滾動,從而導致所有數據被寫入到一個文件中。
sink.serializer    TEXT    Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
batchSize    100
File Roll Sink
修改內容:
a1.sources.r1.type  =  http                            #內置類型
a1.sources.r1.port = 22222                            #設置監測目錄

a1.sinks.k1.type  =  file_roll                                #文件落地
a1.sinks.k1.sink.directory = /usr/local/src/flume/data        #存放目錄
配置文件flume-roll-sink.properties
配置第一行,注釋第二行,啟用console。默認是注釋第一行,開啟第二行。
curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://0.0.0.0:22222
執行結果
[root@localhost data]# pwd
/usr/local/src/flume/data                                #數據所在目錄
 [root@localhost data]# ll
total 4
-rw-r--r--. 1 root root  0 Nov  9 16:21 1510273266537-1
-rw-r--r--. 1 root root 22 Nov  9 16:21 1510273266537-2
-rw-r--r--. 1 root root  0 Nov  9 16:22 1510273266537-3
-rw-r--r--. 1 root root  0 Nov  9 16:22 1510273266537-4
-rw-r--r--. 1 root root  0 Nov  9 16:23 1510273266537-5
-rw-r--r--. 1 root root  0 Nov  9 16:23 1510273266537-6

[root@localhost data]# tail 1510273266537-2      #數據已經寫入
hello file-roll flume
 [root@localhost data]# tail 1510273266537-6 #即使沒有數據也會產生文件
注意:默認每隔30秒產生一個日志文件,但時間不夠精准
模擬http請求

 

  

  Avro Sink

  是實現多級流動 扇出流(1到多) 扇入流(多到1) 的基礎。

 

channel    –     
type    –    avro.
hostname    –    The hostname or IP address to bind to.
port    –    The port # to listen on.

batch-size    100    number of event to batch together for send.
connect-timeout    20000    Amount of time (ms) to allow for the first (handshake) request.
request-timeout    20000    Amount of time (ms) to allow for requests after the first.
我們要演示多級流動,就需要多個源,我們在安裝兩台服務器。
克隆兩台新的虛擬機 flume02、flume03
參數說明

  多級部署結構

 

修改內容:
a1.sources.r1.type  =  http
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222
#描述Sink
a1.sinks.k1.type  =  avro
a1.sinks.k1.hostname = 192.168.163.130
a1.sinks.k1.port = 22222
flume01配置文件flume-avro-sink.properties
復制文件到其它主機
[root@localhost conf]# pwd
/usr/local/src/flume/apache-flume-1.6.0-bin/conf
[root@localhost conf]# scp flume-avro-sink.properties root@192.168.163.130:/usr/local/src/flume/apache-flume-1.6.0-bin/conf/flume-avro-sink.properties 
The authenticity of host '192.168.163.130 (192.168.163.130)' can't be established.
RSA key fingerprint is 40:d6:4e:bd:3e:d0:90:3b:86:41:72:90:ec:dd:95:f9.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '192.168.163.130' (RSA) to the list of known hosts.
flume-avro-sink.properties                                                                                                            100%  477     0.5KB/s   00:00    
[root@localhost conf]# scp flume-avro-sink.properties root@192.168.163.131:/usr/local/src/flume/apache-flume-1.6.0-bin/conf/flume-avro-sink.properties 
The authenticity of host '192.168.163.131 (192.168.163.131)' can't be established.
RSA key fingerprint is 40:d6:4e:bd:3e:d0:90:3b:86:41:72:90:ec:dd:95:f9.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '192.168.163.131' (RSA) to the list of known hosts.
flume-avro-sink.properties                                                                                                            100%  477     0.5KB/s   00:00    
[root@localhost conf]#
遠程拷貝
修改內容:
a1.sources.r1.type  =  avro                            #內置類型
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222

#描述Sink
a1.sinks.k1.type  =  avro
a1.sinks.k1.hostname = 192.168.163.131
a1.sinks.k1.port = 22222
flume-avro-sink.properties
修改內容:
a1.sources.r1.type  =  avro                            #內置類型
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222

#描述Sink
a1.sinks.k1.type  =  logger
flume-avro-sink.properties
[root@localhost conf]# ../bin/flume-ng agent -c ./ -f ./flume-avro-sink.properties -n a1 -Dflume.root.logger=INFO,console
啟動各個flume服務器的Agent
在flume01節點上發送消息
curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://0.0.0.0:22222
執行結果:
2017-11-09 18:58:33,863 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xdd9a2bfc, /192.168.163.130:34945 => /192.168.163.131:22222] BOUND: /192.168.163.131:22222
2017-11-09 18:58:33,863 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xdd9a2bfc, /192.168.163.130:34945 => /192.168.163.131:22222] CONNECTED: /192.168.163.130:34945
2017-11-09 19:00:28,463 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{tester=tony} body: 68 65 6C 6C 6F 20 6D 6F 72 65 20 61 76 72 6F 20 hello more avro  }
模擬發HTTP請求
啟動節點是有先后順序,flume01要訪問192.168.163.130:22222,但flume02還沒有啟動,所以報下列錯誤。

解決辦法:依次倒着啟動各個節點,先flume03,再flume02,再flume01。下面提示綁定成功。
常見錯誤

    

    flume01服務器接收http格式數據為來源,輸出avro格式數據;flume02服務器接收avro格式數據為來源,輸出avro格式數據;flume03服務器接收avro格式數據為來源,輸出到log4j,打印結果到控制台。

 

HDFS Sink

 

    HDFS分布式海量數據的存儲和備份,

    HDFS Sink將事件寫入到HDFS中,支持創建文本文件和序列化文件,支持壓縮。

    這些文件可以分區,按照指定的時間或數據量或事件的數量為基礎。(如:多少條記錄放一個文件,如果每條日志都放一個文件,那HDFS就會產生小文件的問題,將來處理的效率太低。可以設置規則,什么時候文件發生滾動,形成新文件)。它還可以通過時間戳或者機器屬性對數據進行buckets(分桶)/partitions(分區)操作。HDFS的目錄流程可以包含將要由替換格式的轉移序列用於生成存儲事件的目錄/文件名。使用這個Sink要求hadoop必須依據安裝好,以便flume可以通過hadoop提供的jar包與HDFS進行通信。注意,此版本的hadoop必須支持sync()調用,這樣數據可以追加到尾部。

 

 

#命名Agent a1的組件
a1.sources  =  r1
a1.sinks  =  k1
a1.channels  =  c1

#描述/配置Source
a1.sources.r1.type  =  http
a1.sources.r1.port  =  22222

#描述Sink
a1.sinks.k1.type  =  hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flume/data

#描述內存Channel
a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000 
a1.channels.c1.transactionCapacity  =  100

#為Channle綁定Source和Sink
a1.sources.r1.channels  =  c1
a1.sinks.k1.channel  =  c1
配置文件flume-hdfs.properties
channel    –     
type    –    類型名稱,必須是“HDFS”
hdfs.path    –    HDFS 目錄路徑 (eg hdfs://namenode/flume/webdata/)
hdfs.fileType    SequenceFile    File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC 
默認是序列化文件,可選項:SequenceFile序列化文件/DataStream文本文件/CompressedStream 壓縮文件

hdfs.filePrefix    FlumeData    Flume在目錄下創建文件的名稱前綴
hdfs.fileSuffix    –    追加到文件的名稱后綴 (eg .avro - 注: 日期時間不會自動添加)
hdfs.inUsePrefix    –    Flume正在處理的文件所加的前綴
hdfs.inUseSuffix    .tmp    Flume正在處理的文件所加的后綴
hdfs.rollInterval    30    Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize    1024    File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount    10    Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout    0    Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize    100    number of events written to file before it is flushed to HDFS
hdfs.codeC    –    Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.maxOpenFiles    5000    Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.minBlockReplicas    –    Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
hdfs.writeFormat    –    Format for sequence file records. One of “Text” or “Writable” (the default).
hdfs.callTimeout    10000    Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring.
hdfs.threadsPoolSize    10    Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize    1    Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipal    –    Kerberos user principal for accessing secure HDFS
hdfs.kerberosKeytab    –    Kerberos keytab for accessing secure HDFS
hdfs.proxyUser          
hdfs.round    false    時間戳是否向下取整(如果是true,會影響所有基於時間的轉移序列,除了%T)
hdfs.roundValue    1    舍值的邊界值
hdfs.roundUnit    向下舍值的單位 -  second, minute , hour
hdfs.timeZone    Local Time    Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
hdfs.useLocalTimeStamp    false    Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
hdfs.closeTries    0    Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.
hdfs.retryInterval    180    Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.
serializer    TEXT    Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
參數說明

 

   復制依賴jar文件

     /usr/local/src/hadoop/hadoop-2.7.1/share/hadoop/common/lib      所有的jar復制過去

    /usr/local/src/hadoop/hadoop-2.7.1/share/hadoop/common       3jar

    /usr/local/src/hadoop/hadoop-2.7.1/share/hadoop/hdfs        目錄 hadoop-hdfs-2.7.1.jar

 

 

[root@localhost conf]# ../bin/flume-ng agent -c ./ -f ./flume-hdfs.properties -n a1 -Dflume.root.logger=INFO,console

執行結果,飛速打印結果


模擬發HTTP請求

在flume01節點上發送消息
curl -X POST -d '[{"headers":{"tester":"tony"},"body":"hello http flume"}]' http://0.0.0.0:22222
執行結果:
org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://hadoop01:9000/flume/data/FlumeData.1510560200492.tmp
啟動和模擬http請求

 

 

 

hadoop fs -put '/usr/local/src/hive/data/english.txt' /user/hive/warehouse/test.db/tb_book

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM