Flume介紹
一 flume基本介紹
Apache flume是一個分布式的、可靠的和可用的用來高效收集、同濟和移動大量數據從眾多不同sources到一個集中的數據存儲庫的系統。
Flume event:flume事件的定義為:一組具有字節有效負載和可選擇的字符串屬性集的數據流。
Flume agent:flume代理是一個通過事件流從外部資源流向下一個目的地(hop)管理組件的JVM進程。
Flume source:接收從外部資源(如web server)傳送過來的事件轉變成了flume的source。外部資源通過發送讓目標flume source可以識別的特定格式的事件給flume。例如,Avro Flume source能夠被用來接收從客戶端或者其他flume agents從Avro sink獲得然后發送的事件流。當flume 資源(source)收到一個事件(event),它負責把事件存儲到一個活更多的管道(channel)中。管道(channel)是一個臨時倉庫存儲事件直至它被flume sink消耗掉。File channel(文件管道)是一個支持本地系統的channel。Sink刪除channel中的event然后把它放到外部的倉庫中,如HDFS(通過HDFS sink)或者把它向前推送到下一個flume agent的flume source流。一個給定的agent中的Source和sink異步運行,同時events階段性地存在channel里。可以用下圖指示流程。
圖1 Flume基本數據流模型
二 flume特性
Flume允許一個用戶建立多個hop流,亦即events在到達最終目的地前可以流轉多個agents。同樣地,flume允許扇入流(fan-in)和扇出(fan-out)流,上下文線路和為失敗的hops做備份線路(容錯)
可靠性
事件階段性地存在每個agent的channel里。在到達目的地后,事件才被刪除。Flume用事務處理方法來保證事件傳輸的可靠性。Sources和sinks被封裝成存儲/檢索各自的事件中或者由管道提供的事務提供的事務。這樣確保事件集可靠地在流中點對點通過。
可恢復性
事件階段性存在管道,這個管道負責管理失敗的恢復。Flume支持持久的文件管道,這個管道也被本地文件系統所支持。但是,flume同樣也有一個內存管道(memory channel),這個管道簡單地把事件存儲在內存隊列,這樣速度更快,但是事件都留在了內存管道。這樣一來,如果agent掛掉,那么事件就不能恢復了。
三 簡單的flume配置及運行
安裝flume很簡單只需要下載flume的tar包,目前的版本是flume-1.3.0,下載完后解壓,然后配置下環境變量,環境變量其實就是配置,可以直接執行flume-ng命令,它就在bin目錄底下。現在我們可以簡單地對配置文件進行配置,然后運行flume。配置文件可以再conf目錄底下新建一個,如test-flume.conf。
這樣一個簡單的配置文件就行了,下面就是運行flume。
這樣就在agent端啟動了flume命令,通過配置文件我們可以看到我們source配置的是netcat命令,這個命令監聽客戶端的telnet指令,也就是上面講的,source獲取的數據是要安裝格式來,所以要想讓source能獲取到數據,必須在客戶端執行telnet命令傳輸數據,然后source就能收集到數據,收集完數據通過channel(本例為c1,是memory channel存儲),然后sinks一直在監聽c1里的數據,一旦發現c1有數據了,那么它會馬上按照自己定義的格式及需求從c1獲取數據。本例中,sinks類型為logger,也就是用日志的方式記錄下來。
通過上面的分析,我們接下來應該這樣做,客戶端執行telnet命令(本例用的是單節點,打開另一個console窗口執行以下命令)
這時候,在剛執行agent命令的console窗口就會顯示如下內容。
看到這個結果我們簡單的演示也就成功了,下面我們着重看flume三個“零部件”的一些重要屬性。這三個部件分別為sources、channels和sinks。以下示例所用如下默認值。
A1.sources = r1
a1.sinks = s1
a1.channels = c1
Flume模塊source配置
Avro Source
Avro source 監聽avro端口,接收外部avro客戶端流。加粗的屬性是必須要指定的,配置文件如何配置參考示例即可,如source的名字為source1。
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
一定要為avro,a1.sources.r1.type = avro |
bind |
– |
ip地址或主機名,客戶端會把數據發到此host的avrosources a1.sources.r1.bind = hostname/ip |
port |
– |
綁定主機的端口 a1.sources.r1.port = 9998 |
threads |
– |
指定最大線程數量 |
selector.type |
|
|
selector.* |
|
|
interceptors |
– |
Space separated list of interceptors |
interceptors.* |
|
|
示例a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
Exec Source
Exec source即為可執行unix命令,這個命令定義好的agent端,它會自動監控命令的文件,不需要自己在客戶端執行命令。因此要求啟動這個命令后,能不斷產生數據到標准的輸出。如果進程任意原因退出,source也就退出了,不會再產生數據。例如,cat和tail這類命令可以持續產生輸出,而date就不行。加粗為要求的屬性。
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be exec |
command |
– |
The command to execute |
restartThrottle |
10000 |
下一次重新執行命令的時間(ms) |
restart |
false |
Whether the executed cmd should be restarted if it dies |
logStdErr |
false |
Whether the command’s stderr should be logged |
batchSize |
20 |
同一時間讀取和發送到channel的最大行數 |
selector.type |
replicating |
replicating or multiplexing |
selector.* |
|
Depends on the selector.type value |
interceptors |
– |
Space separated list of interceptors |
interceptors.* |
|
|
a1示例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure,用這個命令可以模擬0.9.0的tailSource
a1.sources.r1.channels = c1
Spooling Directory Source
This source lets you ingest data by dropping files in a spooling directory on disk. Unlike other asynchronous sources, this source avoids data loss even if Flume is restarted or fails.Flume will watch the directory for new files and read then ingest them as they appear. After a given file has been fully read into the channel, it is renamed to indicate completion. This allows a cleaner process to remove completed files periodically. Note, however, that events may be duplicated if failures occur, consistent with the semantics offered by other Flume components. The channel optionally inserts the full path of the origin file into a header field of each event. This source buffers file data in memory during reads; be sure to set thebufferMaxLineLength option to a number greater than the longest line you expect to see in your input data.
Warning
This channel expects that only immutable, uniquely named files are dropped in the spooling directory. If duplicate names are used, or files are modified while being read, the source will fail with an error message. For some use cases this may require adding unique identifiers (such as a timestamp) to log file names when they are copied into the spooling directory.
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be spooldir |
spoolDir |
– |
The directory where log files will be spooled |
fileSuffix |
.COMPLETED |
Suffix to append to completely ingested files |
fileHeader |
false |
Whether to add a header storing the filename |
fileHeaderKey |
file |
Header key to use when appending filename to header |
batchSize |
10 |
Granularity at which to batch transfer to the channel |
bufferMaxLines |
100 |
Maximum number of lines the commit buffer can hold |
bufferMaxLineLength |
5000 |
Maximum length of a line in the commit buffer |
selector.type |
replicating |
replicating or multiplexing |
selector.* |
|
Depends on the selector.type value |
interceptors |
– |
Space separated list of interceptors |
interceptors.* |
|
|
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/log/apache/flumeSpool
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1
NetCat Source
A netcat-like source that listens on a given port and turns each line of text into an event. Acts like nc -k -l [host] [port]. In other words, it opens a specified port and listens for data. The expectation is that the supplied data is newline separated text. Each line of text is turned into a Flume event and sent via the connected channel.
Required properties are in bold.
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be netcat |
bind |
– |
Host name or IP address to bind to |
port |
– |
Port # to bind to |
max-line-length |
512 |
Max line length per event body (in bytes) |
ack-every-event |
true |
Respond with an “OK” for every event received |
selector.type |
replicating |
replicating or multiplexing |
selector.* |
|
Depends on the selector.type value |
interceptors |
– |
Space separated list of interceptors |
interceptors.* |
|
|
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
Sequence Generator Source
A simple sequence generator that continuously generates events with a counter that starts from 0 and increments by 1. Useful mainly for testing. Required properties are in bold.
Property Name |
Default |
Description |
channels |
– |
|
type |
– |
The component type name, needs to be seq |
selector.type |
|
replicating or multiplexing |
selector.* |
replicating |
Depends on the selector.type value |
interceptors |
– |
Space separated list of interceptors |
interceptors.* |
|
|
batchSize |
1 |
|
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
Flume模塊channel配置
Flume Channels
Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.
Memory Channel
The events are stored in a an in-memory queue with configurable max size. It’s ideal for flow that needs higher throughput and prepared to lose the staged data in the event of a agent failures. Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be memory |
capacity |
100 |
The max number of events stored in the channel |
transactionCapacity |
100 |
The max number of events stored in the channel per transaction |
keep-alive |
3 |
Timeout in seconds for adding or removing an event |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
JDBC Channel
The events are stored in a persistent storage that’s backed by a database. The JDBC channel currently supports embedded Derby. This is a durable channel that’s ideal for the flows where recoverability is important. Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to be jdbc |
db.type |
DERBY |
Database vendor, needs to be DERBY. |
driver.class |
org.apache.derby.jdbc.EmbeddedDriver |
Class for vendor’s JDBC driver |
driver.url |
(constructed from other properties) |
JDBC connection URL |
db.username |
“sa” |
User id for db connection |
db.password |
– |
password for db connection |
connection.properties.file |
– |
JDBC Connection property file path |
create.schema |
true |
If true, then creates db schema if not there |
create.index |
true |
Create indexes to speed up lookups |
create.foreignkey |
true |
|
transaction.isolation |
“READ_COMMITTED” |
Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ |
maximum.connections |
10 |
Max connections allowed to db |
maximum.capacity |
0 (unlimited) |
Max number of events in the channel |
sysprop.* |
|
DB Vendor specific properties |
sysprop.user.home |
|
Home path to store embedded Derby database |
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = jdbc
Recoverable Memory Channel
Warning
The Recoverable Memory Channel has been deprecated in favor of the FileChannel. FileChannel is durable channel and performs better than the Recoverable Memory Channel.
Required properties are in bold.
Property Name |
Default |
Description |
type |
– |
The component type name, needs to beorg.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel |
wal.dataDir |
${user.home}/.flume/recoverable-memory-channel |
|
wal.rollSize |
(0x04000000) |
Max size (in bytes) of a single file before we roll |
wal.minRetentionPeriod |
300000 |
Min amount of time (in millis) to keep a log |
wal.workerInterval |
60000 |
How often (in millis) the background worker checks for old logs |
wal.maxLogsSize |
(0x20000000) |
Total amt (in bytes) of logs to keep, excluding the current log |
capacity |
100 |
|
transactionCapacity |
100 |
|
keep-alive |
3 |
|
File Channel
Required properties are in bold.
Property Name Default |
Description |
|
type |
– |
The component type name, needs to be file. |
checkpointDir |
~/.flume/file-channel/checkpoint |
The directory where checkpoint file will be stored |
dataDirs |
~/.flume/file-channel/data |
The directory where log files will be stored |
transactionCapacity |
1000 |
The maximum size of transaction supported by the channel |
checkpointInterval |
30000 |
Amount of time (in millis) between checkpoints |
maxFileSize |
2146435071 |
Max size (in bytes) of a single log file |
minimumRequiredSpace |
524288000 |
Minimum Required free space (in bytes) |
capacity |
1000000 |
Maximum capacity of the channel |
keep-alive |
3 |
Amount of time (in sec) to wait for a put operation |
write-timeout |
3 |
Amount of time (in sec) to wait for a write operation |
checkpoint-timeout |
600 |
Expert: Amount of time (in sec) to wait for a checkpoint |
use-log-replay-v1 |
false |
Expert: Use old replay logic |
use-fast-replay |
false |
Expert: Replay without using queue |
encryption.activeKey |
– |
Key name used to encrypt new data |
encryption.cipherProvider |
– |
Cipher provider type, supported types: AESCTRNOPADDING |
encryption.keyProvider |
– |
Key provider type, supported types: JCEKSFILE |
encryption.keyProvider.keyStoreFile |
– |
Path to the keystore file |
encrpytion.keyProvider.keyStorePasswordFile |
– |
Path to the keystore password file |
encryption.keyProvider.keys |
– |
List of all keys (e.g. history of the activeKey setting) |
encyption.keyProvider.keys.*.passwordFile |
– |
Path to the optional key password file |
Note
By default the File Channel uses paths for checkpoint and data directories that are within the user home as specified above. As a result if you have more than one File Channel instances active within the agent, only one will be able to lock the directories and cause the other channel initialization to fail. It is therefore necessary that you provide explicit paths to all the configured channels, preferably on different disks. Furthermore, as file channel will sync to disk after every commit, coupling it with a sink/source that batches events together may be necessary to provide good performance where multiple disks are not available for checkpoint and data directories.
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
Encryption
Below is a few sample configurations:
Generating a key with a password seperate from the key store password:
keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
-keysize 128 -validity 9000 -keystore test.keystore \
-storetype jceks -storepass keyStorePassword
Generating a key with the password the same as the key store password:
keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
-keystore src/test/resources/test.keystore -storetype jceks \
-storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0
Let’s say you have aged key-0 out and new files should be encrypted with key-1:
a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
The same scenerio as above, however key-0 has it’s own password:
a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password
Flume模塊sink配置
HDFS Sink
This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events. It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster. Note that a version of Hadoop that supports the sync() call is required.
The following are the escape sequences supported:
Alias |
Description |
%{host} |
Substitute value of event header named “host”. Arbitrary header names are supported. |
%t |
Unix time in milliseconds |
%a |
locale’s short weekday name (Mon, Tue, ...) |
%A |
locale’s full weekday name (Monday, Tuesday, ...) |
%b |
locale’s short month name (Jan, Feb, ...) |
%B |
locale’s long month name (January, February, ...) |
%c |
locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d |
day of month (01) |
%D |
date; same as %m/%d/%y |
%H |
hour (00..23) |
%I |
hour (01..12) |
%j |
day of year (001..366) |
%k |
hour ( 0..23) |
%m |
month (01..12) |
%M |
minute (00..59) |
%p |
locale’s equivalent of am or pm |
%s |
seconds since 1970-01-01 00:00:00 UTC |
%S |
second (00..59) |
%y |
last two digits of year (00..99) |
%Y |
year (2010) |
%z |
+hhmm numeric timezone (for example, -0400) |
The file in use will have the name mangled to include ”.tmp” at the end. Once the file is closed, this extension is removed. This allows excluding partially complete files in the directory. Required properties are in bold.
Note
For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event. One way to add this automatically is to use the TimestampInterceptor.
Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be hdfs |
hdfs.path |
– |
HDFS directory path (eg hdfs://namenode/flume/webdata/) |
hdfs.filePrefix |
FlumeData |
Name prefixed to files created by Flume in hdfs directory |
hdfs.fileSuffix |
– |
Suffix to append to file (eg .avro - NOTE: period is not automatically added) |
hdfs.rollInterval |
30 |
Number of seconds to wait before rolling current file (0 = never roll based on time interval) |
hdfs.rollSize |
1024 |
File size to trigger roll, in bytes (0: never roll based on file size) |
hdfs.rollCount |
10 |
Number of events written to file before it rolled (0 = never roll based on number of events) |
hdfs.idleTimeout |
0 |
Timeout after which inactive files get closed (0 = disable automatic closing of idle files) |
hdfs.batchSize |
100 |
number of events written to file before it is flushed to HDFS |
hdfs.codeC |
– |
Compression codec. one of following : gzip, bzip2, lzo, snappy |
hdfs.fileType |
SequenceFile |
File format: currently SequenceFile, DataStream or CompressedStream(1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC |
hdfs.maxOpenFiles |
5000 |
Allow only this number of open files. If this number is exceeded, the oldest file is closed. |
hdfs.writeFormat |
– |
“Text” or “Writable” |
hdfs.callTimeout |
10000 |
Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring. |
hdfs.threadsPoolSize |
10 |
Number of threads per HDFS sink for HDFS IO ops (open, write, etc.) |
hdfs.rollTimerPoolSize |
1 |
Number of threads per HDFS sink for scheduling timed file rolling |
hdfs.kerberosPrincipal |
– |
Kerberos user principal for accessing secure HDFS |
hdfs.kerberosKeytab |
– |
Kerberos keytab for accessing secure HDFS |
hdfs.proxyUser |
|
|
hdfs.round |
false |
Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) |
hdfs.roundValue |
1 |
Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. |
hdfs.roundUnit |
second |
The unit of the round down value - second, minute or hour. |
hdfs.timeZone |
Local Time |
Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. |
serializer |
TEXT |
Other possible options include avro_event or the fully-qualified class name of an implementation of theEventSerializer.Builder interface. |
serializer.* |
|
|
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.
Logger Sink
Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties are in bold.
Property Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be logger |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
Avro Sink
This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size. Required properties are inbold.
Property Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be avro. |
hostname |
– |
The hostname or IP address to bind to. |
port |
– |
The port # to listen on. |
batch-size |
100 |
number of event to batch together for send. |
connect-timeout |
20000 |
Amount of time (ms) to allow for the first (handshake) request. |
request-timeout |
20000 |
Amount of time (ms) to allow for requests after the first. |
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
File Roll Sink
Stores events on the local filesystem. Required properties are in bold.
Property Name |
Default |
Description |
channel |
– |
|
type |
– |
The component type name, needs to be file_roll. |
sink.directory |
– |
The directory where files will be stored |
sink.rollInterval |
30 |
Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file. |
sink.serializer |
TEXT |
Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface. |
batchSize |
100 |
|
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
原文出處:http://www.cnblogs.com/caoyuanzhanlang
草原戰狼淘寶小店:http://xarxf.taobao.com/ 淘寶搜小矮人鞋坊,主營精致美麗時尚女鞋,為您的白雪公主挑一雙哦。
==========================================================================================================
=================================== 以上分析僅代表個人觀點,歡迎指正與交流 ===================================
=================================== 尊重勞動成果,轉載請注明出處,萬分感謝 ===================================
==========================================================================================================