Flume(一) —— 啟動與基本使用


基礎架構

Flume is a distributed, reliable(可靠地), and available service for efficiently(高效地) collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

Flume是一個分布式、高可靠、高可用的服務,用來高效地采集、聚合和傳輸海量日志數據。它有一個基於流式數據流的簡單、靈活的架構。

A Flume source consumes events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. For example, an Avro Flume source can be used to receive Avro events from Avro clients or other Flume agents in the flow that send events from an Avro sink. A similar flow can be defined using a Thrift Flume Source to receive events from a Thrift Sink or a Flume Thrift Rpc Client or Thrift clients written in any language generated from the Flume thrift protocol.When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink. The file channel is one example – it is backed by the local filesystem. The sink removes the event from the channel and puts it into an external repository like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow. The source and sink within the given agent run asynchronously with the events staged in the channel.

Flume在下圖中的作用是,實時讀取服務器本地磁盤的數據,將數據寫入到HDFS中。

Agent

是一個JVM進程,以事件的形式將數據從源頭送至目的地。

Agent的3個主要組成部分:Source、Channel、Sink。

Source

負責接收數據到Agent。

Sink

不斷輪詢Channel,將Channel中的數據移到存儲系統、索引系統、另一個Flume Agent。

Channel

Channel是Source和Sink之間的緩沖區,可以解決Source和Sink處理數據速率不匹配的問題。

Channel是線程安全的。

Flume自帶的Channel:Memory Channel、File Channel、Kafka Channel。

Event

Flume數據傳輸的基本單元。

安裝&部署

下載

下載1.7.0安裝包

修改配置

修改flume-env.sh配置中的JDK路徑

創建 job/flume-netcat-logger.conf,文件內容如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
## 事件容量
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
## channel 與 sink 的關系是 1對多 的關系。1個sink只可以綁定1個channel,1個channel可以綁定多個sink。
a1.sinks.k1.channel = c1

啟動、運行

啟動flume

bin/flume-ng agent --conf conf --conf-file job/flume-netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console

使用natcat監聽端口

nc localhost 44444

運行結果

監控Hive日志上傳到HDFS

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe / configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/logs/hive/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = logs-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 13417700
a1.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

執行命令bin/flume-ng agent -c conf/ -f job/file-flume-logger.conf -n a1

數據通過Flume傳到Kafka

使用natcat監聽端口數據通過Flume傳到Kafka

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe / configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = payTopic
a1.sinks.k1.kafka.bootstrap.servers = 127.0.0.1:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

運行結果

參考文檔

Flume官網
Flume 官網開發者文檔
Flume 官網使用者文檔
尚硅谷大數據課程之Flume
FlumeUserGuide


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM