原文:https://cwiki.apache.org/confluence/display/FLUME/Getting+Started
什么是 Flume NG?
Flume NG 旨在比起 Flume OG 變得明顯更簡單。更小。更easy部署。在這樣的情況下,我們不提交Flume NG 到 Flume OG 的后向兼容。當前。我們期待來自感興趣測試Flume NG 正確性、易用性和與別的系統集成的可能性的人的反饋。
變了什么?
Flume NG (下一代)的實現中盡管保持了非常多原來的概念,但 與 Flume OG (原版) 還是有非常大的差別。假設你熟悉 Flume, h這些可能是你想知道的。
- 你仍會有 sources 和sinks ,他們還做相同的事情. 他們由 channels 連接.
- Channels 可插入式的、命令持久的。
Flume NG ships with an in-memory channel for fast, but non-durable event delivery and a file-based channel for durable event delivery. ?
- Channels 可插入式的、命令持久的。
- 沒有很多其它的邏輯或物理的節點。我們能夠把全部的物理節點叫做 agents,agents 能夠執行0到多個 sources 和 sinks。
- 沒有 master 和 ZooKeeper 的依賴了。此時, Flume 執行於一個簡單的基於文件配置的系統。
- 一切都是插件,一些面向終於用戶的,一些面向工具和系統開發人員的。可插入組件包含 channels, sources, sinks, interceptors, sink processors, 和 event serializers.
獲得 Flume NG
Flume在下載頁面上有源代碼包和二進制文件可用。假設你並不打算為Flume 創建 補丁,二進制文件可能是開始的最好方式。
從源代碼中創建
要從源代碼中創建,你須要git, Sun JDK 1.6, Apache Maven 3.x, 大約 90MB 的本地硬盤空間和網絡連接。
1. 簽出源代碼
$ git clone https:
//git-wip-us.apache.org/repos/asf/flume.git flume
$ cd flume
$ git checkout trunk
|
2. 編譯項目
Apache Flume 的創建須要比默認配置很多其它的內存。
我們推薦設置Maven的例如以下選項:
export MAVEN_OPTS=
"-Xms512m -Xmx1024m -XX:PermSize=256m -XX:MaxPermSize=512m"
|
# 創建代碼和運行測試 (注意: 用 mvn install, 不是 mvn
package
, 由於我們每天都部署 Jenkins SNAPSHOT jars , 並且Flume 是一個多模塊的項目)
$ mvn install
# ...或者不運行測試的安裝
$ mvn install -DskipTests
|
(請注意為編譯成功 Flume 要求 Google Protocol Buffers 編譯器在path 中。你能夠依照這里的步驟下載安裝它。 here.)
這些在 flume-ng-dist/target 中生成兩種包.他們是:
- apache-flume-ng-dist-1.4.0-SNAPSHOT-bin.tar.gz - Flume 的二進制版, 待執行
- apache-flume-ng-dist-1.4.0-SNAPSHOT-src.tar.gz - 僅有源代碼的 Flume 公布版
假設你是一個用戶,僅僅想要執行 Flume, 你可能想要的是 -bin 版本號。復制一個、解壓之,你就准備好用了。
$ cp flume-ng-dist/target/apache-flume-
1.4
.
0
-SNAPSHOT-bin.tar.gz .
$ tar -zxvf apache-flume-
1.4
.
0
-SNAPSHOT-bin.tar.gz
$ cd apache-flume-
1.4
.
0
-SNAPSHOT-bin
|
3.基於工作模板創建你的屬性文件(或從頭創建一個)
$ cp conf/flume-conf.properties.template conf/flume.conf
|
4. (可選) 基於模板創建你的 flume-env.sh 文件(或從頭創建一個)。
flume-ng 可運行文件通過在命令行中指定--conf/-c 在conf 文件夾中尋找一個名為 "flume-env.sh" 的文件。 一個使用 flume-env.sh 的樣例是在開發你自己的如sources 和 sinks的 Flume NG組件時通過 JAVA_OPTS 指定debugging 或 profiling 選項。
$ cp conf/flume-env.sh.template conf/flume-env.sh
|
5. 配置和執行Flume NG
在你配置完 Flume NG (見下),你能夠用 bin/flume-ng
運行它. 這個腳本有一些參數和模式。
配置
Flume 用一個基於配置格式的 Java 屬性文件。
當執行一個 agent時。須要你通過 -f <file> 選項(見上)的方式告訴 Flume 哪個文件要用。
這個文件可放在不論什么地方,可是從傳統-和在未來-conf文件夾才是正確放置配置文件的地方。
讓我們開始一個簡單的樣例. 復制粘貼這些到 conf/flume.conf
:
# 在
agent1上
定義一個
叫做ch1
的
內存channel
agent1.channels.ch1.type = memory
# 在 agent1 上定義一個叫做avro-source1 的 Avro source 並告訴它
# 綁定到
0.0
.
0.0
:
41414
. 把它和 channel ch1 連接起來.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind =
0.0
.
0.0
agent1.sources.avro-source1.port =
41414
# 定義一個 logger sink ,記錄它收到的全部事件
# 把它和在同一 channel 上的別的終端相連
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger
# 最后,既然我們已經定義了全部的組件,告訴agent1 我們想要激活
哪一個
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1
|
這是樣例創建了一個內存channel(如,一個不可信或“最小效果”的傳輸),一個 Avro RPC source。和一個連接他們的日志sink. Avro source 接收的不論什么事件 被路由給 channel ch1並發送給日志sink。須要注意的是定義組件是配置 Flume 的第一半,他們必須被通過列在 <agent>.channels,
<agent>.sources
, (和 sections. Multiple sources, sinks, 和 channels 也可能被列入,按空格分隔)激活。
要看很多其它細節,請看 org.apache.flume.conf.properties.PropertiesFileConfigurationProvider
類的 文檔。.
這是一列此時已實現了的 sources, sinks, 和 channels。每一個插件有其自身的選項並須要配置屬性,所以請 看文檔(如今)。
組件 |
類型 |
描寫敘述 |
實現類 |
---|---|---|---|
Channel |
memory |
內存中,快,非持久事件傳輸 |
MemoryChannel |
Channel |
file |
一個 reading, writing, mapping, 和 manipulating 一個文件 的 channel |
FileChannel |
Channel |
jdbc |
JDBC-based, durable event transport (Derby-based) |
JDBCChannel |
Channel |
recoverablememory |
一個用本地文件系統做存儲的非持久 channel 實現 |
RecoverableMemoryChannel |
Channel |
org.apache.flume.channel.PseudoTxnMemoryChannel |
主要用作測試,不是生產用的 |
PseudoTxnMemoryChannel |
Channel |
(custom type as FQCN) |
你自己的 Channel 實現 |
(custom FQCN) |
Source |
avro |
Avro Netty RPC event source |
AvroSource |
Source |
exec |
Execute a long-lived Unix process and read from stdout |
ExecSource |
Source |
netcat |
Netcat style TCP event source |
NetcatSource |
Source |
seq |
Monotonically incrementing sequence generator event source |
SequenceGeneratorSource |
Source |
org.apache.flume.source.StressSource |
主要用作測試,不是生產用的。Serves as a continuous source of events where each event has the same payload. The payload consists of some number of bytes (specified by size property, defaults to 500) where each byte has the signed value Byte.MAX_VALUE (0x7F, or 127). |
org.apache.flume.source.StressSource |
Source |
syslogtcp |
|
SyslogTcpSource |
Source |
syslogudp |
|
SyslogUDPSource |
Source |
org.apache.flume.source.avroLegacy.AvroLegacySource |
|
AvroLegacySource |
Source |
org.apache.flume.source.thriftLegacy.ThriftLegacySource |
|
ThriftLegacySource |
Source |
org.apache.flume.source.scribe.ScribeSource |
|
ScribeSource |
Source |
(custom type as FQCN) |
你自己的 Source 實現 |
(custom FQCN) |
Sink |
hdfs |
Writes all events received to HDFS (with support for rolling, bucketing, HDFS-200 append, and more) |
HDFSEventSink |
Sink |
org.apache.flume.sink.hbase.HBaseSink |
A simple sink that reads events from a channel and writes them to HBase. |
org.apache.flume.sink.hbase.HBaseSink |
Sink |
org.apache.flume.sink.hbase.AsyncHBaseSink |
|
org.apache.flume.sink.hbase.AsyncHBaseSink |
Sink |
logger |
Log events at INFO level via configured logging subsystem (log4j by default) |
LoggerSink |
Sink |
avro |
Sink that invokes a pre-defined Avro protocol method for all events it receives (when paired with an avro source, forms tiered collection) |
AvroSink |
Sink |
file_roll |
|
RollingFileSink |
Sink |
irc |
|
IRCSink |
Sink |
null |
/dev/null for Flume - blackhole all events received |
NullSink |
Sink |
(custom type as FQCN) |
你自己的 Sink 實現 |
(custom FQCN) |
ChannelSelector |
replicating |
|
ReplicatingChannelSelector |
ChannelSelector |
multiplexing |
|
MultiplexingChannelSelector |
ChannelSelector |
(custom type) |
你自己的 ChannelSelector 實現 |
(custom FQCN) |
SinkProcessor |
default |
|
DefaultSinkProcessor |
SinkProcessor |
failover |
|
FailoverSinkProcessor |
SinkProcessor |
load_balance |
多sink時提供平衡加載的能力 |
LoadBalancingSinkProcessor |
SinkProcessor |
(custom type as FQCN) |
你自己的 SinkProcessor 實現 |
(custom FQCN) |
Interceptor$Builder |
host |
|
HostInterceptor$Builder |
Interceptor$Builder |
timestamp |
TimestampInterceptor |
TimestampInterceptor$Builder |
Interceptor$Builder |
static |
|
StaticInterceptor$Builder |
Interceptor$Builder |
regex_filter |
|
RegexFilteringInterceptor$Builder |
Interceptor$Builder |
(custom type as FQCN) |
你自己的 Interceptor$Builder 實現 |
(custom FQCN) |
EventSerializer$Builder |
text |
|
BodyTextEventSerializer$Builder |
EventSerializer$Builder |
avro_event |
|
FlumeEventAvroEventSerializer$Builder |
EventSerializer |
org.apache.flume.sink.hbase.SimpleHbaseEventSerializer |
|
SimpleHbaseEventSerializer |
EventSerializer |
org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer |
|
SimpleAsyncHbaseEventSerializer |
EventSerializer |
org.apache.flume.sink.hbase.RegexHbaseEventSerializer |
|
RegexHbaseEventSerializer |
HbaseEventSerializer |
Custom implementation of serializer for HBaseSink. |
你自己的 HbaseEventSerializer 實現 |
(custom FQCN) |
AsyncHbaseEventSerializer |
Custom implementation of serializer for AsyncHbase sink. |
你自己的 AsyncHbaseEventSerializer 實現 |
(custom FQCN) |
EventSerializer$Builder |
Custom implementation of serializer for all sinks except for HBaseSink and AsyncHBaseSink. |
你自己的 EventSerializer$Builder 實現 |
(custom FQCN) |
flume-ng 讓你執行一個有利於測試和實驗的 Flume NG agent 或一個 Avro client 。
無論如何,你須要指定一個命令(如。 agent
或 avro-client
) 和一個 conf 文件夾 (--conf <conf dir>
).。
全部別的選項都在命令行指定。
用上面的 flume.conf 啟動flume server:
bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1
|
注意,agent 名稱是以 -n agent1
指定必須與 -f conf/flume.conf 中給定的名字匹配
你的輸出應該像這樣:
$ bin/flume-ng agent --conf conf/ -f conf/flume.conf -n agent1
2012
-
03
-
16
16
:
36
:
11
,
918
(main) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:
58
)] Starting lifecycle supervisor
1
2012
-
03
-
16
16
:
36
:
11
,
921
(main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:
54
)] Flume node starting - agent1
2012
-
03
-
16
16
:
36
:
11
,
926
(lifecycleSupervisor-
1
-
0
) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:
110
)] Node manager starting
2012
-
03
-
16
16
:
36
:
11
,
928
(lifecycleSupervisor-
1
-
0
) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:
58
)] Starting lifecycle supervisor
10
2012
-
03
-
16
16
:
36
:
11
,
929
(lifecycleSupervisor-
1
-
0
) [DEBUG - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:
114
)] Node manager started
2012
-
03
-
16
16
:
36
:
11
,
926
(lifecycleSupervisor-
1
-
1
) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:
67
)] Configuration provider starting
2012
-
03
-
16
16
:
36
:
11
,
930
(lifecycleSupervisor-
1
-
1
) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:
87
)] Configuration provider started
2012
-
03
-
16
16
:
36
:
11
,
930
(conf-file-poller-
0
) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:
189
)] Checking file:conf/flume.conf
for
changes
2012
-
03
-
16
16
:
36
:
11
,
931
(conf-file-poller-
0
) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:
196
)] Reloading configuration file:conf/flume.conf
2012
-
03
-
16
16
:
36
:
11
,
936
(conf-file-poller-
0
) [DEBUG - org.apache.flume.conf.properties.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:
225
)] Starting validation of configuration
for
agent: agent1, initial-configuration: AgentConfiguration[agent1]
SOURCES: {avro-source1=ComponentConfiguration[avro-source1]
CONFIG: {port=
41414
, channels=ch1, type=avro, bind=
0.0
.
0.0
}
RUNNER: ComponentConfiguration[runner]
CONFIG: {}
}
CHANNELS: {ch1=ComponentConfiguration[ch1]
CONFIG: {type=memory}
}
SINKS: {log-sink1=ComponentConfiguration[log-sink1]
CONFIG: {type=logger, channel=ch1}
RUNNER: ComponentConfiguration[runner]
CONFIG: {}
}
2012
-
03
-
16
16
:
36
:
11
,
936
(conf-file-poller-
0
) [INFO - org.apache.flume.conf.properties.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:
119
)] Post-validation flume configuration contains configuation
for
agents: [agent1]
2012
-
03
-
16
16
:
36
:
11
,
937
(conf-file-poller-
0
) [DEBUG - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:
67
)] Creating instance of channel ch1 type memory
2012
-
03
-
16
16
:
36
:
11
,
944
(conf-file-poller-
0
) [DEBUG - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:
73
)] Creating instance of source avro-source1, type avro
2012
-
03
-
16
16
:
36
:
11
,
957
(conf-file-poller-
0
) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
69
)] Creating instance of sink log-sink1 typelogger
2012
-
03
-
16
16
:
36
:
11
,
963
(conf-file-poller-
0
) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.onNodeConfigurationChanged(DefaultLogicalNodeManager.java:
52
)] Node configuration change:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:AvroSource: { bindAddress:
0.0
.
0.0
port:
41414
} }} sinkRunners:{log-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor
@79f6f296
counterGroup:{ name:
null
counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel
@43b09468
} }
2012
-
03
-
16
16
:
36
:
11
,
974
(lifecycleSupervisor-
1
-
1
) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:
122
)] Avro source starting:AvroSource: { bindAddress:
0.0
.
0.0
port:
41414
}
2012
-
03
-
16
16
:
36
:
11
,
975
(Thread-
1
) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:
123
)] Polling sink runner starting
2012
-
03
-
16
16
:
36
:
12
,
352
(lifecycleSupervisor-
1
-
1
) [DEBUG - org.apache.flume.source.AvroSource.start(AvroSource.java:
132
)] Avro source started
|
flume-ng global 選項
選項 |
描寫敘述 |
---|---|
--conf,-c <conf> |
在 <conf> 文件夾使用配置 |
--classpath,-C <cp> |
追加到 classpath |
--dryrun,-d |
不真正啟動 Flume,僅僅打印命令 |
-Dproperty=value |
設置一個JDK 系統的合適值 |
flume-ng agent 選項
給定 agent 命令,一個 Flume NG agent 將被一個給定的配置文件(必須) 啟動。
選項 |
描寫敘述 |
---|---|
--conf-file,-f <file> |
聲明你要執行哪一個配置文件 (必須) |
--name,-n <agentname> |
聲明我們要執行的 agent 的名字(必須) |
flume-ng avro-client 選項
從標准輸入執行一個 Avro client,發送文件或數據給一個 Flume NG Avro Source正在監聽的指定的主機和port。
選項 |
描寫敘述 |
---|---|
--host,-H <hostname> |
指定 Flume agent 的主機名 (可能是本機) |
--port,-p <port> |
指定 Avro source 監聽的port號 |
--filename,-F <filename> |
發送 <filename> 的每一行給 Flume (可選) |
--headerFile,-F <file> |
頭文件的每一行包括 鍵/值對 |
Avro client把每一行(以 \n
, \r
, 或 \r\n 結尾
) 都當作一個事件。對Flume 來說 avro-client
命令就是 cat。比如,以下為每個linux用戶創建一個事件並將其發送到本機的41414port上的
Flume 的 avro source 上。
在一個新窗體中鍵入 :
$ bin/flume-ng avro-client --conf conf -H localhost -p
41414
-F /etc/passwd -Dflume.root.logger=DEBUG,console
|
你應該看到像這樣 :
2012
-
03
-
16
16
:
39
:
17
,
124
(main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:
175
)] Finished
2012
-
03
-
16
16
:
39
:
17
,
127
(main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:
178
)] Closing reader
2012
-
03
-
16
16
:
39
:
17
,
127
(main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:
183
)] Closing transceiver
2012
-
03
-
16
16
:
39
:
17
,
129
(main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:
73
)] Exiting
|
在你的第一個窗體,即server執行的那個:
2012
-
03
-
16
16
:
39
:
16
,
738
(New I/O server boss #
1
([id:
0x49e808ca
, /
0
:
0
:
0
:
0
:
0
:
0
:
0
:
0
:
41414
])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:
123
)] [id:
0x0b92a848
, /
1
27.0
.
0.1
:
39577
=> /
127.0
.
0.1
:
41414
] OPEN
2012
-
03
-
16
16
:
39
:
16
,
742
(New I/O server worker #
1
-
1
) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:
123
)] [id:
0x0b92a848
, /
127.0
.
0.1
:
39577
=> /
127.0
.
0.1
:
41414
] BOU
ND: /
127.0
.
0.1
:
41414
2012
-
03
-
16
16
:
39
:
16
,
742
(New I/O server worker #
1
-
1
) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:
123
)] [id:
0x0b92a848
, /
127.0
.
0.1
:
39577
=> /
127.0
.
0.1
:
41414
] CON
NECTED: /
127.0
.
0.1
:
39577
2012
-
03
-
16
16
:
39
:
17
,
129
(New I/O server worker #
1
-
1
) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:
123
)] [id:
0x0b92a848
, /
127.0
.
0.1
:
39577
:> /
127.0
.
0.1
:
41414
] DISCONNECTED
2012
-
03
-
16
16
:
39
:
17
,
129
(New I/O server worker #
1
-
1
) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:
123
)] [id:
0x0b92a848
, /
127.0
.
0.1
:
39577
:> /
127.0
.
0.1
:
41414
] UNBOUND
2012
-
03
-
16
16
:
39
:
17
,
129
(New I/O server worker #
1
-
1
) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:
123
)] [id:
0x0b92a848
, /
127.0
.
0.1
:
39577
:> /
127.0
.
0.1
:
41414
] CLOSED
2012
-
03
-
16
16
:
39
:
17
,
302
(Thread-
1
) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:
68
)] Event: { headers:{} body:[B
@5c1ae90c
}
2012
-
03
-
16
16
:
39
:
17
,
302
(Thread-
1
) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:
68
)] Event: { headers:{} body:[B
@6aba4211
}
2012
-
03
-
16
16
:
39
:
17
,
302
(Thread-
1
) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:
68
)] Event: { headers:{} body:[B
@6a47a0d4
}
2012
-
03
-
16
16
:
39
:
17
,
302
(Thread-
1
) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:
68
)] Event: { headers:{} body:[B
@48ff4cf
}
...
|
祝賀你 !
你正在執行 Apache Flume !