kafka+pyspark


安裝kafka

kafka 三部分 server producer consumer

pyspark 監控

一、環境部署

1.導入對應版本的spark-streaming-kafka-*-*.jar

2.相應jar追加到SPARK_DIST_CLASSPATH

二、kafka+spark測試

1.啟動kafka的server和producer

2.代碼

from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)

sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)

zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

3.啟動 開始監控生產者 即時計算詞頻數

4.注意各個版本匹配問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM