Flume的安裝及使用


Flume的安裝及使用

Flume的安裝

1、上傳至虛擬機,並解壓

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/soft/

在環境變量中增加如下命令,可以使用 soft 快速切換到 /usr/local/soft

alias soft='cd /usr/local/soft/'

2、重命名目錄,並配置環境變量

mv apache-flume-1.9.0-bin/ flume-1.9.0
vim /etc/profile
source /etc/profile

3、查看flume版本

flume-ng version
[root@master soft]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@master soft]# 

4、測試flume

  • 監控一個目錄,將數據打印出來

    • 配置文件
    # 首先先給agent起一個名字 叫a1
    # 分別給source channel sink取名字
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    # 分別對source、channel、sink進行配置
    
    # 配置source
    # 將source的類型指定為 spooldir 用於監聽一個目錄下文件的變化
    # 因為每個組件可能會出現相同的屬性名稱,所以在對每個組件進行配置的時候 
    # 需要加上 agent的名字.sources.組件的名字.屬性 = 屬性值
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/data/
    a1.sources.r1.fileSuffix = .ok
    a1.sources.r1.fileHeader = true
    
    # 給r1這個souces配置一個攔截器並取名為 i1
    a1.sources.r1.interceptors = i1
    # 將攔截器i1的類型設置為timestamp 會將處理數據的時間以毫秒的格式插入event的header中
    # a1.sources.r1.interceptors.i1.type = timestamp
    # 將攔截器i1的類型設置為regex_filter 會根據正則表達式過濾數據
    a1.sources.r1.interceptors.i1.type = regex_filter
    # 配置正則表達式
    a1.sources.r1.interceptors.i1.regex = \\d{3,6}
    # excludeEvents = true 表示將匹配到的過濾,未匹配到的放行
    a1.sources.r1.interceptors.i1.excludeEvents = true
    
    # 配置sink
    # 使用logger作為sink組件,可以將收集到數據直接打印到控制台
    a1.sinks.k1.type = logger
    
    # 配置channel
    # 將channel的類型設置為memory,表示將event緩存在內存中
    a1.channels.c1.type = memory
    
    # 組裝
    # 將sources的channels屬性指定為c1
    a1.sources.r1.channels = c1
    
    # 將sinks的channel屬性指定為c1
    a1.sinks.k1.channel = c1
    
    • 啟動agent
    flume-ng agent -n a1 -f ./spoolingtest.conf -Dflume.root.logger=DEBUG,console
    
    • 新建/root/data目錄
    mkdir /root/data
    
    • 在/root/data/目錄下新建文件,輸入內容,觀察flume進程打印的日志
    # 隨意在a.txt中加入一些內容
    vim /root/data/a.txt
    

5、flume的使用

  • spoolingToHDFS.conf

    • 配置文件
    # a表示給agent命名為a
    # 給source組件命名為r1
    a.sources = r1
    # 給sink組件命名為k1
    a.sinks = k1 
    # 給channel組件命名為c1
    a.channels = c1
    #指定spooldir的屬性
    a.sources.r1.type = spooldir 
    a.sources.r1.spoolDir = /root/data 
    a.sources.r1.fileHeader = true 
    a.sources.r1.interceptors = i1 
    a.sources.r1.interceptors.i1.type = timestamp
    #指定sink的類型
    a.sinks.k1.type = hdfs
    a.sinks.k1.hdfs.path = /flume/data/dir1
    # 指定文件名前綴
    a.sinks.k1.hdfs.filePrefix = student
    # 指定達到多少數據量寫一次文件 單位:bytes
    a.sinks.k1.hdfs.rollSize = 102400
    # 指定多少條寫一次文件
    a.sinks.k1.hdfs.rollCount = 1000
    # 指定文件類型為 流 來什么輸出什么
    a.sinks.k1.hdfs.fileType = DataStream
    # 指定文件輸出格式 為text
    a.sinks.k1.hdfs.writeFormat = text
    # 指定文件名后綴
    a.sinks.k1.hdfs.fileSuffix = .txt
    
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次會從channel里取多少數據
    a.channels.c1.transactionCapacity = 100
    # 組裝
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 在 /root/data/目錄下准備數據
    The Zen of Python, by Tim Peters
    
    Beautiful is better than ugly.
    Explicit is better than implicit.
    Simple is better than complex.
    Complex is better than complicated.
    Flat is better than nested.
    Sparse is better than dense.
    Readability counts.
    Special cases aren't special enough to break the rules.
    Although practicality beats purity.
    Errors should never pass silently.
    Unless explicitly silenced.
    In the face of ambiguity, refuse the temptation to guess.
    There should be one-- and preferably only one --obvious way to do it.
    Although that way may not be obvious at first unless you're Dutch.
    Now is better than never.
    Although never is often better than *right* now.
    If the implementation is hard to explain, it's a bad idea.
    If the implementation is easy to explain, it may be a good idea.
    Namespaces are one honking great idea -- let's do more of those!
    
    • 啟動agent
    flume-ng agent -n a -f ./spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
    
  • hbaseLogToHDFS

    • 配置文件
    # a表示給agent命名為a
    # 給source組件命名為r1
    a.sources = r1
    # 給sink組件命名為k1
    a.sinks = k1 
    # 給channel組件命名為c1
    a.channels = c1
    #指定exec的屬性
    a.sources.r1.type = exec 
    a.sources.r1.command = tail -f /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
    #指定sink的類型
    a.sinks.k1.type = hdfs
    a.sinks.k1.hdfs.path = /flume/data/dir2
    # 指定文件名前綴
    a.sinks.k1.hdfs.filePrefix = hbaselog
    # 指定達到多少數據量寫一次文件 單位:bytes
    a.sinks.k1.hdfs.rollSize = 102400
    # 指定多少條寫一次文件
    a.sinks.k1.hdfs.rollCount = 1000
    # 指定文件類型為 流 來什么輸出什么
    a.sinks.k1.hdfs.fileType = DataStream
    # 指定文件輸出格式 為text
    a.sinks.k1.hdfs.writeFormat = text
    # 指定文件名后綴
    a.sinks.k1.hdfs.fileSuffix = .txt
    
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次會從channel里取多少數據
    a.channels.c1.transactionCapacity = 100
    # 組裝
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
  • hbaselogToHBase

    • 在hbase中創建log表
    create 'log','cf1'
    
    • 配置文件
    # a表示給agent命名為a
    # 給source組件命名為r1
    a.sources = r1
    # 給sink組件命名為k1
    a.sinks = k1 
    # 給channel組件命名為c1
    a.channels = c1
    #指定exec的屬性
    a.sources.r1.type = exec 
    a.sources.r1.command = cat /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
    #指定sink的類型
    a.sinks.k1.type = hbase
    a.sinks.k1.table = log
    a.sinks.k1.columnFamily = cf1
    
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 100000
    # 表示sink每次會從channel里取多少數據
    a.channels.c1.transactionCapacity = 100
    # 組裝
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
  • netcatLogger

    監聽telnet端口

    • 安裝telnet
    yum install telnet
    
    • 配置文件
    # a表示給agent命名為a
    # 給source組件命名為r1
    a.sources = r1
    # 給sink組件命名為k1
    a.sinks = k1 
    # 給channel組件命名為c1
    a.channels = c1
    #指定netcat的屬性
    a.sources.r1.type = netcat 
    a.sources.r1.bind = 0.0.0.0 
    a.sources.r1.port = 8888 
    
    #指定sink的類型
    a.sinks.k1.type = logger
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次會從channel里取多少數據
    a.channels.c1.transactionCapacity = 100
    # 組裝
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 啟動

      • 先啟動agent
      flume-ng agent -n a -f ./netcatToLogger.conf -Dflume.root.logger=DEBUG,console
      
      • 在啟動telnet
      telnet master 8888
      
  • httpToLogger

    • 配置文件
    # a表示給agent命名為a
    # 給source組件命名為r1
    a.sources = r1
    # 給sink組件命名為k1
    a.sinks = k1 
    # 給channel組件命名為c1
    a.channels = c1
    #指定http的屬性
    a.sources.r1.type = http
    a.sources.r1.port = 6666 
    
    #指定sink的類型
    a.sinks.k1.type = logger
    #指定channel
    a.channels.c1.type = memory 
    a.channels.c1.capacity = 1000
    # 表示sink每次會從channel里取多少數據
    a.channels.c1.transactionCapacity = 100
    # 組裝
    a.sources.r1.channels = c1 
    a.sinks.k1.channel = c1
    
    • 啟動

      • 先啟動agent
      flume-ng agent -n a -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
      
      • 再使用curl發起一個http請求
      curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://master:666
      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM