服務器運行環境:spark 2.4.4 + scall 2.11.12 + kafka 2.2.2
由於業務相對簡單,kafka只有固定topics,所以一直使用下面腳本執行實時流計算
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 --py-files /data/service/xxx.zip /data/service/xxx.py
代碼中使用pyspark.streaming.kafka的KafkaUtils來創建spark streaming與kafka的連接,運行了好長時間都沒有出現過問題
隨着新業務接入,在新功能中kafka需要使用動態topics方式,要用到正則表達式,查了KafkaUtils源碼和相關資料,發現它不支持動態topics方式,需要使用spark-streaming-kafka-0-10才能支持
查看文檔http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html 與 http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 后,使用結構化流structured-streaming來實現
實現代碼:
import sys from pyspark.sql import SparkSession def process_row(row): # Write row to storage pass if __name__ == "__main__": if len(sys.argv) != 4: print(""" Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics> """, file=sys.stderr) sys.exit(-1) bootstrapServers = sys.argv[1] subscribeType = sys.argv[2] topics = sys.argv[3] spark = SparkSession\ .builder\ .appName("StructuredKafkaWordCount")\ .getOrCreate() # Create DataSet representing the stream of input lines from kafka ds = spark\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", bootstrapServers)\ .option(subscribeType, topics)\ .load()\ .selectExpr("CAST(value AS STRING)") ds.printSchema() query = ds.writeStream.foreach(process_row).start() query.awaitTermination()
執行提交任務命令
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 /data/service/demo.py master:9092 subscribePattern event.log.*
提交后一直報下面錯誤
org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value
查了好多資料,都說需要添加參數,配置Kafka分區分配策略,並將readStream修改為:
ds = spark\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", bootstrapServers)\ .option("kafka.partition.assignment.strategy", "range")\ .option(subscribeType, topics)\ .load()
再次運行異常信息改為無法連接kafka了,弄了整整一天人都快崩潰了還沒搞定
還好最終查找https://xbuba.com/questions/44959483,大牛提示說有可能是kafka0.8版本的jar與kafka0.10的jar沖突原因造成的
使用命令查找
find / -name 'spark-streaming-kafka*' find / -name 'spark-sql-kafka*'
發現在/root/.ivy2/cache/org.apache.spark/ 目錄下面存在spark-streaming-kafka-0-8_2.11 與 spark-sql-kafka-0-10_2.11 文件夾和相關的jar文件
將spark-streaming-kafka-0-8_2.11刪除后執行代碼就正常運行了
由於老腳本用的還是kafka0.8,為了兼容兩個版本能同時運行,需要將/root/.ivy2/cache/org.apache.spark/ 目錄下面kafka0.8與kafka0.10兩個版本的jar全部清除
然后登錄https://repo1.maven.org/maven2/org/apache/spark/ 下載spark-streaming-kafka-0-8與spark-sql-kafka-0-10對應的jar下來,並將提交命令spark-submit的參數改為:
spark-submit --jars /data/service/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar --py-files /data/service/xxx.zip /data/service/xxx.py spark-submit --jars /data/service/spark-sql-kafka-0-10_2.11-2.4.4.jar /data/service/demo.py master:9092 subscribePattern event.log.*
修改后兩個腳本運行都沒有問題(PS:老腳本原想直接用org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4包來啟動,執行后直接暴錯,提示說要改為org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4才行)