試驗目標:
把kafka的生產者發出的數據流經由Flume放到HDFS來存儲。
試驗環境:
java:1.8
kafka:2.11
flume:1.6
hadoop:2.8.5
試驗流程:
1.進入zookeeper的bin目錄,啟動zookeeper
$ zkServer.sh start
2.配置Flume的conf文件
在flume下conf文件夾創建 flume.cof文件
agent.sources = kafkaSource agent.channels = memoryChannel agent.sinks = hdfsSink agent.sources.kafkaSource.channels = memoryChannel agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource agent.sources.kafkaSource.zookeeperConnect=127.0.0.1:2181 agent.sources.kafkaSource.topic=flume-data agent.sources.kafkaSource.kafka.consumer.timeout.ms=100 agent.channels.memoryChannel.type=memory agent.channels.memoryChannel.capacity=1000 agent.channels.memoryChannel.transactionCapacity=100 agent.sinks.hdfsSink.type=hdfs agent.sinks.hdfsSink.channel = memoryChannel agent.sinks.hdfsSink.hdfs.path=hdfs://master:9000/usr/feiy/flume-data agent.sinks.hdfsSink.hdfs.writeFormat=Text agent.sinks.hdfsSink.hdfs.fileType=DataStream
3.啟動hadoop分布式集群
$ start-all.sh
4.啟動kafka服務,並創建一個topic,讓flume來消費。
啟動kafka:
$ bin/kafka-server-start.sh -daemon ./config/server.properties &
創建topic,主題名:flume-data
$ bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic flume-data
4.啟動flume,等待kafka傳輸消息
進入flume安裝目錄下的conf目錄,執行命令
$ bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name agent -Dflume.root.logger=INFO,console
5.向主kafka里面輸入數據
$ bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic flume-data
此時,你輸入的數據就會通過flume發送到HDFS里面
6.查看HDFS里面的文件
$ hadoop fs -ls /usr/feiy/flume-data
$ hadoop fs -cat /usr/feiy/flume-data/FlumeData.1551321145495
代碼試驗:
如果是用kafka代碼,獲取接口的數據,然后向flume里傳送,只需要將kafka中的代碼中的topic名字設置成服務器上的主題名即可:flume-data
參考:https://blog.csdn.net/feinifi/article/details/73929015