大數據技術之Flume


第1章 概述

1.1 Flume定義

FlumeCloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統Flume基於流式架構,靈活簡單。

1.2 Flume組成架構

Flume組成架構如圖1-1,圖1-2所示:

圖1-1 Flume組成架構

圖1-2 Flume組成架構詳解

下面我們來詳細介紹一下Flume架構中的組件。

1.2.1 Agent

Agent是一個JVM進程,它以事件的形式將數據從源頭送至目的,是Flume數據傳輸的基本單元

Agent主要有3個部分組成,Source、Channel、Sink。

1.2.2 Source

Source是負責接收數據到Flume Agent的組件。Source組件可以處理各種類型、各種格式的日志數據,包括avrothriftexecjmsspooling directorynetcatsequence generatorsysloghttplegacy

1.2.3 Channel

Channel是位於SourceSink之間的緩沖區。因此,Channel允許SourceSink運作在不同的速率上。Channel是線程安全的,可以同時處理幾個Source的寫入操作和幾個Sink的讀取操作。

Flume自帶兩種ChannelMemory ChannelFile Channel

Memory Channel是內存中的隊列Memory Channel在不需要關心數據丟失的情景下適用。如果需要關心數據丟失,那么Memory Channel就不應該使用,因為程序死亡、機器宕機或者重啟都會導致數據丟失。

File Channel將所有事件寫到磁盤。因此在程序關閉或機器宕機的情況下不會丟失數據。

1.2.4 Sink

Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被發送到另一個Flume Agent

Sink是完全事務性的。在從Channel批量刪除數據之前,每個SinkChannel啟動一個事務。批量事件一旦成功寫出到存儲系統或下一個Flume AgentSink就利用Channel提交事務。事務一旦被提交,該Channel從自己的內部緩沖區刪除事件。

Sink組件目的地包括hdfsloggeravrothriftipcfilenullHBasesolr、自定義。

1.2.5 Event

傳輸單元,Flume數據傳輸的基本單元,以事件的形式將數據從源頭送至目的地。

1.3 Flume拓撲結構

Flume的拓撲結構如圖1-3、1-4、1-5和1-6所示:

圖1-3 Flume Agent連接

圖1-4 單source,多channel、sink

圖1-5 Flume負載均衡

圖1-6 Flume Agent聚合

1.4 Flume Agent內部原理

第2章 快速入門

2.1 Flume安裝地址

1) Flume官網地址

http://flume.apache.org/

2)文檔查看地址

http://flume.apache.org/FlumeUserGuide.html

3)下載地址

http://archive.apache.org/dist/flume/

2.2 安裝部署

1)將apache-flume-1.7.0-bin.tar.gz上傳到linux的/opt/software目錄下

2)解壓apache-flume-1.7.0-bin.tar.gz到/opt/module/目錄下

[atguigu@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/

3)修改apache-flume-1.7.0-bin的名稱為flume

[atguigu@hadoop102 module]$ mv apache-flume-1.7.0-bin flume

  1. 將flume/conf下的flume-env.sh.template文件修改為flume-env.sh,並配置flume-env.sh文件

    [atguigu@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh

    [atguigu@hadoop102 conf]$ vi flume-env.sh

    export JAVA_HOME=/opt/module/jdk1.8.0_144

    第3章 企業開發案例

    3.1 監控端口數據官方案例

    1)案例需求:首先,Flume監控本機44444端口,然后通過telnet工具向本機44444端口發送消息,最后Flume將監聽的數據實時顯示在控制台。

    2)需求分析:

    3)實現步驟:

    1.安裝telnet工具

    將rpm軟件包(xinetd-2.3.14-40.el6.x86_64.rpm、telnet-0.17-48.el6.x86_64.rpm和telnet-server-0.17-48.el6.x86_64.rpm)拷入/opt/software文件夾下面。執行RPM軟件包安裝命令:

    [atguigu@hadoop102 software]$ sudo rpm -ivh xinetd-2.3.14-40.el6.x86_64.rpm

    [atguigu@hadoop102 software]$ sudo rpm -ivh telnet-0.17-48.el6.x86_64.rpm

    [atguigu@hadoop102 software]$ sudo rpm -ivh telnet-server-0.17-48.el6.x86_64.rpm

    2.判斷44444端口是否被占用

    [atguigu@hadoop102 flume-telnet]$ sudo netstat -tunlp | grep 44444

    功能描述:netstat命令是一個監控TCP/IP網絡的非常有用的工具,它可以顯示路由表、實際的網絡連接以及每一個網絡接口設備的狀態信息。

    基本語法:netstat [選項]

    選項參數:

        -t或--tcp:顯示TCP傳輸協議的連線狀況;

    -u或--udp:顯示UDP傳輸協議的連線狀況;

        -n或--numeric:直接使用ip地址,而不通過域名服務器;

        -l或--listening:顯示監控中的服務器的Socket;

        -p或--programs:顯示正在使用Socket的程序識別碼和程序名稱;

    3創建Flume Agent配置文件flume-telnet-logger.conf

    在flume目錄下創建job文件夾並進入job文件夾。

    [atguigu@hadoop102 flume]$ mkdir job

    [atguigu@hadoop102 flume]$ cd job/

    在job文件夾下創建Flume Agent配置文件flume-telnet-logger.conf。

    [atguigu@hadoop102 job]$ touch flume-telnet-logger.conf

    在flume-telnet-logger.conf文件中添加如下內容。

    [atguigu@hadoop102 job]$ vim flume-telnet-logger.conf

    添加內容如下:

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

     

    # Describe/configure the source

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = localhost

    a1.sources.r1.port = 44444

     

    # Describe the sink

    a1.sinks.k1.type = logger

     

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    注:配置文件來源於官方手冊http://flume.apache.org/FlumeUserGuide.html

    4. 先開啟flume監聽端口

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console

    參數說明:

        --conf conf/ :表示配置文件存儲在conf/目錄

        --name a1    :表示給agent起名為a1

        --conf-file job/flume-telnet.conf :flume本次啟動讀取的配置文件是在job文件夾下的flume-telnet.conf文件。

        -Dflume.root.logger==INFO,console :-D表示flume運行時動態修改flume.root.logger參數屬性值,並將控制台日志打印級別設置為INFO級別。日志級別包括:log、info、warn、error。

    5.使用telnet工具向本機的44444端口發送內容

    [atguigu@hadoop102 ~]$ telnet localhost 44444

    6.在Flume監聽頁面觀察接收數據情況

    3.2 實時讀取本地文件到HDFS案例

    1)案例需求:實時監控Hive日志,並上傳到HDFS中

    2)需求分析:

    3)實現步驟:

  2. Flume 要想將數據輸出到 HDFS ,必須持有 Hadoop 相關 jar

    將commons-configuration-1.6.jar、

    hadoop-auth-2.7.2.jar、

    hadoop-common-2.7.2.jar、

    hadoop-hdfs-2.7.2.jar、

    commons-io-2.4.jar、

    htrace-core-3.1.0-incubating.jar

    拷貝到/opt/module/flume/lib文件夾下。

  3. 創建 flume-file-hdfs.conf 文件

    創建文件

    [atguigu@hadoop102 job]$ touch flume-file-hdfs.conf

    注:要想讀取Linux系統中的文件,就得按照Linux命令的規則執行命令。由於Hive日志在Linux系統中所以讀取文件的類型選擇:exec即execute執行的意思。表示執行Linux命令來讀取文件。

    [atguigu@hadoop102 job]$ vim flume-file-hdfs.conf

    添加如下內容

    # Name the components on this agent

    a2.sources = r2

    a2.sinks = k2

    a2.channels = c2

     

    # Describe/configure the source

    a2.sources.r2.type = exec

    a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log

    a2.sources.r2.shell = /bin/bash -c

     

    # Describe the sink

    a2.sinks.k2.type = hdfs

    a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H

    #上傳文件的前綴

    a2.sinks.k2.hdfs.filePrefix = logs-

    #是否按照時間滾動文件夾

    a2.sinks.k2.hdfs.round = true

    #多少時間單位創建一個新的文件夾

    a2.sinks.k2.hdfs.roundValue = 1

    #重新定義時間單位

    a2.sinks.k2.hdfs.roundUnit = hour

    #是否使用本地時間戳

    a2.sinks.k2.hdfs.useLocalTimeStamp = true

    #積攢多少個EventflushHDFS一次

    a2.sinks.k2.hdfs.batchSize = 1000

    #設置文件類型,可支持壓縮

    a2.sinks.k2.hdfs.fileType = DataStream

    #多久生成一個新的文件

    a2.sinks.k2.hdfs.rollInterval = 600

    #設置每個文件的滾動大小

    a2.sinks.k2.hdfs.rollSize = 134217700

    #文件的滾動與Event數量無關

    a2.sinks.k2.hdfs.rollCount = 0

    #最小冗余數

    a2.sinks.k2.hdfs.minBlockReplicas = 1

     

    # Use a channel which buffers events in memory

    a2.channels.c2.type = memory

    a2.channels.c2.capacity = 1000

    a2.channels.c2.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a2.sources.r2.channels = c2

    a2.sinks.k2.channel = c2

     

  4. 執行監控配置

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

    1. 開啟 Hadoop Hive 並操作 Hive 產生日志

      [atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh

      [atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh

       

      [atguigu@hadoop102 hive]$ bin/hive

      hive (default)>

    2. HDFS 上查看文件。

    3.3 實時讀取目錄文件到HDFS案例

    1)案例需求:使用Flume監聽整個目錄的文件

    2)需求分析:

    3)實現步驟:

    1.創建配置文件flume-dir-hdfs.conf

    創建一個文件

    [atguigu@hadoop102 job]$ touch flume-dir-hdfs.conf

    打開文件

    [atguigu@hadoop102 job]$ vim flume-dir-hdfs.conf

    添加如下內容

    a3.sources = r3

    a3.sinks = k3

    a3.channels = c3

     

    # Describe/configure the source

    a3.sources.r3.type = spooldir

    a3.sources.r3.spoolDir = /opt/module/flume/upload

    a3.sources.r3.fileSuffix = .COMPLETED

    a3.sources.r3.fileHeader = true

    #忽略所有以.tmp結尾的文件,不上傳

    a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

     

    # Describe the sink

    a3.sinks.k3.type = hdfs

    a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H

    #上傳文件的前綴

    a3.sinks.k3.hdfs.filePrefix = upload-

    #是否按照時間滾動文件夾

    a3.sinks.k3.hdfs.round = true

    #多少時間單位創建一個新的文件夾

    a3.sinks.k3.hdfs.roundValue = 1

    #重新定義時間單位

    a3.sinks.k3.hdfs.roundUnit = hour

    #是否使用本地時間戳

    a3.sinks.k3.hdfs.useLocalTimeStamp = true

    #積攢多少個EventflushHDFS一次

    a3.sinks.k3.hdfs.batchSize = 100

    #設置文件類型,可支持壓縮

    a3.sinks.k3.hdfs.fileType = DataStream

    #多久生成一個新的文件

    a3.sinks.k3.hdfs.rollInterval = 600

    #設置每個文件的滾動大小大概是128M

    a3.sinks.k3.hdfs.rollSize = 134217700

    #文件的滾動與Event數量無關

    a3.sinks.k3.hdfs.rollCount = 0

    #最小冗余數

    a3.sinks.k3.hdfs.minBlockReplicas = 1

     

    # Use a channel which buffers events in memory

    a3.channels.c3.type = memory

    a3.channels.c3.capacity = 1000

    a3.channels.c3.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a3.sources.r3.channels = c3

    a3.sinks.k3.channel = c3

     

    2. 啟動監控文件夾命令

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

    說明: 在使用Spooling Directory Source時

    1. 不要在監控目錄中創建並持續修改文件
    2. 上傳完成的文件會以.COMPLETED結尾
    3. 被監控文件夾每500毫秒掃描一次文件變動

    3. upload文件夾中添加文件

    在/opt/module/flume目錄下創建upload文件夾

    [atguigu@hadoop102 flume]$ mkdir upload

    向upload文件夾中添加文件

    [atguigu@hadoop102 upload]$ touch atguigu.txt

    [atguigu@hadoop102 upload]$ touch atguigu.tmp

    [atguigu@hadoop102 upload]$ touch atguigu.log

    4. 查看HDFS上的數據

    5. 等待1s,再次查詢upload文件夾

    [atguigu@hadoop102 upload]$ ll

    總用量 0

    -rw-rw-r--. 1 atguigu atguigu 0 5 20 22:31 atguigu.log.COMPLETED

    -rw-rw-r--. 1 atguigu atguigu 0 5 20 22:31 atguigu.tmp

    -rw-rw-r--. 1 atguigu atguigu 0 5 20 22:31 atguigu.txt.COMPLETED

    3.4 單數據源多出口案例(選擇器)

        單Source多Channel、Sink如圖7-2所示。

        

    圖7-2 單Source多Channel、Sink

    1)案例需求:使用Flume-1監控文件變動,Flume-1將變動內容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1將變動內容傳遞給Flume-3,Flume-3負責輸出到Local FileSystem。

    2)需求分析:

    3)實現步驟:

    0.准備工作

        在/opt/module/flume/job目錄下創建group1文件夾

    [atguigu@hadoop102 job]$ cd group1/

    在/opt/module/datas/目錄下創建flume3文件夾

    [atguigu@hadoop102 datas]$ mkdir flume3

    1.創建flume-file-flume.conf

    配置1個接收日志文件的source和兩個channel、兩個sink,分別輸送給flume-flume-hdfs和flume-flume-dir。

    創建配置文件並打開

    [atguigu@hadoop102 group1]$ touch flume-file-flume.conf

    [atguigu@hadoop102 group1]$ vim flume-file-flume.conf

    添加如下內容

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1 k2

    a1.channels = c1 c2

    # 將數據流復制給所有channel

    a1.sources.r1.selector.type = replicating

     

    # Describe/configure the source

    a1.sources.r1.type = exec

    a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log

    a1.sources.r1.shell = /bin/bash -c

     

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = hadoop102

    a1.sinks.k1.port = 4141

     

    a1.sinks.k2.type = avro

    a1.sinks.k2.hostname = hadoop102

    a1.sinks.k2.port = 4142

     

    # Describe the channel

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    a1.channels.c2.type = memory

    a1.channels.c2.capacity = 1000

    a1.channels.c2.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1 c2

    a1.sinks.k1.channel = c1

    a1.sinks.k2.channel = c2

    :Avro是由Hadoop創始人Doug Cutting創建的一種語言無關的數據序列化和RPC框架。

    注:RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。

    2.創建flume-flume-hdfs.conf

    配置上級Flume輸出的Source,輸出是到HDFS的Sink。

    創建配置文件並打開

    [atguigu@hadoop102 group1]$ touch flume-flume-hdfs.conf

    [atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf

    添加如下內容

    # Name the components on this agent

    a2.sources = r1

    a2.sinks = k1

    a2.channels = c1

     

    # Describe/configure the source

    a2.sources.r1.type = avro

    a2.sources.r1.bind = hadoop102

    a2.sources.r1.port = 4141

     

    # Describe the sink

    a2.sinks.k1.type = hdfs

    a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H

    #上傳文件的前綴

    a2.sinks.k1.hdfs.filePrefix = flume2-

    #是否按照時間滾動文件夾

    a2.sinks.k1.hdfs.round = true

    #多少時間單位創建一個新的文件夾

    a2.sinks.k1.hdfs.roundValue = 1

    #重新定義時間單位

    a2.sinks.k1.hdfs.roundUnit = hour

    #是否使用本地時間戳

    a2.sinks.k1.hdfs.useLocalTimeStamp = true

    #積攢多少個EventflushHDFS一次

    a2.sinks.k1.hdfs.batchSize = 100

    #設置文件類型,可支持壓縮

    a2.sinks.k1.hdfs.fileType = DataStream

    #多久生成一個新的文件

    a2.sinks.k1.hdfs.rollInterval = 600

    #設置每個文件的滾動大小大概是128M

    a2.sinks.k1.hdfs.rollSize = 134217700

    #文件的滾動與Event數量無關

    a2.sinks.k1.hdfs.rollCount = 0

    #最小冗余數

    a2.sinks.k1.hdfs.minBlockReplicas = 1

     

    # Describe the channel

    a2.channels.c1.type = memory

    a2.channels.c1.capacity = 1000

    a2.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a2.sources.r1.channels = c1

    a2.sinks.k1.channel = c1

    3.創建flume-flume-dir.conf

    配置上級Flume輸出的Source,輸出是到本地目錄的Sink。

    創建配置文件並打開

    [atguigu@hadoop102 group1]$ touch flume-flume-dir.conf

    [atguigu@hadoop102 group1]$ vim flume-flume-dir.conf

    添加如下內容

    # Name the components on this agent

    a3.sources = r1

    a3.sinks = k1

    a3.channels = c2

     

    # Describe/configure the source

    a3.sources.r1.type = avro

    a3.sources.r1.bind = hadoop102

    a3.sources.r1.port = 4142

     

    # Describe the sink

    a3.sinks.k1.type = file_roll

    a3.sinks.k1.sink.directory = /opt/module/datas/flume3

     

    # Describe the channel

    a3.channels.c2.type = memory

    a3.channels.c2.capacity = 1000

    a3.channels.c2.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a3.sources.r1.channels = c2

    a3.sinks.k1.channel = c2

    提示:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,並不會創建新的目錄。

    4.執行配置文件

    分別開啟對應配置文件:flume-flume-dir,flume-flume-hdfs,flume-file-flume。

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf

     

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf

     

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

    5.啟動HadoopHive

    [atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh

    [atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh

     

    [atguigu@hadoop102 hive]$ bin/hive

    hive (default)>

    6.檢查HDFS上數據

    7檢查/opt/module/datas/flume3目錄中數據

    [atguigu@hadoop102 flume3]$ ll

    總用量 8

    -rw-rw-r--. 1 atguigu atguigu 5942 5 22 00:09 1526918887550-3

    3.5 單數據源多出口案例(Sink)

    單Source、Channel多Sink(負載均衡)如圖7-3所示。

        

    圖7-3 單Source、Channel多Sink

    1)案例需求:使用Flume-1監控文件變動,Flume-1將變動內容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1將變動內容傳遞給Flume-3,Flume-3也負責存儲到HDFS

    2)需求分析:

    3)實現步驟:

    0.准備工作

        在/opt/module/flume/job目錄下創建group2文件夾

    [atguigu@hadoop102 job]$ cd group2/

    1.創建flume-netcat-flume.conf

    配置1個接收日志文件的source和1個channel、兩個sink,分別輸送給flume-flume-console1和flume-flume-console2。

    創建配置文件並打開

    [atguigu@hadoop102 group2]$ touch flume-netcat-flume.conf

    [atguigu@hadoop102 group2]$ vim flume-netcat-flume.conf

    添加如下內容

    # Name the components on this agent

    a1.sources = r1

    a1.channels = c1

    a1.sinkgroups = g1

    a1.sinks = k1 k2

     

    # Describe/configure the source

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = localhost

    a1.sources.r1.port = 44444

     

    a1.sinkgroups.g1.processor.type = load_balance

    a1.sinkgroups.g1.processor.backoff = true

    a1.sinkgroups.g1.processor.selector = round_robin

    a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

     

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = hadoop102

    a1.sinks.k1.port = 4141

     

    a1.sinks.k2.type = avro

    a1.sinks.k2.hostname = hadoop102

    a1.sinks.k2.port = 4142

     

    # Describe the channel

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinkgroups.g1.sinks = k1 k2

    a1.sinks.k1.channel = c1

    a1.sinks.k2.channel = c1

    :Avro是由Hadoop創始人Doug Cutting創建的一種語言無關的數據序列化和RPC框架。

    注:RPC(Remote Procedure Call)—遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。

    2.創建flume-flume-console1.conf

    配置上級Flume輸出的Source,輸出是到本地控制台。

    創建配置文件並打開

    [atguigu@hadoop102 group2]$ touch flume-flume-console1.conf

    [atguigu@hadoop102 group2]$ vim flume-flume-console1.conf

    添加如下內容

    # Name the components on this agent

    a2.sources = r1

    a2.sinks = k1

    a2.channels = c1

     

    # Describe/configure the source

    a2.sources.r1.type = avro

    a2.sources.r1.bind = hadoop102

    a2.sources.r1.port = 4141

     

    # Describe the sink

    a2.sinks.k1.type = logger

     

    # Describe the channel

    a2.channels.c1.type = memory

    a2.channels.c1.capacity = 1000

    a2.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a2.sources.r1.channels = c1

    a2.sinks.k1.channel = c1

    3.創建flume-flume-console2.conf

    配置上級Flume輸出的Source,輸出是到本地控制台。

    創建配置文件並打開

    [atguigu@hadoop102 group2]$ touch flume-flume-console2.conf

    [atguigu@hadoop102 group2]$ vim flume-flume-console2.conf

    添加如下內容

    # Name the components on this agent

    a3.sources = r1

    a3.sinks = k1

    a3.channels = c2

     

    # Describe/configure the source

    a3.sources.r1.type = avro

    a3.sources.r1.bind = hadoop102

    a3.sources.r1.port = 4142

     

    # Describe the sink

    a3.sinks.k1.type = logger

     

    # Describe the channel

    a3.channels.c2.type = memory

    a3.channels.c2.capacity = 1000

    a3.channels.c2.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a3.sources.r1.channels = c2

    a3.sinks.k1.channel = c2

    4.執行配置文件

    分別開啟對應配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume。

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

     

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

     

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

    5. 使用telnet工具向本機的44444端口發送內容

    $ telnet localhost 44444

    6. 查看Flume2Flume3的控制台打印日志

    3.6 多數據源匯總案例

        多Source匯總數據到單Flume如圖7-4所示。

    圖7-4多Flume匯總數據到單Flume

    1. 案例需求:

      hadoop103上的Flume-1監控文件/opt/module/group.log,

      hadoop102上的Flume-2監控某一個端口的數據流,

      Flume-1與Flume-2將數據發送給hadoop104上的Flume-3,Flume-3將最終數據打印到控制台。    

    2)需求分析:

    3)實現步驟:

    0.准備工作

    分發Flume

    [atguigu@hadoop102 module]$ xsync flume

        在hadoop102、hadoop103以及hadoop104的/opt/module/flume/job目錄下創建一個group3文件夾。

    [atguigu@hadoop102 job]$ mkdir group3

    [atguigu@hadoop103 job]$ mkdir group3

    [atguigu@hadoop104 job]$ mkdir group3

    1.創建flume1-logger-flume.conf

    配置Source用於監控hive.log文件,配置Sink輸出數據到下一級Flume。

    在hadoop103上創建配置文件並打開

    [atguigu@hadoop103 group3]$ touch flume1-logger-flume.conf

    [atguigu@hadoop103 group3]$ vim flume1-logger-flume.conf

    添加如下內容

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

     

    # Describe/configure the source

    a1.sources.r1.type = exec

    a1.sources.r1.command = tail -F /opt/module/group.log

    a1.sources.r1.shell = /bin/bash -c

     

    # Describe the sink

    a1.sinks.k1.type = avro

    a1.sinks.k1.hostname = hadoop104

    a1.sinks.k1.port = 4141

     

    # Describe the channel

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    2.創建flume2-netcat-flume.conf

    配置Source監控端口44444數據流,配置Sink數據到下一級Flume:

    在hadoop102上創建配置文件並打開

    [atguigu@hadoop102 group3]$ touch flume2-netcat-flume.conf

    [atguigu@hadoop102 group3]$ vim flume2-netcat-flume.conf

    添加如下內容

    # Name the components on this agent

    a2.sources = r1

    a2.sinks = k1

    a2.channels = c1

     

    # Describe/configure the source

    a2.sources.r1.type = netcat

    a2.sources.r1.bind = hadoop102

    a2.sources.r1.port = 44444

     

    # Describe the sink

    a2.sinks.k1.type = avro

    a2.sinks.k1.hostname = hadoop104

    a2.sinks.k1.port = 4141

     

    # Use a channel which buffers events in memory

    a2.channels.c1.type = memory

    a2.channels.c1.capacity = 1000

    a2.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a2.sources.r1.channels = c1

    a2.sinks.k1.channel = c1

    3.創建flume3-flume-logger.conf

    配置source用於接收flume1與flume2發送過來的數據流,最終合並后sink到控制台。

    在hadoop104上創建配置文件並打開

    [atguigu@hadoop104 group3]$ touch flume3-flume-logger.conf

    [atguigu@hadoop104 group3]$ vim flume3-flume-logger.conf

    添加如下內容

    # Name the components on this agent

    a3.sources = r1

    a3.sinks = k1

    a3.channels = c1

     

    # Describe/configure the source

    a3.sources.r1.type = avro

    a3.sources.r1.bind = hadoop104

    a3.sources.r1.port = 4141

     

    # Describe the sink

    # Describe the sink

    a3.sinks.k1.type = logger

     

    # Describe the channel

    a3.channels.c1.type = memory

    a3.channels.c1.capacity = 1000

    a3.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a3.sources.r1.channels = c1

    a3.sinks.k1.channel = c1

    4.執行配置文件

    分別開啟對應配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。

    [atguigu@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

     

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf

     

    [atguigu@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf

    5.在hadoop103上向/opt/module目錄下的group.log追加內容

    [atguigu@hadoop103 module]$ echo 'hello' > group.log

    6.在hadoop102上向44444端口發送數據

    [atguigu@hadoop102 flume]$ telnet hadoop102 44444

    7.檢查hadoop104上數據

    第4章 Flume監控之Ganglia

    4.1 Ganglia的安裝與部署

    1) 安裝httpd服務與php

    [atguigu@hadoop102 flume]$ sudo yum -y install httpd php

    2) 安裝其他依賴

    [atguigu@hadoop102 flume]$ sudo yum -y install rrdtool perl-rrdtool rrdtool-devel

    [atguigu@hadoop102 flume]$ sudo yum -y install apr-devel

    3) 安裝ganglia

    [atguigu@hadoop102 flume]$ sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm

    [atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmetad

    [atguigu@hadoop102 flume]$ sudo yum -y install ganglia-web

    [atguigu@hadoop102 flume]$ sudo yum install -y ganglia-gmond

    4) 修改配置文件/etc/httpd/conf.d/ganglia.conf

    [atguigu@hadoop102 flume]$ sudo vim /etc/httpd/conf.d/ganglia.conf

    修改為紅顏色的配置:

    # Ganglia monitoring system php web frontend

    Alias /ganglia /usr/share/ganglia

    <Location /ganglia>

    Order deny,allow

    Deny from all

    Allow from all

    # Allow from 127.0.0.1

    # Allow from ::1

    # Allow from .example.com

    </Location>

    5) 修改配置文件/etc/ganglia/gmetad.conf

    [atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf

    修改為:

    data_source "hadoop102" 192.168.1.102

    6) 修改配置文件/etc/ganglia/gmond.conf

    [atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmond.conf

    修改為:

    cluster {

    name = "hadoop102"

    owner = "unspecified"

    latlong = "unspecified"

    url = "unspecified"

    }

    udp_send_channel {

    #bind_hostname = yes # Highly recommended, soon to be default.

    # This option tells gmond to use a source address

    # that resolves to the machine's hostname. Without

    # this, the metrics may appear to come from any

    # interface and the DNS names associated with

    # those IPs will be used to create the RRDs.

    # mcast_join = 239.2.11.71

    host = 192.168.1.102

    port = 8649

    ttl = 1

    }

    udp_recv_channel {

    # mcast_join = 239.2.11.71

    port = 8649

    bind = 192.168.1.102

    retry_bind = true

    # Size of the UDP buffer. If you are handling lots of metrics you really

    # should bump it up to e.g. 10MB or even higher.

    # buffer = 10485760

    }

    7) 修改配置文件/etc/selinux/config

    [atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config

    修改為:

    # This file controls the state of SELinux on the system.

    # SELINUX= can take one of these three values:

    # enforcing - SELinux security policy is enforced.

    # permissive - SELinux prints warnings instead of enforcing.

    # disabled - No SELinux policy is loaded.

    SELINUX=disabled

    # SELINUXTYPE= can take one of these two values:

    # targeted - Targeted processes are protected,

    # mls - Multi Level Security protection.

    SELINUXTYPE=targeted

    尖叫提示:selinux本次生效關閉必須重啟,如果此時不想重啟,可以臨時生效之:

    [atguigu@hadoop102 flume]$ sudo setenforce 0

    5) 啟動ganglia

    [atguigu@hadoop102 flume]$ sudo service httpd start

    [atguigu@hadoop102 flume]$ sudo service gmetad start

    [atguigu@hadoop102 flume]$ sudo service gmond start

    6) 打開網頁瀏覽ganglia頁面

    http://192.168.1.102/ganglia

    尖叫提示:如果完成以上操作依然出現權限不足錯誤,請修改/var/lib/ganglia目錄的權限:

    [atguigu@hadoop102 flume]$ sudo chmod -R 777 /var/lib/ganglia

    4.2 操作Flume測試監控

    1) 修改/opt/module/flume/conf目錄下的flume-env.sh配置:

    JAVA_OPTS="-Dflume.monitoring.type=ganglia

    -Dflume.monitoring.hosts=192.168.1.102:8649

    -Xms100m

    -Xmx200m"

    2) 啟動Flume任務

    [atguigu@hadoop102 flume]$ bin/flume-ng agent \

    --conf conf/ \

    --name a1 \

    --conf-file job/flume-telnet-logger.conf \

    -Dflume.root.logger==INFO,console \

    -Dflume.monitoring.type=ganglia \

    -Dflume.monitoring.hosts=192.168.1.102:8649

    3) 發送數據觀察ganglia監測圖

    [atguigu@hadoop102 flume]$ telnet localhost 44444

    樣式如圖:

    圖例說明:

    字段(圖表名稱)

    字段含義

    EventPutAttemptCount

    source嘗試寫入channel的事件總數量

    EventPutSuccessCount

    成功寫入channel且提交的事件總數量

    EventTakeAttemptCount

    sink嘗試從channel拉取事件的總數量。這不意味着每次事件都被返回,因為sink拉取的時候channel可能沒有任何數據。

    EventTakeSuccessCount

    sink成功讀取的事件的總數量

    StartTime

    channel啟動的時間(毫秒)

    StopTime

    channel停止的時間(毫秒)

    ChannelSize

    目前channel中事件的總數量

    ChannelFillPercentage

    channel占用百分比

    ChannelCapacity

    channel的容量

    第5章 Flume高級之自定義MySQLSource

    5.1 自定義Source說明

    Source是負責接收數據到Flume Agent的組件。Source組件可以處理各種類型、各種格式的日志數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source類型已經很多,但是有時候並不能滿足實際開發當中的需求,此時我們就需要根據實際需求自定義某些Source。

    如:實時監控MySQL,從MySQL中獲取數據傳輸到HDFS或者其他存儲框架,所以此時需要我們自己實現MySQLSource。

    官方也提供了自定義source的接口:

    官網說明:https://flume.apache.org/FlumeDeveloperGuide.html#source

    5.3 自定義MySQLSource組成

    圖6-1 自定義MySQLSource組成

    5.2 自定義MySQLSource步驟

    根據官方說明自定義MySqlSource需要繼承AbstractSource類並實現Configurable和PollableSource接口。

    實現相應方法:

    getBackOffSleepIncrement()//暫不用

    getMaxBackOffSleepInterval()//暫不用

    configure(Context context)//初始化context

    process()//獲取數據(從MySql獲取數據,業務處理比較復雜,所以我們定義一個專門的類——SQLSourceHelper來處理跟MySql的交互),封裝成Event並寫入Channel,這個方法被循環調用

    stop()//關閉相關的資源

    5.4 代碼實現

    5.4.1 導入Pom依賴

    <dependencies>

    <dependency>

    <groupId>org.apache.flume</groupId>

    <artifactId>flume-ng-core</artifactId>

    <version>1.7.0</version>

    </dependency>

    <dependency>

    <groupId>mysql</groupId>

    <artifactId>mysql-connector-java</artifactId>

    <version>5.1.27</version>

    </dependency>

    </dependencies>

    5.4.2 添加配置信息

    在ClassPath下添加jdbc.properties和log4j. properties

    jdbc.properties:

    dbDriver=com.mysql.jdbc.Driver

    dbUrl=jdbc:mysql://hadoop102:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8
    dbUser=root
    dbPassword=000000

    log4j. properties:

    #--------console-----------

    log4j.rootLogger=info,myconsole,myfile
    log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
    log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
    #log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n

    #log4j.rootLogger=error,myfile
    log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
    log4j.appender.myfile.File=/tmp/flume.log
    log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n

    5.4.3 SQLSourceHelper

    1)屬性說明:

    屬性

    說明(括號中為默認值)

    runQueryDelay    

    查詢時間間隔(10000)

    batchSize

    緩存大小(100)

    startFrom

    查詢語句開始id(0)

    currentIndex

    查詢語句當前id,每次查詢之前需要查元數據表

    recordSixe

    查詢返回條數

    table

    監控的表名

    columnsToSelect

    查詢字段(*)

    customQuery

    用戶傳入的查詢語句

    query

    查詢語句

    defaultCharsetResultSet

    編碼格式(UTF-8)

    2)方法說明:

    方法

    說明

    SQLSourceHelper(Context context)

    構造方法,初始化屬性及獲取JDBC連接

    InitConnection(String url, String user, String pw)

    獲取JDBC連接

    checkMandatoryProperties()

    校驗相關屬性是否設置(實際開發中可增加內容)

    buildQuery()

    根據實際情況構建sql語句,返回值String

    executeQuery()

    執行sql語句的查詢操作,返回值List<List<Object>>

    getAllRows(List<List<Object>> queryResult)

    將查詢結果轉換為String,方便后續操作

    updateOffset2DB(int size)

    根據每次查詢結果將offset寫入元數據表

    execSql(String sql)

    具體執行sql語句方法

    getStatusDBIndex(int startFrom)

    獲取元數據表中的offset

    queryOne(String sql)

    獲取元數據表中的offset實際sql語句執行方法

    close()

    關閉資源

    3)代碼分析

    4)代碼實現:

    package com.atguigu.source;

     

    import org.apache.flume.Context;

    import org.apache.flume.conf.ConfigurationException;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

     

    import java.io.IOException;

    import java.sql.*;

    import java.text.ParseException;

    import java.util.ArrayList;

    import java.util.List;

    import java.util.Properties;

     

    public class SQLSourceHelper {

     

    private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);

     

    private int runQueryDelay, //兩次查詢的時間間隔

    startFrom, //開始id

    currentIndex,     //當前id

    recordSixe = 0, //每次查詢返回結果的條數

    maxRow; //每次查詢的最大條數

     

    private String table, //要操作的表

    columnsToSelect, //用戶傳入的查詢的列

    customQuery, //用戶傳入的查詢語句

    query, //構建的查詢語句

    defaultCharsetResultSet;//編碼集

     

    //上下文,用來獲取配置文件

    private Context context;

     

    //為定義的變量賦值(默認值),可在flume任務的配置文件中修改

    private static final int DEFAULT_QUERY_DELAY = 10000;

    private static final int DEFAULT_START_VALUE = 0;

    private static final int DEFAULT_MAX_ROWS = 2000;

    private static final String DEFAULT_COLUMNS_SELECT = "*";

    private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";

     

    private static Connection conn = null;

    private static PreparedStatement ps = null;

    private static String connectionURL, connectionUserName, connectionPassword;

     

    //加載靜態資源

    static {

     

    Properties p = new Properties();

     

    try {

    p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));

    connectionURL = p.getProperty("dbUrl");

    connectionUserName = p.getProperty("dbUser");

    connectionPassword = p.getProperty("dbPassword");

    Class.forName(p.getProperty("dbDriver"));

     

    } catch (IOException | ClassNotFoundException e) {

    LOG.error(e.toString());

    }

    }

     

    //獲取JDBC連接

    private static Connection InitConnection(String url, String user, String pw) {

    try {

     

    Connection conn = DriverManager.getConnection(url, user, pw);

     

    if (conn == null)

    throw new SQLException();

     

    return conn;

     

    } catch (SQLException e) {

    e.printStackTrace();

    }

     

    return null;

    }

     

    //構造方法

    SQLSourceHelper(Context context) throws ParseException {

     

    //初始化上下文

    this.context = context;

     

    //有默認值參數:獲取flume任務配置文件中的參數,讀不到的采用默認值

    this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);

     

    this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);

     

    this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);

     

    this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);

     

    //無默認值參數:獲取flume任務配置文件中的參數

    this.table = context.getString("table");

    this.customQuery = context.getString("custom.query");

     

    connectionURL = context.getString("connection.url");

     

    connectionUserName = context.getString("connection.user");

     

    connectionPassword = context.getString("connection.password");

     

    conn = InitConnection(connectionURL, connectionUserName, connectionPassword);

     

    //校驗相應的配置信息,如果沒有默認值的參數也沒賦值,拋出異常

    checkMandatoryProperties();

     

    //獲取當前的id

    currentIndex = getStatusDBIndex(startFrom);

     

    //構建查詢語句

    query = buildQuery();

    }

     

    //校驗相應的配置信息(表,查詢語句以及數據庫連接的參數)

    private void checkMandatoryProperties() {

     

    if (table == null) {

    throw new ConfigurationException("property table not set");

    }

     

    if (connectionURL == null) {

    throw new ConfigurationException("connection.url property not set");

    }

     

    if (connectionUserName == null) {

    throw new ConfigurationException("connection.user property not set");

    }

     

    if (connectionPassword == null) {

    throw new ConfigurationException("connection.password property not set");

    }

    }

     

    //構建sql語句

    private String buildQuery() {

     

    String sql = "";

     

    //獲取當前id

    currentIndex = getStatusDBIndex(startFrom);

    LOG.info(currentIndex + "");

     

    if (customQuery == null) {

    sql = "SELECT " + columnsToSelect + " FROM " + table;

    } else {

    sql = customQuery;

    }

     

    StringBuilder execSql = new StringBuilder(sql);

     

    //id作為offset

    if (!sql.contains("where")) {

    execSql.append(" where ");

    execSql.append("id").append(">").append(currentIndex);

     

    return execSql.toString();

    } else {

    int length = execSql.toString().length();

     

    return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;

    }

    }

     

    //執行查詢

    List<List<Object>> executeQuery() {

     

    try {

    //每次執行查詢時都要重新生成sql,因為id不同

    customQuery = buildQuery();

     

    //存放結果的集合

    List<List<Object>> results = new ArrayList<>();

     

    if (ps == null) {

    //

    ps = conn.prepareStatement(customQuery);

    }

     

    ResultSet result = ps.executeQuery(customQuery);

     

    while (result.next()) {

     

    //存放一條數據的集合(多個列)

    List<Object> row = new ArrayList<>();

     

    //將返回結果放入集合

    for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {

    row.add(result.getObject(i));

    }

     

    results.add(row);

    }

     

    LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());

     

    return results;

    } catch (SQLException e) {

    LOG.error(e.toString());

     

    // 重新連接

    conn = InitConnection(connectionURL, connectionUserName, connectionPassword);

     

    }

     

    return null;

    }

     

    //將結果集轉化為字符串,每一條數據是一個list集合,將每一個小的list集合轉化為字符串

    List<String> getAllRows(List<List<Object>> queryResult) {

     

    List<String> allRows = new ArrayList<>();

     

    if (queryResult == null || queryResult.isEmpty())

    return allRows;

     

    StringBuilder row = new StringBuilder();

     

    for (List<Object> rawRow : queryResult) {

     

    Object value = null;

     

    for (Object aRawRow : rawRow) {

     

    value = aRawRow;

     

    if (value == null) {

    row.append(",");

    } else {

    row.append(aRawRow.toString()).append(",");

    }

    }

     

    allRows.add(row.toString());

    row = new StringBuilder();

    }

     

    return allRows;

    }

     

    //更新offset元數據狀態,每次返回結果集后調用。必須記錄每次查詢的offset值,為程序中斷續跑數據時使用,以idoffset

    void updateOffset2DB(int size) {

    //source_tab做為KEY,如果不存在則插入,存在則更新(每個源表對應一條記錄)

    String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('"

    + this.table

    + "','" + (recordSixe += size)

    + "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";

     

    LOG.info("updateStatus Sql:" + sql);

     

    execSql(sql);

    }

     

    //執行sql語句

    private void execSql(String sql) {

     

    try {

    ps = conn.prepareStatement(sql);

     

    LOG.info("exec::" + sql);

     

    ps.execute();

    } catch (SQLException e) {

    e.printStackTrace();

    }

    }

     

    //獲取當前idoffset

    private Integer getStatusDBIndex(int startFrom) {

     

    //flume_meta表中查詢出當前的id是多少

    String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");

     

    if (dbIndex != null) {

    return Integer.parseInt(dbIndex);

    }

     

    //如果沒有數據,則說明是第一次查詢或者數據表中還沒有存入數據,返回最初傳入的值

    return startFrom;

    }

     

    //查詢一條數據的執行語句(當前id)

    private String queryOne(String sql) {

     

    ResultSet result = null;

     

    try {

    ps = conn.prepareStatement(sql);

    result = ps.executeQuery();

     

    while (result.next()) {

    return result.getString(1);

    }

    } catch (SQLException e) {

    e.printStackTrace();

    }

     

    return null;

    }

     

    //關閉相關資源

    void close() {

     

    try {

    ps.close();

    conn.close();

    } catch (SQLException e) {

    e.printStackTrace();

    }

    }

     

    int getCurrentIndex() {

    return currentIndex;

    }

     

    void setCurrentIndex(int newValue) {

    currentIndex = newValue;

    }

     

    int getRunQueryDelay() {

    return runQueryDelay;

    }

     

    String getQuery() {

    return query;

    }

     

    String getConnectionURL() {

    return connectionURL;

    }

     

    private boolean isCustomQuerySet() {

    return (customQuery != null);

    }

     

    Context getContext() {

    return context;

    }

     

    public String getConnectionUserName() {

    return connectionUserName;

    }

     

    public String getConnectionPassword() {

    return connectionPassword;

    }

     

    String getDefaultCharsetResultSet() {

    return defaultCharsetResultSet;

    }

    }

    5.4.4 MySQLSource

    代碼實現:

    package com.atguigu.source;

     

    import org.apache.flume.Context;

    import org.apache.flume.Event;

    import org.apache.flume.EventDeliveryException;

    import org.apache.flume.PollableSource;

    import org.apache.flume.conf.Configurable;

    import org.apache.flume.event.SimpleEvent;

    import org.apache.flume.source.AbstractSource;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

     

    import java.text.ParseException;

    import java.util.ArrayList;

    import java.util.HashMap;

    import java.util.List;

     

    public class SQLSource extends AbstractSource implements Configurable, PollableSource {

     

    //打印日志

    private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);

     

    //定義sqlHelper

    private SQLSourceHelper sqlSourceHelper;

     

     

    @Override

    public long getBackOffSleepIncrement() {

    return 0;

    }

     

    @Override

    public long getMaxBackOffSleepInterval() {

    return 0;

    }

     

    @Override

    public void configure(Context context) {

     

    try {

    //初始化

    sqlSourceHelper = new SQLSourceHelper(context);

    } catch (ParseException e) {

    e.printStackTrace();

    }

    }

     

    @Override

    public Status process() throws EventDeliveryException {

     

    try {

    //查詢數據表

    List<List<Object>> result = sqlSourceHelper.executeQuery();

     

    //存放event的集合

    List<Event> events = new ArrayList<>();

     

    //存放event頭集合

    HashMap<String, String> header = new HashMap<>();

     

    //如果有返回數據,則將數據封裝為event

    if (!result.isEmpty()) {

     

    List<String> allRows = sqlSourceHelper.getAllRows(result);

     

    Event event = null;

     

    for (String row : allRows) {

    event = new SimpleEvent();

    event.setBody(row.getBytes());

    event.setHeaders(header);

    events.add(event);

    }

     

    //event寫入channel

    this.getChannelProcessor().processEventBatch(events);

     

    //更新數據表中的offset信息

    sqlSourceHelper.updateOffset2DB(result.size());

    }

     

    //等待時長

    Thread.sleep(sqlSourceHelper.getRunQueryDelay());

     

    return Status.READY;

    } catch (InterruptedException e) {

    LOG.error("Error procesing row", e);

     

    return Status.BACKOFF;

    }

    }

     

    @Override

    public synchronized void stop() {

     

    LOG.info("Stopping sql source {} ...", getName());

     

    try {

    //關閉資源

    sqlSourceHelper.close();

    } finally {

    super.stop();

    }

    }

    }

    5.5 測試

    5.5.1 Jar包准備

    1) 將MySql驅動包放入Flume的lib目錄下

    [atguigu@hadoop102 flume]$ cp \

    /opt/sorfware/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar \

    /opt/module/flume/lib/

    2) 打包項目並將Jar包放入Flume的lib目錄下

    5.5.2 配置文件准備

    1)創建配置文件並打開

    [atguigu@hadoop102 job]$ touch mysql.conf

    [atguigu@hadoop102 job]$ vim mysql.conf

    2)添加如下內容

    # Name the components on this agent

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

     

    # Describe/configure the source

    a1.sources.r1.type = com.atguigu.source.SQLSource

    a1.sources.r1.connection.url = jdbc:mysql://192.168.9.102:3306/mysqlsource

    a1.sources.r1.connection.user = root

    a1.sources.r1.connection.password = 000000

    a1.sources.r1.table = student

    a1.sources.r1.columns.to.select = *

    #a1.sources.r1.incremental.column.name = id

    #a1.sources.r1.incremental.value = 0

    a1.sources.r1.run.query.delay=5000

     

    # Describe the sink

    a1.sinks.k1.type = logger

     

    # Describe the channel

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 1000

    a1.channels.c1.transactionCapacity = 100

     

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    5.5.3 MySql表准備

    1) 創建MySqlSource數據庫

    CREATE DATABASE mysqlsource

    2) 在MySqlSource數據庫下創建數據表Student和元數據表Flume_meta

    CREATE TABLE `student` (

    `id` int(11) NOT NULL AUTO_INCREMENT,

    `name` varchar(255) NOT NULL,

    PRIMARY KEY (`id`)

    );

    CREATE TABLE `flume_meta` (

    `source_tab` varchar(255) NOT NULL,

    `currentIndex` varchar(255) NOT NULL,

    PRIMARY KEY (`source_tab`)

    );

    1. 向數據表中添加數據

      1 zhangsan

      2 lisi

      3 wangwu

      4 zhaoliu

      1. 測試並查看結果

    2. 任務執行

      [atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 \

      --conf-file job/mysql.conf -Dflume.root.logger=INFO,console

    3. 結果展示,如圖6-2所示:

    圖6-2 結果展示

    第6章 知識擴展

    6.1 常見正則表達式語法

    元字符

    描述

    ^

    匹配輸入字符串的開始位置。如果設置了RegExp對象的Multiline屬性,^也匹配"\n""\r"之后的位置。

    $

    匹配輸入字符串的結束位置。如果設置了RegExp對象的Multiline屬性,$也匹配"\n""\r"之前的位置。

    *

    匹配前面的子表達式任意次。例如,zo*能匹配"z""zo"以及"zoo"*等價於{0,}

    +

    匹配前面的子表達式一次或多次(大於等於1次)。例如,"zo+"能匹配"zo"以及"zoo",但不能匹配"z"+等價於{1,}

    [a-z]

    字符范圍。匹配指定范圍內的任意字符。例如,"[a-z]"可以匹配"a""z"范圍內的任意小寫字母字符。

    注意:只有連字符在字符組內部時,並且出現在兩個字符之間時,才能表示字符的范圍; 如果出字符組的開頭,則只能表示連字符本身.

    6.2 練習

    案例需求:

    1)flume-1監控hive.log日志,flume-1的數據傳送給flume-2,flume-2將數據追加到本地文件,同時將數據傳輸到flume-3。

    2)flume-4監控本地另一個自己創建的文件any.txt,並將數據傳送給flume-3。

    3)flume-3將匯總數據寫入到HDFS。

    請先畫出結構圖,再開始編寫任務腳本。

    第7章 企業真實面試題(重點)

    7.1 你是如何實現Flume數據傳輸的監控的

    使用第三方框架Ganglia實時監控Flume。

    7.2 FlumeSourceSinkChannel的作用?你們Source是什么類型?

        1、作用

    (1)Source組件是專門用來收集數據的,可以處理各種類型、各種格式的日志數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy

    (2)Channel組件對采集到的數據進行緩存,可以存放在Memory或File中。

    (3)Sink組件是用於把數據發送到目的地的組件,目的地包括Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定義。

    2、我公司采用的Source類型為:

    (1)監控后台日志:exec

    (2)監控后台產生日志的端口:netcat

    Exec spooldir

    7.3 FlumeChannel Selectors

    7.4 Flume參數調優

    1. Source

    增加Source個(使用Tair Dir Source時可增加FileGroups個數)可以增大Source的讀取數據的能力。例如:當某一個目錄產生的文件過多時需要將這個文件目錄拆分成多個文件目錄,同時配置好多個Source 以保證Source有足夠的能力獲取到新產生的數據。

    batchSize參數決定Source一次批量運輸到Channel的event條數,適當調大這個參數可以提高Source搬運Event到Channel時的性能。

    2. Channel 

    type 選擇memory時Channel的性能最好,但是如果Flume進程意外掛掉可能會丟失數據。type選擇file時Channel的容錯性更好,但是性能上會比memory channel差。

    使用file Channel時dataDirs配置多個不同盤下的目錄可以提高性能。

    Capacity 參數決定Channel可容納最大的event條數。transactionCapacity 參數決定每次Source往channel里面寫的最大event條數和每次Sink從channel里面讀的最大event條數。transactionCapacity需要大於Source和Sink的batchSize參數。

    3. Sink 

    增加Sink的個數可以增加Sink消費event的能力。Sink也不是越多越好夠用就行,過多的Sink會占用系統資源,造成系統資源不必要的浪費。

    batchSize參數決定Sink一次批量從Channel讀取的event條數,適當調大這個參數可以提高Sink從Channel搬出event的性能。

    7.5 Flume的事務機制

    Flume的事務機制(類似數據庫的事務機制):Flume使用兩個獨立的事務分別負責從Soucrce到Channel,以及從Channel到Sink的事件傳遞。比如spooling directory source 為文件的每一行創建一個事件,一旦事務中所有的事件全部傳遞到Channel且提交成功,那么Soucrce就將該文件標記為完成。同理,事務以類似的方式處理從Channel到Sink的傳遞過程,如果因為某種原因使得事件無法記錄,那么事務將會回滾。且所有的事件都會保持到Channel中,等待重新傳遞。

    7.6 Flume采集數據會丟失嗎?

    不會,Channel存儲可以存儲在File中,數據傳輸自身有事務。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM