Flume:source和sink


Flume – 初識flume、source和sink

目錄
基本概念
常用源 Source
常用sink

 

基本概念

 什么叫flume?
  分布式,可靠的大量日志收集、聚合和移動工具。

 events
  事件,是一行數據的字節數據,是flume發送文件的基本單位。

 flume配置文件
  重命名flume-env.sh.template為flume-env.sh,並添加[export JAVA_HOME=/soft/jdk]

 flume的Agent
  source //從哪兒讀數據。 負責監控並收集數據。相對於channel是生產者。
  channel //數據通道。 通道,相當於數據緩沖區。
  sink //將數據傳送往哪兒。 沉槽,負責將數據放置在指定位置。相對於channel是消費者。

 flume如何使用
  在flume的conf文件下,創建conf后綴的文件,使用flume命令啟動

 flume命令
  啟動:flume-ng agent -f /soft/flume/conf/example.conf -n a1

常用源 Source

 執行源:Exec Sour //通過linux命令作為source。缺點:失敗后數據會丟失,不能保證數據的完整性。
  #定義源:exec
  a1.source.r1.type = exec
  a1.source.r1.command = tail -F /home/centos/1.txt
 滾動目錄源:Spooling Directory Source //監控目錄,如果目錄下有新文件產生,機會將其消費
  #定義源:spoodir
  a1.source.r1.type = spooldir
  #指定監控目錄
  a1.source.r1.spoolDir = /home/centos/log
 指定類型的文件:tailDir source #監控目錄中指定類型的文件,並監控其消費偏移量;
 通過~/.flume/taildir_position.json監控並實時記錄文件偏移量,可通過a1.sources.r1.positionFile配置進行修改
  #定義源:TAILDIR
  a1.source.r1.type = TAILDIR
  #指定監控文件組
  a1.source.r1.filegroups = g1
  #指定g1組中包含的文件
  a1.source.r1.filegroups.g1 = /home/centos/log/.*log
 順序數字源:Sequence Generator Source //產生順序數字的源,用作測試
  #定義源:seq
  a1.source.r1.type = seq
  #定義一次RPC產生的批次數量
  a1.source.r1.batchSize = 1024
 壓力源:Stress Source //測試集群壓力,用作負載測試
  #定義源:stress
  a1.source.r1.type = org.apache.flume.source.StressSource
  #一個event產生的數據量
  a1.source.r1.size = 1073741824

常用sink

 日志&控制台:logger sink
  a1.sinks.k1.type = logger
 存儲在本地文件:File Roll Sink
  #設置滾動文件sink
  a1.sinks.k1.type = file_roll
  #指定文件位置。若文件不存在會報錯
  a1.sinks.k1.directory = /home/centos/log2
  #設置滾動周期間隔,0即不滾動;默認30s。
  a1.sinks.k1.sink.rollInterval = 0
 寫入到hdfsL:HDFS Sink //默認SequenceFile,可以通過hdfs.fileType指定(SequenceFile, DataStream or CompressedStream)
  #指定類型
  a1.sinks.k1.type = hdfs
  #指定路徑,不用單獨創建文件夾
  a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H
  #時間相關的配置,必須指定時間戳
  a1.sinks.k1.hdfs.useLocalTimeStamp = true
  #實例化文件的前綴
  a1.sinks.k1.hdfs.filePrefix = events-
  #滾動間隔,0為不滾動
  a1.sinks.k1.hdfs.rollInterval = 0
  #滾動大小;默認1024
  a1.sinks.k1.hdfs.rollSize = 1024
  #指定數據類型;默認為 sequenceFile
  a1.sinks.k1.hdfs.fileType = CompressedStream
  #指定壓縮編解碼器
  a1.sinks.k1.hdfs.codeC = gzip
 寫入到Hbase:hbase sink //需要創建表,無法指定rowkey和col
  #設置類型
  a1.sinks.k1.type = hbase
  a1.sinks.k1.table = ns1:flume
  a1.sinks.k1.columnFaminly = f1
 寫入到Hbase:regexhbase sink //需要創建表,可以手動指定rowKey和col
  #設置正則hbase類型
  a1.sinks.k1.type = hbase
  a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
  #手動指定rowkey和列,[ROW_KEY]必須些,且大寫
  a1.sinks.k1.serializer.colNames = ROW_KEY,name,age
  #指定正則,與col對應
  a1.sinks.k1.serializer.regex = (.*),(.*),(.*)
  #指定rowkey索引
  a1.sinks.k1.serializer.rowKeyIndex = 0
  a1.sinks.k1.table = ns1:flume
  a1.sinks.k1.coluFamily = f1
 寫入到Hive:hive sink //需要啟動hive的事務性
  # 設置hive sink
  a1.sinks.k1.type = hive
  # 需要啟動hive的metastore:hive --service metastore //metastore源數據倉庫
  a1.sinks.k1.hive.metastore = thrift://s101:9083
  a1.sinks.k1.hive.database = default
  # 需要創建事務表
  a1.sinks.k1.hive.table = tx2
  # 指定列和字段的映射
  a1.sinks.k1.serializer = DELIMITED
  # 指定輸入的格式,必須是雙引號
  a1.sinks.k1.serializer.delimiter = "\t"
  # 指定hive存儲文件展現方式,必須是單引號
  a1.sinks.k1.serializer.serdeSeparator = '\t'
  a1.sinks.k1.serializer.fieldnames =id,name,age
 寫入到hive補充
  1、首先將/soft/hive/hcatalog/share/hcatalog中的所有jar拷貝到hive的lib庫中
    cp /soft/hive/hcatalog/share/hcatalog/* /soft/hive/lib/
  2、啟動hive的metastore
    hive --service metastore
  3、啟動hive並創建事務表
    SET hive.support.concurrency = true;
    SET hive.enforce.bucketing = true;
    SET hive.exec.dynamic.partition.mode = nonstrict;
    SET hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
    SET hive.compactor.initiator.on = true;
    SET hive.compactor.worker.threads = 1;
    create table tx2(id int ,name string, age int ) clustered by (id) into 2 buckets stored as orc TBLPROPERTIES('transactional'='true');
  4、啟動flume,並使用以上的配置文件
    flume-ng agent -f k_hive.conf -n a1
  5、輸入數據驗證
    1 tom 18

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM