flume+flume+kafka消息傳遞+storm消費


通過flume收集其他機器上flume的監測數據,發送到本機的kafka進行消費。

環境:slave中安裝flume,master中安裝flume+kafka(這里用兩台虛擬機,也可以用三台以上)

masterIP 192.168.83.128    slaveIP 192.168.83.129

通過監控test.log文件的變化,收集變化信息發送到主機的flume中,再發送到kafka中進行消費

1、配置slave1在flume中配置conf目錄中的example.conf文件,沒有就創建一個

#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
#監控文件夾下的test.log文件 a1.sources.r1.command
= tail -F /home/qq/pp/data/test.log a1.sources.r1.channels = c1 # Describe the sink ##sink端的avro是一個數據發送者 a1.sinks = k1 ##type設置成avro來設置發消息 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 ##下沉到master這台機器 a1.sinks.k1.hostname = 192.168.83.133 ##下沉到mini2中的44444 a1.sinks.k1.port = 44444 a1.sinks.k1.batch-size = 2 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

2、master上配置flume/conf里面的example.conf(標紅的注意下)

#me the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
##source中的avro組件是一個接收者服務
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink
#a1.sinks.k1.type = logger
#對於sink的配置描述 使用kafka做數據的消費 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.brokerList = 192.168.83.128:9092,192.168.83.129:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

3、向監聽文件寫入字符串(程序循環寫入,不用手動修改test.log文件了)

[root@s1 # cd /home/qq/pp/data
[root@s1 home/qq/pp/data# while true
> do
> echo "toms" >> test.log
> sleep 1
> done

4、查看上面的程序是否執行

#cd /home/qq/pp/data
#tail -f test.log

5、打開消息接收者master的flume

進入flume安裝目錄,執行如下語句

bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console

現在回打印出一些信息

6、啟動slave的flume

進入flume安裝目錄,執行如下語句

bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console

7、 進入master ---kafka安裝目錄

    1)啟動zookeeper

      bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

    2)啟動kafka服務

      bin/kafka-server-start.sh -daemon config/server.properties 

    3)創建topic

kafka-topics.sh --create --topic flume_kafka  --zookeeper 192.168.83.129:2181,192.168.83.128:2181 --partitions 2 --replication-factor 1

    4)創建消費者

bin/kafka-console-consumer.sh --bootstrap-server 192.168.83.128:9092,192.168.83.129:9092 --topic flume_kafka --from-beginning

    5)然后就會看到消費之窗口打印寫入的信息,  

                    

8、此時啟動 eclipse實例(https://www.cnblogs.com/51python/p/10908660.html),注意修改ip以及topic

 如果啟動不成功看看是不是kafka設置問題(https://www.cnblogs.com/51python/p/10919330.html第一步虛擬機部署)

   啟動后會打印出結果(這是第二次測試不是用的toms而是hollo word測試的,此處只是一個實例)

 

 

 

 ok!一個流程終於走完了!

 

 

參考: 

https://blog.csdn.net/luozhonghua2014/article/details/80369469?utm_source=blogxgwz5

https://blog.csdn.net/wxgxgp/article/details/85701844

https://blog.csdn.net/tototuzuoquan/article/details/73203241

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM