【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink


翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分為以下5篇:

【翻譯】Flume 1.8.0 User Guide(用戶指南)

【翻譯】Flume 1.8.0 User Guide(用戶指南) source

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors

Flume Sinks

1、HDFS Sink 

這個sink 將事件寫入Hadoop分布式文件系統(HDFS)。它目前支持創建文本和序列文件。它支持兩種文件類型的壓縮。可以根據運行時間、數據大小或事件數量定期滾動文件(關閉當前文件並創建一個新文件)。它還通過屬性(如時間戳或事件起源的機器)存儲/分區數據。HDFS目錄路徑可能包含格式化轉義序列,該序列將被HDFS sink替換,以生成用於存儲事件的目錄/文件名。使用這個sink需要安裝hadoop,這樣Flume就可以使用hadoop jar與HDFS集群通信。請注意,需要一個支持sync()調用的Hadoop版本。

以下是支持的轉義序列:

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%e day of month without padding (1)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%n month without padding (1..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)
%[localhost] Substitute the hostname of the host where the agent is running
%[IP] Substitute the IP address of the host where the agent is running
%[FQDN] Substitute the canonical hostname of the host where the agent is running

注意:轉義字符串%[localhost]、%[IP]和%[FQDN]都依賴於Java獲取主機名的能力,這在某些網絡環境中可能會失敗。 

正在使用的文件的名稱將被打亂最后是”.tmp“。一旦文件被關閉,這個擴展名將被刪除。這允許在目錄中排除部分完成的文件。必須屬性以粗體顯示。

注意,對於所有與時間相關的轉義序列,帶有鍵“timestamp”的消息頭必須存在於事件的消息頭中(除非是hdfs)。useLocalTimeStamp設置為true)。自動添加的一種方法是使用TimestampInterceptor。

 

Name Default Description
channel  
type The component type name, needs to be hdfs
hdfs.path HDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix Suffix to append to file (eg .avro - NOTE: period is not automatically added)
hdfs.inUsePrefix Prefix that is used for temporal files that flume actively writes into
hdfs.inUseSuffix .tmp Suffix that is used for temporal files that flume actively writes into
hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
hdfs.codeC Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.fileType SequenceFile

File format: currently SequenceFileDataStream or CompressedStream (1)DataStream

will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC

hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.minBlockReplicas

Specify minimum number of replicas per HDFS block. If not specified, it comes from the

default Hadoop config in the classpath.

hdfs.writeFormat Writable

Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume,

otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive.

hdfs.callTimeout 10000

Number of milliseconds allowed for HDFS operations, such as open, write, flush, close.

This number should be increased if many HDFS timeout operations are occurring.

hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipal Kerberos user principal for accessing secure HDFS
hdfs.kerberosKeytab Kerberos keytab for accessing secure HDFS
hdfs.proxyUser    
hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
hdfs.roundUnit second The unit of the round down value - secondminute or hour.
hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
hdfs.closeTries 0

Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1,

this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure),

and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until

the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open

if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.

hdfs.retryInterval 180

Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode,

so setting this too low can cause a lot of load on the name node. If set to 0 or less,

the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.

serializer TEXT

Other possible options include avro_event or the fully-qualified class name of an implementation of

theEventSerializer.Builder interface.

serializer.*    

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

上面的配置將把時間戳四舍五入到最后10分鍾。例如,時間戳為2012年6月12日上午11:54:34的事件將導致hdfs路徑變為/flume/events/2012-06-12/1150/00。

 2. hive sink

此接收器將包含分隔文本或JSON數據的事件直接匯入Hive表或分區。事件是使用Hive事務編寫的。一旦一組事件提交給Hive,它們就會立即對Hive查詢可見。可以預先創建flume要寫到的分區,也可以選擇,分區不存在的時候,由flume創建分區。來自傳入事件數據的字段映射到Hive表中的相應列。

Name Default Description
channel  
type The component type name, needs to be hive
hive.metastore Hive metastore URI (eg thrift://a.b.com:9083 )
hive.database Hive database name
hive.table Hive table name
hive.partition

Comma separate list of partition values identifying the partition to write to. May contain escape sequences.

E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’

will indicate continent=Asia,country=India,time=2014-02-26-01-21

hive.txnsPerBatchAsk 100

Hive grants a batch of transactions instead of single transactions to streaming clients like Flume.

This setting configures the number of desired transactions per Transaction Batch.

Data from all transactions in a single batch end up in a single file. Flume will write a maximum of

batchSize events in each transaction in the batch.

This setting in conjunction with batchSize provides control over the size of each file.

Note that eventually Hive will transparently compact these files into larger files.

heartBeatInterval 240

(In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring.

Set this value to 0 to disable heartbeats.

autoCreatePartitions true Flume will automatically create the necessary Hive partitions to stream to
batchSize 15000 Max number of events written to Hive in a single Hive transaction
maxOpenConnections 500 Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.
callTimeout 10000 (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort.
serializer  

Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table.

Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON

roundUnit minute The unit of the round down value - secondminute or hour.
roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time
timeZone Local Time Name of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles.
useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.

Hive sink提供以下序列化器:

JSON:處理UTF8編碼的JSON(嚴格語法)事件,不需要配置。JSON中的對象名稱直接映射到Hive表中具有相同名稱的列。在內部使用org.apache.hive.hcatalog.data.JsonSerDe ,但是它獨立於Hive表。這個序列化器需要安裝HCatalog。
DELIMITED:處理簡單的分隔文本事件。內部使用LazySimpleSerde,但獨立於Hive表的Serde。

Name Default Description
serializer.delimiter , (Type: string) The field delimiter in the incoming data. To use special characters, surround them with double quotes like “\t”
serializer.fieldnames

The mapping from input fields to columns in hive table. Specified as a comma separated list (no spaces) of

hive table columns names, identifying the input fields in order of their occurrence. To skip fields leave the

column name unspecified. Eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in input map to time,

ip and message columns in the hive table.

serializer.serdeSeparator Ctrl-A

(Type: character) Customizes the separator used by underlying serde. There can be a gain in efficiency if the

fields in serializer.fieldnames are in same order as table columns, the serializer.delimiter is same as the

serializer.serdeSeparator and number of fields in serializer.fieldnames is less than or equal to number of

table columns, as the fields in incoming event body do not need to be reordered to match order of table columns.

Use single quotes for special characters like ‘\t’. Ensure input fields do not contain this character.

NOTE: If serializer.delimiter is a single character, preferably set this to the same character

以下是支持的轉義序列:

 

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)

注意,對於所有與時間相關的轉義序列,帶有鍵“timestamp”的消息頭必須存在於事件的消息頭中(除非useLocalTimeStamp設置為true)。自動添加的一種方法是使用TimestampInterceptor。

 hive table 示例:

create table weblogs ( id int , msg string )
    partitioned by (continent string, country string, time string)
    clustered by (id) into 5 buckets
    stored as orc;

agent a1 示例:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

上面的配置將把時間戳四舍五入到最后10分鍾。例如,將時間戳標頭設置為2012年6月12日上午11:54:34,將“國家”標頭設置為“印度”的事件將計算分區(大陸=“亞洲”,國家=“印度”,時間=“2012-06-12-11-50”)。序列化器被配置為接受包含三個字段的制表符分隔的輸入,並跳過第二個字段。

 3. Logger Sink

在INFO級別記錄事件。通常用於測試/調試目的。必須屬性以粗體顯示。此sink是惟一不需要在日志原始數據部分中解釋的額外配置的異常。

 

Property Name Default Description
channel  
type The component type name, needs to be logger
maxBytesToLog 16 Maximum number of bytes of the Event body to log

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

4. Avro Sink

這個sink構成了Flume分層收集支持的一半。發送到此sink的Flume事件被轉換為Avro事件並發送到配置的主機名/端口對。事件以配置的批大小的批次從配置的Channel中獲取。必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be avro.
hostname The hostname or IP address to bind to.
port The port # to listen on.
batch-size 100 number of event to batch together for send.
connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
request-timeout 20000 Amount of time (ms) to allow for requests after the first.
reset-connection-interval none

Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop.

This will allow the sink to connect to hosts behind a hardware load-balancer when news

hosts are added without having to restart the agent.

compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
compression-level 6

The level of compression to compress event. 0 = no compression and 1-9 is compression.

The higher the number the more compression

ssl false

Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally

set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.

trust-all-certs false

If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked.

This should NOT be used in production because it makes it easier for an attacker to execute a

man-in-the-middle attack and “listen in” on the encrypted connection.

truststore

The path to a custom Java truststore file. Flume uses the certificate authority information in this file

to determine whether the remote Avro Source’s SSL authentication credentials should be trusted.

If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts”

in the Oracle JRE) will be used.

truststore-password The password for the specified truststore.
truststore-type JKS The type of the Java truststore. This can be “JKS” or other supported Java truststore type.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

5. Thrift Sink

這個接收器構成了Flume分層收集支持的一半。發送到此接收器的Flume事件被轉換為Thrift事件並發送到配置的主機名/端口對。事件以配置的批大小的批次從配置的Channel中獲取。
通過啟用kerberos身份驗證,可以將Thrift sink配置為以安全模式啟動。要在安全模式下與Thrift源通信,Thrift sink也應該在安全模式下運行。客戶機-主體和客戶機-keytab是節儉接收器用於對kerberos KDC進行身份驗證的屬性。服務器主體表示此sink配置為以安全模式連接的Thrift源的主體。必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be thrift.
hostname The hostname or IP address to bind to.
port The port # to listen on.
batch-size 100 number of event to batch together for send.
connect-timeout 20000 Amount of time (ms) to allow for the first (handshake) request.
request-timeout 20000 Amount of time (ms) to allow for requests after the first.
connection-reset-interval none

Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop.

This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.

ssl false Set to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password” and “truststore-type”
truststore

The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote

Thrift Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically

“jssecacerts” or “cacerts” in the Oracle JRE) will be used.

truststore-password The password for the specified truststore.
truststore-type JKS The type of the Java truststore. This can be “JKS” or other supported Java truststore type.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude
kerberos false

Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for

successful authentication and communication to a kerberos enabled Thrift Source.

client-principal —- The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC.
client-keytab —- The keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC.
server-principal The kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to.

 agent a1示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

6. IRC Sink

IRC sink接收來自附加channel的消息,並將這些消息轉發到配置的IRC目的地。必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be irc
hostname The hostname or IP address to connect to
port 6667 The port number of remote host to connect
nick Nick name
user User name
password User password
chan channel
name    
splitlines (boolean)
splitchars n

line separator (if you were to enter the default value into the config file,

then you would need to escape the backslash, like this: “\n”)

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume

7. File Roll Sink

在本地文件系統上存儲事件。必須屬性以粗體顯示。

 

Property Name Default Description
channel  
type The component type name, needs to be file_roll.
sink.directory The directory where files will be stored
sink.pathManager DEFAULT The PathManager implementation to use.
sink.pathManager.extension The file extension if the default PathManager is used.
sink.pathManager.prefix A character string to add to the beginning of the file name if the default PathManager is used
sink.rollInterval 30 Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
sink.serializer TEXT Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
batchSize 100  

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

8. Null Sink

丟棄從channel接收的所有事件。必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be null.
batchSize 100  

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

9. Hbase Sink

9.1 Hbase Sink

這個sink將數據寫入HBase。Hbase配置是從第一個在類路徑中的hbase-site.xml 獲取的。由配置指定的實現HbaseEventSerializer的類用於將事件轉換為HBase put 和/或 增量。然后將這些put和increments寫入HBase。這個sink提供了與HBase相同的一致性保證,HBase目前是行原子性的。如果Hbase無法寫入某些事件,sink將重播該事務中的所有事件。
HBaseSink支持編寫數據來保護HBase。要寫入安全模式的HBase,agent運行的用戶必須具有對配置為寫入的sink的表的寫入權限。可以在配置中指定用於根據KDC進行身份驗證的主體和keytab。Flume代理類路徑中的hbase-site.xml 必須將身份驗證設置為kerberos(有關如何實現這一點的詳細信息,請參閱HBase文檔)。
為了方便,兩個序列化器配有Flume。SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer))按原樣將事件體寫入HBase,並可選地增加HBase中的一列。這主要是一個示例實現。RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer)基於給定的regex分解事件體,並將每個部分寫入不同的列中。
類型是FQCN: org.apache.flume.sink.hbase.HBaseSink。
必須屬性以粗體顯示。

 

Property Name Default Description
channel  
type The component type name, needs to be hbase
table The name of the table in Hbase to write to.
columnFamily The column family in Hbase to write to.
zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize 100 Number of events to be written per txn.
coalesceIncrements false

Should the sink coalesce multiple increments to a cell per batch.

This might give better performance if there are multiple increments to a limited number of cells.

serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = “iCol”, payload column = “pCol”.
serializer.* Properties to be passed to the serializer.
kerberosPrincipal Kerberos user principal for accessing secure HBase
kerberosKeytab Kerberos keytab for accessing secure HBase

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1

9.2  AsyncHBaseSink

這個sink使用異步模型將數據寫入HBase。配置中指定的實現AsyncHbaseEventSerializer的類用於將事件轉換為HBase put和/或增量。然后將這些put和increments寫入HBase。這個sink使用Asynchbase API寫入HBase。這個接收器提供了與HBase相同的一致性保證,HBase目前是行原子性的。如果Hbase無法寫入某些事件,sink將重播該事務中的所有事件。類型是FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink。必須屬性以粗體顯示。

 

Property Name Default Description
channel  
type The component type name, needs to be asynchbase
table The name of the table in Hbase to write to.
zookeeperQuorum The quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
columnFamily The column family in Hbase to write to.
batchSize 100 Number of events to be written per txn.
coalesceIncrements false

Should the sink coalesce multiple increments to a cell per batch. This might give better performance

if there are multiple increments to a limited number of cells.

timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for all events in a transaction.
serializer

org.apache.flume.sink.hbase.

SimpleAsyncHbaseEventSerializer

 
serializer.* Properties to be passed to the serializer.

 

請注意,此接收器接受配置中的Zookeeper Quorum和父znode信息。Zookeeper Quorum和父節點配置可以在flume配置文件中指定。或者,這些配置值取自類路徑中的第一個hbase-site.xml文件。
如果配置中沒有提供這些信息,則sink將從類路徑中的第一個hbase-site.xml讀取此信息.
agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1

10 MorphlineSolrSink

這個sink從Flume事件中提取數據,對其進行轉換,並將其近乎實時地加載到Apache Solr服務器中,然后由Apache Solr服務器向最終用戶或搜索應用程序提供查詢。
這個sink非常適合將原始數據流到HDFS(通過HdfsSink)並同時提取、轉換和加載相同數據到Solr(通過MorphlineSolrSink)的用例。特別是,這個sink可以處理來自不同數據源的任意異構原始數據,並將其轉換為對搜索應用程序有用的數據模型。
ETL功能可以使用一個形態線配置文件進行定制,該文件定義了一系列轉換命令,將事件記錄從一個命令傳輸到另一個命令。
形態線可以看作是Unix管道的演化,其中數據模型被一般化以處理通用記錄流,包括任意二進制有效負載。形態線命令有點像flume攔截器。形態線可以嵌入到Hadoop組件中,比如Flume。
提供了開箱即用的命令來解析和轉換一組標准數據格式,如日志文件、Avro、CSV、文本、HTML、XML、PDF、Word、Excel等,還可以作為形態線插件添加其他數據格式的定制命令和解析器。任何類型的數據格式都可以建立索引,任何類型Solr模式的任何Solr文檔都可以生成,任何定制的ETL邏輯都可以注冊和執行。
形態線操作連續的記錄流。數據模型可以這樣描述:記錄是一組命名字段,其中每個字段都有一個或多個值的有序列表/值可以是任何Java對象。也就是說,記錄本質上是一個哈希表,其中每個哈希表條目都包含一個字符串鍵和一個作為值的Java對象列表。(實現使用了番石榴的ArrayListMultimap,這是一個ListMultimap)。注意,一個字段可以有多個值,任何兩個記錄都不需要使用公共字段名。
這個sink將Flume事件的主體填充到morphline記錄的_attachment_body字段中,並將Flume事件的頭部復制到同名的記錄字段中。然后命令可以對這些數據進行操作。
支持路由到SolrCloud集群,以提高可伸縮性。索引負載可以分散在大量的morphlinesolrsink上,以提高可伸縮性。索引負載可以跨多個morphlinesolrsink復制以獲得高可用性,例如使用Flume特性(如負載平衡接收器處理器)。MorphlineInterceptor還可以幫助實現到多個Solr集合的動態路由(例如,對於多租戶)。
您的環境所需的形態線和solr jar必須放在Apache Flume安裝的lib目錄中。
類型是FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be org.apache.flume.sink.solr.morphline.MorphlineSolrSink
morphlineFile

The relative or absolute path on the local file system to the morphline configuration file.

Example: /etc/flume-ng/conf/morphline.conf

morphlineId null Optional name used to identify a morphline if there are multiple morphlines in a morphline config file
batchSize 1000 The maximum number of events to take per flume transaction.
batchDurationMillis 1000

The maximum duration per flume transaction (ms). The transaction commits after this

duration or when batchSize is exceeded, whichever comes first.

handlerClass

org.apache.flume.sink.solr.

morphline.MorphlineHandlerImpl

The FQCN of a class implementing org.apache.flume.sink.solr.morphline.MorphlineHandler
isProductionMode false

This flag should be enabled for mission critical, large-scale online production systems that

need to make progress without downtime when unrecoverable exceptions occur.

Corrupt or malformed parser input data, parser bugs, and errors related to unknown

Solr schema fields produce unrecoverable exceptions.

recoverableExceptionClasses org.apache.solr.client.solrj.SolrServerException

Comma separated list of recoverable exceptions that tend to be transient,

in which case the corresponding task can be retried. Examples include network connection errors,

timeouts, etc. When the production mode flag is set to true,

the recoverable exceptions configured using this parameter will not be ignored and hence will lead to retries.

isIgnoringRecoverableExceptions false

This flag should be enabled, if an unrecoverable exception is accidentally misclassified as recoverable.

This enables the sink to make progress and avoid retrying an event forever.

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000

11. ElasticSearchSink

這個sink將數據寫入一個elasticsearch集群。默認情況下,事件將被寫入,以便Kibana圖形界面能夠顯示它們——就像logstash編寫它們一樣。
環境所需的elasticsearch和lucene-core jar必須放在Apache Flume安裝的lib目錄中。Elasticsearch要求客戶機JAR的主版本與服務器的主版本匹配,並且兩者運行相同的JVM小版本。如果不正確,將出現serializationexception。要選擇所需的版本,首先要確定elasticsearch的版本和目標集群正在運行的JVM版本。然后選擇一個與主版本匹配的elasticsearch客戶端庫 0.19.x客戶端可以與一個 0.19.x的集群通信;0.20.x可以和0.20通信。0.90 x和可以和0.90.x對話。一旦確定了elasticsearch版本,然后讀取pom。確定要使用的正確lucene-core JAR版本的xml文件。運行ElasticSearchSink的Flume代理還應該匹配目標集群運行到次要版本的JVM。
每天事件將被寫入一個新的索引。名稱將是-yyyy-MM-dd,其中是indexName參數。sink將在UTC午夜開始寫入一個新索引。
默認情況下,ElasticSearchLogStashEventSerializer將事件序列化為elasticsearch。可以使用序列化器參數覆蓋此行為。這個參數接受org.apache.flume.sink.elasticsearch的實現。ElasticSearchEventSerializer或org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory。不贊成實現ElasticSearchEventSerializer,支持更強大的ElasticSearchIndexRequestBuilderFactory。
類型是FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostNames Comma separated list of hostname:port, if the port is not present the default port ‘9300’ will be used
indexName flume

The name of the index which the date will be appended to. Example ‘flume’ -> ‘flume-yyyy-MM-dd’

Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header

indexType logs

The type to index the document to, defaults to ‘log’ Arbitrary header substitution is supported,

eg. %{header} replaces with value of named event header

clusterName elasticsearch Name of the ElasticSearch cluster to connect to
batchSize 100 Number of events to be written per txn.
ttl

TTL in days, when set will cause the expired documents to be deleted automatically,

if not set documents will never be automatically deleted. TTL is accepted both in the earlier form

of integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute),

h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days.

Followhttp://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information.

serializer

org.apache.flume.sink.elasticsearch.

ElasticSearchLogStashEventSerializer

The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use.

Implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred.

serializer.* Properties to be passed to the serializer.

 

注意,使用事件頭的值來動態決定存儲事件時要使用的索引名和索引類型非常方便。使用此功能時要小心,因為事件提交器現在已經控制了indexName和indexType。此外,如果使用elasticsearch REST客戶端,則事件提交器可以控制所使用的URL路徑。

 agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

12  Kite Dataset Sink

將事件寫入Kite數據集的實驗性接收器。這個接收器將反序列化每個傳入事件的主體,並將結果記錄存儲在Kite數據集中。它通過按URI加載數據集來確定目標數據集。
唯一受支持的序列化是avro,記錄模式必須在事件頭中傳遞,使用flume.avro.schema中的任何一個。使用JSON模式表示的文本或flume.avro.schema。一個可以找到模式的url (hdfs:/…支持uri)。這與Log4jAppender flume客戶機和使用反序列化器的假脫機目錄源的Avro反序列化器兼容。schemaType =LITERAL
注1:flume.avro.schema。不支持哈希頭。注2:在某些情況下,文件滾動可能會在超過滾動間隔后輕微發生。但是,這個延遲不會超過5秒。在大多數情況下,延遲是無法辨認的。

Property Name Default Description
channel  
type Must be org.apache.flume.sink.kite.DatasetSink
kite.dataset.uri URI of the dataset to open
kite.repo.uri URI of the repository to open (deprecated; use kite.dataset.uri instead)
kite.dataset.namespace Namespace of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.dataset.name Name of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.batchSize 100 Number of records to process in each batch
kite.rollInterval 30 Maximum wait time (seconds) before data files are released
kite.flushable.commitOnBatch true

If true, the Flume transaction will be commited and the writer will be

flushed on each batch of kite.batchSize records. This setting only

applies to flushable datasets. When true, it’s possible for temp files with

commited data to be left in the dataset directory. These files need to be

recovered by hand for the data to be visible to DatasetReaders.

kite.syncable.syncOnBatch true

Controls whether the sink will also sync data when committing the transaction.

This setting only applies to syncable datasets. Syncing gaurentees that data will

be written on stable storage on the remote system while flushing only gaurentees

that data has left Flume’s client buffers. When the kite.flushable.commitOnBatch 

property is set to false, this property must also be set to false.

kite.entityParser avro

Parser that turns Flume Events into Kite entities. Valid values are avro 

and the fully-qualified class name of an implementation of the EntityParser.Builder interface.

kite.failurePolicy retry

Policy that handles non-recoverable errors such as a missing Schema in the Event header.

The default value, retry, will fail the current batch and try again which matches the old behavior.

Other valid values are save, which will write the raw Event to the kite.error.dataset.uri dataset,

and the fully-qualified class name of an implementation of the FailurePolicy.Builder interface.

kite.error.dataset.uri

URI of the dataset where failed events are saved when kite.failurePolicy is set to save.

 Required when the kite.failurePolicy is set to save.

auth.kerberosPrincipal Kerberos user principal for secure authentication to HDFS
auth.kerberosKeytab Kerberos keytab location (local FS) for the principal
auth.proxyUser The effective user for HDFS actions, if different from the kerberos principal

 13.  Kafka Sink

這是一個Flume Sink實現,可以將數據發布到Kafka主題。目標之一是將Flume與Kafka集成,這樣基於pull的處理系統就可以處理來自各種Flume源的數據。目前支持Kafka 0.9.x系列發行版。
這個版本的Flume不再支持Kafka的舊版本(0.8.x)。
必需的屬性用粗體標記。

Property Name Default Description
type Must be set to org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers

List of brokers Kafka-Sink will connect to, to get the list of topic partitions

This can be a partial list of brokers, but we recommend at least two for HA.

The format is comma separated list of hostname:port

kafka.topic default-flume-topic

The topic in Kafka to which the messages will be published.

If this parameter is configured, messages will be published to this topic.

If the event header contains a “topic” field, the event will be published to that

topic overriding the topic configured here. Arbitrary header substitution is supported,

eg. %{header} is replaced with value of event header named “header”. (If using the substitution,

it is recommended to set “auto.create.topics.enable” property of Kafka broker to true.)

flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency.
kafka.producer.acks 1

How many replicas must acknowledge a message before its considered successfully written.

Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas)

Set this to -1 to avoid data loss in some cases of leader failure.

useFlumeEventFormat false

By default events are put as bytes onto the Kafka topic directly from the event body.

Set to true to store events as the Flume Avro binary format. Used in conjunction with

the same property on the KafkaSource or with the parseAsFlumeEvent property on the

Kafka Channel this will preserve any Flume headers for the producing side.

defaultPartitionId

Specifies a Kafka partition ID (integer) for all events in this channel to be sent to,

unless overriden by partitionIdHeader. By default, if this property is not set,

events will be distributed by the Kafka Producer’s partitioner - including by key 

if specified (or by a partitioner specified by kafka.partitioner.class).

partitionIdHeader

When set, the sink will take the value of the field named using the value of this property from

the event header and send the message to the specified partition of the topic.

If the value represents an invalid partition, an EventDeliveryException will be thrown.

If the header value is present then this setting overrides defaultPartitionId.

allowTopicOverride true

When set, the sink will allow a message to be produced into a topic specified by the 

topicHeaderproperty (if provided).

topicHeader topic

When set in conjunction with allowTopicOverride will produce a message

into the value of the header named using the value of this property. Care should be taken

when using in conjunction with the Kafka Source topicHeader property to avoid creating a loopback.

kafka.producer.security.protocol PLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security.

See below for additional info on secure setup.

more producer security props  

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties

that need to be set on producer.

Other Kafka Producer Properties

These properties are used to configure the Kafka Producer. Any producer property

supported by Kafka can be used. The only requirement is to prepend the property name with the

prefixkafka.producer. For example: kafka.producer.linger.ms

注意,Kafka Sink使用來自FlumeEvent頭部的主題和鍵屬性將事件發送到Kafka。如果標題中存在主題,則事件將被發送到該特定主題,覆蓋為Sink配置的主題。如果鍵存在於標題中,Kafka將使用該鍵在主題分區之間對數據進行分區。具有相同鍵的事件將被發送到相同的分區。如果鍵為null,則事件將發送到隨機分區。
Kafka接收器還為key.serializer(org.apache. Kafka .common. serialize . stringserializer)和value.serializer(org.apache. Kafka .common. serialize . bytearrayserializer)提供默認值。不建議修改這些參數。
棄用屬性:

Property Name Default Description
brokerList Use kafka.bootstrap.servers
topic default-flume-topic Use kafka.topic
batchSize 100 Use kafka.flumeBatchSize
requiredAcks 1 Use kafka.producer.acks

下面給出了Kafka sink的一個配置示例。屬性以前綴kafka開頭.kafka producer.在創建Kafka生成器時傳遞的屬性並不僅限於本例中給出的屬性。還可以在這里包含定制屬性,並通過作為方法參數傳入的Flume上下文對象在預處理器中訪問它們。

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

Security and Kafka Sink:

 

Flume和Kafka之間的通信通道支持安全認證和數據加密。對於安全身份驗證,可以使用Kafka版本0.9.0中的SASL/GSSAPI (Kerberos V5)或SSL(盡管參數名為SSL,但實際的協議是TLS實現)。

到目前為止,數據加密僅由SSL/TLS提供。

設置kafka.producer.security.protocol 符合下列任何一項值意味着:

  • SASL_PLAINTEXT - 沒有數據加密的Kerberos或明文身份驗證
  • SASL_SSL - 帶有數據加密的Kerberos或純文本身份驗證
  • SSL - 基於TLS加密,具有可選的身份驗證.

警告:啟用SSL時會導致性能下降,其程度取決於CPU類型和JVM實現。參考文獻:Kafka安全概述和用於跟蹤這個問題的jira: Kafka -2561

TLS and Kafka Sink:

 

請閱讀配置Kafka客戶端SSL中描述的步驟,以了解用於微調的其他配置設置,例如以下任何一種:安全提供程序、密碼套件、啟用的協議、信任存儲或密鑰存儲類型。

使用服務器端身份驗證和數據加密的示例配置。

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SSL
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

注意:默認情況下屬性ssl.endpoint.identification.algorithm沒有定義,因此沒有執行主機名驗證。為了啟用主機名驗證,請設置以下屬性:

a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

一旦啟用,客戶端將針對以下兩個字段之一驗證服務器的完全限定域名(FQDN):

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

如果還需要客戶端身份驗證,那么應該向Flume agent配置添加以下內容。每個Flume agent必須擁有自己的客戶端證書,這些證書必須由Kafka agent單獨或通過其簽名鏈進行信任。常見的示例是通過一個根CA對每個客戶端證書進行簽名,而這個根CA又受到Kafka代理的信任。

a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

如果密鑰存儲和密鑰使用不同的密碼保護,則使用ssl.key.password 屬性將為生產者密鑰庫提供所需的額外秘密:

a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos and Kafka Sink: 

要將Kafka sink與Kerberos保護的Kafka集群一起使用,請設置producer.security.protocol上面為生產者指出的屬性。與Kafka代理一起使用的Kerberos keytab和主體在JAAS文件的“KafkaClient”部分中指定。“客戶端”部分描述了需要時的Zookeeper連接。有關JAAS文件內容的信息,請參見Kafka文檔。可以通過flume-env.sh中的JAVA_OPTS指定這個JAAS文件的位置,也可以選擇指定系統范圍內的kerberos配置: 

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用SASL_PLAINTEXT的安全配置示例:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

使用SASL_SSL的安全配置示例: 

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

JAAS文件示例。有關其內容的參考,請參閱SASL配置的Kafka文檔中所需身份驗證機制(GSSAPI/PLAIN)的客戶端配置部分。與Kafka源或Kafka通道不同,“客戶端”部分不是必需的,除非其他連接組件需要它。另外,請確保Flume進程的操作系統用戶具有jaas和keytab文件上的讀權限。

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

14  HTTP Sink

此接收器的行為是,它將從通道獲取事件,並使用HTTP POST請求將這些事件發送到遠程服務。事件內容作為POST主體發送。

此接收器的錯誤處理行為取決於目標服務器返回的HTTP響應。sink backoff/ready狀態是可配置的,事務提交/回滾結果也是可配置的,該事件是否有助於成功的事件排放計數也是可配置的。

狀態代碼不可讀的服務器返回的任何格式錯誤的HTTP響應都將導致回退信號,並且事件不會從channel中消費。

必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be http.
endpoint The fully qualified URL endpoint to POST to
connectTimeout 5000 The socket connection timeout in milliseconds
requestTimeout 5000 The maximum request processing time in milliseconds
contentTypeHeader text/plain The HTTP Content-Type header
acceptHeader text/plain The HTTP Accept header value
defaultBackoff true Whether to backoff by default on receiving all HTTP status codes
defaultRollback true Whether to rollback by default on receiving all HTTP status codes
defaultIncrementMetrics false Whether to increment metrics by default on receiving all HTTP status codes
backoff.CODE Configures a specific backoff for an individual (i.e. 200) code or a group (i.e. 2XX) code
rollback.CODE Configures a specific rollback for an individual (i.e. 200) code or a group (i.e. 2XX) code
incrementMetrics.CODE Configures a specific metrics increment for an individual (i.e. 200) code or a group (i.e. 2XX) code

 

請注意,最特定的HTTP狀態代碼匹配用於backoff、rollback和incrementMetrics配置選項。如果2XX和200狀態碼都有配置值,那么200個HTTP代碼將使用200值,而201-299范圍內的所有其他HTTP代碼將使用2XX值。

在不向HTTP端點發出任何請求的情況下,將使用任何空事件或空事件。

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

15 Custom Sink

自定義接收器是接收器接口的自己實現。啟動Flume agent時,自定義sink的類及其依賴項必須包含在代理的類路徑中。自定義接收器的類型是它的FQCN。必須屬性以粗體顯示。

Property Name Default Description
channel  
type The component type name, needs to be your FQCN

agent a1 示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

 

翻譯自官網flume1.8用戶指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分為以下5篇:

【翻譯】Flume 1.8.0 User Guide(用戶指南)

【翻譯】Flume 1.8.0 User Guide(用戶指南) source

【翻譯】Flume 1.8.0 User Guide(用戶指南) Sink

【翻譯】Flume 1.8.0 User Guide(用戶指南) Channel

【翻譯】Flume 1.8.0 User Guide(用戶指南) Processors


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM