【原創】flume-1.3.0安裝配置以及flume啟動說明


Flume介紹

一 flume基本介紹

Apache flume是一個分布式的、可靠的和可用的用來高效收集、同濟和移動大量數據從眾多不同sources到一個集中的數據存儲庫的系統。

Flume event:flume事件的定義為:一組具有字節有效負載和可選擇的字符串屬性集的數據流。

Flume agent:flume代理是一個通過事件流從外部資源流向下一個目的地(hop)管理組件的JVM進程。

Flume source:接收從外部資源(如web server)傳送過來的事件轉變成了flume的source。外部資源通過發送讓目標flume source可以識別的特定格式的事件給flume。例如,Avro Flume source能夠被用來接收從客戶端或者其他flume agents從Avro sink獲得然后發送的事件流。當flume 資源(source)收到一個事件(event),它負責把事件存儲到一個活更多的管道(channel)中。管道(channel)是一個臨時倉庫存儲事件直至它被flume sink消耗掉。File channel(文件管道)是一個支持本地系統的channel。Sink刪除channel中的event然后把它放到外部的倉庫中,如HDFS(通過HDFS sink)或者把它向前推送到下一個flume agent的flume source流。一個給定的agent中的Source和sink異步運行,同時events階段性地存在channel里。可以用下圖指示流程。

 

 

圖1    Flume基本數據流模型

二 flume特性

Flume允許一個用戶建立多個hop流,亦即events在到達最終目的地前可以流轉多個agents。同樣地,flume允許扇入流(fan-in)和扇出(fan-out)流,上下文線路和為失敗的hops做備份線路(容錯)

可靠性

事件階段性地存在每個agent的channel里。在到達目的地后,事件才被刪除。Flume用事務處理方法來保證事件傳輸的可靠性。Sources和sinks被封裝成存儲/檢索各自的事件中或者由管道提供的事務提供的事務。這樣確保事件集可靠地在流中點對點通過。

可恢復性

事件階段性存在管道,這個管道負責管理失敗的恢復。Flume支持持久的文件管道,這個管道也被本地文件系統所支持。但是,flume同樣也有一個內存管道(memory channel),這個管道簡單地把事件存儲在內存隊列,這樣速度更快,但是事件都留在了內存管道。這樣一來,如果agent掛掉,那么事件就不能恢復了。

簡單的flume配置及運行

安裝flume很簡單只需要下載flume的tar包,目前的版本是flume-1.3.0,下載完后解壓,然后配置下環境變量,環境變量其實就是配置,可以直接執行flume-ng命令,它就在bin目錄底下。現在我們可以簡單地對配置文件進行配置,然后運行flume。配置文件可以再conf目錄底下新建一個,如test-flume.conf。

 
   

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

         這樣一個簡單的配置文件就行了,下面就是運行flume。

 
   

 

 

 

 

這樣就在agent端啟動了flume命令,通過配置文件我們可以看到我們source配置的是netcat命令,這個命令監聽客戶端的telnet指令,也就是上面講的,source獲取的數據是要安裝格式來,所以要想讓source能獲取到數據,必須在客戶端執行telnet命令傳輸數據,然后source就能收集到數據,收集完數據通過channel(本例為c1,是memory channel存儲),然后sinks一直在監聽c1里的數據,一旦發現c1有數據了,那么它會馬上按照自己定義的格式及需求從c1獲取數據。本例中,sinks類型為logger,也就是用日志的方式記錄下來。

通過上面的分析,我們接下來應該這樣做,客戶端執行telnet命令(本例用的是單節點,打開另一個console窗口執行以下命令)

 

 

 

 

 

 

 

 

 

         這時候,在剛執行agent命令的console窗口就會顯示如下內容。

 

 

 

 

 

 

 

 

 

看到這個結果我們簡單的演示也就成功了,下面我們着重看flume三個“零部件”的一些重要屬性。這三個部件分別為sources、channels和sinks。以下示例所用如下默認值。

A1.sources = r1

a1.sinks = s1

a1.channels = c1

 

Flume模塊source配置

Avro Source

Avro source 監聽avro端口,接收外部avro客戶端流。加粗的屬性是必須要指定的,配置文件如何配置參考示例即可,如source的名字為source1。

Property Name

Default

Description

channels

 

type

一定要為avro,a1.sources.r1.type = avro

bind

ip地址或主機名,客戶端會把數據發到此host的avrosources

a1.sources.r1.bind = hostname/ip

port

綁定主機的端口 a1.sources.r1.port = 9998

threads

指定最大線程數量

selector.type

 

 

selector.*

 

 

interceptors

Space separated list of interceptors

interceptors.*

 

 

示例a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Exec Source

Exec source即為可執行unix命令,這個命令定義好的agent端,它會自動監控命令的文件,不需要自己在客戶端執行命令。因此要求啟動這個命令后,能不斷產生數據到標准的輸出。如果進程任意原因退出,source也就退出了,不會再產生數據。例如,cat和tail這類命令可以持續產生輸出,而date就不行。加粗為要求的屬性。

Property Name

Default

Description

channels

 

type

The component type name, needs to be exec

command

The command to execute

restartThrottle

10000

下一次重新執行命令的時間(ms)

restart

false

Whether the executed cmd should be restarted if it dies

logStdErr

false

Whether the command’s stderr should be logged

batchSize

20

同一時間讀取和發送到channel的最大行數

selector.type

replicating

replicating or multiplexing

selector.*

 

Depends on the selector.type value

interceptors

Space separated list of interceptors

interceptors.*

 

 

a1示例:                                                                              

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure,用這個命令可以模擬0.9.0的tailSource
a1.sources.r1.channels = c1

Spooling Directory Source

This source lets you ingest data by dropping files in a spooling directory on disk. Unlike other asynchronous sources, this source avoids data loss even if Flume is restarted or fails.Flume will watch the directory for new files and read then ingest them as they appear. After a given file has been fully read into the channel, it is renamed to indicate completion. This allows a cleaner process to remove completed files periodically. Note, however, that events may be duplicated if failures occur, consistent with the semantics offered by other Flume components. The channel optionally inserts the full path of the origin file into a header field of each event. This source buffers file data in memory during reads; be sure to set thebufferMaxLineLength option to a number greater than the longest line you expect to see in your input data.

Warning

 

This channel expects that only immutable, uniquely named files are dropped in the spooling directory. If duplicate names are used, or files are modified while being read, the source will fail with an error message. For some use cases this may require adding unique identifiers (such as a timestamp) to log file names when they are copied into the spooling directory.

Property Name

Default

Description

channels

 

type

The component type name, needs to be spooldir

spoolDir

The directory where log files will be spooled

fileSuffix

.COMPLETED

Suffix to append to completely ingested files

fileHeader

false

Whether to add a header storing the filename

fileHeaderKey

file

Header key to use when appending filename to header

batchSize

10

Granularity at which to batch transfer to the channel

bufferMaxLines

100

Maximum number of lines the commit buffer can hold

bufferMaxLineLength

5000

Maximum length of a line in the commit buffer

selector.type

replicating

replicating or multiplexing

selector.*

 

Depends on the selector.type value

interceptors

Space separated list of interceptors

interceptors.*

 

 

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/log/apache/flumeSpool
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1

NetCat Source

A netcat-like source that listens on a given port and turns each line of text into an event. Acts like nc -k -l [host] [port]. In other words, it opens a specified port and listens for data. The expectation is that the supplied data is newline separated text. Each line of text is turned into a Flume event and sent via the connected channel.

Required properties are in bold.

Property Name

Default

Description

channels

 

type

The component type name, needs to be netcat

bind

Host name or IP address to bind to

port

Port # to bind to

max-line-length

512

Max line length per event body (in bytes)

ack-every-event

true

Respond with an “OK” for every event received

selector.type

replicating

replicating or multiplexing

selector.*

 

Depends on the selector.type value

interceptors

Space separated list of interceptors

interceptors.*

 

 

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

Sequence Generator Source

A simple sequence generator that continuously generates events with a counter that starts from 0 and increments by 1. Useful mainly for testing. Required properties are in bold.

Property Name

Default

Description

channels

 

type

The component type name, needs to be seq

selector.type

 

replicating or multiplexing

selector.*

replicating

Depends on the selector.type value

interceptors

Space separated list of interceptors

interceptors.*

 

 

batchSize

1

 

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

 

Flume模塊channel配置

Flume Channels

Channels are the repositories where the events are staged on a agent. Source adds the events and Sink removes it.

Memory Channel

The events are stored in a an in-memory queue with configurable max size. It’s ideal for flow that needs higher throughput and prepared to lose the staged data in the event of a agent failures. Required properties are in bold.

Property Name

Default

Description

type

The component type name, needs to be memory

capacity

100

The max number of events stored in the channel

transactionCapacity

100

The max number of events stored in the channel per transaction

keep-alive

3

Timeout in seconds for adding or removing an event

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

 

JDBC Channel

The events are stored in a persistent storage that’s backed by a database. The JDBC channel currently supports embedded Derby. This is a durable channel that’s ideal for the flows where recoverability is important. Required properties are in bold.

Property Name

Default

Description

type

The component type name, needs to be jdbc

db.type

DERBY

Database vendor, needs to be DERBY.

driver.class

org.apache.derby.jdbc.EmbeddedDriver

Class for vendor’s JDBC driver

driver.url

(constructed from other properties)

JDBC connection URL

db.username

“sa”

User id for db connection

db.password

password for db connection

connection.properties.file

JDBC Connection property file path

create.schema

true

If true, then creates db schema if not there

create.index

true

Create indexes to speed up lookups

create.foreignkey

true

 

transaction.isolation

“READ_COMMITTED”

Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ

maximum.connections

10

Max connections allowed to db

maximum.capacity

0 (unlimited)

Max number of events in the channel

sysprop.*

 

DB Vendor specific properties

sysprop.user.home

 

Home path to store embedded Derby database

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = jdbc

Recoverable Memory Channel

Warning

 

The Recoverable Memory Channel has been deprecated in favor of the FileChannel. FileChannel is durable channel and performs better than the Recoverable Memory Channel.

Required properties are in bold.

Property Name

Default

Description

type

The component type name, needs to beorg.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel

wal.dataDir

${user.home}/.flume/recoverable-memory-channel

 

wal.rollSize

(0x04000000)

Max size (in bytes) of a single file before we roll

wal.minRetentionPeriod

300000

Min amount of time (in millis) to keep a log

wal.workerInterval

60000

How often (in millis) the background worker checks for old logs

wal.maxLogsSize

(0x20000000)

Total amt (in bytes) of logs to keep, excluding the current log

capacity

100

 

transactionCapacity

100

 

keep-alive

3

 

File Channel

Required properties are in bold.

Property Name Default

Description

 

type

The component type name, needs to be file.

checkpointDir

~/.flume/file-channel/checkpoint

The directory where checkpoint file will be stored

dataDirs

~/.flume/file-channel/data

The directory where log files will be stored

transactionCapacity

1000

The maximum size of transaction supported by the channel

checkpointInterval

30000

Amount of time (in millis) between checkpoints

maxFileSize

2146435071

Max size (in bytes) of a single log file

minimumRequiredSpace

524288000

Minimum Required free space (in bytes)

capacity

1000000

Maximum capacity of the channel

keep-alive

3

Amount of time (in sec) to wait for a put operation

write-timeout

3

Amount of time (in sec) to wait for a write operation

checkpoint-timeout

600

Expert: Amount of time (in sec) to wait for a checkpoint

use-log-replay-v1

false

Expert: Use old replay logic

use-fast-replay

false

Expert: Replay without using queue

encryption.activeKey

Key name used to encrypt new data

encryption.cipherProvider

Cipher provider type, supported types: AESCTRNOPADDING

encryption.keyProvider

Key provider type, supported types: JCEKSFILE

encryption.keyProvider.keyStoreFile

Path to the keystore file

encrpytion.keyProvider.keyStorePasswordFile

Path to the keystore password file

encryption.keyProvider.keys

List of all keys (e.g. history of the activeKey setting)

encyption.keyProvider.keys.*.passwordFile

Path to the optional key password file

Note

 

By default the File Channel uses paths for checkpoint and data directories that are within the user home as specified above. As a result if you have more than one File Channel instances active within the agent, only one will be able to lock the directories and cause the other channel initialization to fail. It is therefore necessary that you provide explicit paths to all the configured channels, preferably on different disks. Furthermore, as file channel will sync to disk after every commit, coupling it with a sink/source that batches events together may be necessary to provide good performance where multiple disks are not available for checkpoint and data directories.

Example for agent named a1:

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Encryption

Below is a few sample configurations:

Generating a key with a password seperate from the key store password:

keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
  -keysize 128 -validity 9000 -keystore test.keystore \
  -storetype jceks -storepass keyStorePassword

Generating a key with the password the same as the key store password:

keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
  -keystore src/test/resources/test.keystore -storetype jceks \
  -storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0

Let’s say you have aged key-0 out and new files should be encrypted with key-1:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

The same scenerio as above, however key-0 has it’s own password:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password

 

Flume模塊sink配置

HDFS Sink

This sink writes events into the Hadoop Distributed File System (HDFS). It currently supports creating text and sequence files. It supports compression in both file types. The files can be rolled (close current file and create a new one) periodically based on the elapsed time or size of data or number of events. It also buckets/partitions data by attributes like timestamp or machine where the event originated. The HDFS directory path may contain formatting escape sequences that will replaced by the HDFS sink to generate a directory/file name to store the events. Using this sink requires hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster. Note that a version of Hadoop that supports the sync() call is required.

The following are the escape sequences supported:

Alias

Description

%{host}

Substitute value of event header named “host”. Arbitrary header names are supported.

%t

Unix time in milliseconds

%a

locale’s short weekday name (Mon, Tue, ...)

%A

locale’s full weekday name (Monday, Tuesday, ...)

%b

locale’s short month name (Jan, Feb, ...)

%B

locale’s long month name (January, February, ...)

%c

locale’s date and time (Thu Mar 3 23:05:25 2005)

%d

day of month (01)

%D

date; same as %m/%d/%y

%H

hour (00..23)

%I

hour (01..12)

%j

day of year (001..366)

%k

hour ( 0..23)

%m

month (01..12)

%M

minute (00..59)

%p

locale’s equivalent of am or pm

%s

seconds since 1970-01-01 00:00:00 UTC

%S

second (00..59)

%y

last two digits of year (00..99)

%Y

year (2010)

%z

+hhmm numeric timezone (for example, -0400)

The file in use will have the name mangled to include ”.tmp” at the end. Once the file is closed, this extension is removed. This allows excluding partially complete files in the directory. Required properties are in bold.

Note

 

For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event. One way to add this automatically is to use the TimestampInterceptor.

Name

Default

Description

channel

 

type

The component type name, needs to be hdfs

hdfs.path

HDFS directory path (eg hdfs://namenode/flume/webdata/)

hdfs.filePrefix

FlumeData

Name prefixed to files created by Flume in hdfs directory

hdfs.fileSuffix

Suffix to append to file (eg .avro - NOTE: period is not automatically added)

hdfs.rollInterval

30

Number of seconds to wait before rolling current file (0 = never roll based on time interval)

hdfs.rollSize

1024

File size to trigger roll, in bytes (0: never roll based on file size)

hdfs.rollCount

10

Number of events written to file before it rolled (0 = never roll based on number of events)

hdfs.idleTimeout

0

Timeout after which inactive files get closed (0 = disable automatic closing of idle files)

hdfs.batchSize

100

number of events written to file before it is flushed to HDFS

hdfs.codeC

Compression codec. one of following : gzip, bzip2, lzo, snappy

hdfs.fileType

SequenceFile

File format: currently SequenceFile, DataStream or CompressedStream(1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC

hdfs.maxOpenFiles

5000

Allow only this number of open files. If this number is exceeded, the oldest file is closed.

hdfs.writeFormat

“Text” or “Writable”

hdfs.callTimeout

10000

Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. This number should be increased if many HDFS timeout operations are occurring.

hdfs.threadsPoolSize

10

Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)

hdfs.rollTimerPoolSize

1

Number of threads per HDFS sink for scheduling timed file rolling

hdfs.kerberosPrincipal

Kerberos user principal for accessing secure HDFS

hdfs.kerberosKeytab

Kerberos keytab for accessing secure HDFS

hdfs.proxyUser

 

 

hdfs.round

false

Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)

hdfs.roundValue

1

Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.

hdfs.roundUnit

second

The unit of the round down value - second, minute or hour.

hdfs.timeZone

Local Time

Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.

serializer

TEXT

Other possible options include avro_event or the fully-qualified class name of an implementation of theEventSerializer.Builder interface.

serializer.*

 

 

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

The above configuration will round down the timestamp to the last 10th minute. For example, an event with timestamp 11:54:34 AM, June 12, 2012 will cause the hdfs path to become /flume/events/2012-06-12/1150/00.

Logger Sink

Logs event at INFO level. Typically useful for testing/debugging purpose. Required properties are in bold.

Property Name

Default

Description

channel

 

type

The component type name, needs to be logger

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Avro Sink

This sink forms one half of Flume’s tiered collection support. Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair. The events are taken from the configured Channel in batches of the configured batch size. Required properties are inbold.

Property Name

Default

Description

channel

 

type

The component type name, needs to be avro.

hostname

The hostname or IP address to bind to.

port

The port # to listen on.

batch-size

100

number of event to batch together for send.

connect-timeout

20000

Amount of time (ms) to allow for the first (handshake) request.

request-timeout

20000

Amount of time (ms) to allow for requests after the first.

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

 

File Roll Sink

Stores events on the local filesystem. Required properties are in bold.

Property Name

Default

Description

channel

 

type

The component type name, needs to be file_roll.

sink.directory

The directory where files will be stored

sink.rollInterval

30

Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.

sink.serializer

TEXT

Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.

batchSize

100

 

Example for agent named a1:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

原文出處:http://www.cnblogs.com/caoyuanzhanlang

草原戰狼淘寶小店:http://xarxf.taobao.com/ 淘寶搜小矮人鞋坊,主營精致美麗時尚女鞋,為您的白雪公主挑一雙哦。

  ==========================================================================================================

  ===================================    以上分析僅代表個人觀點,歡迎指正與交流   ===================================

  ===================================    尊重勞動成果,轉載請注明出處,萬分感謝   ===================================

  ==========================================================================================================

  





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM