spark streaming連接kafka引發"partition.assignment.strategy"異常處理


  服務器運行環境:spark 2.4.4 + scall 2.11.12 + kafka 2.2.2

  由於業務相對簡單,kafka只有固定topics,所以一直使用下面腳本執行實時流計算

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 --py-files /data/service/xxx.zip /data/service/xxx.py

  代碼中使用pyspark.streaming.kafka的KafkaUtils來創建spark streaming與kafka的連接,運行了好長時間都沒有出現過問題

  隨着新業務接入,在新功能中kafka需要使用動態topics方式,要用到正則表達式,查了KafkaUtils源碼和相關資料,發現它不支持動態topics方式,需要使用spark-streaming-kafka-0-10才能支持

  查看文檔http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html 與 http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 后,使用結構化流structured-streaming來實現

  實現代碼:

import sys

from pyspark.sql import SparkSession

def process_row(row):
    # Write row to storage
    pass

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
        """, file=sys.stderr)
        sys.exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()

    # Create DataSet representing the stream of input lines from kafka
    ds = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\
        .load()\
        .selectExpr("CAST(value AS STRING)")

    ds.printSchema()
    query = ds.writeStream.foreach(process_row).start()
    query.awaitTermination()

  執行提交任務命令

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 /data/service/demo.py master:9092 subscribePattern event.log.*

  提交后一直報下面錯誤

org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value

  查了好多資料,都說需要添加參數,配置Kafka分區分配策略,並將readStream修改為:

    ds = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option("kafka.partition.assignment.strategy", "range")\
        .option(subscribeType, topics)\
        .load()

  再次運行異常信息改為無法連接kafka了,弄了整整一天人都快崩潰了還沒搞定

  還好最終查找https://xbuba.com/questions/44959483,大牛提示說有可能是kafka0.8版本的jar與kafka0.10的jar沖突原因造成的

  使用命令查找

find / -name 'spark-streaming-kafka*'
find / -name 'spark-sql-kafka*'

  發現在/root/.ivy2/cache/org.apache.spark/ 目錄下面存在spark-streaming-kafka-0-8_2.11 與 spark-sql-kafka-0-10_2.11 文件夾和相關的jar文件

  將spark-streaming-kafka-0-8_2.11刪除后執行代碼就正常運行了

  由於老腳本用的還是kafka0.8,為了兼容兩個版本能同時運行,需要將/root/.ivy2/cache/org.apache.spark/ 目錄下面kafka0.8與kafka0.10兩個版本的jar全部清除

  然后登錄https://repo1.maven.org/maven2/org/apache/spark/ 下載spark-streaming-kafka-0-8與spark-sql-kafka-0-10對應的jar下來,並將提交命令spark-submit的參數改為:

spark-submit --jars /data/service/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar --py-files /data/service/xxx.zip /data/service/xxx.py
spark-submit --jars /data/service/spark-sql-kafka-0-10_2.11-2.4.4.jar /data/service/demo.py master:9092 subscribePattern event.log.*

  修改后兩個腳本運行都沒有問題(PS:老腳本原想直接用org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4包來啟動,執行后直接暴錯,提示說要改為org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4才行)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM