kafka產生的數據通過Flume存到HDFS中


試驗目標:

把kafka的生產者發出的數據流經由Flume放到HDFS來存儲。

試驗環境:

  java:1.8

  kafka:2.11

  flume:1.6

  hadoop:2.8.5

試驗流程:

1.進入zookeeper的bin目錄,啟動zookeeper

$ zkServer.sh start

2.配置Flume的conf文件

在flume下conf文件夾創建 flume.cof文件

agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink

agent.sources.kafkaSource.channels = memoryChannel
agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect=127.0.0.1:2181
agent.sources.kafkaSource.topic=flume-data

agent.sources.kafkaSource.kafka.consumer.timeout.ms=100
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=1000
agent.channels.memoryChannel.transactionCapacity=100
 
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://master:9000/usr/feiy/flume-data
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream

3.啟動hadoop分布式集群

$ start-all.sh

4.啟動kafka服務,並創建一個topic,讓flume來消費。

啟動kafka:

$ bin/kafka-server-start.sh -daemon ./config/server.properties  &

創建topic,主題名:flume-data

$ bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic flume-data

4.啟動flume,等待kafka傳輸消息

進入flume安裝目錄下的conf目錄,執行命令

$ bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name agent -Dflume.root.logger=INFO,console

5.向主kafka里面輸入數據

$ bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic flume-data

 

此時,你輸入的數據就會通過flume發送到HDFS里面

6.查看HDFS里面的文件

$ hadoop fs -ls /usr/feiy/flume-data

$ hadoop fs -cat /usr/feiy/flume-data/FlumeData.1551321145495

代碼試驗:

如果是用kafka代碼,獲取接口的數據,然后向flume里傳送,只需要將kafka中的代碼中的topic名字設置成服務器上的主題名即可:flume-data

 參考:https://blog.csdn.net/feinifi/article/details/73929015


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM