一次flume exec source采集日志到kafka因為單條日志數據非常大同步失敗的踩坑帶來的思考


本次遇到的問題描述,日志采集同步時,當單條日志(日志文件中一行日志)超過2M大小,數據無法采集同步到kafka,分析后,共踩到如下幾個坑。
1、flume采集時,通過shell+EXEC(tail -F xxx.log 的方式) source來獲取日志時,當單條日志過大超過1M時,source端無法從日志中獲取到Event。
2、日志超過1M后,flume的kafka sink 作為生產者發送給日志給kafka失敗,kafka無法收到消息。
以下針對踩的這兩個坑做分析,flume 我使用的是1.9.0版本。 kafka使用的是2.11-2.0.0版本

問題一、flume采集時,通過shell+EXEC(tail -F  xxx.log 的方式) source來獲取日志時,當單條日志過大超過1M時,source端無法從日志中獲取到Event。flume的配置如下:

 ......
 agent.sources = seqGenSrc
 ......
 # For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = exec
#agent.sources.seqGenSrc.command = tail -F /opt/logs/test.log|grep businessCollection|awk -F '- {' '{print "{"$2}'
agent.sources.seqGenSrc.command = tail -F /opt/logs/test.log|grep businessCollection
agent.sources.seqGenSrc.shell = /bin/bash -c
agent.sources.seqGenSrc.batchSize = 1
agent.sources.seqGenSrc.batchTimeout = 90000
......

  原因:采用shell+EXEC方式的時候,flume的源碼中使用的是如下的方式來獲取日志

    private Process process = null;
	//使用這種方式來執行命令。
process = Runtime.getRuntime().exec(commandArgs);
//讀取日志
 reader = new BufferedReader(  new InputStreamReader(process.getInputStream(), charset));

  

在一行日志超過1M后,這個代碼就假死了,一直宕住,導致無法獲取到數據。

針對這個問題處理方式:
方式一:修改源碼的實現方式。(1.9.0的源碼 對應的是源碼中的flume-ng-core 項目中的org.apache.flume.source.ExecSource.java 這個類)

//process的采用如下方式獲和執行命令,就改一行代碼。增加.redirectErrorStream(true)后,輸入流就都可以獲取到,哪怕超過1M
process = new ProcessBuilder(commandArgs).redirectErrorStream(true).start();

  

  

 

修改完成后,重新打包編譯,然后將生成的jar包替換原來老的jar包。

  方式二:放棄EXECSource,使用TAILDIR Source。 使用這個source時,對應的配置如下:

 ......
 agent.sources = seqGenSrc
 ......
 # For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = TAILDIR
agent.sources.seqGenSrc.positionFile = ./taildir_position.json
agent.sources.seqGenSrc.filegroups = seqGenSrc
agent.sources.seqGenSrc.filegroups.seqGenSrc = /opt/logs/test.log
agent.sources.seqGenSrc.fileHeader = false
agent.sources.seqGenSrc.batchSize = 1
......

  建議采用TAILDIR Source 比較好,這個可以對多個日志進行監控和采集,而且日志采集時會記錄日志采集位置到positionFile 中,這樣日志采集不會重復。EXEC SOURCE在重啟采集時數據會重復采集,還需要其他的方式去避免重復采集

問題二、日志超過1M后,flume的kafka sink 作為生產者發送給日志給kafka失敗,kafka無法收到消息
原因:kafka 在默認情況下,只能接收1M大小以內的消息,在沒有做自定義設置時。所以單條消息大於1M后是無法處理的。
處理方式如下:

1)、修改kafka 服務端server.properties文件,做如下設置(修改大小限制)

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=502400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=502400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
message.max.bytes=5242880
replica.fetch.max.bytes=6291456

2)、修改producer.properties,做如下設置(修改大小限制)

# the maximum size of a request in bytes
max.request.size= 9242880

3)、java代碼中在初始化kafka 生產者時,也需要指定max.request.size= 9242880

 本文作者:張永清,轉載請注明出處:一次flume exec source采集日志到kafka因為單條日志數據非常大同步失敗的踩坑帶來的思考

        Properties properties = new Properties();
		...
		      properties.put("max.request.size", 5242880);
			  ...
			KafkaProducer<Object,Object>  kafkaProducer = new KafkaProducer<Object,Object>(properties);

  4)、消費者在消費kafka數據時,也需要注意設置消費消息的大小限制

 本文作者:張永清,轉載請注明出處:一次flume exec source采集日志到kafka因為單條日志數據非常大同步失敗的踩坑帶來的思考

            Properties properties = new Properties();
			...
            properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 6291456);		
				...
				 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);  

對於flume不了的同學,可以看flume 1.9中文版用戶指南:https://www.h3399.cn/201906/700076.html  

flume1.9 用戶指南 (中文版)

概述

Apache Flume 是一個分布式, 可靠且可用的系統, 用於有效地從許多不同的 source 收集, 聚合和移動大量日志數據到集中式數據存儲.

Apache Flume 的使用不僅限於日志數據聚合. 由於數據 source 是可定制的, 因此 Flume 可用於傳輸大量 event 數據, 包括但不限於網絡流量數據, 社交媒體生成的數據, 電子郵件消息以及幾乎任何可能的數據 source.

Apache Flume 是 Apache Software Foundation 的頂級項目.

系統要求

Java 運行時環境 - Java 1.8 或更高版本

內存 - 為 source,channel 或 sink 配置的內存

磁盤空間 - channel 或 sink 配置的磁盤空間

目錄權限 - agent 使用的目錄的讀 / 寫權限

架構

數據流模型

Flume event 被定義為具有字節有效負載和可選字符串屬性集的數據流單元. Flume agent 是一個 (JVM) 進程, 它承載 event 從外部 source 流向下一個目標 (躍點) 的組件.

Flume source 消耗由外部 source(如 Web 服務器)傳遞給它的 event . 外部 source 以目標 Flume source 識別的格式向 Flume 發送 event . 例如, Avro Flume source 可用於從 Avro 客戶端或從 Avrosink 發送 event 的流中的其他 Flume agent 接收 Avroevent . 可以使用 Thrift Flume Source 定義類似的流程, 以接收來自 Thrift Sink 或 Flume Thrift Rpc 客戶端或 Thrift 客戶端的 event , 這些客戶端使用 Flume thrift 協議生成的任何語言編寫. 當 Flume source 接收 event 時, 它將其存儲到一個或多個 channels . 該 channel 是一個被動存儲器, 可以保持 event 直到它被 Flume sink 消耗. 文件 channel 就是一個例子 - 它由本地文件系統支持. sink 從 channel 中移除 event 並將其放入外部存儲庫 (如 HDFS(通過 Flume HDFS sink)) 或將其轉發到流中下一個 Flume agent (下一跳)的 Flume source. 給定 agent 中的 source 和 sink 與 channel 中暫存的 event 異步運行.

復雜的流程

Flume 允許用戶構建多跳流, 其中 event 在到達最終目的地之前經過多個 agent . 它還允許 fan-in 和 fan-out, 上下文路由和故障跳躍的備份路由(故障轉移).

可靠性

event 在每個 agent 的 channel 中進行. 然后將 event 傳遞到流中的下一個 agent 或終端存儲庫(如 HDFS). 只有將 event 存儲在下一個 agent 的 channel 或終端存儲庫中后, 才會從 channel 中刪除這些 event . 這就是 Flume 中的單跳消息傳遞語義如何提供流的端到端可靠性.

Flume 使用事務方法來保證 event 的可靠傳遞. source 和 sink 分別在事務中封裝由 channel 提供的事務中放置或提供的 event 的存儲 / 檢索. 這可確保 event 集在流中從一個點到另一個點可靠地傳遞. 在多跳流的情況下, 來自前一跳的 sink 和來自下一跳的 source 都運行其事務以確保數據安全地存儲在下一跳的 channel 中.

可恢復性

event 在 channel 中進行, 該 channel 管理從故障中恢復. Flume 支持由本地文件系統支持的持久文件 channel. 還有一個內存 channel, 它只是將 event 存儲在內存中的隊列中, 這更快, 但是當 agent 進程死亡時仍然留在內存 channel 中的任何 event 都無法恢復.

設置

設置 agent

Flume agent 配置存儲在本地配置文件中. 這是一個遵循 Java 屬性文件格式的文本文件. 可以在同一配置文件中指定一個或多個 agent 的配置. 配置文件包括 agent 中每個 source,sink 和 channel 的屬性以及它們如何連接在一起以形成數據流.

配置單個組件

流中的每個組件 (source,sink 或 channel) 都具有特定於類型和實例化的名稱, 類型和屬性集. 例如, Avrosource 需要主機名 (或 IP 地址) 和端口號來接收數據. 內存 channel 可以具有最大隊列大小 ("容量"),HDFS sink 需要知道文件系統 URI, 創建文件的路徑, 文件輪換頻率("hdfs.rollInterval") 等. 組件的所有此類屬性需要在托管 Flume agent 的屬性文件中設置.

將各個部分連接在一起

agent 需要知道要加載哪些組件以及它們如何連接以構成流程. 這是通過列出 agent 中每個 source,sink 和 channel 的名稱, 然后為每個 sink 和 source 指定連接 channel 來完成的. 例如, agent 通過名為 file-channel 的文件 channel 將 event 從名為 avroWeb 的 Avrosource 流向 HDFS sink hdfs-cluster1. 配置文件將包含這些組件的名稱和文件 channel, 作為 avroWebsource 和 hdfs-cluster1 sink 的共享 channel.

啟動 agent

使用名為 flume-ng 的 shell 腳本啟動 agent 程序, 該腳本位於 Flume 發行版的 bin 目錄中. 您需要在命令行上指定 agent 名稱, config 目錄和配置文件:

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

現在, agent 將開始運行在給定屬性文件中配置的 source 和 sink.

一個簡單的例子

在這里, 我們給出一個示例配置文件, 描述單節點 Flume 部署. 此配置允許用戶生成 event , 然后將其記錄到控制台.

# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  

此配置定義名為 a1 的單個 agent .a1 有一個監聽端口 44444 上的數據的 source, 一個緩沖內存中 event 數據的 channel, 以及一個將 event 數據記錄到控制台的 sink. 配置文件命名各種組件, 然后描述其類型和配置參數. 給定的配置文件可能會定義幾個命名的 agent 當一個給定的 Flume 進程啟動時, 會傳遞一個標志, 告訴它要顯示哪個命名 agent.

鑒於此配置文件, 我們可以按如下方式啟動 Flume:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

請注意, 在完整部署中, 我們通常會包含一個選項: --conf=<conf-dir> . 所述 <conf-dir> 目錄將包括一個 shell 腳本 f lume-env.sh 和潛在的一個 log4j 的屬性文件. 在這個例子中, 我們傳遞一個 Java 選項來強制 Flume 登錄到控制台, 我們沒有自定義環境腳本.

從一個單獨的終端, 我們可以 telnet 端口 44444 並向 Flume 發送一個 event :

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

  

原始的 Flume 終端將在日志消息中輸出 event .

12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: {
headers:{
} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!.
}

  

恭喜 - 您已成功配置並部署了 Flume agent ! 后續部分更詳細地介紹了 agent 配置.

在配置文件中使用環境變量

Flume 能夠替換配置中的環境變量. 例如:

a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = ${
NC_PORT
}
a1.sources.r1.channels = c1

  

注意: 它目前僅適用於 values , 不適用於 keys . (IE. only on the "right side" of the = mark of the config lines.)

通過設置 propertiesImplementation = org.apache.flume.node.EnvVarResolverProperties, 可以通過 agent 程序調用上的 Java 系統屬性啟用此功能.

例如:

$ NC_PORT=44444 bin/flume-ng agent -conf conf -conf-file example.conf -name a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

請注意, 上面只是一個示例, 可以通過其他方式配置環境變量, 包括在 conf/flume-env.sh.

記錄原始數據

在許多生產環境中記錄流經攝取 pipeline 的原始數據流不是所希望的行為, 因為這可能導致泄漏敏感數據或安全相關配置 (例如密鑰) 泄漏到 Flume 日志文件. 默認情況下, Flume 不會記錄此類信息. 另一方面, 如果數據管道被破壞, Flume 將嘗試提供調試 DEBUG 的線索.

調試 event 管道問題的一種方法是設置 連接到 Logger Sink 的附加內存 channel, 它將所有 event 數據輸出到 Flume 日志. 但是, 在某些情況下, 這種方法是不夠的.

為了能夠記錄 event 和配置相關的數據, 除了 log4j 屬性外, 還必須設置一些 Java 系統屬性.

要啟用與配置相關的日志記錄, 請設置 Java 系統屬性 - Dorg.apache.flume.log.printconfig=true . 這可以在命令行上傳遞, 也可以在 flume-env.sh 中的 JAVA_OPTS 變量中設置.

要啟用數據記錄, 請 按照上述相同方式設置 Java 系統屬性 -Dorg.apache.flume.log.rawdata=true . 對於大多數組件, 還必須將 log4j 日志記錄級別設置為 DEBUG 或 TRACE, 以使特定於 event 的日志記錄顯示在 Flume 日志中.

下面是啟用配置日志記錄和原始數據日志記錄的示例, 同時還將 Log4j 日志級別設置為 DEBUG 以用於控制台輸出:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorgwdata=true

基於 Zookeeper 的配置

Flume 通過 Zookeeper 支持 agent 配置. 這是一個實驗性功能. 配置文件需要在可配置前綴下的 Zookeeper 中上傳. 配置文件存儲在 Zookeeper 節點數據中. 以下是 agent 商 a1 和 a2 的 Zookeeper 節點樹的外觀

- /flume
|- /a1 [Agent config file]
|- /a2 [Agent config file]

 

上載配置文件后, 使用以下選項啟動 agent

$ bin/flume-ng agent -conf conf -z zkhost:2181,zkhost1:2181 -p /flume -name a1 -Dflume.root.logger=INFO,console

Argument Name Default Description
z Zookeeper 連接字符串。以逗號分隔的主機名列表:port
p /flume Zookeeper 中的基本路徑,用於存儲 agent 配置

Flume 擁有完全基於插件的架構. 雖然 Flume 附帶了許多開箱即用的 source,channels,sink,serializers 等, 但許多實現都與 Flume 分開運行. 安裝第三方插件

雖然通過將自己的 jar 包添加到 flume-env.sh 文件中的 FLUME_CLASSPATH 變量中, 始終可以包含自定義 Flume 組件, 但 Flume 現在支持一個名為 plugins.d 的特殊目錄, 該目錄會自動獲取以特定格式打包的插件. 這樣可以更輕松地管理插件打包問題, 以及更簡單的調試和幾類問題的故障排除, 尤其是庫依賴性沖突.

目錄

該 plugins.d 目錄位於 $FLUME_HOME/plugins.d . 在啟動時, flume-ng 啟動腳本在 plugins.d 目錄中查找符合以下格式的插件, 並在啟動 java 時將它們包含在正確的路徑中.

插件的目錄布局

plugins.d 中的每個插件 (子目錄) 最多可以有三個子目錄:

lib - the plugin's jar(s)
libext - the plugin's dependency jar(s)
native - any required native libraries, such as .so files

 

plugins.d 目錄中的兩個插件示例:

plugins.d/
plugins.d/custom-source-1/
plugins.d/custom-source-1/lib/my-source.jar
plugins.d/custom-source-1/libext/spring-core-2.5.6.jar
plugins.d/custom-source-2/
plugins.d/custom-source-2/lib/custom.jar
plugins.d/custom-source-2/native/gettext.so

 

數據攝取

Flume 支持許多從外部來 source 攝取數據的機制.

RPC

Flume 發行版中包含的 Avro 客戶端可以使用 avro RPC 機制將給定文件發送到 Flume Avrosource:

$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10

 

 

上面的命令會將 /usr/logs/log.10 的內容發送到監聽該端口的 Flume source.

執行命令

有一個 exec source 執行給定的命令並消耗輸出. 輸出的單 "行" 即. 文本后跟回車符 ('\ r') 或換行符 ('\ n') 或兩者一起.

網絡流

Flume 支持以下機制從常用日志流類型中讀取數據, 例如:

  1. Avro
  2. Thrift
  3. Syslog
  4. Netcat

設置多 agent 流程

為了跨多個 agent 或跳數據流, 先前 agent 的 sink 和當前跳的 source 需要是 avro 類型, sink 指向 source 的主機名 (或 IP 地址) 和端口.

合並

日志收集中非常常見的情況是大量日志生成客戶端將數據發送到連接到存儲子系統的少數消費者 agent . 例如, 從數百個 Web 服務器收集的日志發送給寫入 HDFS 集群的十幾個 agent .

這可以通過使用 avrosink 配置多個第一層 agent 在 Flume 中實現, 所有這些 agent 都指向單個 agent 的 avrosource(同樣, 您可以在這種情況下使用 thriftsource/sink / 客戶端). 第二層 agent 上的此 source 將接收的 event 合並到單個信道中, 該信道由信宿器消耗到其最終目的地.

多路復用流程

Flume 支持將 event 流多路復用到一個或多個目的地. 這是通過定義可以復制或選擇性地將 event 路由到一個或多個信道的流復用器來實現的.

上面的例子顯示了來自 agent "foo" 的 source 代碼將流程擴展到三個不同的 channel. 扇出可以復制或多路復用. 在復制流的情況下, 每個 event 被發送到所有三個 channel. 對於多路復用情況, 當 event 的屬性與預配置的值匹配時, event 將被傳遞到可用 channel 的子集. 例如, 如果一個名為 "txnType" 的 event 屬性設置為 "customer", 那么它應該轉到 channel1 和 channel3, 如果它是 "vendor", 那么它應該轉到 channel2, 否則轉到 channel3. 可以在 agent 的配置文件中設置映射.

配置

如前面部分所述, Flume agent 程序配置是從類似於具有分層屬性設置的 Java 屬性文件格式的文件中讀取的.

定義流程

要在單個 agent 中定義流, 您需要通過 channel 鏈接 source 和 sink. 您需要列出給定 agent 的 source,sink 和 channel, 然后將 source 和 sink 指向 channel.source 實例可以指定多個 channel, 但 sink 實例只能指定一個 channel. 格式如下:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

  

例如, 名為 agent_foo 的 agent 正在從外部 avro 客戶端讀取數據並通過內存 channel 將其發送到 HDFS.

配置文件 weblog.config 可能如下所示:

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

 

這將使 event 從 avro-AppSrv-source 流向 hdfs-Cluster1-sink, 通過內存 channelmem-channel-1.

當使用 weblog.config 作為其配置文件啟動 agent 程序時, 它將實例化該流程.

配置單個組件

定義流后, 您需要設置每個 source,sink 和 channel 的屬性. 這是以相同的分層命名空間方式完成的, 您可以在其中設置組件類型以及特定於每個組件的屬性的其他值:

# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>
# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

 

需要為 Flume 的每個組件設置屬性 "type", 以了解它需要什么類型的對象. 每個 source,sink 和 channel 類型都有自己的一組屬性, 使其能夠按預期運行. 所有這些都需要根據需要進行設置. 在前面的示例中, 我們有一個從 avro-AppSrv-source 到 hdfs-Cluster1-sink 的流程通過內存 channelmem-channel-1. 這是一個顯示每個組件配置的示例:

agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...

 

在 agent 中添加多個流

單個 Flume agent 可以包含多個獨立流. 您可以在配置中列出多個 source,sink 和 channel. 可以鏈接這些組件以形成多個流:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

 

然后, 您可以將 source 和 sink 鏈接到 channel(用於 sink)的相應 channel(用於 source), 以設置兩個不同的流. 例如, 如果您需要在 agent 中設置兩個流, 一個從外部 avro 客戶端到外部 HDFS, 另一個從尾部輸出到 avrosink, 那么這是一個配置來執行此操作:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

 

配置多 agent 流程

要設置多層流, 您需要有 sink 指向下一跳的 avro/thrift source. 這將導致第一個 Flume agent 將 event 轉發到下一個 Flume agent . 例如, 如果您使用 avro 客戶端定期向本地 Flume agent 發送文件(每個 event 1 個文件), 則此本地 agent 可以將其轉發到另一個已安裝存儲的 agent

Weblog agent 配置:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel
# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel
# avro sink properties
agent_foo.sinks.avro-forward-sink.type = avro
agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sinks.avro-forward-sink.port = 10000
# configure other pieces
#...

 

HDFS agent 配置:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel
# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel
# avro source properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000
# configure other pieces
#...

 

在這里, 我們將 weblog agent 的 avro-forward-sink 鏈接到 hdfs agent 的 avro-collection-source. 這將導致來自外部應用程序服務器 source 的 event 最終存儲在 HDFS 中.

扇出流量

如前一節所述, Flume 支持扇出從一個 source 到多個 channel 的流量. 扇出有兩種模式 : 復制和多路復用. 在復制流程中, event 將發送到所有已配置的 channel. 在多路復用的情況下, event 僅被發送到合格 channels 的子集. 為了散開流量, 需要指定 source 的 channel 列表以及扇出它的策略. 這是通過添加可以復制或多路復用的 channel"選擇器" 來完成的. 如果它是多路復用器, 則進一步指定選擇規則. 如果您沒有指定選擇器, 那么默認情況下它會復制:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating

 

多路復用選擇具有另一組屬性以分流流. 這需要指定 event 屬性到 channel 集的映射. 選擇器檢查 event 頭中的每個已配置屬性. 如果它與指定的值匹配, 則該 event 將發送到映射到該值的所有 channel. 如果沒有匹配項, 則將 event 發送到配置為默認值的 channel 集:

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...
<Agent>.sources.<Source1>.selector.default = <Channel2>

 

映射允許為每個值重疊 channel.

以下示例具有多路復用到兩個路徑的單個流. 名為 agent_foo 的 agent 具有單個 avrosource 和兩個鏈接到兩個 sink 的 channel:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

  

選擇器檢查名為 "State" 的標頭. 如果該值為 "CA", 則將其發送到 mem-channel-1, 如果其為 "AZ", 則將其發送到文件 channel-2, 或者如果其為 "NY" 則為兩者. 如果 "狀態" 標題未設置或與三者中的任何一個都不匹配, 則它將轉到 mem-channel-1, 其被指定為 "default".

選擇器還支持可選 channel. 要為標頭指定可選 channel, 可通過以下方式使用 config 參數 "optional":

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

  

選擇器將首先嘗試寫入所需的 channel, 如果其中一個 channel 無法使用 event , 則會使事務失敗. 在所有渠道上重新嘗試交易. 一旦所有必需的 channel 消耗了 event , 則選擇器將嘗試寫入可選 channel. 任何可選 channel 使用該 event 的失敗都會被忽略而不會重試.

如果可選信道與特定報頭的所需信道之間存在重疊, 則認為該信道是必需的, 並且信道中的故障將導致重試所有必需信道集. 例如, 在上面的示例中, 對於標題 "CA",mem-channel-1 被認為是必需的 channel, 即使它被標記為必需和可選, 並且寫入此 channel 的失敗將導致該 event 在為選擇器配置的所有 channel 上重試.

請注意, 如果標頭沒有任何所需的 channel, 則該 event 將被寫入默認 channel, 並將嘗試寫入該標頭的可選 channel. 如果未指定所需的 channel, 則指定可選 channel 仍會將 event 寫入默認 channel. 如果沒有將 channel 指定為默認 channel 且沒有必需 channel, 則選擇器將嘗試將 event 寫入可選 channel. 在這種情況下, 任何失敗都會被忽略.

支持

多個 Flume 組件支持 SSL / TLS 協議, 以便安全地與其他系統通信.

Component SSL server or client
Avro Source server
Avro Sink client
Thrift Source server
Thrift Sink client
Kafka Source client
Kafka Channel client
Kafka Sink client
HTTP Source server
JMS Source client
Syslog TCP Source server
Multiport Syslog TCP Source server

SSL 兼容組件具有若干配置參數來設置 SSL, 例如啟用 SSL 標志, 密鑰庫 / 信任庫參數 (位置, 密碼, 類型) 和其他 SSL 參數(例如禁用的協議)

始終在 agent 配置文件的組件級別指定為組件啟用 SSL. 因此, 某些組件可能配置為使用 SSL, 而其他組件則不配置(即使具有相同的組件類型)

密鑰庫 / 信任庫設置可以在組件級別或全局指定.

在組件級別設置的情況下, 通過組件特定參數在 agent 配置文件中配置密鑰庫 / 信任庫. 此方法的優點是組件可以使用不同的密鑰庫(如果需要). 缺點是必須為 agent 配置文件中的每個組件復制密鑰庫參數. 組件級別設置是可選的, 但如果已定義, 則其優先級高於全局參數.

使用全局設置, 只需定義一次密鑰庫 / 信任庫參數, 並對所有組件使用相同的設置, 這意味着更少和更集中的配置.

可以通過系統屬性或通過環境變量來配置全局設置.

系統屬性 環境變量 描述
javax.net.ssl.keyStore FLUME_SSL_KEYSTORE_PATH 密鑰庫位置
javax.net.ssl.keyStorePassword FLUME_SSL_KEYSTORE_PASSWORD 密鑰庫密碼
javax.net.ssl.keyStoreType FLUME_SSL_KEYSTORE_TYPE 密鑰庫類型(默認為 JKS)
javax.net.ssl.trustStore FLUME_SSL_TRUSTSTORE_PATH 信任庫位置
javax.net.ssl.trustStorePassword FLUME_SSL_TRUSTSTORE_PASSWORD 信任庫密碼
javax.net.ssl.trustStoreType FLUME_SSL_TRUSTSTORE_TYPE 信任庫類型(默認為 JKS)
flume.ssl.include.protocols FLUME_SSL_INCLUDE_PROTOCOLS 計算啟用的協議時要包括的協議。逗號(,)分隔列表。如果提供,排除的協議將從此列表中排除。
flume.ssl.exclude.protocols FLUME_SSL_EXCLUDE_PROTOCOLS 計算啟用的協議時要排除的協議。逗號(,)分隔列表。
flume.ssl.include.cipherSuites FLUME_SSL_INCLUDE_CIPHERSUITES 在計算啟用的密碼套件時包含的密碼套件。逗號(,)分隔列表。如果提供,排除的密碼套件將被排除在此列表之外。
flume.ssl.exclude.cipherSuites FLUME_SSL_EXCLUDE_CIPHERSUITES 在計算啟用的密碼套件時要排除的密碼套件。逗號(,)分隔列表。

可以在命令行上傳遞 SSL 系統屬性, 也可以在 conf / flume-env.sh 中設置 JAVA_OPTS 環境變量(盡管使用命令行是不可取的, 因為包含密碼的命令將保存到命令歷史記錄中.)

export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks"
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password"

 

Flume 使用 JSSE(Java 安全套接字擴展)中定義的系統屬性, 因此這是設置 SSL 的標准方法. 另一方面, 在系統屬性中指定密碼意味着可以在進程列表中看到密碼. 對於不可接受的情況, 也可以在環境變量中定義參數. 在這種情況下, Flume 在內部從相應的環境變量初始化 JSSE 系統屬性.

SSL 環境變量可以在啟動 Flume 之前在 shell 環境中設置, 也可以在 conf / flume-env.sh 中設置(盡管使用命令行是不可取的, 因為包含密碼的命令將保存到命令歷史記錄中.)

export FLUME_SSL_KEYSTORE_PATH=/path/to/keystore.jks
export FLUME_SSL_KEYSTORE_PASSWORD=password

 

** 請注意:**

必須在組件級別啟用 SSL. 僅指定全局 SSL 參數不會產生任何影響.

如果在多個級別指定全局 SSL 參數, 則優先級如下(從高到低):

agent 配置中的組件參數

系統屬性

環境變量

如果為組件啟用了 SSL, 但未以上述任何方式指定 SSL 參數, 則

在密鑰庫的情況下: 配置錯誤

在 truststores 的情況下: 將使用默認信任庫(Oracle JDK 中的 jssecacerts / cacerts)

在所有情況下, 可信任密碼都是可選的. 如果未指定, 則在 JDK 打開信任庫時, 不會對信任庫執行完整性檢查.

source 和接收批量大小和 channel 事務容量

source 和 sink 可以具有批量大小參數, 該參數確定它們在一個批次中處理的最大 event 數. 這發生在具有稱為事務容量的上限的 channel 事務中. 批量大小必須小於渠道的交易容量. 有一個明確的檢查, 以防止不兼容的設置. 只要讀取配置, 就會進行此檢查.

  1. Flume Source
  2. Avro Source

監聽 Avro 端口並從外部 Avro 客戶端流接收 event . 當與另一個(上一跳)Flume agent 上的內置 Avro Sink 配對時, 它可以創建分層集合拓撲. 必需屬性以粗體顯示

屬性名稱 默認 描述
channels -  
type - 組件類型名稱,需要是 avro
bind - 要偵聽的主機名或 IP 地址
port - 要綁定的端口號
threads - 生成的最大工作線程數
selector.type    
selector.*    
interceptors - 以空格分隔的攔截器列表
interceptors.*    
compression-type none 這可以是 “none” 或“deflate”。壓縮類型必須與匹配 AvroSource 的壓縮類型匹配
SSL false 將其設置為 true 以啟用 SSL 加密。如果啟用了 SSL,則還必須通過組件級參數(請參閱下文)或全局 SSL 參數(請參閱 SSL / TLS 支持部分)指定 “密鑰庫” 和“密鑰庫密碼” 。
keysore - 這是 Java 密鑰庫文件的路徑。如果未在此處指定,則將使用全局密鑰庫(如果已定義,則配置錯誤)。
keystore-password   - Java 密鑰庫的密碼。如果未在此處指定,則將使用全局密鑰庫密碼(如果已定義,則配置錯誤)。
keystore-type JKS Java 密鑰庫的類型。這可以是 “JKS” 或“PKCS12”。如果未在此處指定,則將使用全局密鑰庫類型(如果已定義,則默認為 JKS)。
exclude-protocols   SSLv3 要排除的以空格分隔的 SSL / TLS 協議列表。除指定的協議外,將始終排除 SSLv3。
include-protocols - 要包含的以空格分隔的 SSL / TLS 協議列表。啟用的協議將是包含的協議,沒有排除的協議。如果包含協議為空,則它包括每個支持的協議。
exclude-cipher-suites - 要排除的以空格分隔的密碼套件列表。
include-cipher-suites - 以空格分隔的密碼套件列表。啟用的密碼套件將是包含的密碼套件,不包括排除的密碼套件。如果 included-cipher-suites 為空,則包含每個支持的密碼套件。
ipFilter false  將此設置為 true 以啟用 ipFiltering for netty
ipFilterRules - 使用此配置定義 N netty ipFilter 模式規則。

agent 名為 a1 的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

  

ipFilterRules 的示例

ipFilterRules 定義由逗號分隔的 N 個 netty ipFilters 模式規則必須采用此格式.

<'allow' or deny>:<'ip' or 'name' for computer name>:<pattern> or allow/deny:ip/name:pattern
example: ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*

  

請注意, 匹配的第一個規則將適用, 如下例所示, 來自 localhost 上的客戶端

這將允許 localhost 上的客戶端拒絕來自任何其他 ip 的客戶端 "allow:name:localhost,deny:ip: 這將拒絕 localhost 上的客戶端允許來自任何其他 ip 的客戶端"deny:name:localhost,allow:ip:

Thrift Source

偵聽 Thrift 端口並從外部 Thrift 客戶端流接收 event . 當與另一個(上一跳)Flume agent 上的內置 ThriftSink 配對時, 它可以創建分層集合拓撲. 可以通過啟用 kerberos 身份驗證將 Thriftsource 配置為以安全模式啟動. agent-principal 和 agent-keytab 是 Thriftsource 用於向 kerberos KDC 進行身份驗證的屬性. 必需屬性以粗體顯示

屬性名稱 默認 描述
channels -  
type - 組件類型名稱,需要節儉
bind - 要偵聽的主機名或 IP 地址
port - 要綁定的端口號
threads - 生成的最大工作線程數
selector.type    
selector.*    
interceptors - 空格分隔的攔截器列表
interceptors.*    
SSL false  將其設置為 true 以啟用 SSL 加密。如果啟用了 SSL,則還必須通過組件級參數(請參閱下文)或全局 SSL 參數(請參閱 SSL / TLS 支持部分)指定 “密鑰庫” 和“密鑰庫密碼”。
keystore - 這是 Java 密鑰庫文件的路徑。如果未在此處指定,則將使用全局密鑰庫(如果已定義,則配置錯誤)。
keystore-password - Java 密鑰庫的密碼。如果未在此處指定,則將使用全局密鑰庫密碼(如果已定義,則配置錯誤)。
keystore-type JKS Java 密鑰庫的類型。這可以是 “JKS” 或“PKCS12”。如果未在此處指定,則將使用全局密鑰庫類型(如果已定義,則默認為 JKS)。
exclude-protocols   要排除的以空格分隔的 SSL / TLS 協議列表。除指定的協議外,將始終排除 SSLv3。
include-protocols - 要包含的以空格分隔的 SSL / TLS 協議列表。啟用的協議將是包含的協議,沒有排除的協議。如果包含協議為空,則它包括每個支持的協議。
exclude-cipher-suites - 要排除的以空格分隔的密碼套件列表。
include-cipher-suites - 以空格分隔的密碼套件列表。啟用的密碼套件將是包含的密碼套件,不包括排除的密碼套件。
kerberos   設置為 true 以啟用 kerberos 身份驗證。在 kerberos 模式下,成功進行身份驗證需要 agent-principal 和 agent-keytab。安全模式下的 Thriftsource 僅接受來自已啟用 kerberos 且已成功通過 kerberos KDC 驗證的 Thrift 客戶端的連接。
agent-principal - Thrift Source 使用的 kerberos 主體對 kerberos KDC 進行身份驗證。
 agent-keytab - Thrift Source 與 agent 主體結合使用的 keytab 位置,用於對 kerberos KDC 進行身份驗證。

agent 名為 a1 的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
Exec Source

  

Exec source 在啟動時運行給定的 Unix 命令, 並期望該進程在標准輸出上連續生成數據 (stderr 被簡單地丟棄, 除非屬性 logStdErr 設置為 true). 如果進程因任何原因退出, 則 source 也會退出並且不會生成其他數據. 這意味着 cat [named pipe] 或 tail -F [file] 等配置將產生所需的結果, 而日期 可能不會 - 前兩個命令產生數據流, 而后者產生單個 event 並退出. 必需屬性以粗體顯示

屬性名稱 默認 描述
channels -  
type - 組件類型名稱,需要是 exec
command - 要執行的命令
shell - 用於運行命令的 shell 調用。例如 /bin/sh -c 。僅適用於依賴 shell 功能的命令,如通配符,后退標記,管道等。
restartThrottle 10000 嘗試重新啟動之前等待的時間(以毫秒為單位)
restart false 是否應該重新執行已執行的 cmd
logStdErr false  是否應記錄命令的 stderr
BATCHSIZE 20 一次讀取和發送到 channel 的最大行數
batchTimeout 3000 在向下游推送數據之前,如果未達到緩沖區大小,則等待的時間(以毫秒為單位)
selector.type replication 復制或多路復用
selector.*   取決於 selector.type 值
interceptors - 以空格分隔的攔截器列表
interceptors.*    

警告

Exec Source 和其他異步 source 的問題在於, 如果無法將 event 放入 Channel 中, 則 source 無法保證客戶端知道它. 在這種情況下, 數據將丟失. 例如, 最常請求的功能之一是 tail -F [file] 類似用例, 其中應用程序寫入磁盤上的日志文件, Flume 將文件作為尾部發送, 將每一行作為 event 發送. 雖然這是可能的, 但是有一個明顯的問題; 如果 channel 填滿並且 Flume 無法發送 event , 會發生什么?由於某種原因, Flume 無法向編寫日志文件的應用程序指示它需要保留日志或 event 尚未發送. 如果這沒有意義, 您只需要知道: 當使用 Exec Source 等單向異步接口時, 您的應用程序永遠無法保證已收到數據!

作為此警告的延伸 - 並且完全清楚 - 使用此 source 時, event 傳遞絕對沒有保證.

為了獲得更強的可靠性保證, 請考慮 Spooling Directory Source,Taildir Source 或通過 SDK 直接與 Flume 集成.

agent 名為 a1 的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

  

'shell'配置用於通過命令 shell(例如 Bash 或 Powershell)調用'命令'.'command'作為參數傳遞給'shell'執行. 這允許'命令'使用 shell 中的功能, 例如通配符, 后退標記, 管道, 循環, 條件等. 如果沒有'shell'配置, 將直接調用'command'.'shell'的常用值: '/bin/sh -c', '/bin/ksh -c', 'cmd /c', 'powershell -Command', etc.

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
JMS Source

  

JMS Source 從 JMS 目標 (例如隊列或主題) 讀取消息. 作為 JMS 應用程序, 它應該與任何 JMS 提供程序一起使用, 但僅使用 ActiveMQ 進行測試. JMSsource 提供可配置的批量大小, 消息選擇器, 用戶 / 傳遞和消息到水槽 event 轉換器. 請注意, 供應商提供的 JMS jar 應該包含在 Flume 類路徑中, 使用 plugins.d 目錄(首選), 命令行上的 - classpath 或 flume-env.sh 中的 FLUME_CLASSPATH 變量. 必需屬性以粗體顯示

屬性名稱 默認 描述
channels -  
type - 組件類型名稱,需要是 jms
initialContextFactory - Inital Context Factory,例如:org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory - 連接工廠應顯示為的 JNDI 名稱
providerURL - JMS 提供程序 URL
destinationName - 目的地名稱
destinationType - 目的地類型(隊列或主題)
messageSelector - 創建使用者時使用的消息選擇器
userName - 目標 / 提供商的用戶名
PASSWORDFILE - 包含目標 / 提供程序密碼的文件
BATCHSIZE 100 一批中要使用的消息數
converter.type DEFAULT 用於將消息轉換為水槽 event 的類。見下文。
converter.* - 轉換器屬性。
converter.charset UTF-8 僅限默認轉換器。在將 JMS TextMessages 轉換為字節數組時使用的字符集。
createDurableSubscription false 是否創建持久訂閱。持久訂閱只能與 destinationType 主題一起使用。如果為 true,則必須指定 “clientId” 和“durableSubscriptionName”。
clientId - JMS 客戶端標識符在創建后立即在 Connection 上設置。持久訂閱必需。
durableSubscriptionName - 用於標識持久訂閱的名稱。持久訂閱必需。

消息轉換器

JMSsource 允許可插拔轉換器, 盡管默認轉換器可能適用於大多數用途. 默認轉換器能夠將字節, 文本和對象消息轉換為 FlumeEvents. 在所有情況下, 消息中的屬性都將作為標題添加到 FlumeEvent 中.

BytesMessage:

消息的字節被復制到 FlumeEvent 的主體. 每封郵件無法轉換超過 2GB 的數據.

TextMessage 的:

消息文本轉換為字節數組並復制到 FlumeEvent 的主體. 默認轉換器默認使用 UTF-8, 但這是可配置的.

ObjectMessage:

Object 被寫入包含在 ObjectOutputStream 中的 ByteArrayOutputStream, 並將生成的數組復制到 FlumeEvent 的主體.

agent 名為 a1 的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE

  

SSL 和 JMS Source

JMS 客戶端實現通常支持通過 JSSE(Java 安全套接字擴展)定義的某些 Java 系統屬性來配置 SSL / TLS. 為 Flume 的 JVM 指定這些系統屬性, JMSsource(或更准確地說是 JMSsource 使用的 JMS 客戶端實現)可以通過 SSL 連接到 JMS 服務器(當然, 僅當 JMS 服務器也已設置為使用 SSL 時). 它應該可以與任何 JMS 提供程序一起使用, 並且已經過 ActiveMQ,IBM MQ 和 Oracle WebLogic 的測試.

以下部分僅介紹 Flume 方面所需的 SSL 配置步驟. 您可以在 Flume Wiki 上找到有關不同 JMS 提供程序的服務器端設置以及完整工作配置示例的更詳細說明.

** SSL 傳輸 / 服務器身份驗證:**

如果 JMS 服務器使用自簽名證書或其證書由不受信任的 CA(例如公司自己的 CA)簽名, 則需要設置信任庫 (包含正確的證書) 並傳遞給 Flume. 它可以通過全局 SSL 參數完成. 有關全局 SSL 設置的更多詳細信息, 請參閱 SSL / TLS 支持部分.

使用 SSL 時, 某些 JMS 提供程序需要 SSL 特定的 JNDI 初始上下文工廠和 / 或提供程序 URL 設置(例如, ActiveMQ 使用 ssl:// URL 前綴而不是 tcp://). 在這種情況下, 必須在 agent 配置文件中調整 source 屬性(initialContextFactory 和 / 或 providerURL)

客戶端證書身份驗證(雙向 SSL):

JMS Source 可以通過客戶端證書身份驗證而不是通常的用戶 / 密碼登錄來對 JMS 服務器進行身份驗證(使用 SSL 並且 JMS 服務器配置為接受此類身份驗證時).

包含用於身份驗證的 Flume 密鑰的密鑰庫需要再次通過全局 SSL 參數進行配置. 有關全局 SSL 設置的更多詳細信息, 請參閱 SSL / TLS 支持部分.

密鑰庫應該只包含一個密鑰(如果存在多個密鑰, 則將使用第一個密鑰). 密鑰密碼必須與密鑰庫密碼相同.

在客戶端證書身份驗證的情況下, 不需要在 Flume agent 配置文件中為 JMSsource 指定 userName / passwordFile 屬性.

請注意:

與其他組件不同, JMS Source 沒有組件級別的配置參數. 也沒有啟用 SSL 標志. SSL 設置由 JNDI / Provider URL 設置 (最終是 JMS 服務器設置) 以及 truststore / keystore 的存在 / 不存在控制.

Spooling Directory Source

此 source 允許您通過將要攝取的文件放入磁盤上的 "spooling" 目錄來攝取數據. 此 source 將查看新文件的指定目錄, 並將在新文件出現時解析 event .event 解析邏輯是可插入的. 在給定文件完全讀入 channel 后, 默認情況下通過重命名文件來指示完成, 或者可以刪除它或使用 trackerDir 來跟蹤已處理的文件.

與 Exec source 不同, 即使 Flume 重新啟動或被殺死, 此 source 也是可靠的並且不會遺漏數據. 作為這種可靠性的交換, 只有不可變的, 唯一命名的文件必須被放入 spooling directory. 中. Flume 試圖檢測這些問題, 如果違反則會聲明失敗:

如果在放入 spooling directory 后寫入文件, Flume 會將錯誤打印到其日志文件並停止處理.

如果稍后重復使用文件名, Flume 將在其日志文件中輸出錯誤並停止處理.

為避免上述問題, 在將文件名移動到 spooling directory 中時, 添加唯一標識符 (例如時間戳) 可能很有用.

盡管該 source 的可靠性保證, 但仍存在如果發生某些下游故障則可能重復 event 的情況. 這與其他 Flume 組件提供的保證一致.

屬性名稱 默認 描述
channels -  
type - 組件類型名稱需要是 spooldir。
spoolDir - 從中讀取文件的目錄。
fileSuffix .COMPLETED 后綴附加到完全攝取的文件
deletePolicy never 何時刪除已完成的文件:從 never 或 immediate
FileHeader false 是否添加存儲絕對路徑文件名的標頭。
fileHeaderKey file 將絕對路徑文件名附加到 event 標題時使用的標題鍵。
basenameHeader false 是否添加存儲文件基本名稱的標頭。
basenameHeaderKey basename 標題將文件的基本名稱附加到 event 標題時使用的標題。
includePattern .*$ 正則表達式,指定要包含的文件。它可以與 ignorePattern 一起使用。如果一個文件同時匹配 ignorePattern 和 includePattern 正則表達式,該文件將被忽略。
ignorePattern $ 正則表達式,指定要忽略的文件(跳過)。它可以與 includePattern 一起使用。如果一個文件同時匹配 ignorePattern 和 includePattern 正則表達式,該文件將被忽略。
trackerDir .flumespool 用於存儲與文件處理相關的元數據的目錄。如果此路徑不是絕對路徑,則將其解釋為相對於 spoolDir。
trackingPolicy rename 跟蹤策略定義如何跟蹤文件處理。它可以是 “重 rename” 或“tracker_dir”。此參數僅在 deletePolicy 為 “never” 時有效。“重 rename” - 處理完文件后,會根據 fileSuffix 參數重命名。“tracker_dir” - 不重命名文件,但會在 trackerDir 中創建新的空文件。新的跟蹤器文件名 source 自攝取的文件名和 fileSuffix。
consumeOrder oldest spooling directory  中的文件將以 oldest, youngest 和 random 的方式使用。如果是 oldest 和 youngest 的,文件的最后修改時間將用於比較文件。如果出現相同,將首先消耗具有最小字典順序的文件。在 random 的情況下,任何文件將被隨機挑選。當使用 oldest 和 youngest 時,整個目錄將被掃描以選擇 oldest/youngest 的文件,如果存在大量文件,這可能會很慢,而使用 random 可能會導致舊文件在新文件不斷進入時很晚被消耗 & nbsp;spooling directory。
pollDelay 500 輪詢新文件時使用的延遲(以毫秒為單位)。
recursiveDirectorySearch false 是否監視子目錄以查找要讀取的新文件。
maxBackoff 4000 如果 channel 已滿,則在連續嘗試寫入 channel 之間等待的最長時間(以毫秒為單位)。source 將以低退避開始,並在每次 channel 拋出 ChannelException 時以指數方式增加,直到此參數指定的值。
BATCHSIZE 100 批量傳輸到 channel 的粒度
inputCharset UTF-8 反序列化器使用的字符集,將輸入文件視為文本。
decodeErrorPolicy FAIL 當我們在輸入文件中看到不可解碼的字符時該怎么辦。FAIL:拋出異常並且無法解析文件。 REPLACE:用“替換字符”char 替換不可解析的字符,通常是 Unicode U+FFFD 。 IGNORE:刪除不可解析的字符序列。
deserializer LINE 指定用於將文件解析為 event 的反序列​​化程序。默認將每行解析為 event 。指定的類必須實現 EventDeserializer.Builder。
deserializer.*   每個 event 反序列化器不同。
bufferMaxLines - (Obselete)此選項現在被忽略。
bufferMaxLineLength 5000 (已棄用)提交緩沖區中行的最大長度。請改用 deserializer.maxLineLength。
selector.type replicating   replicating  or   multiplexing
selector.*   取決於 selector.type 值
interceptors - 以空格分隔的攔截器列表
interceptors.*    

名為 agent-1 的 agent 示例:

a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
Event Deserializers

  

以下 event 反序列化器隨 Flume 一起提供.

LINE

此解串器為每行文本輸入生成一個 event

物業名稱 默認 描述
deserializer.maxLineLength 2048 單個 event 中包含的最大字符數。如果一行超過此長度,則會被截斷,並且該行上的其余字符將出現在后續 event 中。
deserializer.outputCharset UTF-8 用於編碼放入 channel 的 event 的字符集。

AVRO

此解串器能夠讀取 Avro 容器文件, 並在文件中為每個 Avro 記錄生成一個 event . 每個 event 都使用標頭進行注釋, 該標頭指示所使用的模式. event 的主體是二進制 Avro 記錄數據, 不包括模式或容器文件元素的其余部分.

請注意, 如果假脫機目錄 source 必須重試將其中一個 event 放入 channel(例如, 因為 channel 已滿), 則它將重置並從最新的 Avro 容器文件同步點重試. 為了減少此類故障情況下的潛在 event 重復, 請在 Avro 輸入文件中更頻繁地寫入同步標記.

物業名稱 默認 描述
deserializer.schemaType HASH 如何表示模式。默認情況下,或者 & nbsp; 指定值 HASH 時,會對 Avro 架構進行哈希處理,並將哈希值存儲在 event 頭 “flume.avro.schema.hash” 中的每個 event 中。如果指定了 LITERAL,則 JSON 編碼的模式本身存儲在 event 頭 “flume.avro.schema.literal” 中的每個 event 中。與 HASH 模式相比,使用 LITERAL 模式效率相對較低。

BlobDeserializer>

此反序列化器為每個 event 讀取二進制大對象(BLOB), 通常每個文件一個 BLOB. 例如 PDF 或 JPG 文件. 請注意, 此方法不適用於非常大的對象, 因為整個 BLOB 都緩存在 RAM 中.

物業名稱 默認 描述
解串器 - 此類的 FQCN:org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
deserializer.maxBlobLength 100000000  要讀取的最大字節數和給定請求的緩沖區

Taildir Source

注意: 此 source 作為預覽功能提供. 它不適用於 Windows.

觀察指定的文件, 並在檢測到添加到每個文件的新行后幾乎實時地拖尾它們. 如果正在寫入新行, 則此 source 將重試讀取它們以等待寫入完成.

此 source 是可靠的, 即使 tail 文件旋轉也不會丟失數據. 它定期以 JSON 格式寫入給定位置文件上每個文件的最后讀取位置. 如果 Flume 由於某種原因停止或停止, 它可以從寫在現有位置文件上的位置重新開始 tail.

在其他用例中, 此 source 也可以使用給定的位置文件從每個文件的任意位置開始拖尾. 當指定路徑上沒有位置文件時, 默認情況下它將從每個文件的第一行開始拖尾.

文件將按修改時間順序使用. 將首先使用具有最早修改時間的文件.

此 source 不會重命名或刪除或對正在掛載的文件執行任何修改. 目前此 source 不支持 tail 二進制文件. 它逐行讀取文本文件.

屬性名稱 默認 描述
channels -  
type - 組件類型名稱,需要是 TAILDIR。
filegroups - 以空格分隔的文件組列表。每個文件組都指示一組要掛起的文件。
filegroups. - 文件組的絕對路徑。正則表達式(而不是文件系統模式)只能用於文件名。
positionFile ~/.flume/taildir_position.json 以 JSON 格式文件以記錄每個尾部文件的 inode,絕對路徑和最后位置。
headers. - 標題值,使用標題鍵設置。可以為一個文件組指定多個標頭。
byteOffsetHeader false 是否將 tailed line 的字節偏移量添加到名為 “byteoffset” 的標頭中。
skipToEnd false 在未寫入位置文件的文件的情況下是否跳過位置到 EOF。
idleTimeout 120000 關閉非活動文件的時間(毫秒)。如果關閉的文件附加了新行,則此 source 將自動重新打開它。
writePosInterval 3000 寫入位置文件上每個文件的最后位置的間隔時間(ms)。
BATCHSIZE 100 一次讀取和發送到 channel 的最大行數。使用默認值通常很好。
maxBatchCount Long.MAX_VALUE 控制從同一文件連續讀取的批次數。如果 source 正在拖尾多個文件,並且其中一個文件以快速寫入,則可以防止處理其他文件,因為繁忙文件將在無限循環中讀取。在這種情況下,降低此值。
backoffSleepIncrement 1000 在最后一次嘗試未找到任何新數據時,重新嘗試輪詢新數據之前的時間延遲增量。
maxBackoffSleep 5000 每次重新嘗試輪詢新數據時的最大時間延遲,當最后一次嘗試未找到任何新數據時。
cachePatternMatching true 對於包含數千個文件的目錄,列出目錄並應用文件名正則表達式模式可能非常耗時。緩存匹配文件列表可以提高性能。消耗文件的順序也將被緩存。要求文件系統以至少 1 秒的粒度跟蹤修改時間。
FileHeader false 是否添加存儲絕對路徑文件名的標頭。
fileHeaderKey file 將絕對路徑文件名附加到 event 標題時使用的標題鍵。

agent 名為 a1 的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.JSON
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
Kafka Source

  

Kafka Source 是一個 Apache Kafka 消費者, 它從 Kafka 主題中讀取消息. 如果您運行了多個 Kafka source, 則可以使用相同的使用者組配置它們, 以便每個 source 都讀取一組唯一的主題分區. 這目前支持 Kafka 服務器版本 0.10.1.0 或更高版本. 測試完成了 2.0.1, 這是發布時最高的可用版本.

屬性名稱 默認 描述
channels -  
type - 組件類型名稱,需要是 org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers - source 代碼使用的 Kafka 集群中的 agent 列表
kafka.consumer.group.id flume 獨特的消費者群體。在多個 source 或 agent 中設置相同的 ID 表示它們是同一個使用者組的一部分
kafka.topics - 以逗號分隔的主題列表,kafka 消費者將從中讀取消息。
kafka.topics.regex - 正則表達式,用於定義 source 訂閱的主題集。此屬性具有比 kafka.topics 更高的優先級,並覆蓋 kafka.topics(如果存在)。
BATCHSIZE 1000 一批中寫入 Channel 的最大消息數
batchDurationMillis 1000 將批次寫入 channel 之前的最長時間(以毫秒為單位)只要達到第一個大小和時間,就會寫入批次。
backoffSleepIncrement 1000 Kafka Topic  顯示為空時觸發的初始和增量等待時間。等待時間將減少對空 kafka Topic  的激進 ping 操作。一秒鍾是攝取用例的理想選擇,但使用攔截器的低延遲操作可能需要較低的值。
maxBackoffSleep 5000 Kafka Topic  顯示為空時觸發的最長等待時間。5 秒是攝取用例的理想選擇,但使用攔截器的低延遲操作可能需要較低的值。
useFlumeEventFormat false 默認情況下,event 將從 Kafka Topic 直接作為字節直接進入 event 主體。設置為 true 以讀取 event 作為 Flume Avro 二進制格式。與 Kafka Sink 上的相同屬性或 Kafka Channel 上的 parseAsFlumeEvent 屬性一起使用時,這將保留在生成端發送的任何 Flume 標頭。
setTopicHeader true 設置為 true 時,將檢索到的消息的主題存儲到由 topic Header 屬性定義的標頭中 & nbsp;。
topicHeader topic 如果 setTopicHeader 屬性設置為 true,則定義用於存儲接收消息主題名稱的標頭名稱。如果與 Kafka SinktopicHeader 屬性結合使用,應該小心,以避免將消息發送回循環中的同一主題。
kafka.consumer.security.protocol PLAINTEXT 如果使用某種級別的安全性寫入 Kafka,則設置為 SASL_PLAINTEXT,SASL_SSL 或 SSL。有關安全設置的其他信息,請參見下文。
more consumer security props   如果使用 SASL_PLAINTEXT,SASL_SSL 或 SSL,請參閱 Kafka 安全性以獲取需要在使用者上設置的其他屬性。
Other Kafka Consumer Properties - 這些屬性用於配置 Kafka Consumer。可以使用 Kafka 支持的任何消費者財產。唯一的要求是使用前綴 kafka.consumer 添加屬性名稱 & nbsp;。例如:kafka.consumer.auto.offset.reset

** 注意 **

Kafka Source 會覆蓋兩個 Kafka 使用者參數: source 和每次 batch 提交的時候會將 auto.commit.enable 設置為 "false", 並提交每個批處理. Kafka Source 至少保證一次消息檢索策略.

source 啟動時可以存在重復項.

Kafka Source 還提供了 key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和 value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默認值.

不建議修改這些參數.

不推薦使用的屬性

屬性名稱 默認 描述
topic - 使用 kafka.topics
groupId flume 使用 kafka.consumer.group.id
zookeeperConnect - 自 0.9.x 起不再受 kafka 消費者客戶端的支持。使用 kafka.bootstrap.servers 與 kafka 集群建立連接
migrateZookeeperOffsets true 如果找不到 Kafka 存儲的偏移量,請在 Zookeeper 中查找偏移量並將它們提交給 Kafka。這應該是支持從舊版本的 Flume 無縫 Kafka 客戶端遷移。遷移后,可以將其設置為 false,但通常不需要這樣做。如果未找到 Zookeeper 偏移量,則 Kafka 配置 kafka.consumer.auto.offset.reset 定義如何處理偏移量。  有關詳細信息,請查看 Kafka 文檔

通過逗號分隔的主題列表進行主題訂閱的示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.Bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

  

正則表達式主題訂閱的示例:

ier1.source.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.source.source1.channel = channel1

  

tier1.source.source1.kafka.Bootstrap.servers = 本地主機: 9092

tier1.source.source1.kafka .topics.regex = ^ topic [0-9] $

#默認使用 kafka.consumer.group.id = flume

** Security 和 Kafka Source:**

Flume 和 Kafka 之間的通信渠道支持安全認證和數據加密. 對於安全身份驗證, 可以使用 Kafka 0.9.0 版中的 SASL / GSSAPI(Kerberos V5)或 SSL(即使該參數名為 SSL, 實際協議是 TLS 實現)

截至目前, 數據加密僅由 SSL / TLS 提供.

將 kafka.consumer.security.protocol 設置為以下任何值意味着:

SASL_PLAINTEXT - 無數據加密的 Kerberos 或純文本身份驗證

SASL_SSL - 使用數據加密的 Kerberos 或純文本身份驗證

SSL - 基於 TLS 的加密, 帶有可選的身份驗證

** 警告 **

啟用 SSL 時性能會下降, 其大小取決於 CPU 類型和 JVM 實現. 參考: Kafka 安全概述 和跟蹤此問題的 jira: https://issues/jira/browse/KAFKA-2561

** TLS 和 Kafka Source:**

請閱讀配置 Kafka 客戶端 SSL 中描述的步驟, 以了解用於微調的其他配置設置, 例如以下任何一項: 安全提供程序, 密碼套件, 啟用的協議, 信任庫或密鑰庫類型.

配置服務器端身份驗證和數據加密的示例:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.Bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

  

在此處指定信任庫是可選的, 可以使用全局信任庫. 有關全局 SSL 設置的更多詳細信息, 請參閱 SSL / TLS 支持部分

注意: 默認情況下, 未定義屬性 ssl.endpoint.identification.algorithm, 因此不會執行主機名驗證. 要啟用主機名驗證, 請設置以下屬性

a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS

啟用后, 客戶端將根據以下兩個字段之一驗證服務器的完全限定域名(FQDN):

通用名稱(CN) https://tools.ietf.org/html/rfc6125#section-2.3 https://tools.ietf.org/html/rfc6125#section-2.3

主題備選名稱(SAN)

如果還需要客戶端身份驗證, 則還需要將以下內容添加到 Flume agent 配置中, 或者可以使用全局 SSL 設置(請參閱 SSL / TLS 支持部分). 每個 Flume agent 都必須擁有其客戶證書, Kafka 經紀人必須單獨或通過其簽名鏈來信任. 常見示例是由單個根 CA 簽署每個客戶端證書, 而后者又由 Kafka 經紀人信任.

# optional, the global keystore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>

  

** Kerberos 和 Kafka source **

要將 Kafka source 與使用 Kerberos 保護的 Kafka 群集一起使用, 請為消費者設置上面提到的 consumer.security.protocol 屬性.與 Kafka agent 一起使用的 Kerberos 密鑰表和主體在 JAAS 文件的 "KafkaClient" 部分中指定."客戶端" 部分描述了 Zookeeper 連接(如果需要). 有關 JAAS 文件內容的信息, 請參閱 Kafka doc. 可以通過 flume-env.sh 中的 JAVA_OPTS 指定此 JAAS 文件的位置以及可選的系統范圍的 kerberos 配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

 

使用 SASL_PLAINTEXT 的示例安全配置:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.Bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

  

使用 SASL_SSL 的安全配置示例:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.Bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

  

示例 JAAS 文件. 有關其內容的參考, 請參閱 SASL 配置的 Kafka 文檔中所需認證機制 (GSSAPI / PLAIN) 的客戶端配置部分. 由於 Kafka Source 也可能連接到 Zookeeper 以進行偏移遷移, 因此 "Client" 部分也添加到此示例中. 除非您需要偏移遷移, 否則不需要這樣做, 或者您需要此部分用於其他安全組件. 另外, 請確保 Flume 進程的操作系統用戶對 jaas 和 keytab 文件具有讀權限.

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
NetCat TCP source

  

類似於 netcat 的 source, 它偵聽給定端口並將每行文本轉換為 event . 像 nc -k -l [host] [port]這樣的行為. 換句話說, 它打開一個指定的端口並監聽數據. 期望是提供的數據是換行符分隔的文本. 每行文本都轉換為 Flume event , 並通過連接的 channel 發送. 必需屬性以 粗體顯示

Property Name Default Description
channels  
type The component type name, needs to be netcat
bind Host name or IP address to bind to
port Port # to bind to
max-line-length 512 Max line length per event body (in bytes)
ack-every-event true Respond with an “OK” for every event received
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
NetCat UDP source

  

根據原始的 Netcat(TCP)source, 該 source 監聽給定端口並將每行文本轉換為 event 並通過連接的 channel 發送. 像 nc -u -k -l [host] [port]這樣的行為. 必需屬性以 粗體顯示

Property Name Default Description
channels  
type The component type name, needs to be netcatudp
bind Host name or IP address to bind to
port Port # to bind to
remoteAddressHeader  
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

agent 名為 a1 的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
Sequence Generator Source

  

一個簡單的 sequence 生成器, 它使用從 0 開始的計數器連續生成 event , 遞增 1 並在 totalEvents 處停止. 無法向 channel 發送 event 時重試. 主要用於測試. 在重試期間, 它使重試消息的主體保持與以前相同, 以便在目的地重復數據刪除之后, 唯一 event 的數量應等於指定的 totalEvents. 必需屬性以粗體顯示

Property Name Default Description
channels  
type The component type name, needs to be seq
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
batchSize 1 Number of events to attempt to process per request loop.
totalEvents Long.MAX_VALUE Number of unique events sent by the source.
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
Syslog Source

  

 


讀取 syslog 數據並生成 Flume event .UDP source 將整個消息視為單個 event .TCPsource 為每個由換行符 ('n') 分隔的字符串創建一個新 event

必需屬性以粗體顯示

Syslog TCPsource

原始的, 經過驗證的 syslog TCPsource 代碼

屬性名稱 默認 描述
channels -  
type - 組件類型名稱,需要是 & nbsp;syslogtcp
host - 要綁定的主機名或 IP 地址
port - 要綁定的端口號
eventSize 2500 單個 event 行的最大大小(以字節為單位)
keepFields none 將此設置為 “all” 將保留 event 正文中的 Priority,Timestamp 和 Hostname。還允許包含間隔開的字段列表。目前,可以包括以下字段:優先級,版本,時間戳,主機名。值'true'和'false'已被棄用,采用'all'和'none'。
clientIPHeader - 如果指定,客戶端的 IP 地址將使用此處指定的標頭名稱存儲在每個 event 的標頭中。這允許攔截器和 channel 選擇器基於客戶端的 IP 地址定制路由邏輯。不要在此處使用標准 Syslog 標頭名稱(如_host_),因為在這種情況下將覆蓋 event 標頭。
clientHostnameHeader - 如果指定,則客戶端的主機名將使用此處指定的標頭名稱存儲在每個 event 的標頭中。這允許攔截器和 channel 選擇器基於客戶端的主機名自定義路由邏輯。檢索主機名可能涉及名稱服務反向查找,這可能會影響性能。不要在此處使用標准 Syslog 標頭名稱(如_host_),因為在這種情況下將覆蓋 event 標頭。
selector.type   replicating or multiplexing
selector.* replicating 取決於 selector.type 值
interceptors - 以空格分隔的攔截器列表
interceptors.*    
SSL false 將其設置為 true 以啟用 SSL 加密。如果啟用了 SSL,則還必須通過組件級參數(請參閱下文)或全局 SSL 參數(請參閱 SSL / TLS 支持部分)指定 “密鑰庫” 和“密鑰庫密碼” 。
keystore - 這是 Java 密鑰庫文件的路徑。如果未在此處指定,則將使用全局密鑰庫(如果已定義,則配置錯誤)。
keystore-password - Java 密鑰庫的密碼。如果未在此處指定,則將使用全局密鑰庫密碼(如果已定義,則配置錯誤)。
keystore-type JKS Java 密鑰庫的類型。這可以是 “JKS” 或“PKCS12”。如果未在此處指定,則將使用全局密鑰庫類型(如果已定義,則默認為 JKS)。
exclude-protocols SSLv3 要排除的以空格分隔的 SSL / TLS 協議列表。除指定的協議外,將始終排除 SSLv3。
include-protocols - 要包含的以空格分隔的 SSL / TLS 協議列表。啟用的協議將是包含的協議,沒有排除的協議。如果包含協議為空,則它包括每個支持的協議。
exclude-cipher-suites - 要排除的以空格分隔的密碼套件列表。
include-cipher-suites - 以空格分隔的密碼套件列表。啟用的密碼套件將是包含的密碼套件,不包括排除的密碼套件。如果 included-cipher-suites 為空,則包含每個支持的密碼套件。
For example, a syslog TCP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

  

 


多端口 Syslog TCP source

這是 Syslog TCPsource 的更新, 更快, 多端口版本. 請注意, 端口配置設置已替換端口. 多端口功能意味着它可以以有效的方式一次監聽多個端口. 此 source 使用 Apache Mina 庫來執行此操作. 提供對 RFC-3164 和許多常見 RFC-5424 格式消息的支持. 還提供了配置基於每個端口的字符集的功能.

屬性名稱 默認 描述
 channels -  
type - 組件類型名稱,需要是 multiport_syslogtcp
host - 要綁定的主機名或 IP 地址。
ports - 要綁定到的空格分隔列表(一個或多個)。
eventSize 2500 單個 event 行的最大大小(以字節為單位)。
keepFields none 將此設置為 “all” 將保留 event 正文中的 Priority,Timestamp 和 Hostname。還允許包含間隔開的字段列表。目前,可以包括以下字段:優先級,版本,時間戳,主機名。值'true'和'false'已被棄用,采用'all'和'none'。
portHeader - 如果指定,端口號將使用此處指定的標頭名稱存儲在每個 event 的標頭中。這允許攔截器和 channel 選擇器基於傳入端口定制路由邏輯。
clientIPHeader - 如果指定,客戶端的 IP 地址將使用此處指定的標頭名稱存儲在每個 event 的標頭中。這允許攔截器和 channel 選擇器基於客戶端的 IP 地址定制路由邏輯。不要在此處使用標准 Syslog 標頭名稱(如_host_),因為在這種情況下將覆蓋 event 標頭。
clientHostnameHeader - 如果指定,則客戶端的主機名將使用此處指定的標頭名稱存儲在每個 event 的標頭中。這允許攔截器和 channel 選擇器基於客戶端的主機名自定義路由邏輯。檢索主機名可能涉及名稱服務反向查找,這可能會影響性能。不要在此處使用標准 Syslog 標頭名稱(如_host_),因為在這種情況下將覆蓋 event 標頭。
charset.default UTF-8 將 syslogevent 解析為字符串時使用的默認字符集。
charset.port - 字符集可基於每個端口進行配置。
BATCHSIZE 100 每個請求循環嘗試處理的最大 event 數。使用默認值通常很好。
readBufferSize 1024 內部 Mina 讀緩沖區的大小。提供性能調整。使用默認值通常很好。
numProcessors (auto-detected) 處理消息時系統上可用的處理器數量。默認是使用 Java Runtime API 自動檢測 CPU 數量。Mina 將為每個檢測到的 CPU 生成 2 個請求處理線程,這通常是合理的。
selector.type replicating replicating, multiplexing, or custom
selector.* - 取決於 selector.type 值
interceptors - 以空格分隔的攔截器列表。
interceptors.*    
SSL false 將其設置為 true 以啟用 SSL 加密。如果啟用了 SSL,則還必須通過組件級參數(請參閱下文)或全局 SSL 參數(請參閱 SSL / TLS 支持部分)指定 “密鑰庫” 和“密鑰庫密碼” 。
keystore - 這是 Java 密鑰庫文件的路徑。如果未在此處指定,則將使用全局密鑰庫(如果已定義,則配置錯誤)。
keystore-password - Java 密鑰庫的密碼。如果未在此處指定,則將使用全局密鑰庫密碼(如果已定義,則配置錯誤)。
keystore-type JKS Java 密鑰庫的類型。這可以是 “JKS” 或“PKCS12”。如果未在此處指定,則將使用全局密鑰庫類型(如果已定義,則默認為 JKS)。
exclude-protocols SSLv3 要排除的以空格分隔的 SSL / TLS 協議列表。除指定的協議外,將始終排除 SSLv3。
include-protocols - 要包含的以空格分隔的 SSL / TLS 協議列表。啟用的協議將是包含的協議,沒有排除的協議。如果包含協議為空,則它包括每個支持的協議。
exclude-cipher-suites - 要排除的以空格分隔的密碼套件列表。
include-cipher-suites - 以空格分隔的密碼套件列表。啟用的密碼套件將是包含的密碼套件,不包括排除的密碼套件。如果 included-cipher-suites 為空,則包含每個支持的密碼套件。
For example, a multiport syslog TCP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port
Syslog UDPsource

  

 

 

Property Name Default Description
channels  
type The component type name, needs to be syslogudp
host Host name or IP address to bind to
port Port # to bind to
keepFields false Setting this to true will preserve the Priority, Timestamp and Hostname in the body of the event.
clientIPHeader If specified, the IP address of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the IP address of the client. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case.
clientHostnameHeader If specified, the host name of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the host name of the client. Retrieving the host name may involve a name service reverse lookup which may affect the performance. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case.
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
For example, a syslog UDP source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

  

 


HTTP Source < 通過 HTTP POST 和 GET 接受 Flumeevent 的 source.GET 應僅用於實驗. HTTP 請求由可插入的 "處理程序" 轉換為 flume event , 該處理程序必須實現 HTTPSourceHandler 接口. 該處理程序獲取 HttpServletRequest 並返回 flume event 列表.從一個 Http 請求處理的所有 event 都在一個事務中提交給 channel, 從而允許在諸如文件 channel 之類的 channel 上提高效率. 如果處理程序拋出異常, 則此 source 將返回 HTTP 狀態 400. 如果 channel 已滿, 或者 source 無法將 event 附加到 channel, 則 source 將返回 HTTP 503 - 暫時不可用狀態

在一個發布請求中發送的所有 event 都被視為一個批處理, 並在一個事務中插入到 channel 中

此 source 基於 Jetty 9.4, 並提供了設置其他 Jetty 特定參數的功能, 這些參數將直接傳遞給 Jetty 組件

屬性名稱 默認 描述
type   組件類型名稱需要為 http
port - source 應綁定的端口。
bind 0.0.0.0 要偵聽的主機名或 IP 地址
handler org.apache.flume.source.http.JSONHandler 處理程序類的 FQCN。
handler.* - 配置處理程序的參數
selector.type replicating replicating or multiplexing
selector.*   取決於 selector.type 值
interceptors - 以空格分隔的攔截器列表
interceptors.*    
SSL 將屬性設置為 true,以啟用 SSL.HTTP Source 不支持 SSLv3。
exclude-protocols 在 SSLv3 要排除的以空格分隔的 SSL / TLS 協議列表。除指定的協議外,將始終排除 SSLv3。
include-protocols - 要包含的以空格分隔的 SSL / TLS 協議列表。啟用的協議將是包含的協議,沒有排除的協議。如果包含協議為空,則它包括每個支持的協議。
exclude-cipher-suites - 要排除的以空格分隔的密碼套件列表。
include-cipher-suites - 以空格分隔的密碼套件列表。啟用的密碼套件將是包含的密碼套件,不包括排除的密碼套件。
keystore   密鑰庫的位置,包括密鑰庫文件名。如果啟用了 SSL 但未在此處指定密鑰庫,則將使用全局密鑰庫(如果已定義,則配置錯誤)。
keystore-password   密鑰庫密碼。如果啟用了 SSL 但未在此處指定密鑰庫密碼,則將使用全局密鑰庫密碼(如果已定義,則配置錯誤)。
keystore-type JKS 密鑰庫類型。這可以是 “JKS” 或“PKCS12”。
QueuedThreadPool.*   要在 org.eclipse.jetty.util.thread.QueuedThreadPool 上設置的 Jetty 特定設置。NB QueuedThreadPool 僅在設置了此類的至少一個屬性時使用。
HttpConfiguration.*   要在 org.eclipse.jetty.server.HttpConfiguration 上設置 Jetty 特定設置
SslContextFactory.*   要在 org.eclipse.jetty.util.ssl.SslContextFactory 上設置的 Jetty 特定設置(僅在 ssl 設置為 true 時適用)。
ServerConnector.*   要在 org.eclipse.jetty.server.ServerConnector 上設置的 Jetty 特定設置

不推薦使用的屬性

物業名稱 默認 描述
keystorePassword - 使用密鑰庫密碼。不推薦的值將被新的值覆蓋。
excludeProtocols SSLv3 使用 exclude-protocols。不推薦的值將被新的值覆蓋。
enableSSL false 使用 ssl。不推薦的值將被新的值覆蓋。

NB 使用上面列出的對象上的 setter-methods 設置 Jetty 特定設置. 有關完整的詳細信息, 請參閱這些類的 Javadoc(, , 和 )

使用特定於 Jetty 的設置時, 上面命名的屬性將優先(例如, excludeProtocols 將優先於 SslContextFactory.ExcludeProtocols). 所有房產都是小寫的.

An example http source for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.REST.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300
JSONHandler

  

開箱即用的處理程序可以處理以 JSON 格式表示的 event , 並支持 UTF-8,UTF-16 和 UTF-32 字符集. 處理程序接受一個 event 數組(即使只有一個 event ,event 必須在數組中發送), 並根據請求中指定的編碼將它們轉換為 Flumeevent . 如果未指定編碼, 則假定為 UTF-8.JSON 處理程序支持 UTF-8,UTF-16 和 UTF-32.event 表示如下.

[{
"headers" : {
"timestamp" : "434324343",
"host" : "random_host.example.com"
},
"body" : "random_body"
},
{
"headers" : {
"namenode" : "namenode.example.com",
"datanode" : "random_datanode.example.com"
},
"body" : "really_random_body"
}]

  

要設置 charset, 請求必須具有指定為 application/JSON 的 內容類型 ; charset = UTF-8(根據需要用 UTF-16 或 UTF-32 替換 UTF-8)

以此處理程序所期望的格式創建 event 的一種方法是使用 Flume SDK 中提供的 JSONEvent 並使用 Google Gson 使用 Gson#fromJson(Object,Type)方法創建 JSON 字符串. 要作為 event 列表的此方法的第二個參數傳遞的類型標記可以通過以下方式創建:

Type type = new TypeToken<List<JSONEvent>>() {
}.getType();
BlobHandler

  

默認情況下, HTTPSource 將 JSON 輸入拆分為 Flumeevent . 作為替代方案, BlobHandler 是 HTTPSource 的處理程序, 它返回包含請求參數的 event 以及使用此請求上載的二進制大對象(BLOB). 例如 PDF 或 JPG 文件. 請注意, 此方法不適用於非常大的對象, 因為它會將整個 BLOB 緩存在 RAM 中.

屬性名稱 默認 描述
handler - 此類的 FQCN:org.apache.flume.sink.solr.morphline.BlobHandler
handler.maxBlobLength 100000000 要讀取的最大字節數和給定請求的緩沖區

Stress source

StressSource 是一個內部負載生成 source 實現, 對壓力測試非常有用. 它允許用戶使用空標頭配置 event 有效負載的大小. 用戶可以配置要發送的 event 總數以及要傳遞的最大成功 event 數.

必需屬性以粗體顯示

物業名稱 默認 描述
type - 組件類型名稱,需要是 org.apache.flume.source.StressSource
size 500 每個 event 的有效載荷大小。單位:字節
maxTotalEvents -1 要發送的最大 event 數
maxSuccessfulEvents -1 成功發送的最大 event 數
BATCHSIZE 1 一批中要發送的 event 數
maxEventsPerSecond 0 設置為大於零的整數時,在 source 上強制執行速率限制。
Example for agent named a1:
a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1
Legacy Sources

  

 


legacy source 允許 Flume 1.x agent 從 Flume 0.9.4 agent 接收 event . 它接受 Flume 0.9.4 格式的 event , 將它們轉換為 Flume 1.0 格式, 並將它們存儲在連接的 channel 中. 0.9.4event 屬性 (如 timestamp,pri,host,nanos 等) 將轉換為 1.xevent 頭屬性. 舊版 source 支持 Avro 和 Thrift RPC 連接. 要在兩個 Flume 版本之間使用此橋接, 您需要使用 avroLegacy 或 thriftLegacysource 啟動 Flume 1.x agent .0.9.4 agent 應該讓 agent Sink 指向 1.x agent 的主機 / 端口.

注意

Flume 1.x 的可靠性語義與 Flume 0.9.x 的可靠性語義不同. 舊版 source 不支持 Flume 0.9.x agent 的 E2E 或 DFO 模式. 唯一支持的 0.9.x 模式是盡力而為, 盡管 1.x 流的可靠性設置將適用於傳統 source 保存到 Flume 1.xchannel 后的 event

必需屬性以粗體顯示

Avro Legacy Source

  屬性名稱 默認 描述
channels -  
type - 組件類型名稱,需要是 org.apache.flume.source.avroLegacy.AvroLegacySource
host - 要綁定的主機名或 IP 地址
port - 要聽的端口#
selector.type   replicating or multiplexing
selector.* replicating 取決於 selector.type 值
interceptors - 以空格分隔的攔截器列表
interceptors.*    
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
Thrift Legacy Source

  

 

 

屬性名稱 默認 描述
channels -  
type - 組件類型名稱,需要是 org.apache.flume.source.thriftLegacy.ThriftLegacySource
host - 要綁定的主機名或 IP 地址
port - 要聽的端口#
selector.type   replicating or multiplexing
selector.* replicating 取決於 selector.type 值
interceptors - 以空格分隔的攔截器列表
interceptors.*    
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
Custom Source

  

 


自定義 source 是您自己的 Source 接口實現. 啟動 Flume agent 時, 自定義 source 的類及其依賴項必須包含在 agent 程序的類路徑中. 自定義 source 的類型是其 FQCN.

屬性名稱 默認 描述
channels -  
type - 組件類型名稱,需要是您的 FQCN
selector.type   replicating or multiplexing
selector.* replicating 取決於 selector.type 值
interceptors - 以空格分隔的攔截器列表
interceptors.*    
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1
Scribe Source

  

 


Scribe 是另一種攝取系統. 要采用現有的 Scribe 攝取系統, Flume 應該使用基於 Thrift 的 ScribeSource 和兼容的傳輸協議. 要部署 Scribe, 請遵循 Facebook 的指南. 必需屬性以粗體顯示

屬性名稱 默認 描述
type - 組件類型名稱,需要是 org.apache.flume.source.scribe.ScribeSource
port 1499 應該連接 Scribe 的端口
maxReadBufferBytes  16384000 Thrift 默認 FrameBuffer 大小
workerThreads 5 在 Thrift 中處理線程數
selector.type    
selector.*    

 

Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1
Flume Sinks
HDFS Sink

  


此 sink 將 event 寫入 Hadoop 分布式文件系統 (HDFS). 它目前支持創建文本和序列文件. 它支持兩種文件 type 的壓縮. 可以根據經過的時間或數據大小或 event 數量定期滾動文件(關閉當前文件並創建新文件). 它還根據 event source 自的時間戳或機器等屬性對數據進行分區 / 分區. HDFS 目錄路徑可能包含格式轉義序列, 將由 HDFS sink 替換, 以生成用於存儲 event 的目錄 / 文件名. 使用此 sink 需要安裝 hadoop, 以便 Flume 可以使用 Hadoop jar 與 HDFS 集群進行通信. 請注意, 需要支持 sync() 調用的 Hadoop 版本.

以下是支持的轉義序列:

別號 描述
%{host} 名為 “host” 的 event 標頭的替換值。支持任意標題名稱。
%T Unix 時間,以毫秒為單位
%a locale 的工作日短名稱(周一,周二,......)
%A locale 的完整工作日名稱(周一,周二......)
%b locale 的短月名(Jan,Feb,...)
%B locale 的長月名(1 月,2 月......)
%C locale 的日期和時間(2005 年 3 月 3 日 23:05:25)
%d 每月的一天(01)
%E 沒有填充的月份日(1)
%d 日期;  與%m /%d /%y 相同
%H 小時(00..23)
%I 小時(01..12)
%j 一年中的一天(001..366)
%K 小時(0..23)
%M 月(01..12)
%N 沒有填充的月份(1..12)
%M 分鍾(00..59)
%p locale 相當於 am 或 pm
%S 自 1970-01-01 00:00:00 UTC 以來的秒數
%S 第二(00..59)
%Y 年份的最后兩位數(00..99)
%Y 年(2010 年)
%Z + hhmm 數字時區(例如,-0400)
%[localhost] 替換運行 agent 程序的主機的 hostname
%[IP] 替換運行 agent 程序的主機的 IP 地址
%[FQDN] 替換運行 agent 程序的主機的規范 hostname

注意: 轉義字符串%[localhost],%[IP]和%[FQDN]都依賴於 Java 獲取 hostname 的能力, 這在某些網絡環境中可能會失敗.

正在使用的文件將在名稱末尾包含 ".tmp". 文件關閉后, 將刪除此擴展程序. 這允許排除目錄中的部分完整文件. 必需屬性以粗體顯示

注意

對於所有與時間相關的轉義序列, event 標題中必須存在帶有 "timestamp" 鍵的標頭(除非 hdfs.useLocalTimeStamp 設置為 true).自動添加此方法的一種方法是使用 TimestampInterceptor.

名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是 hdfs
hdfs.path - HDFS 目錄路徑( eg hdfs://namenode/flume/webdata/  )
hdfs.filePrefix FlumeData 名稱前綴為 Flume 在 hdfs 目錄中創建的文件
hdfs.fileSuffix - 附加到文件的后綴(例如. avro -  注意:不會自動添加句點)
hdfs.inUsePrefix - 用於水槽主動寫入的臨時文件的前綴
hdfs.inUseSuffix .TMP 用於臨時文件的后綴,flume 主動寫入
hdfs.emptyInUseSuffix false 如果為 false,則在寫入輸出時使用 hdfs.inUseSuffix。關閉輸出后,hdfs.inUseSuffix 將從輸出文件名中刪除。如果為 true,則忽略 hdfs.inUseSuffix 參數,而是使用空字符串。
hdfs.rollInterval 30 滾動當前文件之前等待的秒數(0 = 根據時間間隔從不滾動)
hdfs.rollSize 1024 觸發滾動的文件大小,以字節為單位(0:永不基於文件大小滾動)
hdfs.rollCount 10 在滾動之前寫入文件的 event 數(0 = 從不基於 event 數滾動)
hdfs.idleTimeout 0 超時后非活動文件關閉(0 = 禁用自動關閉空閑文件)
hdfs.batchSize 100 在將文件刷新到 HDFS 之前寫入文件的 event 數
hdfs.codeC - 壓縮編解碼器。以下之一:gzip,bzip2,lzo,lzop,snappy
hdfs.fileType SequenceFile 文件格式:當前 SequenceFile,DataStream 或 CompressedStream (1)DataStream 不會壓縮輸出文件,請不要設置 codeC(2)CompressedStream 需要使用可用的 codeC 設置 hdfs.codeC
hdfs.maxOpenFiles 5000 僅允許此數量的打開文件。如果超過此數量,則關閉最舊的文件。
hdfs.minBlockReplicas - 指定每個 HDFS 塊的最小副本數。如果未指定,則它來自類路徑中的默認 Hadoop 配置。
hdfs.writeFormat 可寫 序列文件記錄的格式。一個文本或可寫。在使用 Flume 創建數據文件之前設置為 Text,否則 Apache Impala(孵化)或 Apache Hive 無法讀取這些文件。
hdfs.threadsPoolSize 10 HDFS IO 操作的每個 HDFS  sink 的線程數(open, write, etc. )
hdfs.rollTimerPoolSize 1 每個 HDFS sink 用於調度定時文件滾動的線程數
hdfs.kerberosPrincipal - 用於訪問安全 HDFS 的 Kerberos 用戶主體
hdfs.kerberosKeytab - 用於訪問安全 HDFS 的 Kerberos 密鑰表
hdfs.proxyUser    
hdfs.round false 是否應將時間戳向下舍入(如果為 true,則影響除%t 之外的所有基於時間的轉義序列)
hdfs.roundValue 1 舍入到此最高倍(在使用 hdfs.roundUnit 配置的單位中),小於當前時間。
hdfs.roundUnit second 舍入值的單位 -  second, minute or hour.
hdfs.timeZone Local Time 應該用於解析目錄路徑的時區名稱,例如 America/Los_Angeles。
hdfs.useLocalTimeStamp false 替換轉義序列時,請使用本地時間(而不是 event 頭中的時間戳)。
hdfs.closeTries 0 啟動近距離嘗試后,sink 必須嘗試重命名文件的次數。如果設置為 1,則此 sink 將不會重新嘗試失敗的重命名(例如,由於 NameNode 或 DataNode 失敗),並且可能使文件處於打開狀態,擴展名為. tmp。如果設置為 0,sink 將嘗試重命名該文件,直到最終重命名該文件(它將嘗試的次數沒有限制)。如果關閉調用失敗但數據將保持不變,則文件可能仍保持打開狀態,在這種情況下,只有在 Flume 重啟后文件才會關閉。
hdfs.retryInterval 180 連續嘗試關閉文件之間的時間(以秒為單位)。每次關閉調用都會花費多次 RPC 往返 Namenode,因此將此設置得太低會導致名稱節點上的大量負載。如果設置為 0 或更小,則如果第一次嘗試失敗,sink 將不會嘗試關閉文件,並且可能使文件保持打開狀態或擴展名為 “.tmp”。
serializer TEXT 其他可能的選項包括 avro_event 或 EventSerializer.Builder 接口的實現的完全限定類名 & nbsp;。
serializer.*    

不推薦使用的屬性:

名稱 默認 描述
hdfs.callTimeout   30000 HDFS 操作允許的毫秒數,例如 open,write,flush,close。 如果發生許多 HDFS 超時操作,則應增加此數量。
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

  

 


以上配置將時間戳向下舍入到最后 10 分鍾. 例如, 時間戳為 2012 年 6 月 12 日上午 11:54:34 的 event 將導致 hdfs 路徑變為

  1. /flume/events/2012-06-12/1150/00.
  2. Hive Sink

此 sink 將包含分隔文本或 JSON 數據的 event 直接流式傳輸到 Hive 表或分區. event 使用 Hive 事務編寫. 一旦將一組 event 提交給 Hive, 它們就會立即顯示給 Hive 查詢. 水槽將流入的分區既可以預先創建, 也可以選擇 Flume 創建它們, 如果它們缺失的話. 傳入 event 數據中的字段將映射到 Hive 表中的相應列

名稱 默認 描述
channel -  
type - 組件 type 名稱需要是 hive
hive.metastore - Hive Metastore URI(  eg thrift://a.b.com:9083  )
hive.database - Hive 數據庫名稱
hive.table - Hive 表名
hive.partition - 逗號分隔標識要寫入的分區的分區值列表。可能包含轉義序列。例如:如果表格被分區(大陸:字符串,國家:字符串,時間:字符串),那么'亞洲,印度,2014-02-26-01-21'將表示大陸 = 亞洲,國家 = 印度,時間 = 2014 -02-26-01-21
hive.txnsPerBatchAsk 100 Hive 向 Flume 等流媒體客戶端授予一批交易而非單筆交易。此設置配置每個事務批處理所需的事務數。來自單個批次中所有事務的數據最終都在一個文件中。Flume 將在批處理中的每個事務中寫入最多 batchSizeevent 。此設置與 batchSize 一起提供對每個文件大小的控制。請注意,最終 Hive 會將這些文件透明地壓縮為更大的文件。
heartBeatInterval 240 (以秒為單位)發送到 Hive 的連續心跳之間的間隔,以防止未使用的事務過期。將此值設置為 0 可禁用心跳。
autoCreatePartitions true Flume 將自動創建必要的 Hive 分區以進行流式傳輸
BATCHSIZE 15000 在單個 Hive 事務中寫入 Hive 的最大 event 數
maxOpenConnections 500 僅允許此數量的打開連接。如果超過此數量,則關閉最近最少使用的連接。
callTimeout 10000 (以毫秒為單位)Hive 和 HDFS   I/O 操作的超時,例如 openTxn,write,commit,abort。
serializer   Serializer 負責解析 event 中的字段並將它們映射到 hive 表中的列。serializer 器的選擇取決於 event 中數據的格式。支持的序列化程序:DELIMITED 和 JSON
roundUnit minute 舍入值的單位 -  秒,分鍾或小時。
roundValue 1 舍入到此最高倍數(在使用 hive.roundUnit 配置的單位中),小於當前時間
時區 Local Time 應該用於解析分區中轉義序列的時區名稱,例如 America / Los_Angeles。
useLocalTimeStamp false 替換轉義序列時,請使用本地時間(而不是 event 頭中的時間戳)。

為 Hive sink 提供了以下序列化程序:

JSON: 處理 UTF8 編碼的 JSON(嚴格語法)event , 不需要配置. JSON 中的對象名稱直接映射到 Hive 表中具有相同名稱的列. 內部使用 org.apache.hive.hcatalog.data.JsonSerDe, 但獨立於 Hive 表的 Serde. 此序列化程序需要安裝 HCatalog.

DELIMITED: 處理簡單的分隔文本 event . 內部使用 LazySimpleSerde, 但獨立於 Hive 表的 Serde.

名稱 默認 描述
serializer.delimiter (type:字符串)傳入數據中的字段分隔符。要使用特殊字符,請用雙引號括起來,例如 “\t”
serializer.fieldnames - 從輸入字段到 hive 表中的列的映射。指定為 hive 表列名稱的逗號分隔列表(無空格),按發生順序標識輸入字段。要跳過字段,請保留未指定的列名稱。例如。'time ,, ip,message'表示輸入映射到 hive 表中的 time,ip 和 message 列的第 1,第 3 和第 4 個字段。
serializer.serdeSeparator Ctrl-A (type:字符)自定義基礎 serde 使用的分隔符。如果 serializer.fieldnames 中的字段與表列的順序相同,則 serializer.delimiter 與 serializer.serdeSeparator 相同,並且 serializer.fieldnames 中的字段數小於或等於表的數量,可以提高效率列,因為傳入 event 正文中的字段不需要重新排序以匹配表列的順序。對於'\t'這樣的特殊字符使用單引號。確保輸入字段不包含此字符。注意:如果 serializer.delimiter 是單個字符,最好將其設置為相同的字符

以下是支持的轉義序列:

別號 描述
%{host} 名為 “host” 的 event 標頭的替換值。支持任意標題名稱。
%T Unix 時間,以毫秒為單位
%a locale 的工作日短名稱(周一,周二,......)
%A locale 的完整工作日名稱(周一,周二......)
%b locale 的短月名(Jan,Feb,...)
%B locale 的長月名(1 月,2 月......)
%C locale 的日期和時間(2005 年 3 月 3 日 23:05:25)
%d 每月的一天(01)
%d 日期;  與%m /%d /%y 相同
%H 小時(00..23)
%I 小時(01..12)
%j 一年中的一天(001..366)
%K 小時(0..23)
%M 月(01..12)
%M 分鍾(00..59)
%p locale 相當於 am 或 pm
%S 自 1970-01-01 00:00:00 UTC 以來的秒數
%S 第二(00..59)
%Y 年份的最后兩位數(00..99)
%Y 年(2010 年)
%Z + hhmm  數字時區(例如,-0400)

注意

對於所有與時間相關的轉義序列, event 標題中必須存在具有鍵 "timestamp" 的標頭(除非 useLocalTimeStamp 設置為 true). 自動添加此方法的一種方法是使用 TimestampInterceptor.

示例 Hive 表:

create table weblogs ( id int , msg string )
partitioned by (continent string, country string, time string)
clustered by (id) into 5 buckets
stored as orc;
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

  

以上配置將時間戳向下舍入到最后 10 分鍾. 例如, 將時間戳標頭設置為 2012 年 6 月 12 日上午 11:54:34 且 "country" 標頭設置為 "india" 的 event 將評估為分區(continent ='asia',country ='india',time ='2012-06-12-11-50'. 序列化程序配置為接受包含三個字段的制表符分隔輸入並跳過第二個字段.

Logger Sink

在 INFO 級別記錄 event . 通常用於測試 / 調試目的. 必需屬性以粗體顯示. 此 sink 是唯一的例外, 它不需要在 "記錄原始數據" 部分中說明的額外配置.

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是 & nbsp; logger
maxBytesToLog 16 要記錄的 event 主體的最大字節數
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
Avro Sink

  

 


這個 Sink 形成了 Flume 的分層收集支持的一半. 發送到此 sink 的 Flume event 將轉換為 Avro event 並發送到配置的 hostname/port pair .event 將從配置的 channel 中批量獲取配置的批處理大小. 必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是 avro。
hostname - 要綁定的 hostname 或 IP 地址。
port - 要聽的端口#。
batch-size 100 要一起批量發送的 event 數。
connect-timeout 20000 允許第一個(握手)請求的時間量(ms)。
request-timeout 20000 在第一個之后允許請求的時間量(ms)。
reset-connection-interval none 重置連接到下一跳之前的時間量。這將迫使 Avro Sink 重新連接到下一跳。這將允許 sink 在添加新聞主機時連接到硬件負載平衡器后面的主機,而無需重新啟動 agent 。
compression-type none 這可以是 “ none ” 或“  deflate ”。壓縮 type 必須與匹配 AvroSource 的壓縮 type 匹配
compression-level 6 壓縮 event 的壓縮級別。0 = 無壓縮,1-9 是壓縮。數字越大,壓縮越多
SSL false 設置為 true 以啟用此 AvroSink 的 SSL。配置 SSL 時,您可以選擇設置“truststore”,“truststore-password”,“truststore-type”,並指定是否“trust-all-certs”。
trust-all-certs false 如果將此設置為 true,則不會檢查遠程服務器(Avrosource)的 SSL 服務器證書。這不應該在生產中使用,因為它使攻擊者更容易執行中間人攻擊並 “監聽” 加密連接。
truststore - 自定義 Java 信任庫文件的路徑。Flume 使用此文件中的證書頒發機構信息來確定是否應該信任遠程 Avro Source 的 SSL 身份驗證憑據。如果未指定,則將使用全局密鑰庫。如果未指定全局密鑰庫,則將使用缺省 Java JSSE 證書頒發機構文件(通常為 Oracle JRE 中的 “jssecacerts” 或“cacerts”)。
truststore-password - 信任庫的密碼。如果未指定,則將使用全局密鑰庫密碼(如果已定義)。
truststore-type JKS Java 信任庫的 type。這可以是 “JKS” 或其他受支持的 Java 信任庫 type。如果未指定,則將使用全局密鑰庫 type(如果已定義,則 defautl 為 JKS)。
exclude-protocols 在 SSLv3 要排除的以空格分隔的 SSL/TLS 協議列表。除指定的協議外,將始終排除 SSLv3。
maxIoWorkers 2 * 機器中可用處理器的數量 I / O 工作線程的最大數量。這是在 NettyAvroRpcClient NioClientSocketChannelFactory 上配置的。
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
Thrift Sink

  

 


這個水槽形成了 Flume 的分層收集支持的一半. 發送到此 sink 的 Flume event 將轉換為 Thrift event 並發送到配置的 hostname/port pair.event 將從配置的 channel 中批量獲取配置的批處理大小.

可以通過啟用 kerberos 身份驗證將 Thrift sink 配置為以安全模式啟動. 要與以安全模式啟動的 Thriftsource 通信, Thriftsink 也應該以安全模式運行. client-principal 和 client-keytab 是 Thriftsink 用於向 kerberos KDC 進行身份驗證的屬性. server-principal 表示 Thriftsource 的主體, 此 sink 配置為以安全模式連接. 必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要節儉。
hostname - 要綁定的 hostname 或 IP 地址。
port - 要聽的端口#。
batch-size 100 要一起批量發送的 event 數。
connect-timeout 20000 允許第一個(握手)請求的時間量(ms)。
request-timeout 20000 在第一個之后允許請求的時間量(ms)。
connection-reset-interval none 重置連接到下一跳之前的時間量。這將迫使 Thrift Sink 重新連接到下一跳。這將允許 sink 在添加新聞主機時連接到硬件負載平衡器后面的主機,而無需重新啟動 agent 。
SSL false 設置為 true 以為此 ThriftSink 啟用 SSL。配置 SSL 時,您可以選擇設置 “truststore”,“truststore-password” 和“truststore-type”
truststore - 自定義 Java 信任庫文件的路徑。Flume 使用此文件中的證書頒發機構信息來確定是否應該信任遠程 Thrift Source 的 SSL 身份驗證憑據。如果未指定,則將使用全局密鑰庫。如果未指定全局密鑰庫,則將使用缺省 Java JSSE 證書頒發機構文件(通常為 Oracle JRE 中的 “jssecacerts” 或“cacerts”)。
truststore-password - 信任庫的密碼。如果未指定,則將使用全局密鑰庫密碼(如果已定義)。
truststore-type JKS Java 信任庫的 type。這可以是 “JKS” 或其他受支持的 Java 信任庫 type。如果未指定,則將使用全局密鑰庫 type(如果已定義,則 defautl 為 JKS)。
exclude-protocols SSLv3 要排除的以空格分隔的 SSL / TLS 協議列表
Kerberos 的 false 設置為 true 以啟用 kerberos 身份驗證。在 kerberos 模式下,需要 client-principal,client-keytab 和 server-principal 才能成功進行身份驗證並與啟用 kerberos 的 Thrift Source 進行通信。
client-principal - Thrift Sink 使用的 kerberos 校長對 kerberos KDC 進行身份驗證。
client-keytab - Thrift Sink 與客戶端主體結合使用的 keytab 位置,用於對 kerberos KDC 進行身份驗證。
server-principal - Thrift Sink 配置為連接到的 Thrift Source 的 kerberos 主體。
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
IRC Sink

  

 


IRC sink 從附加 channel 接收消息, 並將這些消息中繼到已配置的 IRC 目標. 必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是 irc
hostname - 要連接的 hostname 或 IP 地址
port 6667 要連接的遠程主機的端口號
nick - 昵稱
user - 用戶名
password - 用戶密碼
chan - channel
name    
splitlines - (布爾值)
splitChars 中 n 行分隔符(如果你要在配置文件中輸入默認值,那么你需要轉義反斜杠,如下所示:“\ n”)
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume
File Roll Sink

  

 


在本地文件系統上存儲 event . 必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是 file_roll。
sink.directory - 將存儲文件的目錄
sink.pathManager DEFAULT 要使用的 PathManager 實現。
sink.pathManager.extension - 如果使用默認的 PathManager,則為文件擴展名。
sink.pathManager.prefix - 如果使用默認 PathManager,則添加到文件名開頭的字符串
sink.rollInterval 30 每 30 秒滾動一次文件。指定 0 將禁用滾動並導致所有 event 都寫入單個文件。
sink.serializer TEXT 其他可能的選項包括 avro_event 或 EventSerializer.Builder 接口實現的 FQCN。
sink.batchSize 100  
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
Null Sink

  

 


丟棄從 channel 收到的所有 event . 必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱必須為 null。
BATCHSIZE 100  
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1
HBase Sink

  

 


此 sink 將數據寫入 HBase.Hbase 配置是從類路徑中遇到的第一個 hbase-site.xml 中獲取的. 實現由配置指定的 HbaseEventSerializer 的類用於將 event 轉換為 HBase put 和 / 或 increments . 然后將這些 put 和 increments 寫入 HBase. 該 sink 提供與 HBase 相同的一致性保證, HBase 是當前行的原子性. 如果 Hbase 無法寫入某些 event , 則 sink 將重播該事務中的所有 event

HBase Sink 支持將數據寫入安全 HBase. 要寫入安全 HBase, agent 程序正在運行的用戶必須具有對 sink 配置為寫入的表的寫入權限. 可以在配置中指定用於對 KDC 進行身份驗證的主體和密鑰表. Flume agent 程序類路徑中的 hbase-site.xml 必須將身份驗證設置為 kerberos(有關如何執行此操作的詳細信息, 請參閱 HBase 文檔)

為方便起見, Flume 提供了兩個序列化器.

SimpleHbaseEventSerializer(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer)按原樣將 event 主體寫入 HBase, 並可選擇增加 Hbase 中的列. 這主要是一個示例實現.

RegexHbaseEventSerializer(org.apache.flume.sink.hbase.RegexHbaseEventSerializer)根據給定的正則表達式打破 event 體, 並將每個部分寫入不同的列.

type 是 FQCN:org.apache.flume.sink.hbase.HBaseSink.

必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是 hbase
table - Hbase 中要寫入的表的名稱。
ColumnFamily - Hbase 中的列族寫入。
zookeeperQuorum - 法定人數規格。這是 hbase-site.xml 中屬性 hbase.zookeeper.quorum 的值
znodeParent /hbase -ROOT - 區域的 znode 的基本路徑。價值 zookeeper.znode.parent 在 HBase 的 - site.xml 中
BATCHSIZE 100 每個 txn 要寫入的 event 數。
coalesceIncrements false 如果 sink 將多個增量合並到每個批次的單元格中。如果有限數量的單元格有多個增量,這可能會提供更好的性能。
serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer 默認增量列 =“iCol”,有效負載列 =“pCol”。
serializer.* - 要傳遞給序列化程序的屬性。
kerberosPrincipal - 用於訪問安全 HBase 的 Kerberos 用戶主體
kerberosKeytab - 用於訪問安全 HBase 的 Kerberos 密鑰表
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
HBase2 Sink

  

 


HBase2Sink 相當於 HBase 版本 2 的 HBaseSink. 提供的功能和配置參數與 HBaseSink 的情況相同(除了 sinktype 中的 hbase2 標簽和包 / 類名稱)

type 是 FQCN:org.apache.flume.sink.hbase2.HBase2Sink

必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是 hbase2
table - HBase 中要寫入的表的名稱。
ColumnFamily - HBase 中的列族寫入。
zookeeperQuorum - 這是 hbase-site.xml 中屬性 hbase.zookeeper.quorum 的值
znodeParent /hbase -ROOT - 區域的 znode 的基本路徑。價值 zookeeper.znode.parent 在 HBase 的 - site.xml 中
BATCHSIZE 100 每個 txn 要寫入的 event 數。
coalesceIncrements false 如果 sink 將多個增量合並到每個批次的單元格中。如果有限數量的單元格有多個增量,這可能會提供更好的性能。
serializer org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer 默認增量列 =“iCol”,有效負載列 =“pCol”。
serializer.* - 要傳遞給序列化程序的屬性。
kerberosPrincipal - 用於訪問安全 HBase 的 Kerberos 用戶主體
kerberosKeytab - 用於訪問安全 HBase 的 Kerberos 密鑰表
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase2
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.channel = c1
AsyncHBaseSink

  

 


此 sink 使用異步模型將數據寫入 HBase. 實現由配置指定的 AsyncHbaseEventSerializer 的類用於將 event 轉換為 HBase put 和 / 或 increments. 然后將這些放置和增量寫入 HBase. 此 sink 使用 Asynchbase API https://github.com/OpenTSDB/asynchbase 寫入 HBase. 該 sink 提供與 HBase 相同的一致性保證, HBase 是當前行的原子性. 如果 Hbase 無法寫入某些 event , 則 sink 將重播該事務中的所有 event .AsyncHBaseSink 只能與 HBase 1.x 一起使用. AsyncHBaseSink 使用的異步客戶端庫不適用於 HBase 2.type 為 FQCN:org.apache.flume.sink.hbase.AsyncHBaseSink. 必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是 asynchbase
table - Hbase 中要寫入的表的名稱。
zookeeperQuorum - 法定人數規格。這是 hbase-site.xml 中屬性 hbase.zookeeper.quorum 的值
znodeParent /hbase -ROOT - 區域的 znode 的基本路徑。價值 zookeeper.znode.parent 在 HBase 的 - site.xml 中
ColumnFamily - Hbase 中的列族寫入。
batchSize 100 每個 txn 要寫入的 event 數。
coalesceIncrements false 如果 sink 將多個增量合並到每個批次的單元格中。如果有限數量的單元格有多個增量,這可能會提供更好的性能。
timeout 60000 sink 為事務中的所有 event 等待來自 hbase 的 acks 的時間長度(以毫秒為單位)。
serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer  
serializer.* - 要傳遞給序列化程序的屬性。
async.* - 要傳遞給 asyncHbase 庫的屬性。這些屬性優先於舊的 zookeeperQuorum 和 znodeParent 值。您可以在 AsyncHBase 的文檔頁面中找到可用屬性的列表 & nbsp;。

請注意, 此 sink 在配置中獲取 Zookeeper Quorum 和父 znode 信息. 可以在 flume 配置文件中指定 Zookeeper Quorum 和父節點配置. 或者, 這些配置值取自類路徑中的第一個 hbase-site.xml 文件.

如果配置中未提供這些, 則 sink 將從類路徑中的第一個 hbase-site.xml 文件中讀取此信息.

Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1
MorphlineSolrSink

  

此 sink 從 Flume event 中提取數據, 對其進行轉換, 並將其近乎實時地加載到 Apache Solr 服務器中, 后者又向最終用戶或搜索應用程序提供查詢.

此 sink 非常適合將原始數據流式傳輸到 HDFS(通過 HdfsSink)並同時提取, 轉換並將相同數據加載到 Solr(通過 MorphlineSolrSink)的用例. 特別是, 此 sink 可以處理來自不同數據 source 的任意異構原始數據, 並將其轉換為對搜索應用程序有用的數據模型

ETL 功能可使用 morphline 配置文件進行自定義, 該文件定義了一系列轉換命令, 用於將 event 記錄從一個命令傳遞到另一個命令

Morphlines 可以看作是 Unix 管道的演變, 其中數據模型被推廣為使用通用記錄流, 包括任意二進制有效載荷. morphline 命令有點像 Flume 攔截器. Morphlines 可以嵌入到 Flume 等 Hadoop 組件中.

用於解析和轉換一組標准數據格式 (如日志文件, Avro,CSV, 文本, HTML,xml,PDF,Word,Excel 等) 的命令是開箱即用的, 還有其他自定義命令和解析器用於其他數據格式可以添加為 morphline 插件. 可以索引任何 type 的數據格式, 並且可以生成任何 type 的 Solr 模式的任何 Solr 文檔, 並且可以注冊和執行任何自定義 ETL 邏輯.

Morphlines 操縱連續的記錄流. 數據模型可以描述如下: 記錄是一組命名字段, 其中每個字段具有一個或多個值的有序列表. 值可以是任何 Java 對象. 也就是說, 記錄本質上是一個哈希表, 其中每個哈希表條目包含一個 String 鍵和一個 Java 對象列表作為值.(該實現使用 Guava 的 ArrayListMultimap, 它是一個 ListMultimap). 請注意, 字段可以具有多個值, 並且任何兩個記錄都不需要使用公共字段名稱.

這個 sink 將 Flumeevent 的主體填充到_attachment_body 中記錄字段中, 並將 Flumeevent 的標頭復制到同名的記錄字段中. 然后命令可以對此數據執行操作.

支持路由到 SolrCloud 集群以提高可伸縮性. 索引負載可以分布在大量 MorphlineSolrSink 上, 以提高可伸縮性. 可以跨多個 MorphlineSolrSink 復制索引負載以實現高可用性, 例如使用 Flume 功能, 例如負載平衡 sink 處理器. MorphlineInterceptor 還可以幫助實現到多個 Solr 集合的動態路由(例如, 用於多租戶)

必須將環境所需的 morphline 和 Solr jar 放在 Apache Flume 安裝的 lib 目錄中.

type 是 FQCN:org.apache.flume.sink.Solr.morphline.MorphlineSolrSink

必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是 org.apache.flume.sink.solr.morphline.MorphlineSolrSink
morphlineFile - 本地文件系統與 morphline 配置文件的相對或絕對路徑。示例:/etc/flume-ng/conf/morphline.conf
morphlineId null 如果 morphline 配置文件中有多個 morphlines,則用於標識 morphline 的可選名稱
batchSize 1000 每個水槽事務要采取的最大 event 數。
batchDurationMillis 1000 每個水槽事務的最大持續時間(ms)。事務在此持續時間之后或超過 batchSize 時提交,以先到者為准。
handlerClass org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl 實現 org.apache.flume.sink.solr.morphline.MorphlineHandler 的類的 FQCN
isProductionMode false 應該為關鍵任務,大規模在線生產系統啟用此標志,這些系統需要在發生不可恢復的異常時無需停機即可取得進展。與未知 Solr 架構字段相關的損壞或格式錯誤的解析器輸入數據,解析器錯誤和錯誤會產生不可恢復的異常。
recoverableExceptionClasses org.apache.solr.client.solrj.SolrServerException 以逗號分隔的可恢復異常列表,這些異常往往是暫時的,在這種情況下,可以重試相應的任務。示例包括網絡連接錯誤,超時等。當生產模式標志設置為 true 時,使用此參數配置的可恢復異常將不會被忽略,因此將導致重試。
isIgnoringRecoverableExceptions false 如果不可恢復的異常被意外錯誤分類為可恢復,則應啟用此標志。這使得 sink 能夠取得進展並避免永遠重試 event 。
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.Solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000
ElasticSearchSink

  

 


此 sink 將數據寫入彈性搜索集群. 默認情況下, 將寫入 event 以便 Kibana http://kibana.org/ 圖形界面可以顯示它們 - 就像 logstash 一樣 https://logstash.net/ 編寫它們一樣

必須將環境所需的 Elasticsearch 和 lucene-core jar 放在 Apache Flume 安裝的 lib 目錄中. Elasticsearch 要求客戶端 JAR 的主要版本與服務器的主要版本匹配, 並且兩者都運行相同的 JVM 次要版本. 如果這不正確, 將出現 SerializationExceptions. 要選擇所需的版本, 請首先確定 Elasticsearch 的版本以及目標群集正在運行的 JVM 版本. 然后選擇與主要版本匹配的 Elasticsearch 客戶端庫.0.19.x 客戶端可以與 0.19.x 群集通信; 0.20.x 可以與 0.20.x 對話, 0.90.x 可以與 0.90.x 對話. 確定 Elasticsearch 版本后, 讀取 pom.xml 文件以確定要使用的正確 lucene-core JAR 版本.

event 將每天寫入新索引. 名稱將是 < indexName> -yyyy-MM-dd, 其中 < indexName > 是 indexName 參數. sink 將在午夜 UTC 開始寫入新索引.

默認情況下, ElasticSearchLogStashEventSerializer 會為彈性搜索序列化 event . 可以使用 serializer 參數覆蓋此行為. 此參數接受 org.apache.flume.sink.Elasticsearch.ElasticSearchEventSerializer 或 org.apache.flume.sink.Elasticsearch.ElasticSearchIndexRequestBuilderFactory 的實現. 不推薦使用 ElasticSearchEventSerializer 來支持更強大的 ElasticSearchIndexRequestBuilderFactory

type 是 FQCN:org.apache.flume.sink.Elasticsearch.ElasticSearchSink

必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是 org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostname - 逗號分隔的 hostname:port 列表,如果端口不存在,將使用默認端口'9300'
indexName flume 將附加日期的索引的名稱。示例'flume' - >'flume-yyyy-MM-dd'支持任意標題替換,例如。%{header} 替換為命名 event 標頭的值
indexType logs 索引文檔的 type,默認為'log'支持任意標頭替換,例如。%{header} 替換為命名 event 標頭的值
clusterName elasticsearch 要連接的 ElasticSearch 集群的名稱
batchSize 100 每個 txn 要寫入的 event 數。
ttl - TTL 以天為單位,設置時會導致過期文檔自動刪除,如果沒有設置,文檔將永遠不會被自動刪除。TTL 在早期的整數形式中都被接受,例如 a1.sink.k1.ttl = 5,還有限定符 ms(毫秒),s(秒),m(分鍾),h(小時),d(天)和 w(周)。示例 a1.sink.k1.ttl = 5d 將 TTL 設置為 5 天。按照 < a href="http://www.elasticsearch.org/guide/reference/mapping/ttl-field/以獲取更多信息。" ztid="2867" ow="472" oh="36">http://www.elasticsearch.org/guide/reference/mapping/ttl-field / 以獲取更多信息。
serializer org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer 要使用的 ElasticSearchIndexRequestBuilderFactory 或 ElasticSearchEventSerializer。接受任一類的實現,但首選 ElasticSearchIndexRequestBuilderFactory。
serializer.* - 要傳遞給序列化程序的屬性。

注意

使用標頭替換可以方便地使用 event 標頭的值來動態決定在存儲 event 時要使用的 indexName 和 indexType. 使用此功能時應謹慎, 因為 event 提交者現在可以控制 indexName 和 indexType. 此外, 如果使用 Elasticsearch REST 客戶端, 則 event 提交者可以控制所使用的 URL 路徑

Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = Elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.Elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
Kite Dataset Sink

  

將 event 寫入 Kite 數據集的 http://kitesdk.org/docs/current/guide/ 實驗 sink. 此 sink 將反序列化每個傳入 event 的主體, 並將結果記錄存儲在風箏數據集中. 它通過按 URI 加載數據集來確定目標數據集.

唯一支持的序列化的 Avro 和記錄模式必須在 event 標題使用或者被通過, flume.avro.schema.literal 與 JSON 模式表示或 flume.avro.schema.url 有一個網址, 該模式可能會發現(hdfs:/ ... 支持 URI). 這與使用 deserializer.schemaType = LITERAL 的 Log4jAppender 水槽客戶端和 false 脫機目錄 source 的 Avro 反序列化器 兼容.

注 1: 不支持 flume.avro.schema.hash 標頭. 注意 2: 在某些情況下, 在超過滾動間隔后可能會略微發生文件滾動. 但是, 此延遲不會超過 5 秒. 在大多數情況下, 延遲是可以忽略的.

屬性名稱 默認 描述
channel -  
type - 必須是 org.apache.flume.sink.kite.DatasetSink
kite.dataset.uri - 要打開的數據集的 URI
kite.repo.uri - 要打開的存儲庫的 URI(不建議使用; 請改用 kite.dataset.uri)
kite.dataset.namespace - 數據集的命名空間,其中將寫入記錄(不建議使用; 請改用 kite.dataset.uri)
kite.dataset.name - 將寫入記錄的數據集的名稱(不建議使用; 請改用 kite.dataset.uri)
kite.batchSize 100 每批中要處理的記錄數
kite.rollInterval 30 釋放數據文件之前的最長等待時間(秒)
kite.flushable.commitOnBatch true 如果為 true,將提交 Flume 事務,並在每批 kite.batchSize  記錄上刷新 writer。此設置僅適用於可刷新數據集。如果為 & nbsp;true,則可以將具有提交數據的臨時文件保留在數據集目錄中。需要手動恢復這些文件,以使數據對 DatasetReaders 可見。
kite.syncable.syncOnBatch true 控制 sink 在提交事務時是否也將同步數據。此設置僅適用於可同步數據集。同步 gaurentees 數據將寫入遠程系統上的穩定存儲,同時只刷新數據已離開 Flume 的客戶端緩沖區的 gaurentees。當 kite.flushable.commitOnBatch 屬性設置為 false 時,此屬性也必須設置為 false。
kite.entityParser Avro  將 Flume Events 變為 Kite 實體的 Parser 。有效值是 avro 和 EntityParser.Builder 接口的實現的完全限定類名。
kite.failurePolicy retry 政策處理不可恢復的錯誤,如缺少 & nbsp; 架構在 event 頭。默認值 retry 將使當前批處理失敗,並再次嘗試與舊行為匹配。其他有效值是保存,這將寫原始 event 到 kite.error.dataset.uri 數據集,以及一個實現的完全限定類名 FailurePolicy.Builder 接口。
kite.error.dataset.uri - 當 kite.failurePolicy 設置為 save 時,保存失敗 event 的數據集的 URI 。需要的時候 kite.failurePolicy 設置為保存。
auth.kerberosPrincipal - 用於 HDFS 安全身份驗證的 Kerberos 用戶主體
auth.kerberosKeytab - 主體的 Kerberos keytab 位置(本地 FS)
auth.proxyUser - HDFS 操作的有效用戶,如果與 kerberos 主體不同

Kafka Sink

這是一個 Flume Sink 實現, 可以將數據發布到 Kafka http://kafka.apache.org/ 主題. 其中一個目標是將 Flume 與 Kafka 集成, 以便基於拉的處理系統可以處理來自各種 Flume source 的數據.

這目前支持 Kafka 服務器版本 0.10.1.0 或更高版本. 測試完成了 2.0.1, 這是發布時最高的可用版本

必需屬性以粗體字體標記

屬性名稱 默認 描述
type - 必須設置為 org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers - Kafka-Sink 將連接到的經紀人列表,以獲取主題分區列表這可以是經紀人的部分列表,但我們建議至少兩個用於 HA。格式為逗號分隔的 hostname:port 列表
kafka.topic default-flume-topic Kafka 中將發布消息的主題。如果配置了此參數,則會將消息發布到此主題。如果 event 標頭包含 “主題” 字段,則 event 將發布到該主題,從而覆蓋此處配置的主題。支持任意頭部替換,例如。%{header}將替換為名為 “header” 的 event 標頭的值。(如果使用替換,建議將 Kafka agent 的 “auto.create.topics.enable” 屬性設置為 true。)
flumeBatchSize 100 一批中要處理的消息數。較大批量可提高吞吐量,同時增加延遲。
kafka.producer.acks 1 在考慮成功寫入之前,有多少副本必須確認消息。接受的值為 0(從不等待確認),1(僅等待前導),-1(等待所有副本)將此值設置為 - 1,以避免在某些領導失敗的情況下丟失數據。
useFlumeEventFormat false 默認情況下,event 直接從 event 主體作為字節放到 Kafka 主題上。設置為 true 以將 event 存儲為 Flume Avro 二進制格式。與 KafkaSource 上的相同屬性或 Kafka Channel 上的 parseAsFlumeEvent 屬性一起使用時,這將保留生成端的任何 Flume 頭。
defaultPartitionId - 指定要發送到此 channel 中的所有 event 的 Kafka 分區 ID(整數),除非被 partitionIdHeader 覆蓋。默認情況下,如果未設置此屬性,則 event 將由 Kafka Producer 的分區程序分發 - 包括按鍵(如果指定)(或由 kafka.partitioner.class 指定的分區程序)。
partitionIdHeader - 設置后,sink 將使用 event 標頭中使用此屬性的值命名的字段的值,並將消息發送到主題的指定分區。如果該值表示無效分區,則將拋出 EventDeliveryException。如果存在標頭值,則此設置將覆蓋 defaultPartitionId。
allowTopicOverride true 設置后,sink 將允許將消息生成到 topicHeader 屬性指定的主題中(如果提供)。
topicHeader topic 當與 allowTopicOverride 一起設置時,將生成一個消息,該消息將使用此屬性的值命名為標頭的值。與 Kafka Source topicHeader 屬性一起使用時應小心,以避免創建環回。
kafka.producer.security.protocol PLAINTEXT 如果使用某種級別的安全性寫入 Kafka,則設置為 SASL_PLAINTEXT,SASL_SSL 或 SSL。有關安全設置的其他信息,請參見下文。
more producer security props   如果使用 SASL_PLAINTEXT,則 SASL_SSL 或 SSL 會引用 Kafka 安全性以獲取需要在生產者上設置的其他屬性。
其他 Kafka Producer Properties - 這些屬性用於配置 Kafka Producer。可以使用 Kafka 支持的任何生產者屬性。唯一的要求是使用前綴 kafka.producer 添加屬性名稱 & nbsp;。例如:kafka.producer.linger.ms

注意

Kafka Sink 使用 Flume Event 標頭中的 topic 和 key 屬性將 event 發送到 Kafka. 如果 topic 中存在 headers , 則會將 event 發送到該特定 topic, 從而覆蓋為 Sink 配置的主題. 如果 key 在 topic 存在, 關鍵還是使用 Kafkatopic 分區分區數據. 具有相同密鑰的 event 將發送到同一分區. 如果密鑰為空, 則將 event 發送到隨機分區.

Kafka sink 還為 key.serializer(org.apache.kafka.common.serialization.StringSerializer)和 value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer)

提供默認值. 不建議修改這些參數

不推薦使用的屬性:

屬性名稱 默認 描述
brokerList - 使用 kafka.bootstrap.servers
topic default-flume-topic 使用 kafka.topic
BATCHSIZE 100 使用 kafka.flumeBatchSize
requiredAcks 1 使用 kafka.producer.acks

下面給出 Kafka sink 的示例配置. 以 kafka 生產者前綴 kafka.producer 開頭的屬性. 創建 Kafka 生成器時傳遞的屬性不限於此示例中給出的屬性. 此外, 可以在此處包含您的自定義屬性, 並通過作為方法參數傳入的 Flume Context 對象在預處理器中訪問它們.

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.Bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

  

安全和 Kafka Sink

Flume 和 Kafka 之間的通信 channel 支持安全認證和數據加密. 對於安全身份驗證, 可以使用 Kafka 0.9.0 版中的 SASL / GSSAPI(Kerberos V5)或 SSL(即使該參數名為 SSL, 實際協議是 TLS 實現)

截至目前, 數據加密僅由 SSL / TLS 提供.

將 kafka.producer.security.protocol 設置為以下任何值意味着:

SASL_PLAINTEXT - 無數據加密的 Kerberos 或純文本身份驗證

SASL_SSL - 使用數據加密的 Kerberos 或純文本身份驗證

SSL - 基於 TLS 的加密, 帶有可選的身份驗證

警告

啟用 SSL 時性能會下降, 其大小取決於 CPUtype 和 JVM 實現. 參考: Kafka 安全概述 和跟蹤此問題的 jira: https://issues/jira/browse/KAFKA-2561

TLS 和 Kafka Sink:

請閱讀配置 Kafka 客戶端 SSL 中描述的步驟, 以了解用於微調的其他配置設置, 例如以下任何一項: 安全提供程序, 密碼套件, 啟用的協議, 信任庫或密鑰庫 type.

配置服務器端身份驗證和數據加密的示例.

在此處指定信任庫是可選的, 可以使用全局信任庫. 有關全局 SSL 設置的更多詳細信息, 請參閱 SSL / TLS 支持部分.

注意: 默認情況下, 未定義屬性 ssl.endpoint.identification.algorithm, 因此不會執行 hostname 驗證. 要啟用 hostname 驗證, 請設置以下屬性

a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

啟用后, 客戶端將根據以下兩個字段之一驗證服務器的完全限定域名(FQDN):

通用名稱(CN) https://tools.ietf.org/html/rfc6125#section-2.3 https://tools.ietf.org/html/rfc6125#section-2.3

主題備選名稱(SAN)

如果還需要客戶端身份驗證, 則還需要將以下內容添加到 Flume agent 配置中, 或者可以使用全局 SSL 設置(請參閱 SSL / TLS 支持部分). 每個 Flume agent 都必須擁有其客戶證書, Kafka 經紀人必須單獨或通過其簽名鏈來信任. 常見示例是由單個根 CA 簽署每個客戶端證書, 而后者又由 Kafka 經紀人信任

# optional, the global keystore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

  

如果密鑰庫和密鑰使用不同的密碼保護, 則 ssl.key.password 屬性將為生產者密鑰庫提供所需的額外密鑰

a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos 和 Kafka Sink

要將 Kafkasink 與使用 Kerberos 保護的 Kafka 集群一起使用, 請為生產者設置上面提到的 producer.security.protocol 屬性. 與 Kafka agent 一起使用的 Kerberos 密鑰表和主體在 JAAS 文件的 "KafkaClient" 部分中指定."客戶端" 部分描述了 Zookeeper 連接(如果需要). 有關 JAAS 文件內容的信息, 請參閱 Kafka doc. 可以通過 flume-env.sh 中的 JAVA_OPTS 指定此 JAAS 文件的位置以及可選的系統范圍的 kerberos 配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

 

使用 SASL_PLAINTEXT 的示例安全配置:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.Bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

  

使用 SASL_SSL 的安全配置示例:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.Bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

  

示例 JAAS 文件. 有關其內容的參考, 請參閱 SASL 配置的 Kafka 文檔中所需認證機制 (GSSAPI / PLAIN) 的客戶端配置部分. 與 Kafka Source 或 Kafka Channel 不同, 不需要 "Client" 部分, 除非其他連接組件需要它. 另外, 請確保 Flume 進程的操作系統用戶對 jaas 和 keytab 文件具有讀權限.

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
HTTP Sink

  

此 sink 的行為是它將從 channel 獲取 event , 並使用 HTTP POST 請求將這些 event 發送到遠程服務. event 內容作為 POST 正文發送

此 sink 的錯誤處理行為取決於目標服務器返回的 HTTP 響應. sink 退避 / 就緒狀態是可配置的, 事務提交 / 回滾結果以及 event 是否有助於成功的 event 排放計數也是可配置的

狀態代碼不可讀的服務器返回的任何格式錯誤的 HTTP 響應都將導致退避信號, 並且不會從該 channel 中消耗該 event

必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱需要為 http。
endpoint - POST 到的完全限定的 URL 端點
connectTimeout 5000 套接字 connect-timeout(以毫秒為單位)
request-timeout 5000 最大請求處理時間(以毫秒為單位)
contentTypeHeader text/plain HTTP Content-Type 標頭
acceptHeader text/plain HTTP Accept 標頭值
defaultBackoff true 是否在接收所有 HTTP 狀態代碼時默認退避
defaultRollback true 是否在接收所有 HTTP 狀態代碼時默認回滾
defaultIncrementMetrics false 是否在接收所有 HTTP 狀態代碼時默認增加指標
backoff.CODE - 為個人(即 200)代碼或組(即 2XX)代碼配置特定退避
rollback.CODE - 為單個(即 200)代碼或組(即 2XX)代碼配置特定回滾
incrementMetrics.CODE - 配置單個(即 200)代碼或組(即 2XX)代碼的特定度量增量

請注意, 最具體的 HTTP 狀態代碼匹配用於 backoff,rollback 和 incrementMetrics 配置選項. 如果存在 2XX 和 200 狀態代碼的配置值, 則 200 個 HTTP 代碼將使用 200 值, 而 201-299 范圍內的所有其他 HTTP 代碼將使用 2XX 值.

消耗任何空或空 event 而不向 HTTP 端點發出任何請求.

Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/JSON
a1.sinks.k1.contentTypeHeader = application/JSON
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true
Custom Sink

  

自定義 sink 是您自己的 Sink 接口實現. 啟動 Flume agent 程序時, 必須在 agent 程序的類路徑中包含自定義 sink 的類及其依賴項.自定義 sink 的 type 是其 FQCN. 必需屬性以粗體顯示

屬性名稱 默認 描述
channel -  
type - 組件 type 名稱,需要是您的 FQCN
Example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1
Flume Channels

  

 


channel 是 event 在 agent 上暫存的存儲庫. Source 添加 event ,Sink 刪除它.

Memory Channel

event 存儲在具有可配置最大大小的內存中隊列中. 它非常適合需要更高吞吐量的流量, 並且在 agent 發生故障時准備丟失分階段數據. 必需屬性以粗體顯示

屬性名稱 默認 描述
type - 組件 type 名稱,需要是 & nbsp;memory
capacity 100 channel 中存儲的最大 event 數
transactionCapacity 100 每個事務 channel 從 source 或提供給 sink 的最大 event 數
keep-alive 3 添加或刪除 event 的超時(以秒為單位)
byteCapacityBufferPercentage 20 定義 byteCapacity 與 channel 中所有 event 的估計總大小之間的緩沖區百分比,以計算標頭中的數據。見下文。
byteCapacity 見說明 允許的最大內存總字節數,作為此 channel 中所有 event 的總和。該實現僅計算 Event  主體,這也是提供 byteCapacityBufferPercentage 配置參數的原因。默認值為計算值,等於 JVM 可用的最大內存的 80%(即命令行傳遞的 - Xmx 值的 80%)。請注意,如果在單個 JVM 上有多個內存 channel,並且它們碰巧保持相同的物理 event (即,如果您使用來自單個 source 的復制 channel 選擇器),那么這些 event 大小可能會被重復計算以用於 channelbyteCapacity 目的。將此值設置為 0 將導致此值回退到大約 200 GB 的內部硬限制。
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
JDBC Channel

  

 


event 存儲在由數據庫支持的持久存儲中. JDBC Channel 當前支持嵌入式 Derby. 這是一個持久的 channel, 非常適合可恢復性很重要的流程. 必需屬性以粗體顯示

屬性名稱 默認 描述
type - 組件 type 名稱,需要是 & nbsp;  jdbc
db.type DERBY 數據庫供應商需要是 DERBY。
driver.class org.apache.derby.jdbc.EmbeddedDriver 供應商的 JDBC 驅動程序的類
driver.url (由其他屬性建造) JDBC 連接 URL
db.username “sa” 數據庫連接的用戶標識
db.password - 數據庫連接的密碼
connection.properties.file - JDBC 連接屬性文件路徑
create.schema true 如果為 true,則創建 db 模式(如果不存在)
create.index true 創建索引以加快查找速度
create.foreignkey true  
transaction.isolation “READ_COMMITTED” db 會話的隔離級別 READ_UNCOMMITTED,READ_COMMITTED,SERIALIZABLE,REPEATABLE_READ
maximum.connections 10 允許 db 的最大連接數
最大容量 0(無限制) channel 中的最大 event 數
sysprop。*   數據庫供應商特定屬性
sysprop.user.home   存儲嵌入式 Derby 數據庫的主路徑

 

Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = jdbc
Kafka Channel

  


event 存儲在 Kafka 群集中(必須單獨安裝).Kafka 提供高可用性和復制, 因此如果 agent 或 kafka agent 崩潰, event 可立即用於其他 sinks

Kafka 頻道可用於多種場景:

使用 Flume source 和 sink - 它為 event 提供了可靠且高度可用的 channel

使用 Flume source 和攔截器但沒有 sink - 它允許將 Flumeevent 寫入 Kafka 主題, 供其他應用程序使用

使用 Flume sink, 但沒有 source - 它是一種低延遲, 容錯的方式將 event 從 Kafka 發送到 Flumesink, 如 HDFS,HBase 或 Solr

這目前支持 Kafka 服務器版本 0.10.1.0 或更高版本. 測試完成了 2.0.1, 這是發布時最高的可用版本.

配置參數組織如下:

通常與 channel 相關的配置值應用於 channel 配置級別, 例如: a1.channel.k1.type =

與 Kafka 相關的配置值或 Channel 運行的前綴以 "kafka." 為前綴(這對 CommonClient Configs 是有效的), 例如: a1.channel.k1.kafka.topic 和 a1.channel.k1.kafka.Bootstrap.servers. 這與 hdfssink 的運行方式沒有什么不同

特定於生產者 / 消費者的屬性以 kafka.producer 或 kafka.consumer 為前綴

在可能的情況下, 使用 Kafka 參數名稱, 例如: Bootstrap.servers 和 acks

此版本的 flume 與以前的版本向后兼容, 但是下表中顯示了已棄用的屬性, 並且在配置文件中存在時會在啟動時記錄警告消息.

必需屬性以粗體顯示

屬性名稱 默認 描述
type - 組件 type 名稱,需要是 org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers - channel 使用的 Kafka 集群中的經紀商列表這可以是經紀人的部分列表,但我們建議至少兩個用於 HA。格式為逗號分隔的 hostname:port 列表
kafka.topic 水槽溝道 頻道將使用的 Kafka 主題
kafka.consumer.group.id 水槽 channel 用於向 Kafka 注冊的消費者群組 ID。多個 channel 必須使用相同的主題和組,以確保當一個 agent 程序發生故障時,另一個 agent 程序可以獲取數據請注意,具有相同 ID 的非 channel 使用者可能會導致數據丟失。
parseAsFlumeEvent 真正 期望在頻道中使用 FlumeEvent 架構的 Avro 基准。如果 Flume source 寫入 channel,則應該為 true; 如果其他生成器正在寫入 channel 正在使用的主題,則應為 false。通過使用 flume-ng-sdk 工件提供的 org.apache.flume.source.avro.AvroFlumeEvent,可以在 Flume 之外解析到 Kafka 的 Flume source 消息
的 pollTimeout 500 在消費者的 “poll()” 調用中等待的時間量(以毫秒為單位)。https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
defaultPartitionId - 指定要發送到此 channel 中的所有 event 的 Kafka 分區 ID(整數),除非被 partitionIdHeader 覆蓋。默認情況下,如果未設置此屬性,則 event 將由 Kafka Producer 的分區程序分發 - 包括按鍵(如果指定)(或由 kafka.partitioner.class 指定的分區程序)。
partitionIdHeader - 設置時,生產者將從 event 頭中獲取使用此屬性的值命名的字段的值,並將消息發送到主題的指定分區。如果該值表示無效分區,則該 event 將不被接受。如果存在標頭值,則此設置將覆蓋 defaultPartitionId。
kafka.consumer.auto.offset.reset 最新 當 Kafka 中沒有初始偏移量或服務器上當前偏移量不再存在時(例如因為該數據已被刪除)該怎么辦:最早:自動將偏移量重置為最早的最新偏移量:自動重置偏移量到最新的偏移量無:如果沒有為消費者的組找到任何其他偏移量,則向消費者拋出異常:向消費者拋出異常。
kafka.producer.security.protocol 純文本 如果使用某種級別的安全性寫入 Kafka,則設置為 SASL_PLAINTEXT,SASL_SSL 或 SSL。有關安全設置的其他信息,請參見下文。
kafka.consumer.security.protocol 純文本 與 kafka.producer.security.protocol 相同,但是用於從 Kafka 閱讀 / 消費。
更多生產者 / 消費者安全道具   如果使用 SASL_PLAINTEXT,則 SASL_SSL 或 SSL 會引用 Kafka 安全性以獲取需要在生產者 / 消費者上設置的其他屬性。

不推薦使用的屬性:

屬性名稱 默認 描述
brokerList - channel 使用的 Kafka 集群中的經紀商列表這可以是經紀人的部分列表,但我們建議至少兩個用於 HA。格式為逗號分隔的 hostname:port 列表
話題 水槽溝道 使用 kafka.topic
的 groupId 水槽 使用 kafka.consumer.group.id
readSmallestOffset false 使用 kafka.consumer.auto.offset.reset
migrateZookeeperOffsets 真正 如果找不到 Kafka 存儲的偏移量,請在 Zookeeper 中查找偏移量並將它們提交給 Kafka。這應該是支持從舊版本的 Flume 無縫 Kafka 客戶端遷移。遷移后,可以將其設置為 false,但通常不需要這樣做。如果未找到 Zookeeper 偏移量,則 kafka.consumer.auto.offset.reset 配置定義如何處理偏移量。

注意

Example for agent named a1:
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.Bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

  

安全和 Kafka 頻道:

Flume 和 Kafka 之間的通信 channel 支持安全認證和數據加密. 對於安全身份驗證, 可以使用 Kafka 0.9.0 版中的 SASL / GSSAPI(Kerberos V5)或 SSL(即使該參數名為 SSL, 實際協議是 TLS 實現)

截至目前, 數據加密僅由 SSL / TLS 提供

將 kafka.producer | consumer.security.protocol 設置為以下任何值意味着:

SASL_PLAINTEXT - 無數據加密的 Kerberos 或純文本身份驗證

SASL_SSL - 使用數據加密的 Kerberos 或純文本身份驗證

SSL - 基於 TLS 的加密, 帶有可選的身份驗證.

警告

啟用 SSL 時性能會下降, 其大小取決於 CPUtype 和 JVM 實現. 參考: Kafka 安全概述 和跟蹤此問題的 jira: https://issues/jira/browse/KAFKA-2561

** TLS 和 Kafka 頻道:**

請閱讀配置 Kafka 客戶端 SSL 中描述的步驟 以了解用於微調的其他配置設置, 例如以下任何一項: 安全提供程序, 密碼套件, 啟用的協議, 信任庫或密鑰庫 type.

配置服務器端身份驗證和數據加密的示例.

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.Bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

  

在此處指定信任庫是可選的, 可以使用全局信任庫. 有關全局 SSL 設置的更多詳細信息, 請參閱 SSL / TLS 支持部分.

注意: 默認情況下, 未定義屬性 ssl.endpoint.identification.algorithm, 因此不會執行 hostname 驗證. 要啟用 hostname 驗證, 請設置以下屬性

  1. a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
  2. a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS

啟用后, 客戶端將根據以下兩個字段之一驗證服務器的完全限定域名(FQDN):

通用名稱(CN) https://tools.ietf.org/html/rfc6125#section-2.3 https://tools.ietf.org/html/rfc6125#section-2.3

主題備選名稱(SAN)

如果還需要客戶端身份驗證, 則還需要將以下內容添加到 Flume agent 配置中, 或者可以使用全局 SSL 設置(請參閱 SSL / TLS 支持部分). 每個 Flume agent 都必須擁有其客戶證書, Kafka 經紀人必須單獨或通過其簽名鏈來信任. 常見示例是由單個根 CA 簽署每個客戶端證書, 而后者又由 Kafka 經紀人信任.

# optional, the global keystore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
# optional, the global keystore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>

  

如果密鑰庫和密鑰使用不同的密碼保護, 則 ssl.key.password 屬性將為使用者和生產者密鑰庫提供所需的額外密鑰:

a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>

  

** Kerberos 和 Kafka 頻道:**

要將 Kafkachannel 與使用 Kerberos 保護的 Kafka 群集一起使用, 請為生產者和 / 或使用者設置上面提到的 producer / consumer.security.protocol 屬性. 與 Kafka agent 一起使用的 Kerberos 密鑰表和主體在 JAAS 文件的 "KafkaClient" 部分中指定."客戶端" 部分描述了 Zookeeper 連接(如果需要). 有關 JAAS 文件內容的信息, 請參閱 Kafka doc. 可以通過 flume-env.sh 中的 JAVA_OPTS 指定此 JAAS 文件的位置以及可選的系統范圍的 kerberos 配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

 

使用 SASL_PLAINTEXT 的示例安全配置:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.Bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka

  

使用 SASL_SSL 的安全配置示例:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.Bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

  

示例 JAAS 文件. 有關其內容的參考, 請參閱 SASL 配置的 Kafka 文檔中所需認證機制 (GSSAPI / PLAIN) 的客戶端配置部分. 由於 Kafka Source 也可能連接到 Zookeeper 以進行偏移遷移, 因此 "Client" 部分也添加到此示例中. 除非您需要偏移遷移, 否則不需要這樣做, 或者您需要此部分用於其他安全組件. 另外, 請確保 Flume 進程的操作系統用戶對 jaas 和 keytab 文件具有讀權限.

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/path/to/keytabs/flume.keytab"
principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};
File Channel

  

必需屬性以粗體顯示

屬性名稱默認值 描述  
type - 組件 type 名稱,需要是 & nbsp; file 。
checkpointDir ~/.flume/file-channel/checkpoint 將存儲檢查點文件的目錄
useDualCheckpoints false 備份檢查點。如果將其設置為 true,則 & nbsp; 必須設置 backupCheckpointDir
backupCheckpointDir - 備份檢查點的目錄。此目錄不能與數據目錄或檢查點目錄相同
dataDirs ~/.flume/file-channel/data 逗號分隔的目錄列表,用於存儲日志文件。在不同磁盤上使用多個目錄可以提高文件 channel 的性能
transactionCapacity 10000 channel 支持的最大事務大小
checkpointInterval 30000 檢查點之間的時間量(以毫秒為單位)
maxFileSize 為 2146435071 單個日志文件的最大大小(以字節為單位)  2G
minimumRequiredSpace 524288000 最小所需可用空間(以字節為單位)。為避免數據損壞,當可用空間低於此值時,文件 channel 將停止接受接收 / 放置請求 500M
capacity 1000000 channel 的最大容量 & nbsp;  100 萬
keep-alive 3 等待放置操作的時間量(以秒為單位)
use-log-replay-v1 false 推薦:使用原有的重播邏輯
use-fast-replay false 推薦:不使用隊列重播
checkpointOnClose true 控制是否在關閉 channel 時創建檢查點。通過避免重放,在關閉時創建檢查點可以提高文件 channel 的后續啟動速度。
encryption.activeKey - 用於加密新數據的密鑰名稱
encryption.cipherProvider - 密碼提供程序 type,支持的 type:AESCTRNOPADDING
encryption.keyProvider - 密鑰提供程序 type,支持的 type:JCEKSFILE
encryption.keyProvider.keyStoreFile - 密鑰庫文件的路徑
encrpytion.keyProvider.keyStorePasswordFile - 密鑰庫密碼文件的路徑
encryption.keyProvider.keys - 所有鍵的列表(例如 activeKey 設置的歷史記錄)
encyption.keyProvider.keys.*.passwordFile - 可選密鑰密碼文件的路徑

注意

默認情況下, 文件 channel 使用上面指定的用戶主目錄內的檢查點和數據目錄的路徑. 因此, 如果 agent 中有多個活動的文件 channel 實例, 則只有一個實例可以鎖定目錄並導致其他 channel 初始化失敗.

因此, 有必要為所有已配置的 channel 提供顯式路徑, 最好是在不同的磁盤上

此外, 由於文件 channel 將在每次提交后同步到磁盤, 因此將其與將 event 一起批處理的 sink/source 耦合可能是必要的, 以便在多個磁盤不可用於檢查點和數據目錄時提供良好的性能.

Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

  

加密

以下是一些示例配置:

使用密鑰存儲密碼分隔生成密碼密鑰:

keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
-keysize 128 -validity 9000 -keystore test.keystore \
-storetype jceks -storepass keyStorePassword

 

使用與密鑰庫密碼相同的密碼生成密鑰:

keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
-keystore src/test/resources/test.keystore -storetype jceks \
-storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0

  

false 設您已使用密鑰 0 輸出密鑰, 並且應使用密鑰 1 加密新文件:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

  

與上面相同的場景, 但 key-0 有自己的密碼:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password
Custom Channel

  

自定義 channel 是您自己的 Channel 接口實現. 啟動 Flume agent 程序時, 必須在 agent 程序的類路徑中包含自定義 channel 的類及其依賴項. 自定義 channel 的 type 是其 FQCN. 必需屬性以粗體顯示

屬性名稱 默認 描述
type - 組件 type 名稱需要是 FQCN
Example for agent named a1:
a1.channels = c1
a1.channels.c1.type = org.example.MyChannel
Flume Channel Selectors

  

 


如果未指定 type, 則默認為 "replicating"

Replicating Channel Selector (default)

必需屬性以粗體顯示

屬性名稱 默認 描述
selector.type replicating 組件 type 名稱需要 & nbsp;  replicating
selector.optional - 要標記為可選的 channel 集
Example for agent named a1 and it's source called r1:
a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

  


在上面的配置中, c3 是可選的 channel.

無法寫入 c3 只是被忽略了. 由於 c1 和 c2 未標記為可選, 因此無法寫入這些 channel 將導致事務失敗.

Multiplexing Channel Selector (多路復用)

必需屬性以粗體顯示

屬性名稱 默認 描述
selector.type replicating 組件 type 名稱需要進行多路復用 (multiplexing)
selector.header flume.selector.header  
selector.default -  
selector.mapping。* -  
Example for agent named a1 and it's source called r1:
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
Custom Channel Selector

  

 


自定義 channel 選擇器是您自己的 Channelelector 接口實現. 啟動 Flume agent 程序時, 自定義 channel 選擇器的類及其依賴項必須包含在 agent 程序的類路徑中. 自定義 channel 選擇器的 type 是其 FQCN.

屬性名稱 默認 描述
selector.type - 組件 type 名稱,需要是您的 FQCN 
Example for agent named a1 and its source called r1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector
Flume Sink Processors

  

 


接收組允許用戶將多個 sink 分組到一個實體中. sink 處理器可用於在組內的所有 sink 上提供負載平衡功能, 或在時間故障的情況下實現從一個 sink 到另一個 sink 的故障轉移.

必需屬性以粗體顯示

屬性名稱 默認 描述
sinks - 以空格分隔的參與組的 sink 列表
processor.type default 組件 type 名稱需要是 default,failover 或 load_balance
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
Default Sink Processor

  

 


默認 sink 只接受一個 sink. 用戶不必為單個 sink 創建處理器(sink 組). 相反, 用戶可以遵循本用戶指南中上面解釋的 source - channel - sink 模式

Failover Sink Processor

故障轉移 sink 維護一個優先級的 sink 列表, 保證只要有一個可用的 event 將被處理(傳遞)

故障轉移機制的工作原理是將故障 sink 降級到池中, 在池中為它們分配一個冷卻期, 在重試之前隨順序故障而增加. sink 成功發送 event 后, 它將恢復到實時池. sink 具有與之相關的優先級, 數量越大, 優先級越高. 如果在發送 event 時 sink 發生故障, 則接下來將嘗試下一個具有最高優先級的 sink 以發送 event . 例如, 在優先級為 80 的 sink 之前激活優先級為 100 的 sink. 如果未指定優先級, 則根據配置中指定 sink 的順序確定 Sinks 優先級.

要進行配置, 請將 sink 組處理器設置為故障轉移並為所有單個 sink 設置優先級. 所有指定的優先級必須是唯一的 此外, 可以使用 maxpenalty 屬性設置故障轉移時間的上限(以毫秒為單位)

必需屬性以粗體顯示

屬性名稱 默認 描述
sinks - 以空格分隔的參與組的 sink 列表
processor.type default 組件 type 名稱需要進行故障轉移 & nbsp;  failover
processor.priority - 優先價值。 必須是與當前 sink 組關聯的 sink 實例之一。較高優先級值 Sink 較早被激活。絕對值越大表示優先級越高
processor.maxpenalty 30000 失敗的 sink 的最大退避時間(以毫秒為單位)
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
Load balancing Sink Processor(負載均衡接收處理器)

  

 


負載平衡 sink 處理器提供了在多個 sink 上進行負載均衡流量的功能. 它維護一個索引的活動 sink 列表, 必須在其上分配負載. 實現支持使用 round_robin 或 random 機制分配負載. 選擇機制的選擇默認為 round_robin 類型, 但可以通過配置覆蓋. 通過從 AbstractSinkSelector 繼承的自定義類支持自定義選擇機制.

調用時, 此選擇器使用其配置的選擇機制選擇下一個 sink 並調用它. 對於 round_robin 和 random 如果所選的 sink 無法傳遞 event , 則處理器通過其配置的選擇機制選擇下一個可用的 sink. 此實現不會將失敗的 sink 列入黑名單, 而是繼續樂觀地嘗試每個可用的 sink. 如果所有 sink 調用都導致失敗, 則選擇器將故障傳播到 sink 運行器.

如果啟用了 backoff , 則 sink 處理器會將失敗的 sink 列入黑名單, 將其刪除以供給定超時的選擇. 當超時結束時, 如果 sink 仍然沒有響應, 則超時會以指數方式增加, 以避免在無響應的 sink 上長時間等待時卡住. 在禁用此功能的情況下, 在循環中, 所有失敗的 sink 負載將被傳遞到下一個 sink, 因此不均衡

必需屬性以粗體顯示

屬性名稱 默認 描述
processor.sink - 以空格分隔的參與組的 sink 列表
processor.type default 組件 type 名稱需要是 load_balance
processor.backoff false 失敗的 sink 是否會以指數方式退回。
processor.selector  round_robin 選擇機制。必須是 round_robin,random  或自定義類的 FQCN,它繼承自 AbstractSinkelector
processor.selector.maxTimeOut 30000 由退避選擇器用於限制指數 backoff (以毫秒為單位)
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
Custom Sink Processor

  

 


目前不支持自定義 sink 處理器

Event Serializers

該 file_roll sink 和 HDFS sink 都支持 EventSerializer 接口. 下面提供了隨 Flume 一起提供的 Event Serializer 的詳細信息.

Body Text Serializer

別名: text . 此攔截器將 event 的主體寫入輸出流, 而不進行任何轉換或修改. event 標題將被忽略. 配置選項如下:

屬性名稱 默認 描述
appendNewline 真正 是否在寫入時將換行符附加到每個 event 。由於遺留原因,默認值為 truefalse 定 event 不包含換行符。
Example for agent named a1:
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false
"Flume Event" Avro Event Serializer

  

 


Alias: avro_event.

此攔截器將 Flumeevent 序列化為 Avro 容器文件. 使用的模式與 Avro RPC 機制中用於 Flumeevent 的模式相同.

此序列化程序繼承自 AbstractAvroEventSerializer 類

配置選項如下:

屬性名稱 默認 描述
syncIntervalBytes 2048000 Avro 同步間隔,大約為字節。  2M ???
compressionCodec null  Avro 壓縮編解碼器。有關受支持的編解碼器,請參閱 Avro 的 CodecFactory 文檔。
Example for agent named a1:
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

  

 


Avro event 序列化器

別名: 此序列化程序沒有別名, 必須使用完全限定的類名類名指定.

這將 Flume event 序列化為 Avro 容器文件, 如 "Flume Event" Avro Event Serializer, 但記錄模式是可配置的. 記錄模式可以指定為 Flume 配置屬性, 也可以在 event 頭中傳遞.

要將記錄模式作為 Flume 配置的一部分傳遞, 請使用下面列出的屬性 schemaURL.

要在 event 標頭中傳遞記錄模式, 請指定 包含模式的 JSON 格式表示的 event 標頭 flume.avro.schema.literal 或包含可以找到模式的 URL 的 flume.avro.schema.url( hdfs:/... URIs are supported I)

此序列化程序繼承自 AbstractAvroEventSerializer 類.

配置選項如下:

屬性名稱 默認 描述
syncIntervalBytes 2048000 Avro 同步間隔,大約為字節。
compressionCodec null Avro 壓縮編解碼器。有關受支持的編解碼器,請參閱 Avro 的 CodecFactory 文檔。
schemaURL   Avro 架構 URL。標題中指定的模式 ovverride 此選項。
Example for agent named a1:
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc
Flume Interceptors

  

 


Flume 具有在運行中修改 / 刪除 event 的能力. 這是在攔截器的幫助下完成的. 攔截器是實現 org.apache.flume.interceptor.Interceptor 的類接口. 攔截器可以根據攔截器開發者選擇的任何標准修改甚至刪除 event .Flume 支持攔截器的鏈接. 通過在配置中指定攔截器構建器類名列表, 可以實現此目的. 攔截器在 source 配置中指定為以空格分隔的列表. 指定攔截器的順序是它們被調用的順序. 一個攔截器返回的 event 列表將傳遞給鏈中的下一個攔截器. 攔截器可以修改或刪除 event . 如果攔截器需要刪除 event , 它就不會在它返回的列表中返回該 event . 如果要刪除所有 event , 則只返回一個空列表. 攔截器是命名組件, 下面是它們如何通過配置創建的示例:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{
CollectorHost
}.%Y-%m-%d
a1.sinks.k1.channel = c1

  

請注意, 攔截器構建器將傳遞給 type config 參數. 攔截器本身是可配置的, 可以傳遞配置值, 就像傳遞給任何其他可配置組件一樣. 在上面的示例中, event 首先傳遞給 HostInterceptor, 然后 HostInterceptor 返回的 event 傳遞給 TimestampInterceptor. 您可以指定完全限定的類名 (FQCN) 或別名時間戳. 如果您有多個收集器寫入相同的 HDFS 路徑, 那么您也可以使用 HostInterceptor

時間戳攔截器 (Timestamp Interceptor)

此攔截器將 event 標頭插入到 event 標頭中, 以毫秒為單位處理 event . 此攔截器插入帶有鍵時間戳 (或由 header 屬性指定) 的標頭, 其值為相關時間戳. 如果已在配置中存在, 則此攔截器可以保留現有時間戳

屬性名稱 默認 描述
type - 組件 type 名稱必須是時間戳或 FQCN
headerName timestamp 用於放置生成的時間戳的標頭的名稱。
preserveExisting false 如果時間戳已存在,是否應保留 - true 或 false
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

  

 


主機攔截器(Host Interceptor)

此攔截器插入運行此 agent 程序的主機的 hostname 或 IP 地址. 它根據配置插入帶有密鑰主機或已配置密鑰的標頭, 其值為主機的 hostname 或 IP 地址.

屬性名稱 默認 描述
type - 組件 type 名稱必須是 & nbsp;host
preserveExisting false 如果主機頭已存在,是否應保留 - true 或 false
useIP true 如果為 true,請使用 IP 地址,否則使用 hostname。
hostHeader host 要使用的標頭密鑰。
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

  

 


靜態攔截器 (Static Interceptor)

靜態攔截器允許用戶將具有靜態值的靜態頭附加到所有 event

當前實現不允許一次指定多個標頭. 相反, 用戶可能會鏈接多個靜態攔截器, 每個靜態攔截器定義一個

屬性名稱 默認 描述
type - 組件 type 名稱必須是 & nbsp;static 
preserveExisting true 如果已配置的標頭已存在,則應保留它 - true 或 false
key key 應創建的標頭的名稱
value value 應該創建的靜態值
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
Remove Header Interceptor (刪除標題攔截器)

  


此攔截器通過刪除一個或多個 headers 來操縱 Flume event headers. 它可以刪除靜態定義的 headers, 基於正則表達式的 headers 或列表中的 headers. 如果這些都沒有定義, 或者沒有符合條件的 headers, 則不會修改 Flume 事件.

請注意, 如果只需要刪除一個 headers, 則按名稱指指定 header 可提供相對於其他兩種方法的性能優勢.

屬性名稱 默認 描述
type - 組件 type 名稱必須是 remove_header
withName - 要刪除的 header 的名稱
fromList - 要刪除的 header 列表,使用 fromListSeparator 指定的分隔符分隔
fromListSeparator \ S * \ S * 正則表達式,用於分隔 fromList 指定的列表中的多個 header 名稱。默認值是由任意數量的空白字符包圍的逗號
matching - 將刪除名稱與此正則表達式匹配的所有 header

UUID 攔截器 (UUID Interceptor)

此攔截器在所有截獲的 event 上設置通用唯一標識符. 示例 UUID 是 b5755073-77a9-43c1-8fad-b7a586fc1b97, 表示 128 位值.

如果 event 的應用程序級別唯一鍵不可用, 請考慮使用 UUIDInterceptor 自動為 event 分配 UUID. 一旦 UUID 進入 Flume 網絡就將其分配給 event 非常重要; 也就是說, 在流量的第一個 Flume Source 中. 這使得在 Flume 網絡中面對復制和重新發送時 event 的后續重復數據刪除可以實現高可用性和高性能. 如果應用程序級別密鑰可用, 則優於自動生成的 UUID, 因為它使用所述公知的應用程序級別密鑰在數據存儲中啟用后續更新和 event 刪除.

屬性名稱 默認 描述
type - 組件 type 名稱必須是 org.apache.flume.sink.solr.morphline.UUIDInterceptor $ Builder
headerName id 要修改的 Flume header 的名稱
preserveExisting true 如果 UUID 標頭已存在,是否應保留 - true 或 false
prefix “” 前綴字符串常量,用於預先添加到每個生成的 UUID

Morphline Interceptor (形態攔截器)

此攔截器通過 morphline 配置文件過濾 event , 該文件定義了一系列轉換命令, 用於將記錄從一個命令傳遞到另一個命令. 例如, morphline 可以忽略某些 event 或通過基於正則表達式的模式匹配來更改或插入某些 event 頭, 或者它可以通過 Apache Tika 在截獲的 event 上自動檢測和設置 MIMEtype. 例如, 這種數據包嗅探可用於 Flume 拓撲中基於內容的動態路由. MorphlineInterceptor 還可以幫助實現到多個 Apache Solr 集合的動態路由(例如, 用於多租戶)

目前, 存在一個限制, 即攔截器的形態線不能為每個輸入 event 生成多個輸出記錄. 此攔截器不適用於重型 ETL 處理 - 如果需要, 請考慮將 ETL 處理從 Flume Source 移至 Flume Sink, 例如移至 MorphlineSolrSink.

必需屬性以粗體顯示

屬性名稱 默認 描述
type - 組件 type 名稱必須是 org.apache.flume.sink.solr.morphline.MorphlineInterceptor $ Builder
morphlineFile - 本地文件系統與 morphline 配置文件的相對或絕對路徑。示例:/etc/flume-ng/conf/morphline.conf
morphlineId null 如果 morphline 配置文件中有多個 morphlines,則用於標識 morphline 的可選名稱
Sample flume.conf file:
a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.Solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1

  

 


搜索和替換攔截器 (Search and Replace Interceptor)

此攔截器基於 Java 正則表達式提供簡單的基於字符串的搜索和替換功能. 還可以進行回溯 / 群組捕捉. 此攔截器使用與 Java Matcher.replaceAll()方法中相同的規則

屬性名稱 默認 描述
type - 組件 type 名稱必須是 search_replace
searchPattern - 要搜索和替換的模式。
replaceString - 替換字符串。
charset UTF-8 event 正文的字符集。默認 false 設為 UTF-8。
Example configuration:
a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace
# Remove leading alphanumeric characters in an event body.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
a1.sources.avroSrc.interceptors.search-replace.replaceString =
Another example:
a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace
# Use grouping operators to reorder and munge words on a line.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1

  

 


正則表達式過濾攔截器 (Regex Filtering Interceptor)

此攔截器通過將 event 主體解釋為文本並將文本與配置的正則表達式進行匹配來有選擇地過濾 event . 提供的正則表達式可用於包括 event 或排除 event

屬性名稱 默認 描述
type - 組件 type 名稱必須是 regex_filter
regex ".*"  用於匹配 event 的正則表達式
excludeEvents false 如果為 true,則 regex 確定要排除的 event ,否則 regex 確定要包括的 event 。

正則表達式提取器攔截器(Regex Extractor Interceptor)

此攔截器使用指定的正則表達式提取正則表達式匹配組, 並將匹配組作為標題附加到 event 上. 它還支持可插入序列化程序, 用於在將匹配組添加為 event 標頭之前對其進行格式化.

屬性名稱 默認 描述
type - 組件 type 名稱必須是 regex_extractor
regex - 用於匹配 event 的正則表達式
serializer - 以空格分隔的序列化程序列表,用於映射與標題名稱匹配並序列化其值。(參見下面的示例)Flume 為以下序列化程序提供內置支持: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
serializer 器。.TYPE default 必須是 default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer),org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer,或實現 org.apache.flume.interceptor.RegexExtractorInterceptorSerializer 的自定義類的 FQCN
serializers..name -  
serializers.* - Serializer-specific properties

序列化器用於將匹配映射到標題名稱和格式化標題值; 默認情況下, 您只需指定標題名稱, 將使用默認的 org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer. 此序列化程序只是將匹配映射到指定的標題名稱, 並在正則表達式提取時傳遞值. 您可以使用完全限定的類名 (FQCN) 將自定義序列化程序實現插入到提取器中, 以便以您喜歡的方式格式化匹配.

例 1:

如果 Flumeevent 主體包含 1:2:3.4foobar5 , 則使用以下配置

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

  

提取的 event 將包含相同的主體, 但后面的標題將添加一個 => 1, 兩個 => 2, 三個 => 3

例 2:

如果 Flumeevent 正文包含 2012-10-18 18:47:57,614 某些 日志 行, 則使用以下配置

a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

  

提取的 event 將包含相同的主體, 但后面的標題將添加 timestamp => 1350611220000

Flume 屬性

屬性名稱 默認 描述
flume.called.from.service - 如果指定了此屬性,則 Flume agent 將繼續輪詢配置文件,即使在預期位置找不到配置文件也是如此。否則,如果配置在預期位置不存在,Flume agent 將終止。設置此屬性時不需要屬性值(例如,只需指定 - Dflume.called.from.service 即可)

屬性: flume.called.from.service

Flume 每隔 30 秒定期輪詢一次指定配置文件的更改. 如果首次輪詢現有文件, 或者自上次輪詢以來現有文件的修改日期發生更改, Flume agent 將從配置文件加載新配置. 重命名或移動文件不會更改其修改時間. 當 Flume agent 輪詢一個不存在的文件時, 會發生以下兩種情況之一:

當 agent 首次輪詢不存在的配置文件時, agent 將根據 flume.called.from.service 屬性執行操作. 如果設置了屬性, 則 agent 將繼續輪詢(始終在同一時間 - 每 30 秒). 如果未設置該屬性, 則 agent 會立即終止

當 agent 輪詢一個不存在的配置文件並且這不是第一次輪詢文件時, agent 不會對此輪詢周期進行配置更改. agent 繼續輪詢而不是終止.

配置過濾器

Flume 提供了一種工具, 用於以配置過濾器的形式將敏感或生成的數據注入配置. 配置密鑰可以設置為配置屬性的值, 它將由配置過濾器替換為它所代表的值

配置過濾器的常見用法

格式類似於 Java 表達式語言, 但它目前不是一個完全有效的 EL 表達式解析器, 只是一種看起來像它的格式

<agent_name>
.configfilters =
<filter_name>
<agent_name>
.configfilters.
<filter_name>
.type =
<filter_type>
<agent_name>
.sources.
<source_name>
.parameter = ${
<filter_name>
['
<key_for_sensitive_or_generated_data>
']}
<agent_name>
.sinks.
<sink_name>
.parameter = ${
<filter_name>
['
<key_for_sensitive_or_generated_data>
']}
<agent_name>
.
<component_type>
.
<component_name>
.parameter = ${
<filter_name>
['
<key_for_sensitive_or_generated_data>
']} #or
<agent_name>
.
<component_type>
.
<component_name>
.parameter = ${
<filter_name>
["
<key_for_sensitive_or_generated_data>
"]} #or
<agent_name>
.
<component_type>
.
<component_name>
.parameter = ${
<filter_name>
[
<key_for_sensitive_or_generated_data>
]} #or
<agent_name>
.
<component_type>
.
<component_name>
.parameter = some_constant_data${
<filter_name>
[
<key_for_sensitive_or_generated_data>
]}

  

環境變量配置過濾器

屬性名稱 默認 描述
type - 組件 type 名稱必須是 env

示例, 要在配置中隱藏密碼, 請設置其值, 如以下示例所示

a1.sources = r1
a1.channels = c1
a1.configfilters = f1
a1.configfilters.f1.type = env
a1.sources.r1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.keystorePassword = ${
f1['my_keystore_password']
}

  

這里 a1.source.r1.keystorePassword 配置屬性將獲取 my_keystore_password 環境變量的值. 設置環境變量的一種方法是運行如下所示的 flume agent:

$ my_keystore_password = Secret123 bin / flume -ng agent --conf conf --conf-file example.conf ...

外部處理配置過濾器(External Process Config Filter)

屬性名稱 默認 描述
type - 組件 type 名稱必須是外部的
command - 將執行以獲取給定鍵的值的命令。該命令將被調用為:

  並且預期返回單行值,退出代碼為 0。

charset UTF-8 返回字符串的字符集。

示例, 要在配置中隱藏密碼, 請設置其值, 如以下示例所示:

a1.sources = r1
a1.channels = c1
a1.configfilters = f1
a1.configfilters.f1.type = external
a1.configfilters.f1.command = /usr/bin/passwordResolver.sh
a1.configfilters.f1.charset = UTF-8
a1.sources.r1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.keystorePassword = ${
f1['my_keystore_password']
}

  

在此示例中, flume 將運行以下命令以獲取值

$ /usr/bin/passwordResolver.sh my_keystore_password

該 passwordResolver.sh 將返回 Secret123 與退出代碼 0.

示例, 要生成滾動文件 sink 的目錄的一部分, 請設置其值, 如以下示例所示:

a1.sources = r1
a1.channels = c1
a1.configfilters = f1
a1.configfilters.f1.type = external
a1.configfilters.f1.command = /usr/bin/generateUniqId.sh
a1.configfilters.f1.charset = UTF-8
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume/agent_${
f1['agent_name']
}

  

在此示例中, flume 將運行以下命令以獲取值

$ /usr/bin/generateUniqId.sh agent_name

該 generateUniqId.sh 將返回 1234 與退出代碼 0.

Hadoop 憑據存儲配置過濾器

此功能的類路徑上需要一個 hadoop-common 庫(2.6 + 版本). 如果安裝了 hadoop, 則 agent 會自動將其添加到類路徑中

屬性名稱 默認 描述
type - 組件 type 名稱必須是 hadoop
credential.provider.path - 提供者路徑。請參閱 hadoop 文檔_here:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/CredentialProviderAPI.html#Configuring_the_Provider_Path
credstore.java-keystore-provider.password-file - 如果文件用於存儲密碼,則為密碼文件的名稱。該文件必須在類路徑上。可以使用 HADOOP_CREDSTORE_PASSWORD 環境變量設置提供程序密碼,也可以將其設置為空。

示例, 要在配置中隱藏密碼, 請設置其值, 如以下示例所示:

a1.sources = r1
a1.channels = c1
a1.configfilters = f1
a1.configfilters.f1.type = hadoop
a1.configfilters.f1.credential.provider.path = jceks://file/<path_to_jceks file>
a1.sources.r1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.keystorePassword = ${
f1['my_keystore_password']
}

  

Log4j 追加 (Log4J Appender)

將 Log4jevent 附加到 flume agent 的 avrosource. 使用此 appender 的客戶端必須在類路徑中包含 flume-ng-sdk(例如, flume-ng-sdk-1.9.0.jar). 必需屬性以粗體顯示

屬性名稱 默認 描述
hostname - 使用 avrosource 運行遠程 Flume agent 的 hostname。
port - 遠程 Flume agent 的 avrosource 正在偵聽的端口。
UnsafeMode false 如果為 true,則 appender 將不會在發送 event 失敗時拋出異常。
AvroReflectionEnabled false 使用 Avro Reflection 來序列化 Log4jevent 。(當用戶記錄字符串時不要使用)
AvroSchemaUrl - 可從中檢索 Avro 架構的 URL。

示例 log4j.properties 文件:

#...
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = example.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

  

默認情況下, 通過調用 toString()或使用 Log4j 布局 (如果指定) 將每個 event 轉換為字符串.

如果 event 是 org.apache.avro.generic.GenericRecord,org.apache.avro.specific.SpecificRecord 的實例 , 或者屬性 AvroReflectionEnabled 設置為 true, 則將使用 Avro 序列化序列化 event .

使用 Avro 架構序列化每個 event 效率很低, 因此最好提供一個架構 URL, 下游 sink(通常是 HDFSsink)可以從該架構 URL 檢索架構.如果未指定 AvroSchemaUrl, 則架構將作為 Flume 標頭包含在內.

示例 log4j.properties 文件配置為使用 Avro 序列化:

#...
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = example.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.AvroReflectionEnabled = true
log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc
# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

  

負載均衡 Log4j 追加 (Load Balancing Log4J Appender)

將 Log4jevent 追加到 flume agent 的 avrosource 列表中. 使用此 appender 的客戶端必須在類路徑中包含 flume-ng-sdk(例如, flume-ng-sdk-1.9.0.jar). 該 appender 支持循環和隨機方案, 用於執行負載平衡. 它還支持可配置的退避超時, 以便從主機集臨時刪除向下 agent . 必需屬性以粗體顯示

屬性名稱 默認 描述
Hosts - 一個以空格分隔的 host:port 列表,Flume(通過 AvroSource)正在偵聽 event
Selector ROUND_ROBIN 選擇機制。必須是 ROUND_ROBIN,RANDOM 或自定義 FQDN 到繼承自 LoadBalancingSelector 的類。
MaxBackoff - 一個 long 值,表示負載平衡客戶端將從無法使用 event 的節點退回的最長時間(以毫秒為單位)。默認為無退避
UnsafeMode false 如果為 true,則 appender 將不會在發送 event 失敗時拋出異常。
AvroReflectionEnabled false 使用 Avro Reflection 來序列化 Log4jevent 。
AvroSchemaUrl - 可從中檢索 Avro 架構的 URL。

使用默認值配置的示例 log4j.properties 文件:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431
# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

  

使用 RANDOM 負載平衡配置的示例 log4j.properties 文件:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431
log4j.appender.out2.Selector = RANDOM
# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

 

使用退避配置的示例 log4j.properties 文件:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432
log4j.appender.out2.Selector = ROUND_ROBIN
log4j.appender.out2.MaxBackoff = 30000
# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

  

安全

HDFSsink,HBasesink,Thriftsource,Thriftsink 和 Kite Datasetsink 都支持 Kerberos 身份驗證. 請參閱相應的部分以配置與 Kerberos 相關的選項.

Flume agent 將作為單個主體對 kerberos KDC 進行身份驗證, 這將由需要進行 kerberos 身份驗證的不同組件使用. 為 Thriftsource,Thriftsink,HDFSsink,HBasesink 和 DataSetsink 配置的主體和 keytab 應該相同, 否則組件將無法啟動.

監控

Flume 中的監控仍在進行中. 變化可能經常發生. 幾個 Flume 組件向 JMX 平台 MBean 服務器報告度量標准. 可以使用 Jconsole 查詢這些指標.

可用的組件指標

下表顯示了可用於組件的度量標准. 每個組件僅維護一組度量, 由 "x" 表示, 未維護的值顯示默認值, 即 0. 這些表告訴您可以在何處獲得有意義的數據. 度量標准的名稱應該足夠描述, 有關更多信息, 您必須深入了解組件的 source 代碼.

Sources1

  Avro EXEC HTTP JMS Kafka MultiportSyslogTCP Scribe
AppendAcceptedCount X            
AppendBatchAcceptedCount X   X X      
AppendBatchReceivedCount X   X X      
AppendReceivedCount X            
ChannelWriteFail X   X X X X X
EventAcceptedCount X X X X X X X
EventReadFail     X X X X X
EventReceivedCount X X X X X X X
GenericProcessingFail     X     X  
KafkaCommitTimer         X    
KafkaEmptyCount         X    
KafkaEventGetTimer         X    
OpenConnectionCount X            

Sources 2

  SequenceGenerator SpoolDirectory SyslogTcp SyslogUDP Taildir Thrift
AppendAcceptedCount           X
AppendBatchAcceptedCount X X     X X
AppendBatchReceivedCount   X     X X
AppendReceivedCount           X
ChannelWriteFail X X X X X X
EventAcceptedCount X X X X X X
EventReadFail   X X X X  
EventReceivedCount   X X X X X
GenericProcessingFail   X     X  
KafkaCommitTimer            
KafkaEmptyCount            
KafkaEventGetTimer            
OpenConnectionCount            

Sinks 1

  Avro/Thrift AsyncHBase ElasticSearch HBase 的 HBase2
BatchCompleteCount X X X X X
BatchEmptyCount X X X X X
BatchUnderflowCount X X X X X
ChannelReadFail X       X
ConnectionClosedCount X X X X X
ConnectionCreatedCount X X X X X
ConnectionFailedCount X X X X X
EventDrainAttemptCount X X X X X
EventDrainSuccessCount X X X X X
EventWriteFail X       X
KafkaEventSendTimer          
RollbackCount          

Sinks 2

  HDFSEvent Hive HTTP Kafka Morphline RollingFile
BatchCompleteCount X X     X  
BatchEmptyCount X X   X X  
BatchUnderflowCount X X   X X  
ChannelReadFail X X X X X X
ConnectionClosedCount X X       X
ConnectionCreatedCount X X       X
ConnectionFailedCount X X       X
EventDrainAttemptCount X X X   X X
EventDrainSuccessCount X X X X X X
EventWriteFail X X X X X X
KafkaEventSendTimer       X    
RollbackCount       X    

Channels

  File Kafka Memory PseudoTxnMemory SpillableMemory
ChannelCapacity X   X   X
Channelize X   X X X
CheckpointBackupWriteErrorCount X        
CheckpointWriteErrorCount X        
EventPutAttemptCount X X X X X
EventPutErrorCount X        
EventPutSuccessCount X X X X X
EventTakeAttemptCount X X X X X
EventTakeErrorCount X        
EventTakeSuccessCount X X X X X
KafkaCommitTimer   X      
KafkaEventGetTimer   X      
KafkaEventSendTimer   X      
Open X        
RollbackCounter   X      
Unhealthy X        

JMX 報告

可以通過使用 flume-env.sh 在 JAVA_OPTS 環境變量中指定 JMX 參數來啟用 JMX 報告, 如

export JAVA_OPTS ="- Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port = 5445 -Dcom.sun.management.jmxremote.authenticate = false -Dcom.sun.management.jmxremote.ssl = false"

注意: 上面的示例禁用安全性. 要啟用安全性, 請參閱

Ganglia 報告

Flume 還可以將這些指標報告給 Ganglia 3 或 Ganglia 3.1 元節點. 要向 Ganglia 報告指標, 必須使用此支持啟動水槽 agent . 必須通過傳遞以下參數來啟動 Flume agent , 作為以 flume.monitoring 為前綴的系統屬性., 可以在 flume-env.sh 中指定:

屬性名稱 默認 描述
type - 組件 type 名稱必須是 ganglia
hosts - 逗號分隔的 hostname 列表: Ganglia 服務器的端口
pollFrequency 60 連續向 Ganglia 服務器報告之間的時間(以秒為單位)
isGanglia3 false Ganglia 服務器版本為 3. 默認情況下,Flume 以 Ganglia 3.1 格式發送

我們可以通過 Ganglia 支持啟動 Flume, 如下所示:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

JSON 報告

Flume 還可以以 JSON 格式報告指標. 為了以 JSON 格式啟用報告, Flume 在可配置端口上托管 Web 服務器. Flume 以以下 JSON 格式報告指標:

{
"typeName1.componentName1" : {
"metric1" : "metricValue1", "metric2" : "metricValue2"
},
"typeName2.componentName2" : {
"metric3" : "metricValue3", "metric4" : "metricValue4"
}
}

  

這是一個例子:

{
"CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
"Type":"CHANNEL",
"StopTime":"0",
"EventPutAttemptCount":"468086",
"ChannelSize":"233428",
"StartTime":"1344882233070",
"EventTakeSuccessCount":"458200",
"ChannelCapacity":"600000",
"EventTakeAttemptCount":"458288"},
"CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
"Type":"CHANNEL",
"StopTime":"0",
"EventPutAttemptCount":"22948908",
"ChannelSize":"5",
"StartTime":"1344882209413",
"EventTakeSuccessCount":"22948900",
"ChannelCapacity":"100",
"EventTakeAttemptCount":"22948908"}
}

  

屬性名稱 默認 描述
type - 組件 type 名稱必須是 http
port 41414 啟動服務器的端口。

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

然后, 可以在 http://<hostname>:<port>/metrics 網頁上獲得度量標准. 自定義組件可以報告上面 Ganglia 部分中提到的指標.

自定義報告

可以通過編寫執行報告的服務器向其他系統報告指標. 任何報告類都必須實現 org.apache.flume.instrumentation.MonitorService 接口 . 這樣的類可以與 GangliaServer 用於報告的方式相同. 他們可以輪詢平台 mbean 服務器以輪詢 mbeans 以獲取指標. 例如, 如果可以使用名為 HTTPReporting 的 HTTP 監視服務, 如下所示:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332

屬性名稱 默認 描述
type - 組件 type 名稱必須是 FQCN

從自定義組件報告指標

任何自定義 flume 組件都應繼承自 org.apache.flume.instrumentation.MonitoredCounterGroup 類. 然后, 該類應為其公開的每個度量標准提供 getter 方法. 請參閱下面的代碼. MonitoredCounterGroup 需要一個屬性列表, 其度量由此類公開. 截至目前, 此類僅支持將度量標准公開為長值.

public class SinkCounter extends MonitoredCounterGroup implements
SinkCounterMBean {
private static final String COUNTER_CONNECTION_CREATED =
"sink.connection.creation.count";
private static final String COUNTER_CONNECTION_CLOSED =
"sink.connection.closed.count";
private static final String COUNTER_CONNECTION_FAILED =
"sink.connection.failed.count";
private static final String COUNTER_BATCH_EMPTY =
"sink.batch.empty";
private static final String COUNTER_BATCH_UNDERFLOW =
"sink.batch.underflow";
private static final String COUNTER_BATCH_COMPLETE =
"sink.batch.complete";
private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
"sink.event.drain.attempt";
private static final String COUNTER_EVENT_DRAIN_SUCCESS =
"sink.event.drain.sucess";
private static final String[] ATTRIBUTES = {
COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
};
public SinkCounter(String name) {
super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
}
@Override
public long getConnectionCreatedCount() {
return get(COUNTER_CONNECTION_CREATED);
}
public long incrementConnectionCreatedCount() {
return increment(COUNTER_CONNECTION_CREATED);
}
}

  

工具

文件 channel 完整性工具

文件 channel 完整性工具可驗證文件 channel 中各個 event 的完整性, 並刪除損壞的 event

這些工具可以按如下方式運行:

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e org.apache.flume.MyEventValidator -DmaxSize 2000

其中 datadir 是要驗證的數據目錄的逗號分隔列表.

以下是可用選項

選項名稱 描述
h/help 顯示幫助
l/dataDirs 以逗號分隔的工具必須驗證的數據目錄列表

event 驗證工具

event 驗證器工具可用於以特定於應用程序的方式驗證文件 channelevent . 該工具在每個 event 上應用用戶提供程序驗證登錄, 並刪除未向邏輯確認的 event

這些工具可以按如下方式運行:

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e org.apache.flume.MyEventValidator -DmaxSize 2000

其中 datadir 是要驗證的數據目錄的逗號分隔列表.

以下是可用選項

選項名稱 描述
h/help 顯示幫助
l/dataDirs 以逗號分隔的工具必須驗證的數據目錄列表
e/eventValidator 完全合格的 event 驗證器實現名稱。jar 必須在 Flume 類路徑上

Event 驗證器實現必須實現 EventValidator 接口. 建議不要從實現中拋出任何異常, 因為它們被視為無效 event . 其他參數可以通過 - D 選項傳遞給 EventValitor 實現.

讓我們看一個基於簡單大小的 event 驗證器的示例, 它將拒絕大於指定的最大大小的 event

public static class MyEventValidator implements EventValidator {
private int value = 0;
private MyEventValidator(int val) {
value = val;
}
@Override
public boolean validateEvent(Event event) {
return event.getBody() <= value;
}
public static class Builder implements EventValidator.Builder {
private int sizeValidator = 0;
@Override
public EventValidator build() {
return new DummyEventVerifier(sizeValidator);
}
@Override
public void configure(Context context) {
binaryValidator = context.getInteger("maxSize");
}
}
}

  

拓撲設計注意事項

Flume 非常靈活, 允許大量可能的部署方案. 如果您計划在大型生產部署中使用 Flume, 那么花一些時間考慮如何根據 Flume 拓撲來表達您的問題是明智的. 本節介紹一些注意事項

Flume 是否適合您的問題?

如果您需要將文本日志數據提取到 Hadoop / HDFS 中, 那么 Flume 最適合您的問題, 完全停止. 對於其他用例, 以下是一些指導原則:

Flume 旨在通過相對穩定, 可能復雜的拓撲來傳輸和攝取定期生成的 event 數據."event 數據" 的概念定義非常廣泛. 對 Flume 來說, event 只是一個普通的字節. event 的大小有一些限制 - 例如, 它不能大於您可以存儲在內存中或單個機器上的磁盤上 - 但實際上, 水槽 event 可以是從文本日志條目到圖像文件的所有內容. event 的關鍵屬性是它們以連續的流式方式生成. 如果您的數據沒有定期生成(即您嘗試將大量數據批量加載到 Hadoop 集群中), 那么 Flume 仍然可以正常工作, 但對您的情況來說可能有點過頭了. Flume 喜歡相對穩定的拓撲結構. 您的拓撲不需要是不可變的, 因為 Flume 可以處理拓撲中的更改而不會丟失數據, 並且還可以容忍由於故障轉移或配置而定期重新配置. 如果您每天都要更改拓撲結構, 那么它可能無法正常工作, 因為重新配置需要一些思考和開銷.

Flume 中的流量可靠性

Flume 流的可靠性取決於幾個因素. 通過調整這些因素, 您可以使用 Flume 實現各種可靠性選項.

您使用什么 type 的 channel .Flume 具有持久的 channel(將數據保存到磁盤的 channel)和非持久 channel(如果機器出現故障將丟失數據). 持久 channel 使用基於磁盤的存儲, 存儲在此類 channel 中的數據將在機器重啟或非磁盤相關故障中持續存在.

您的 channel 是否已為工作量充分配置. Flume 中的 channel 充當各種跳躍的緩沖區. 這些緩沖區具有固定容量, 一旦該容量已滿, 您將在流中的早期點創建背壓. 如果此壓力傳播到流量 source,Flume 將變得不可用並可能丟失數據.

您是否使用冗余拓撲. Flume 讓你在冗余拓撲中復制流. 這可以提供非常容易的容錯 source, 並且可以克服磁盤或機器故障.

考慮 Flume 拓撲結構中可靠性的最佳方法是考慮各種故障情況及其結果. 如果磁盤發生故障會怎么樣? 如果機器出現故障會怎樣? 如果您的終端 sink(例如 HDFS)停機一段時間並且您有背壓, 會發生什么? 可能的設計空間巨大, 但您需要提出的基本問題只是極少數.

Flow 拓撲設計

設計 Flume 拓撲的第一步是枚舉數據的所有 source 和目標(終端 sink). 這些將定義拓撲的邊緣點. 下一個考慮因素是是否引入中間聚合層或 event 路由. 如果要從大量 source 中收集數據, 則聚合數據以簡化終端 sink 的提取可能會有所幫助. 聚合層還可以通過充當緩沖區來消除 source 的突發性或 sink 的不可用性. 如果要在不同位置之間路由數據, 您可能還希望在不同點分割流: 這會創建可能本身包含聚合點的子拓撲.

調整 Flume 部署的大小

一旦了解了拓撲的外觀, 下一個問題就是需要多少硬件和網絡容量. 首先, 量化您生成的數據量. 這並不總是一項簡單的任務! 大多數數據流是突發性的 (例如, 由於晝夜模式) 並且可能是不可預測的. 一個好的起點是考慮每個拓撲層中的最大吞吐量, 包括每秒 event 數和每秒字節數. 一旦知道給定層的所需吞吐量, 就可以計算該層所需節點數的下限. 要確定可達到的吞吐量, 最好使用合成或采樣 event 數據在硬件上試驗 Flume. 通常, 基於磁盤的 channel 應該獲得 10 的 MB / s, 而基於內存的 channel 應該達到 100 的 MB / s 或更多. 但性能會有很大差異, 具體取決於硬件和操作環境.

調整聚合吞吐量可以為每層提供所需節點數量的下限. 有幾個原因需要額外的節點, 例如增加冗余和更好地吸收負載中的突發.

故障排除

處理 agent 失敗

如果 Flume agent 程序關閉, 則該 agent 程序上托管的所有流程都將中止. 重新啟動 agent 后, 將恢復流程. 使用文件 channel 或其他穩定 channel 的流將從中斷處繼續處理 event . 如果無法在同一硬件上重新啟動 agent , 則可以選擇將數據庫遷移到另一個硬件並設置新的 Flume agent , 該 agent 可以繼續處理 db 中保存的 event . 可以利用數據庫 HA 期貨將 Flume agent 移動到另一個主機.

兼容性

HDFS

目前 Flume 支持 HDFS 0.20.2 和 0.23.

組件摘要

Component Interface Type Alias Implementation Class
org.apache.flume.Channel memory org.apache.flume.channel.MemoryChannel
org.apache.flume.Channel jdbc org.apache.flume.channel.jdbc.JdbcChannel
org.apache.flume.Channel file org.apache.flume.channel.file.FileChannel
org.apache.flume.Channel org.apache.flume.channel.PseudoTxnMemoryChannel
org.apache.flume.Channel org.example.MyChannel
org.apache.flume.Source avro org.apache.flume.source.AvroSource
org.apache.flume.Source netcat org.apache.flume.source.NetcatSource
org.apache.flume.Source seq org.apache.flume.source.SequenceGeneratorSource
org.apache.flume.Source exec org.apache.flume.source.ExecSource
org.apache.flume.Source syslogtcp org.apache.flume.source.SyslogTcpSource
org.apache.flume.Source multiport_syslogtcp org.apache.flume.source.MultiportSyslogTCPSource
org.apache.flume.Source syslogudp org.apache.flume.source.SyslogUDPSource
org.apache.flume.Source spooldir org.apache.flume.source.SpoolDirectorySource
org.apache.flume.Source http org.apache.flume.source.http.HTTPSource
org.apache.flume.Source thrift org.apache.flume.source.ThriftSource
org.apache.flume.Source jms org.apache.flume.source.jms.JMSSource
org.apache.flume.Source org.apache.flume.source.avroLegacy.AvroLegacySource
org.apache.flume.Source org.apache.flume.source.thriftLegacy.ThriftLegacySource
org.apache.flume.Source org.example.MySource
org.apache.flume.Sink null org.apache.flume.sink.NullSink
org.apache.flume.Sink logger org.apache.flume.sink.LoggerSink
org.apache.flume.Sink avro org.apache.flume.sink.AvroSink
org.apache.flume.Sink hdfs org.apache.flume.sink.hdfs.HDFSEventSink
org.apache.flume.Sink hbase org.apache.flume.sink.hbase.HBaseSink
org.apache.flume.Sink hbase2 org.apache.flume.sink.hbase2.HBase2Sink
org.apache.flume.Sink asynchbase org.apache.flume.sink.hbase.AsyncHBaseSink
org.apache.flume.Sink elasticsearch org.apache.flume.sink.elasticsearch.ElasticSearchSink
org.apache.flume.Sink file_roll org.apache.flume.sink.RollingFileSink
org.apache.flume.Sink irc org.apache.flume.sink.irc.IRCSink
org.apache.flume.Sink thrift org.apache.flume.sink.ThriftSink
org.apache.flume.Sink org.example.MySink
org.apache.flume.ChannelSelector replicating org.apache.flume.channel.ReplicatingChannelSelector
org.apache.flume.ChannelSelector multiplexing org.apache.flume.channel.MultiplexingChannelSelector
org.apache.flume.ChannelSelector org.example.MyChannelSelector
org.apache.flume.SinkProcessor default org.apache.flume.sink.DefaultSinkProcessor
org.apache.flume.SinkProcessor failover org.apache.flume.sink.FailoverSinkProcessor
org.apache.flume.SinkProcessor load_balance org.apache.flume.sink.LoadBalancingSinkProcessor
org.apache.flume.SinkProcessor  
org.apache.flume.interceptor.Interceptor timestamp org.apache.flume.interceptor.TimestampInterceptor$Builder
org.apache.flume.interceptor.Interceptor host org.apache.flume.interceptor.HostInterceptor$Builder
org.apache.flume.interceptor.Interceptor static org.apache.flume.interceptor.StaticInterceptor$Builder
org.apache.flume.interceptor.Interceptor regex_filter org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.interceptor.Interceptor regex_extractor org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.channel.file.encryption.KeyProvider$Builder jceksfile org.apache.flume.channel.file.encryption.JCEFileKeyProvider
org.apache.flume.channel.file.encryption.KeyProvider$Builder org.example.MyKeyProvider
org.apache.flume.channel.file.encryption.CipherProvider aesctrnopadding org.apache.flume.channel.file.encryption.AESCTRNoPaddingProvider
org.apache.flume.channel.file.encryption.CipherProvider org.example.MyCipherProvider
org.apache.flume.serialization.EventSerializer$Builder text org.apache.flume.serialization.BodyTextEventSerializer$Builder
org.apache.flume.serialization.EventSerializer$Builder avro_event org.apache.flume.serialization.FlumeEventAvroEventSerializer$Builder
org.apache.flume.serialization.EventSerializer$Builder org.example.MyEventSerializer$Builder

別名約定

這些別名的約定在上面的組件特定示例中使用, 以使所有示例中的名稱保持簡短和一致.

Alias Name Alias Type
a agent
c channel
r source
k sink
g sink group
i interceptor
y key
h host
s serializer


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM