pyspark kafka createDirectStream和createStream 區別


from pyspark.streaming.kafka import KafkaUtils

 kafkaStream = KafkaUtils.createStream(streamingContext, \
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])


 from pyspark.streaming.kafka import KafkaUtils
 directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

就是參數不一樣。其中createStream用的ZK quorum是zk的2181端口。而createDirectStream用的是kafka進程9092端口。

Kafka的進程ID為9300,占用端口為9092

QuorumPeerMain為對應的zookeeper實例,進程ID為6379,在2181端口監聽

 

 

所以在運行官方例子時候

一個是

./bin/spark-submit --jars ~/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar examples/src/main/python/streaming/direct_kafka_wordcount.py localhost:9092 test

另外一個是:

 ./bin/spark-submit --jars ~/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar examples/src/main/python/streaming/direct_kafka_wordcount.py localhost:2181 test

 

參考:

https://spark.apache.org/docs/1.6.1/streaming-kafka-integration.html

http://zhangfengzhe.blog.51cto.com/8855103/1556650

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM