Flume 中文入門手冊


原文:https://cwiki.apache.org/confluence/display/FLUME/Getting+Started

什么是 Flume NG?

Flume NG 旨在比起 Flume OG 變得明顯更簡單。更小。更easy部署。在這樣的情況下,我們不提交Flume NG 到 Flume OG 的后向兼容。當前。我們期待來自感興趣測試Flume NG 正確性、易用性和與別的系統集成的可能性的人的反饋。

變了什么?

Flume NG (下一代)的實現中盡管保持了非常多原來的概念,但 與 Flume OG (原版) 還是有非常大的差別。假設你熟悉 Flume, h這些可能是你想知道的。

  • 你仍會有 sources 和sinks ,他們還做相同的事情. 他們由 channels 連接.
    • Channels 可插入式的、命令持久的。

        Flume NG ships with an in-memory channel for fast, but non-durable event delivery and a file-based channel for durable event delivery. ?

  • 沒有很多其它的邏輯或物理的節點。我們能夠把全部的物理節點叫做 agents,agents 能夠執行0到多個 sources 和 sinks。

  • 沒有 master 和 ZooKeeper 的依賴了。此時, Flume 執行於一個簡單的基於文件配置的系統。
  • 一切都是插件,一些面向終於用戶的,一些面向工具和系統開發人員的。可插入組件包含 channels, sources, sinks, interceptors, sink processors, 和 event serializers.

獲得 Flume NG

Flume在下載頁面上有源代碼包和二進制文件可用。假設你並不打算為Flume 創建 補丁,二進制文件可能是開始的最好方式。

從源代碼中創建

要從源代碼中創建,你須要git, Sun JDK 1.6, Apache Maven 3.x, 大約 90MB 的本地硬盤空間和網絡連接。

1. 簽出源代碼

$ git clone https: //git-wip-us.apache.org/repos/asf/flume.git flume
$ cd flume
$ git checkout trunk

2. 編譯項目

Apache Flume 的創建須要比默認配置很多其它的內存。

我們推薦設置Maven的例如以下選項:

export MAVEN_OPTS= "-Xms512m -Xmx1024m -XX:PermSize=256m -XX:MaxPermSize=512m"
# 創建代碼和運行測試 (注意: 用 mvn install, 不是 mvn  package , 由於我們每天都部署 Jenkins SNAPSHOT jars , 並且Flume 是一個多模塊的項目)
$ mvn install
# ...或者不運行測試的安裝
$ mvn install -DskipTests

(請注意為編譯成功 Flume 要求 Google Protocol Buffers 編譯器在path 中。你能夠依照這里的步驟下載安裝它。 here.)

這些在 flume-ng-dist/target 中生成兩種包.他們是:

  • apache-flume-ng-dist-1.4.0-SNAPSHOT-bin.tar.gz - Flume 的二進制版, 待執行
  • apache-flume-ng-dist-1.4.0-SNAPSHOT-src.tar.gz - 僅有源代碼的 Flume 公布版

假設你是一個用戶,僅僅想要執行 Flume, 你可能想要的是 -bin 版本號。復制一個、解壓之,你就准備好用了。

$ cp flume-ng-dist/target/apache-flume- 1.4 . 0 -SNAPSHOT-bin.tar.gz .
$ tar -zxvf apache-flume- 1.4 . 0 -SNAPSHOT-bin.tar.gz
$ cd apache-flume- 1.4 . 0 -SNAPSHOT-bin

3.基於工作模板創建你的屬性文件(或從頭創建一個) 

$ cp conf/flume-conf.properties.template conf/flume.conf

4. (可選) 基於模板創建你的 flume-env.sh 文件(或從頭創建一個)。

flume-ng 可運行文件通過在命令行中指定--conf/-c 在conf 文件夾中尋找一個名為 "flume-env.sh" 的文件。 一個使用 flume-env.sh 的樣例是在開發你自己的如sources 和 sinks的 Flume NG組件時通過 JAVA_OPTS 指定debugging 或 profiling 選項。

$ cp conf/flume-env.sh.template conf/flume-env.sh

5. 配置和執行Flume NG

在你配置完 Flume NG (見下),你能夠用 bin/flume-ng 運行它. 這個腳本有一些參數和模式。

配置

Flume 用一個基於配置格式的 Java 屬性文件。

當執行一個 agent時。須要你通過 -f <file> 選項(見上)的方式告訴 Flume 哪個文件要用。

這個文件可放在不論什么地方,可是從傳統-和在未來-conf文件夾才是正確放置配置文件的地方。

讓我們開始一個簡單的樣例. 復制粘貼這些到 conf/flume.conf:

# 在  agent1上 定義一個 叫做ch1 內存channel
agent1.channels.ch1.type = memory
 
# 在 agent1 上定義一個叫做avro-source1 的 Avro source 並告訴它
# 綁定到  0.0 . 0.0 : 41414 . 把它和 channel ch1 連接起來.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind =  0.0 . 0.0
agent1.sources.avro-source1.port =  41414
 
# 定義一個 logger sink ,記錄它收到的全部事件
# 把它和在同一 channel 上的別的終端相連
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger
 
# 最后,既然我們已經定義了全部的組件,告訴agent1 我們想要激活 哪一個
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

這是樣例創建了一個內存channel(如,一個不可信或“最小效果”的傳輸),一個 Avro RPC source。和一個連接他們的日志sink. Avro source 接收的不論什么事件 被路由給 channel ch1並發送給日志sink。須要注意的是定義組件是配置 Flume 的第一半,他們必須被通過列在 <agent>.channels, <agent>.sources, (和 sections. Multiple sources, sinks, 和 channels 也可能被列入,按空格分隔)激活。

要看很多其它細節,請看 org.apache.flume.conf.properties.PropertiesFileConfigurationProvider 類的 文檔。.

這是一列此時已實現了的 sources, sinks, 和 channels。每一個插件有其自身的選項並須要配置屬性,所以 看文檔(如今)。

組件

類型

描寫敘述

實現類

Channel

memory 

內存中,快,非持久事件傳輸

MemoryChannel

Channel

file

一個 reading, writing, mapping, 和 manipulating 一個文件 的 channel

FileChannel

Channel

jdbc

JDBC-based, durable event transport (Derby-based)

JDBCChannel

Channel

recoverablememory

一個用本地文件系統做存儲的非持久 channel 實現

RecoverableMemoryChannel

Channel

org.apache.flume.channel.PseudoTxnMemoryChannel

主要用作測試,不是生產用的

PseudoTxnMemoryChannel

Channel

(custom type as FQCN)

你自己的 Channel 實現

(custom FQCN)

Source

avro

Avro Netty RPC event source

AvroSource

Source

exec

Execute a long-lived Unix process and read from stdout

ExecSource

Source

netcat

Netcat style TCP event source

NetcatSource

Source

seq

Monotonically incrementing sequence generator event source

SequenceGeneratorSource

Source

org.apache.flume.source.StressSource

主要用作測試,不是生產用的。Serves as a continuous source of events where each event has the same payload. The payload consists of some number of bytes (specified by size property, defaults to 500) where each byte has the signed value Byte.MAX_VALUE (0x7F, or 127). 

org.apache.flume.source.StressSource

Source 

syslogtcp 

 

SyslogTcpSource 

Source

syslogudp

 

SyslogUDPSource

Source

org.apache.flume.source.avroLegacy.AvroLegacySource

 

AvroLegacySource 

Source

org.apache.flume.source.thriftLegacy.ThriftLegacySource

 

ThriftLegacySource 

Source

org.apache.flume.source.scribe.ScribeSource

 

ScribeSource 

Source 

(custom type as FQCN) 

你自己的 Source 實現

(custom FQCN) 

Sink

hdfs

Writes all events received to HDFS (with support for rolling, bucketing, HDFS-200 append, and more)

HDFSEventSink

Sink 

org.apache.flume.sink.hbase.HBaseSink

A simple sink that reads events from a channel and writes them to HBase.

org.apache.flume.sink.hbase.HBaseSink

Sink 

org.apache.flume.sink.hbase.AsyncHBaseSink

 

org.apache.flume.sink.hbase.AsyncHBaseSink

Sink

logger

Log events at INFO level via configured logging subsystem (log4j by default)

LoggerSink

Sink

avro

Sink that invokes a pre-defined Avro protocol method for all events it receives (when paired with an avro source, forms tiered collection)

AvroSink

Sink 

file_roll 

 

RollingFileSink 

Sink 

irc 

 

IRCSink 

Sink

null 

/dev/null for Flume - blackhole all events received

NullSink

Sink 

(custom type as FQCN) 

你自己的 Sink 實現

(custom FQCN) 

ChannelSelector 

replicating 

 

ReplicatingChannelSelector

ChannelSelector 

multiplexing 

 

MultiplexingChannelSelector

ChannelSelector 

(custom type) 

你自己的 ChannelSelector 實現

(custom FQCN) 

SinkProcessor 

default 

 

DefaultSinkProcessor

SinkProcessor 

failover 

 

FailoverSinkProcessor

SinkProcessor 

load_balance 

多sink時提供平衡加載的能力

LoadBalancingSinkProcessor

SinkProcessor 

(custom type as FQCN) 

你自己的 SinkProcessor 實現

(custom FQCN) 

Interceptor$Builder

host

 

HostInterceptor$Builder

Interceptor$Builder

timestamp

TimestampInterceptor

TimestampInterceptor$Builder

Interceptor$Builder

static 

 

StaticInterceptor$Builder

Interceptor$Builder

regex_filter 

 

RegexFilteringInterceptor$Builder

Interceptor$Builder

(custom type as FQCN)

你自己的 Interceptor$Builder 實現

(custom FQCN)

EventSerializer$Builder 

text 

 

BodyTextEventSerializer$Builder 

EventSerializer$Builder

avro_event 

 

FlumeEventAvroEventSerializer$Builder 

EventSerializer

org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

 

SimpleHbaseEventSerializer

EventSerializer

org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

 

SimpleAsyncHbaseEventSerializer

EventSerializer

org.apache.flume.sink.hbase.RegexHbaseEventSerializer

 

RegexHbaseEventSerializer

HbaseEventSerializer

Custom implementation of serializer for HBaseSink. 
(custom type as FQCN)

你自己的 HbaseEventSerializer 實現

(custom FQCN)

AsyncHbaseEventSerializer

Custom implementation of serializer for AsyncHbase sink. 
(custom type as FQCN)

你自己的 AsyncHbaseEventSerializer 實現

(custom FQCN)

EventSerializer$Builder

Custom implementation of serializer for all sinks except for HBaseSink and AsyncHBaseSink. 
(custom type as FQCN)

你自己的 EventSerializer$Builder 實現

(custom FQCN)

flume-ng 讓你執行一個有利於測試和實驗的 Flume NG agent 或一個 Avro client 。

無論如何,你須要指定一個命令(如。 agent 或 avro-client)  和一個 conf 文件夾  (--conf <conf dir>).。

全部別的選項都在命令行指定。

用上面的 flume.conf 啟動flume server:

bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1

注意,agent 名稱是以 -n agent1 指定必須與 -f conf/flume.conf 中給定的名字匹配

你的輸出應該像這樣:

$ bin/flume-ng agent --conf conf/ -f conf/flume.conf -n agent1
2012 - 03 - 16  16 : 36 : 11 , 918  (main) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java: 58 )] Starting lifecycle supervisor  1
2012 - 03 - 16  16 : 36 : 11 , 921  (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java: 54 )] Flume node starting - agent1
2012 - 03 - 16  16 : 36 : 11 , 926  (lifecycleSupervisor- 1 - 0 ) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java: 110 )] Node manager starting
2012 - 03 - 16  16 : 36 : 11 , 928  (lifecycleSupervisor- 1 - 0 ) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java: 58 )] Starting lifecycle supervisor  10
2012 - 03 - 16  16 : 36 : 11 , 929  (lifecycleSupervisor- 1 - 0 ) [DEBUG - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java: 114 )] Node manager started
2012 - 03 - 16  16 : 36 : 11 , 926  (lifecycleSupervisor- 1 - 1 ) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java: 67 )] Configuration provider starting
2012 - 03 - 16  16 : 36 : 11 , 930  (lifecycleSupervisor- 1 - 1 ) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java: 87 )] Configuration provider started
2012 - 03 - 16  16 : 36 : 11 , 930  (conf-file-poller- 0 ) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java: 189 )] Checking file:conf/flume.conf  for  changes
2012 - 03 - 16  16 : 36 : 11 , 931  (conf-file-poller- 0 ) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java: 196 )] Reloading configuration file:conf/flume.conf
2012 - 03 - 16  16 : 36 : 11 , 936  (conf-file-poller- 0 ) [DEBUG - org.apache.flume.conf.properties.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java: 225 )] Starting validation of configuration  for  agent: agent1, initial-configuration: AgentConfiguration[agent1]
SOURCES: {avro-source1=ComponentConfiguration[avro-source1]
   CONFIG: {port= 41414 , channels=ch1, type=avro, bind= 0.0 . 0.0 }
   RUNNER:   ComponentConfiguration[runner]
     CONFIG: {}
 
 
}
CHANNELS: {ch1=ComponentConfiguration[ch1]
   CONFIG: {type=memory}
 
}
SINKS: {log-sink1=ComponentConfiguration[log-sink1]
   CONFIG: {type=logger, channel=ch1}
   RUNNER:   ComponentConfiguration[runner]
     CONFIG: {}
 
 
}
2012 - 03 - 16  16 : 36 : 11 , 936  (conf-file-poller- 0 ) [INFO - org.apache.flume.conf.properties.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java: 119 )] Post-validation flume configuration contains configuation   for  agents: [agent1]
2012 - 03 - 16  16 : 36 : 11 , 937  (conf-file-poller- 0 ) [DEBUG - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java: 67 )] Creating instance of channel ch1 type memory
2012 - 03 - 16  16 : 36 : 11 , 944  (conf-file-poller- 0 ) [DEBUG - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java: 73 )] Creating instance of source avro-source1, type avro
2012 - 03 - 16  16 : 36 : 11 , 957  (conf-file-poller- 0 ) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java: 69 )] Creating instance of sink log-sink1 typelogger
2012 - 03 - 16  16 : 36 : 11 , 963  (conf-file-poller- 0 ) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.onNodeConfigurationChanged(DefaultLogicalNodeManager.java: 52 )] Node configuration change:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:AvroSource: { bindAddress: 0.0 . 0.0  port: 41414  } }} sinkRunners:{log-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor @79f6f296  counterGroup:{ name: null  counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel @43b09468 } }
2012 - 03 - 16  16 : 36 : 11 , 974  (lifecycleSupervisor- 1 - 1 ) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java: 122 )] Avro source starting:AvroSource: { bindAddress: 0.0 . 0.0  port: 41414  }
2012 - 03 - 16  16 : 36 : 11 , 975  (Thread- 1 ) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java: 123 )] Polling sink runner starting
2012 - 03 - 16  16 : 36 : 12 , 352  (lifecycleSupervisor- 1 - 1 ) [DEBUG - org.apache.flume.source.AvroSource.start(AvroSource.java: 132 )] Avro source started

flume-ng global 選項

選項

描寫敘述

--conf,-c <conf>

在 <conf> 文件夾使用配置

--classpath,-C <cp>

追加到 classpath

--dryrun,-d 

不真正啟動 Flume,僅僅打印命令

-Dproperty=value 

設置一個JDK 系統的合適值

flume-ng agent 選項

給定 agent 命令,一個 Flume NG agent 將被一個給定的配置文件(必須) 啟動。

選項

描寫敘述

--conf-file,-f <file>

聲明你要執行哪一個配置文件 (必須)

--name,-n <agentname>

聲明我們要執行的 agent 的名字(必須)

flume-ng avro-client 選項

從標准輸入執行一個 Avro client,發送文件或數據給一個 Flume NG Avro Source正在監聽的指定的主機和port。

選項

描寫敘述

--host,-H <hostname>

指定 Flume agent 的主機名 (可能是本機)

--port,-p <port>

指定 Avro source 監聽的port號

--filename,-F <filename>

發送 <filename> 的每一行給 Flume (可選)

--headerFile,-F <file>

頭文件的每一行包括 鍵/值對

 Avro client把每一行(以 \n\r, 或 \r\n 結尾) 都當作一個事件。對Flume 來說 avro-client 命令就是 cat。比如,以下為每個linux用戶創建一個事件並將其發送到本機的41414port上的 Flume 的 avro source 上。

在一個新窗體中鍵入 :

$ bin/flume-ng avro-client --conf conf -H localhost -p  41414  -F /etc/passwd -Dflume.root.logger=DEBUG,console

你應該看到像這樣 :

2012 - 03 - 16  16 : 39 : 17 , 124  (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java: 175 )] Finished
2012 - 03 - 16  16 : 39 : 17 , 127  (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java: 178 )] Closing reader
2012 - 03 - 16  16 : 39 : 17 , 127  (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java: 183 )] Closing transceiver
2012 - 03 - 16  16 : 39 : 17 , 129  (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java: 73 )] Exiting

在你的第一個窗體,即server執行的那個:

2012 - 03 - 16  16 : 39 : 16 , 738  (New I/O server boss # 1  ([id:  0x49e808ca , / 0 : 0 : 0 : 0 : 0 : 0 : 0 : 0 : 41414 ])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java: 123 )] [id:  0x0b92a848 , / 1
27.0 . 0.1 : 39577  => / 127.0 . 0.1 : 41414 ] OPEN
2012 - 03 - 16  16 : 39 : 16 , 742  (New I/O server worker # 1 - 1 ) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java: 123 )] [id:  0x0b92a848 , / 127.0 . 0.1 : 39577  => / 127.0 . 0.1 : 41414 ] BOU
ND: / 127.0 . 0.1 : 41414
2012 - 03 - 16  16 : 39 : 16 , 742  (New I/O server worker # 1 - 1 ) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java: 123 )] [id:  0x0b92a848 , / 127.0 . 0.1 : 39577  => / 127.0 . 0.1 : 41414 ] CON
NECTED: / 127.0 . 0.1 : 39577
2012 - 03 - 16  16 : 39 : 17 , 129  (New I/O server worker # 1 - 1 ) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java: 123 )] [id:  0x0b92a848 , / 127.0 . 0.1 : 39577  :> / 127.0 . 0.1 : 41414 ] DISCONNECTED
2012 - 03 - 16  16 : 39 : 17 , 129  (New I/O server worker # 1 - 1 ) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java: 123 )] [id:  0x0b92a848 , / 127.0 . 0.1 : 39577  :> / 127.0 . 0.1 : 41414 ] UNBOUND
2012 - 03 - 16  16 : 39 : 17 , 129  (New I/O server worker # 1 - 1 ) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java: 123 )] [id:  0x0b92a848 , / 127.0 . 0.1 : 39577  :> / 127.0 . 0.1 : 41414 ] CLOSED
2012 - 03 - 16  16 : 39 : 17 , 302  (Thread- 1 ) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java: 68 )] Event: { headers:{} body:[B @5c1ae90c  }
2012 - 03 - 16  16 : 39 : 17 , 302  (Thread- 1 ) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java: 68 )] Event: { headers:{} body:[B @6aba4211  }
2012 - 03 - 16  16 : 39 : 17 , 302  (Thread- 1 ) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java: 68 )] Event: { headers:{} body:[B @6a47a0d4  }
2012 - 03 - 16  16 : 39 : 17 , 302  (Thread- 1 ) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java: 68 )] Event: { headers:{} body:[B @48ff4cf  }
...

祝賀你 !

你正在執行 Apache Flume !


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM