大數據之flume數據采集


Flume是一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸的系統。

它可以采集文件,socket數據包等各種形式源數據,又可以將采集到的數據輸出到HDFS、hbase、hive、kafka等眾多外部存儲系統中。

一、flume結構

 

 

 

Flume分布式系統中最核心的角色是agent,每一個agent相當於一個數據傳遞員,內部有三個組件:

Source: 采集源,用於跟數據源對接,以獲取數據;

Channel : angent內部的數據傳輸通道,用於從source將數據傳遞到sink。

Sink::下沉地,采集數據的傳送目的,用於往下一級agent傳遞數據或者往最終存儲系統傳遞數據;

數據在flume內部以Event的封裝形式存在。

flume的事務控制機制:

1、source到channel
2、channel到sink

二、Flume多個agent串聯

 

 

三、Flume安裝使用(未安裝)

1、上傳安裝包,解壓

2、執行腳本,模擬日志生產

while true; do echo 111111111111111111111111_$RANDOM >> access.log; sleep 0.2; done

案例一、采集端口數據

1、增加netcat-logger.conf

# Name the components on this agent
#給那三個組件取個名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
#類型, 從網絡端口接收數據,在本機啟動, 所以localhost, type=spoolDir采集目錄源,目錄里有就采
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
#下沉的時候是一批一批的, 下沉的時候是一個個eventChannel參數解釋:
#capacity:默認該通道中最大的可以存儲的event數量
#trasactionCapacity:每次最大可以從source中拿到或者送到sink中的event數量
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、啟動

$ bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console

3、傳入數據:

$ telnet localhost 44444

案例二、采集文件夾數據

1、增加spooldir-hdfs.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
#監聽目錄,spoolDir指定目錄, fileHeader要不要給文件夾前墜名
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/flumespool
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、啟動

bin/flume-ng agent -c ./conf -f ./conf/spool-logger.conf -n a1 -Dflume.root.logger=INFO,console

3、傳入數據:

往/home/hadoop/flumeSpool放文件

案例三:采集文件數據(方式一)

exec source 適用於監控一個實時追加的文件,沒有偏移量,會出現數據丟失情況;

1、增加tail-hdfs.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# exec 指的是命令
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
# F根據文件名追中, f根據文件的nodeid追蹤,即使換了文件名,也能跟蹤到
a1.sources.r1.command = tail -F /home/hadoop/log/test.log

#下沉目標
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
# 指定目錄, flum幫做目的替換
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
#文件的命名, 前綴
a1.sinks.k1.hdfs.filePrefix = events-

#10 分鍾就改目錄
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

#文件滾動之前的等待時間(秒)
a1.sinks.k1.hdfs.rollInterval = 120
#文件滾動的大小限制(bytes)
a1.sinks.k1.hdfs.rollSize = 268435456
#寫入多少個event數據后滾動文件(事件個數)
a1.sinks.k1.hdfs.rollCount = 20

#1000個事件就往里面寫入
a1.sinks.k1.hdfs.batchSize = 1000

#用本地時間格式化目錄
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#下沉后, 生成的文件類型,默認是Sequencefile,可用DataStream,則為普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

2、啟動命令

bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1

案例四:采集文件數據(方式二)

taildir Source 既能夠實現斷點續傳,又可以保證數據不丟失,還能夠進行實時監控。為了防止00:00的時候,今日的數據寫到明日,在sink處增加攔截器,給數據一個時間戳,不使用節點機器上時間。

tail dir 是根據通配符監視多個文件,即使文件改了名,也不會重復采集,它是根據偏移量進行跟蹤的;

1、增加tail-hdfs.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups=g1
a1.sources.r1.filegroups.g1=  /logdata/a.*
a1.sources.r1.fileHeader = true

# 加入攔截器
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = timestamp
a1.sources.s1.interceptors.i1.headerName= timestamp

#下沉目標
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
# 指定目錄, flum幫做目的替換
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
#文件的命名, 前綴
a1.sinks.k1.hdfs.filePrefix = events-

#10 分鍾就改目錄
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

#文件滾動之前的等待時間(秒)
a1.sinks.k1.hdfs.rollInterval = 120
#文件滾動的大小限制(bytes)
a1.sinks.k1.hdfs.rollSize = 268435456
#寫入多少個event數據后滾動文件(事件個數)
a1.sinks.k1.hdfs.rollCount = 20

#1000個事件就往里面寫入
a1.sinks.k1.hdfs.batchSize = 1000

#用本地時間格式化目錄
a1.sinks.k1.hdfs.useLocalTimeStamp = false

#下沉后, 生成的文件類型,默認是Sequencefile,可用DataStream,則為普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

2、啟動命令

bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1

 

案例五、flume級聯操作

使用前景:復雜的網絡或者日志服務器特別多,每台服務器流量不多,需要進行匯集;

需要寫兩個配置文件,分別放在兩個機器上,一個當發送者,一個當收集者(Kafka為例)

1、編寫tail-avro-.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /logdata/a.*
a1.sources.r1.fileHeader = false

a1.channels.c1.type = file

a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = doitedu02
a1.sinks.k1.port = 4444

2、編寫avro-fakfa.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1


a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = doitedu02
a1.sources.r1.port = 4444
a1.sources.r1.batchSize = 100

a1.channels.c1.type = file

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = doitedu01:9092,doitedu02:9092,doitedu03:9092
a1.sinks.k1.kafka.topic = doitedu17
a1.sinks.k1.kafka.producer.acks = 1

2、先啟動avro-fakfa.conf,再啟動tail-avro-.conf

bin/flume-ng agent -c conf -f conf/avro-fakfa.conf -n al -Dflume.root.logger=INFO,console

bin/flume-ng agent -c conf -f conf/tail-avro-.conf -n a1

3、kafka基本命令

## topic查看
bin/kafka-topics.sh --list --zookeeper doitedu01:2181

## topic創建
bin/kafka-topics.sh --create --topic topic2 --partitions 2 --replication-factor 2 --zookeeper doitedu01:2181

## 啟動一個控制台生產者來生產數據
bin/kafka-console-producer.sh --broker-list doitedu01:9092,doitedu02:9092,doitedu03:9092 --topic topic2
>hello tom

## 啟動一個控制台消費者來消費數據
bin/kafka-console-consumer.sh --bootstrap-server doitedu01:9092,doitedu02:9092,doitedu03:9092 --topic topic2 --from-beginning

  

案例六:flume選擇器

一個 source 可以對接多個 channel,那么,source 的數據如何在多個 channel 之間傳遞,就由 selector 來控制,配置應該掛載到 source 組件

1、復制選擇器

一個連hdfs, 一個連kafka

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

a1.sources.r1.channels = c1 c2
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /logdata/a.*
a1.sources.r1.fileHeader = false
a1.sources.r1.selector.type = replicating

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.headerName = timestamp

a1.channels.c1.type = memory
a1.channels.c2.type = memory

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = doitedu01:9092,doitedu02:9092,doitedu03:9092
a1.sinks.k1.kafka.topic = doitedu17
a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k2.channel = c2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://doitedu01:8020/flumedata/%Y-%m-%d/%H
a1.sinks.k2.hdfs.filePrefix = doitedu-log-
a1.sinks.k2.hdfs.fileSuffix = .log
a1.sinks.k2.hdfs.rollSize = 268435456
a1.sinks.k2.hdfs.rollInterval = 120
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.batchSize = 1000
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.codeC = snappy
a1.sinks.k2.hdfs.useLocalTimeStamp = false

  

2、多路選擇器

一個source里數據,可能有不同種類數據,需要使用攔截器,對數據進行區分,然后使用多路選擇器插入到不同的channel里,一個寫到kakfa,一個寫到hdfs。

2.1 攔截器,並打包放到flume的lib下

package cn.doitedu.yiee.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

public class MultiplexingInterceptor implements Interceptor {
    private Integer flagfield = 0;
    private Integer timestampfield = 0;

    public MultiplexingInterceptor(Integer flagfield,Integer timestampfield) {
        this.flagfield = flagfield;
        this.timestampfield = timestampfield;
    }

    /**
     * 攔截器構造實例后的初始化工作
     */
    public void initialize() {

    }

    // 日志格式:
    // u01,ev1,mall,1568738583468
    public Event intercept(Event event) {
        // 根據event的數據內容,以及參數中指定的標記字段,來產生不同的header值
        byte[] body = event.getBody();
        String line = new String(body);

        String[] split = line.split(",");

        // 切出業務標記,並添加到header
        event.getHeaders().put("flag",split[flagfield]);
        // 切出行為(事件)時間戳,並添加到header
        event.getHeaders().put("timestamp",split[timestampfield]);

        return event;
    }

    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }


    /**
     * 攔截器銷毀之前的一些清理工作
     */
    public void close() {

    }

    public static class MultiplexingInterceptorBuilder implements Interceptor.Builder{

        Integer flagfield = 0;
        Integer timestampfield = 0;
        /**
         * 用戶構建一個攔截器實例
         * @return
         */
        public Interceptor build() {

            return new MultiplexingInterceptor(flagfield,timestampfield);
        }

        /**
         * 獲取參數的入口
         * @param context
         */
        public void configure(Context context) {
            flagfield = context.getInteger("flagfield");
            timestampfield = context.getInteger("timestampfield");

        }
    }
}

  2.2 模擬日志生成腳本

while true
		do
	if [ $(($RANDOM % 2)) -eq 0 ]
		then
		echo "u$RANDOM,e1,waimai,`date +%s`000" >> a.log
	else
		echo "u$RANDOM,e1,mall,`date +%s`000" >> a.log
		fi
		sleep 0.2
	done

  2.3變成配置文件

	1.sources = r1
	a1.channels = c1 c2
	a1.sinks = k1 k2
	
	a1.sources.r1.channels = c1 c2
	a1.sources.r1.type = TAILDIR
	a1.sources.r1.filegroups = g1
	a1.sources.r1.filegroups.g1 = /logdata/a.*
	a1.sources.r1.fileHeader = false
	
	a1.sources.r1.interceptors = i1
	a1.sources.r1.interceptors.i1.type = cn.doitedu.yiee.flume.MultiplexingInterceptor$MultiplexingInterceptorBuilder
	a1.sources.r1.interceptors.i1.flagfield = 2
	a1.sources.r1.interceptors.i1.timestampfield = 3
	
	a1.sources.r1.selector.type = multiplexing
	a1.sources.r1.selector.header = flag
	a1.sources.r1.selector.mapping.mall = c1
	a1.sources.r1.selector.mapping.waimai = c2
	a1.sources.r1.selector.default = c2
	
	
	a1.channels.c1.type = memory
	a1.channels.c1.capacity = 2000
	a1.channels.c1.transactionCapacity = 1000
	
	a1.channels.c2.type = memory
	a1.channels.c2.capacity = 2000
	a1.channels.c2.transactionCapacity = 1000
	
	
	a1.sinks.k1.channel = c1
	a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
	a1.sinks.k1.kafka.bootstrap.servers = doitedu01:9092,doitedu02:9092,doitedu03:9092
	a1.sinks.k1.kafka.topic = mall
	a1.sinks.k1.kafka.producer.acks = 1
	
	
	a1.sinks.k2.channel = c2
	a1.sinks.k2.type = hdfs
	a1.sinks.k2.hdfs.path = hdfs://doitedu01:8020/waimai/%Y-%m-%d/%H
	a1.sinks.k2.hdfs.filePrefix = doitedu-log-
	a1.sinks.k2.hdfs.fileSuffix = .log
	a1.sinks.k2.hdfs.rollSize = 268435456
	a1.sinks.k2.hdfs.rollInterval = 120
	a1.sinks.k2.hdfs.rollCount = 0
	a1.sinks.k2.hdfs.batchSize = 1000
	a1.sinks.k2.hdfs.fileType = DataStream
	a1.sinks.k2.hdfs.useLocalTimeStamp = false

  

案例七:自動失敗切換

多個sink連接一個channel,默認不需要專門去配置的, 相當於負載均衡,或者failover sink processor 自動失敗,需要將多個 sink 創建成 group。正常情況下,只運行一個sink,只有當它失敗后,才切換到別的sink上。

 

 

默認是走蘭色的線,若是蘭色的機器掛掉,就走綠色的線;

1、級聯高可用配置第一級

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

a1.sources.r1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = g1
a1.sources.r1.filegroups.g1 = /logdata/a.*
a1.sources.r1.fileHeader = false


a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000


a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = doitedu02
a1.sinks.k1.port = 4444


a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = doitedu03
a1.sinks.k2.port = 4444


a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 200
a1.sinkgroups.g1.processor.priority.k2 = 100
a1.sinkgroups.g1.processor.maxpenalty = 5000

2、級聯高可用配置第2級(節點1)

a1.sources = r1
a1.sinks = k1
a1.channels = c1


a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = doitedu02
a1.sources.r1.port = 4444
a1.sources.r1.batchSize = 100


a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = doitedu01:9092,doitedu02:9092,doitedu03:9092
a1.sinks.k1.kafka.topic = failover
a1.sinks.k1.kafka.producer.acks = 1

3、級聯高可用配置第2級(節點2)

a1.sources = r1
a1.sinks = k1
a1.channels = c1


a1.sources.r1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = doitedu03
a1.sources.r1.port = 4444
a1.sources.r1.batchSize = 100


a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = doitedu01:9092,doitedu02:9092,doitedu03:9092
a1.sinks.k1.kafka.topic = failover
a1.sinks.k1.kafka.producer.acks = 1

  

四、flume監控

flume 在運行時,狀態是否正常,吞吐量是否正常,可以使用ganglia 進行展現:

-Dflume.monitoring.type=ganglia -Dflume.monitoring.port=34890

Ganglia 是一個通用的集群運維監控系統;
它在各台需要監控狀態信息的機器上安裝“探針”,然后這些“探針”會收集所在機器上的各種狀態
信息(cpu 負載,內存負載,磁盤 IO 負載,網絡 IO 負載,以及各類應用軟件的狀態信息),然后匯
聚到它的中心匯聚點,並提供 web 頁面進行圖形可視化查看

 

 

五、監控flume進程、自動拉起

 

#!/bin/bash

export FLUME_HOME=/opt/apps/flume-1.9.0
while true
do
pc=`ps -ef | grep flume | grep -v "grep" | wc -l`

if [[ $pc -lt 1 ]]
then
  echo "detected no flume process.... preparing to launch flume agent...... "
  ${FLUME_HOME}/bin/flume-ng agent -n a1 -c ${FLUME_HOME}/conf/ -f ${FLUME_HOME}/agentconf/failover.properties 1>/dev/null 2>&1 &
else
  echo "detected flume process number is : $pc "
fi

sleep 1

done

 

更多java、大數據學習面試資料,請掃碼關注我的公眾號:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM