Flume篇---Flume安裝配置與相關使用


一.前述

Copy過來一段介紹Apache Flume 是一個從可以收集例如日志,事件等數據資源,並將這些數量龐大的數據從各項數據資源中集中起來存儲的工具/服務,或者數集中機制。flume具有高可用,分布式,配置工具,其設計的原理也是基於將數據流,如日志數據從各種網站服務器上匯集起來存儲到HDFS,HBase等集中存儲器中。官網:http://flume.apache.org/FlumeUserGuide.html

二.架構

1.基本架構

介紹:

Source:(相當於一個來源)

   從數據發生器接收數據,並將接收的數據以Flume的event格式傳遞給一個或者多個通道channal,Flume提供多種數據接收的方式,比如Avro,Thrift,twitter1%等

Channel:(相當於一個中轉)

 channal是一種短暫的存儲容器,它將從source處接收到的event格式的數據緩存起來,直到它們被sinks消費掉,它在source和sink間起着一共橋梁的作用,channal是一個完整的事務,這一點保證了數據在收發的時候的一致性. 並且它可以和任意數量的source和sink鏈接. 支持的類型有: JDBC channel , File System channel , Memort channel等.

sink:(相當於最后的寫出)

  sink將數據存儲到集中存儲器比如Hbase和HDFS,它從channals消費數據(events)並將其傳遞給目標地. 目標地可能是另一個sink,也可能HDFS,HBase.

2.延伸架構

  2.1利用AVRO中轉

2.2一般多個來源時可以配置這樣

ps:

Avro([ævrə])是Hadoop的一個子項目,由Hadoop的創始人Doug Cutting(也是Lucene,Nutch等項目的創始人) 牽頭開發 Avro是一個數據序列化系統,設計用於支持大批量數據交換的應用。它的主要特點有:支持二進制序列化方式,可以便捷,快速地處理大量數據;動態語言友好,Avro提供的機制使動態語言可以方便地處理Avro數據。
三。具體實施
3.1 安裝
1、上傳
2、解壓
3、修改conf/flume-env.sh  文件中的JDK目錄
 注意:JAVA_OPTS 配置  如果我們傳輸文件過大 報內存溢出時 需要修改這個配置項
4、驗證安裝是否成功  ./flume-ng version
5、配置環境變量
    export FLUME_HOME=/home/apache-flume-1.6.0-bin


3.2 Source、Channel、Sink有哪些類型
    Flume Source
    Source類型                   | 說明
    Avro Source                 | 支持Avro協議(實際上是Avro RPC),內置支持
    Thrift Source               | 支持Thrift協議,內置支持
    Exec Source                 | 基於Unix的command在標准輸出上生產數據
    JMS Source                   | 從JMS系統(消息、主題)中讀取數據
    Spooling Directory Source | 監控指定目錄內數據變更
    Twitter 1% firehose Source|    通過API持續下載Twitter數據,試驗性質
    Netcat Source               | 監控某個端口,將流經端口的每一個文本行數據作為Event輸入
    Sequence Generator Source | 序列生成器數據源,生產序列數據
    Syslog Sources               | 讀取syslog數據,產生Event,支持UDP和TCP兩種協議
    HTTP Source                 | 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式
    Legacy Sources               | 兼容老的Flume OG中Source(0.9.x版本)

    Flume Channel
    Channel類型       說明
    Memory Channel                | Event數據存儲在內存中
    JDBC Channel                  | Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby
    File Channel                  | Event數據存儲在磁盤文件中
    Spillable Memory Channel   | Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件
    Pseudo Transaction Channel | 測試用途
    Custom Channel                | 自定義Channel實現

    Flume Sink
    Sink類型     說明
    HDFS Sink             | 數據寫入HDFS
    Logger Sink           | 數據寫入日志文件
    Avro Sink             | 數據被轉換成Avro Event,然后發送到配置的RPC端口上
    Thrift Sink           | 數據被轉換成Thrift Event,然后發送到配置的RPC端口上
    IRC Sink              | 數據在IRC上進行回放
    File Roll Sink         | 存儲數據到本地文件系統
    Null Sink             | 丟棄到所有數據
    HBase Sink             | 數據寫入HBase數據庫
    Morphline Solr Sink | 數據發送到Solr搜索服務器(集群)
    ElasticSearch Sink     | 數據發送到Elastic Search搜索服務器(集群)
    Kite Dataset Sink     | 寫數據到Kite Dataset,試驗性質的
    Custom Sink           | 自定義Sink實現



案例1、 A simple example
    http://flume.apache.org/FlumeUserGuide.html#a-simple-example
    
    配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################

啟動flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console 指定配置目錄
flume-ng agent -n a1 -f op5 -Dflume.root.logger=INFO,console 不用指定配置目錄,將上訴source,channel,sink的文件起名為a1,同時指定這個文件在哪

安裝telnet
yum install telnet
退出 ctrl+]  quit

Memory Chanel 配置
  capacity:默認該通道中最大的可以存儲的event數量是100,
  trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數量也是100
  keep-alive:event添加到通道中或者移出的允許時間
  byte**:即event的字節量的限制,只包括eventbody



案例2、兩個flume做集群(第一個agent的sink作為第二個agent的source)

    node01服務器中,配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = node1
    a1.sources.r1.port = 44444

    # Describe the sink
    # a1.sinks.k1.type = logger
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = node2
    a1.sinks.k1.port = 60000

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    node02服務器中,安裝Flume(步驟略)
    配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = node2
    a1.sources.r1.port = 60000

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    先啟動node02的Flume
    flume-ng agent  -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console
    
    再啟動node01的Flume
    flume-ng agent  -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
    
    打開telnet 測試  node02控制台輸出結果


案例3、Exec Source(監聽一個文件)
        http://flume.apache.org/FlumeUserGuide.html#exec-source
        
    配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/flume.exec.log

    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    啟動Flume
    flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console
    
    創建空文件演示 touch flume.exec.log
    循環添加數據
    for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done
        
案例4、Spooling Directory Source(監聽一個目錄)
        http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
    配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/logs
    a1.sources.r1.fileHeader = true

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################

    啟動Flume
    flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console

    拷貝文件演示
    mkdir logs
    cp flume.exec.log logs/


案例5、hdfs sink
        http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
    
        配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/logs
    a1.sources.r1.fileHeader = true

    # Describe the sink
    ***只修改上一個spool sink的配置代碼塊 a1.sinks.k1.type = logger
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M
    
    ##每隔60s或者文件大小超過10M的時候產生新文件
    # hdfs有多少條消息時新建文件,0不基於消息個數
    a1.sinks.k1.hdfs.rollCount=0
    # hdfs創建多長時間新建文件,0不基於時間
    a1.sinks.k1.hdfs.rollInterval=60
    # hdfs多大時新建文件,0不基於文件大小
    a1.sinks.k1.hdfs.rollSize=10240
    # 當目前被打開的臨時文件在該參數指定的時間(秒)內,沒有任何數據寫入,則將該臨時文件關閉並重命名成目標文件
    a1.sinks.k1.hdfs.idleTimeout=3
    
    a1.sinks.k1.hdfs.fileType=DataStream
   #時間參數一定要帶上 true
    a1.sinks.k1.hdfs.useLocalTimeStamp=true
    
    ## 每五分鍾生成一目錄:
    # 是否啟用時間上的”舍棄”,這里的”舍棄”,類似於”四舍五入”,后面再介紹。如果啟用,則會影響除了%t的其他所有時間表達式
    a1.sinks.k1.hdfs.round=true
    # 時間上進行“舍棄”的值;
    a1.sinks.k1.hdfs.roundValue=5
    # 時間上進行”舍棄”的單位,包含:second,minute,hour
    a1.sinks.k1.hdfs.roundUnit=minute

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1(將source,channel,sink關聯)
    ############################################################
    創建HDFS目錄
    hadoop fs -mkdir /flume
    
    啟動Flume
    flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console

    查看hdfs文件
    hadoop fs -ls /flume/...
    hadoop fs -get /flume/...


http://flume.apache.org/

安裝
1、上傳
2、解壓
3、修改conf/flume-env.sh  文件中的JDK目錄
 注意:JAVA_OPTS 配置  如果我們傳輸文件過大 報內存溢出時 需要修改這個配置項
4、驗證安裝是否成功  ./flume-ng version
5、配置環境變量
    export FLUME_HOME=/home/apache-flume-1.6.0-bin


Source、Channel、Sink有哪些類型
    Flume Source
    Source類型                   | 說明
    Avro Source                 | 支持Avro協議(實際上是Avro RPC),內置支持
    Thrift Source               | 支持Thrift協議,內置支持
    Exec Source                 | 基於Unix的command在標准輸出上生產數據
    JMS Source                   | 從JMS系統(消息、主題)中讀取數據
    Spooling Directory Source | 監控指定目錄內數據變更
    Twitter 1% firehose Source|    通過API持續下載Twitter數據,試驗性質
    Netcat Source               | 監控某個端口,將流經端口的每一個文本行數據作為Event輸入
    Sequence Generator Source | 序列生成器數據源,生產序列數據
    Syslog Sources               | 讀取syslog數據,產生Event,支持UDP和TCP兩種協議
    HTTP Source                 | 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式
    Legacy Sources               | 兼容老的Flume OG中Source(0.9.x版本)

    Flume Channel
    Channel類型       說明
    Memory Channel                | Event數據存儲在內存中
    JDBC Channel                  | Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby
    File Channel                  | Event數據存儲在磁盤文件中
    Spillable Memory Channel   | Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件
    Pseudo Transaction Channel | 測試用途
    Custom Channel                | 自定義Channel實現

    Flume Sink
    Sink類型     說明
    HDFS Sink             | 數據寫入HDFS
    Logger Sink           | 數據寫入日志文件
    Avro Sink             | 數據被轉換成Avro Event,然后發送到配置的RPC端口上
    Thrift Sink           | 數據被轉換成Thrift Event,然后發送到配置的RPC端口上
    IRC Sink              | 數據在IRC上進行回放
    File Roll Sink         | 存儲數據到本地文件系統
    Null Sink             | 丟棄到所有數據
    HBase Sink             | 數據寫入HBase數據庫
    Morphline Solr Sink | 數據發送到Solr搜索服務器(集群)
    ElasticSearch Sink     | 數據發送到Elastic Search搜索服務器(集群)
    Kite Dataset Sink     | 寫數據到Kite Dataset,試驗性質的
    Custom Sink           | 自定義Sink實現



案例1、 A simple example
    http://flume.apache.org/FlumeUserGuide.html#a-simple-example
    
    配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################

啟動flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console

安裝telnet
yum install telnet
退出 ctrl+]  quit

Memory Chanel 配置
  capacity:默認該通道中最大的可以存儲的event數量是100,
  trasactionCapacity:每次最大可以source中拿到或者送到sink中的event數量也是100
  keep-alive:event添加到通道中或者移出的允許時間
  byte**:即event的字節量的限制,只包括eventbody



案例2、兩個flume做集群

    node01服務器中,配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = node1
    a1.sources.r1.port = 44444

    # Describe the sink
    # a1.sinks.k1.type = logger
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = node2
    a1.sinks.k1.port = 60000

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    node02服務器中,安裝Flume(步驟略)
    配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = node2
    a1.sources.r1.port = 60000

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    先啟動node02的Flume
    flume-ng agent  -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console
    
    再啟動node01的Flume
    flume-ng agent  -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
    
    打開telnet 測試  node02控制台輸出結果


案例3、Exec Source
        http://flume.apache.org/FlumeUserGuide.html#exec-source
        
    配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/flume.exec.log

    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    啟動Flume
    flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console
    
    創建空文件演示 touch flume.exec.log
    循環添加數據
    for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done
        
案例4、Spooling Directory Source
        http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
    配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/logs
    a1.sources.r1.fileHeader = true

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################

    啟動Flume
    flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console

    拷貝文件演示
    mkdir logs
    cp flume.exec.log logs/


案例5、hdfs sink
        http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
    
        配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/logs
    a1.sources.r1.fileHeader = true

    # Describe the sink
    ***只修改上一個spool sink的配置代碼塊 a1.sinks.k1.type = logger
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M
    
    ##每隔60s或者文件大小超過10M的時候產生新文件
    # hdfs有多少條消息時新建文件,0不基於消息個數
    a1.sinks.k1.hdfs.rollCount=0
    # hdfs創建多長時間新建文件,0不基於時間
    a1.sinks.k1.hdfs.rollInterval=60
    # hdfs多大時新建文件,0不基於文件大小
    a1.sinks.k1.hdfs.rollSize=10240
    # 當目前被打開的臨時文件在該參數指定的時間(秒)內,沒有任何數據寫入,則將該臨時文件關閉並重命名成目標文件
    a1.sinks.k1.hdfs.idleTimeout=3
    
    a1.sinks.k1.hdfs.fileType=DataStream
    a1.sinks.k1.hdfs.useLocalTimeStamp=true
    
    ## 每五分鍾生成一個目錄:
    # 是否啟用時間上的”舍棄”,這里的”舍棄”,類似於”四舍五入”,后面再介紹。如果啟用,則會影響除了%t的其他所有時間表達式
    a1.sinks.k1.hdfs.round=true
    # 時間上進行“舍棄”的值;
    a1.sinks.k1.hdfs.roundValue=5
    # 時間上進行”舍棄”的單位,包含:second,minute,hour
    a1.sinks.k1.hdfs.roundUnit=minute

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    創建HDFS目錄
    hadoop fs -mkdir /flume
    
    啟動Flume
    flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console

    查看hdfs文件
    hadoop fs -ls /flume/...
    hadoop fs -get /flume/...

作業:
1、flume如何收集java請求數據
2、項目當中如何來做? 日志存放/log/目錄下 以yyyyMMdd為子目錄 分別存放每天的數據





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM