Flume+Kafka整合


腳本生產數據---->flume采集數據----->kafka消費數據------->storm集群處理數據

 

日志文件使用log4j生成,滾動生成!

 

當前正在寫入的文件在滿足一定的數量閾值之后,需要重命名!!! 

 

flume+Kafka整合步驟及相關配置:(先安裝好zookeeper集群和Kafka集群)

配置flume:

1、下載flume

2、解壓flume安裝包

  cd  /export/servers/

  tar  -zxvf  apache-flume-1.6.0-bin.tar.gz

  ln  -s  apache-flume-1.6.0-bin  flume

3、創建flume配置文件

  cd  /export/servers/flume/conf/

  mkdir  myconf

  vi  exec.conf

  輸入一下內容:

  a1.sources=r1

  a1.channels=c1

  a1.sinks=k1

 

  a1.sources.r1.type=exec

  a1.sources.r1.command=tail -F /export/data/flume_sources/click_log/1.log

  a1.sources.r1.channels=c1

 

  a1.channels.c1.type=memory

  a1.channels.c1.capacity=10000

  a1.channels.c1.transactionCapacity=100

 

  a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

  a1.sinks.k1.topic=test

  a1.sinks.k1.brokerList=kafka01:9092

  a1.sinks.k1.requiredAcks=1

  a1.sinks.k1.batchSize=20

  a1.sinks.k1.channel=c1

4、准備目標數據的目錄

  mkdir  -p  /export/data/flume_sources/click_log

5、通過腳本創建目標文件並生產數據

  for((i=0;i<=50000;i++));

  do  echo "message-" + $i >> /export/data/flume_sources/click_log/1.log;

  done

注:腳本名稱為click_log_out.sh,需要使用root用戶賦權,chmod  +x  click_log_out.sh

6、開始打通所有流程

  一:啟動Kafka集群

    kafka-server-start.sh  /export/servers/kafka/config/server.properties

  二:創建一個topic並開啟consumer

    kafka-console-consumer.sh  --topic=test  --zookeeper  zk01:2181

  三:執行數據生產的腳本

    sh  click_log_out.sh

  四:啟動flume客戶端

    ./bin/flume_ng  agent  -n  a1  -c  conf  -f  conf/myconf/exec.conf  -Dflume.root.logger=INFO,console

  五:在第三步啟動的kafka consumer窗口查看效果


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM