通過flume收集其他機器上flume的監測數據,發送到本機的kafka進行消費。
環境:slave中安裝flume,master中安裝flume+kafka(這里用兩台虛擬機,也可以用三台以上)
masterIP 192.168.83.128 slaveIP 192.168.83.129
通過監控test.log文件的變化,收集變化信息發送到主機的flume中,再發送到kafka中進行消費
1、配置slave1在flume中配置conf目錄中的example.conf文件,沒有就創建一個
#Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec
#監控文件夾下的test.log文件 a1.sources.r1.command = tail -F /home/qq/pp/data/test.log a1.sources.r1.channels = c1 # Describe the sink ##sink端的avro是一個數據發送者 a1.sinks = k1 ##type設置成avro來設置發消息 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 ##下沉到master這台機器 a1.sinks.k1.hostname = 192.168.83.133 ##下沉到mini2中的44444 a1.sinks.k1.port = 44444 a1.sinks.k1.batch-size = 2 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2、master上配置flume/conf里面的example.conf(標紅的注意下)
#me the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source ##source中的avro組件是一個接收者服務 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # Describe the sink #a1.sinks.k1.type = logger #對於sink的配置描述 使用kafka做數據的消費 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = flume_kafka a1.sinks.k1.brokerList = 192.168.83.128:9092,192.168.83.129:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
3、向監聽文件寫入字符串(程序循環寫入,不用手動修改test.log文件了)
[root@s1 # cd /home/qq/pp/data [root@s1 home/qq/pp/data# while true > do > echo "toms" >> test.log > sleep 1 > done
4、查看上面的程序是否執行
#cd /home/qq/pp/data
#tail -f test.log
5、打開消息接收者master的flume
進入flume安裝目錄,執行如下語句
bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console
現在回打印出一些信息
6、啟動slave的flume
進入flume安裝目錄,執行如下語句
bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console
7、 進入master ---kafka安裝目錄
1)啟動zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2)啟動kafka服務
bin/kafka-server-start.sh -daemon config/server.properties
3)創建topic
kafka-topics.sh --create --topic flume_kafka --zookeeper 192.168.83.129:2181,192.168.83.128:2181 --partitions 2 --replication-factor 1
4)創建消費者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.83.128:9092,192.168.83.129:9092 --topic flume_kafka --from-beginning
5)然后就會看到消費之窗口打印寫入的信息,
8、此時啟動 eclipse實例(https://www.cnblogs.com/51python/p/10908660.html),注意修改ip以及topic
如果啟動不成功看看是不是kafka設置問題(https://www.cnblogs.com/51python/p/10919330.html第一步虛擬機部署)
啟動后會打印出結果(這是第二次測試不是用的toms而是hollo word測試的,此處只是一個實例)
ok!一個流程終於走完了!
參考:
https://blog.csdn.net/luozhonghua2014/article/details/80369469?utm_source=blogxgwz5
https://blog.csdn.net/wxgxgp/article/details/85701844
https://blog.csdn.net/tototuzuoquan/article/details/73203241