flume+kafka實現實時流式日志處理


環境搭建

一. 搭建kafka運行環境
1.安裝zookeeper :
配置環境變量ZOOKEEPER_HOME 修改zoo.cfg dataDir=./zookeeper-3.4.14/data
2.運行zookeeper:
cmd: zkserver
注:不能安裝最新版 會報錯 改為 zookeeper-3.4.14 之后報錯消失
3.安裝kafka:
修改config/server.properties log.dirs=/tmp/kafka-logs
4.運行kafka:
// bin/kafka-server-start.sh -daemon config/server.properties >kafka.log 2>&1 &
D:\kafka_2.12-2.6\kafka_2.12-2.6.0>.\bin\windows\kafka-server-start.bat .\config\server.properties

如果需要創建多個broker,創建對應的server2.properties,

然后 執行   .\bin\windows\kafka-server-start.bat .\config\server2.properties 就可以了

這里的節點broker我理解是存儲數據的小倉庫,如果創建了3個倉庫,其中一個被隨機選舉為leader負責數據的讀寫,而其他兩個節點作為slave 負責數據的備份,當leader掛掉時,生產者和消費者仍然正常執行,不受影響。這件事情就是由zookeeper來控制的,所以serverx.properties中需要配置zookeeper的服務器IP和端口

5.創建Topic:
D:\kafka_2.12-2.6\kafka_2.12-2.6.0\bin\windows> kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
提示:Created topic test. 則創建成功
錯誤:找不到或無法加載主類 kafka.admin.TopicCommand
原因:kafka的安裝包為xx-src 這種是源碼,需要編譯后才能運行,編譯需要gradle 和scala 比較麻煩 所以重新下載編譯好的安裝包 比較方便
6.開啟一個生產者:
D:\kafka_2.12-2.6\kafka_2.12-2.6.0\bin\windows> kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic test
7.開啟一個消費者:
D:\kafka_2.12-2.6\kafka_2.12-2.6.0\bin\windows> kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
8.實現通信:
在消費者cmd中輸入word 能夠在消費者cmd中出現,完成kafka通信,到現在為止有四個窗口:
1)zookeeper 127.0.0.1:2181
2)kafka 127.0.0.1:9092
3)消費者
4)生產者
9.關閉kafka:
kafka-server-stop.sh

二. 搭建flume運行環境:
1.安裝flume:
增加配置文件:/conf/example.conf
2.運行flume:
// D:\apache-flume-1.9.0-bin\bin> flume-ng agent -n a1 -c ../conf/ -f kafka_spool.conf -d flume.root.logger=INFO,console
D:\apache-flume-1.9.0-bin\bin> flume-ng agent --conf ../conf --conf-file ../conf/kafka_spool.conf --name a1 -property flume.root.logger=INFO,console
當控制台輸出:/127.0.0.1:44444 表示flume進程正常啟動了
3.conf文件:
kafka_netcat.conf :檢測端口變化 nc localhost 44444

kafka_spool.conf :檢測文件夾中的文件變化,執行后的文件名發生變化 xx.log -> xx.log.COMPLETED

#example.conf: A single-node flume configuration
#test kafka sink with spooldir source
 
#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
#Describe/configue the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
#這里是需要監控的文件夾路徑 a1.sources.r1.spoolDir = /usr/flume/logs a1.sources.r1.fileHeader = true #Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink #設置kafka的主題topic a1.sinks.k1.topic = kafka_spooldir_test #設置消費者編碼為UTF-8 a1.sinks.k1.custom.encoding=UTF-8 #綁定kafka主機以及端口號 a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave3:9092 #設置kafka序列化方式 a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder #use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

 

kafka_exec.conf :通過執行本地shell檢測某文件新增內容

flume官方文檔:http://flume.apache.org/FlumeUserGuide.html

kafka官方文檔中文版:https://blog.csdn.net/ld326/article/details/78118441

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM