Spark Streaming實時處理應用


1 框架一覽

  事件處理的架構圖如下所示。

locality

2 優化總結

  當我們第一次部署整個方案時,kafkaflume組件都執行得非常好,但是spark streaming應用需要花費4-8分鍾來處理單個batch。這個延遲的原因有兩點,一是我們使用DataFrame來強化數據,而強化數據需要從hive中讀取大量的數據; 二是我們的參數配置不理想。

  為了優化我們的處理時間,我們從兩方面着手改進:第一,緩存合適的數據和分區;第二,改變配置參數優化spark應用。運行spark應用的spark-submit命令如下所示。通過參數優化和代碼改進,我們顯著減少了處理時間,處理時間從4-8分鍾降到了低於25秒。

/opt/app/dev/spark-1.5.2/bin/spark-submit \
 --jars  \
/opt/cloudera/parcels/CDH/jars/zkclient-0.3.jar,/opt/cloudera/parcels/CDH/jars/kafka_2.10-0.8.1.1.jar,\
/opt/app/dev/jars/datanucleus-core-3.2.2.jar,/opt/app/dev/jars/datanucleus-api-jdo-3.2.1.jar,/opt/app/dev/jars/datanucleus-rdbms-3.2.1.jar \
--files /opt/app/dev/spark-1.5.2/conf/hive-site.xml,/opt/app/dev/jars/log4j-eir.properties \
--queue spark_service_pool \
--master yarn \
--deploy-mode cluster \
--conf "spark.ui.showConsoleProgress=false" \ --conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \ --conf "spark.sql.tungsten.enabled=false" \ --conf "spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory" \ --conf "spark.eventLog.enabled=true" \ --conf "spark.sql.codegen=false" \ --conf "spark.sql.unsafe.enabled=false" \ --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \ --conf "spark.streaming.backpressure.enabled=true" \ --conf "spark.locality.wait=1s" \ --conf "spark.streaming.blockInterval=1500ms" \ --conf "spark.shuffle.consolidateFiles=true" \ --driver-memory 10G \ --executor-memory 8G \ --executor-cores 20 \ --num-executors 20 \ --class com.bigdata.streaming.OurApp \ /opt/app/dev/jars/OurStreamingApplication.jar external_props.conf

  下面我們將詳細介紹這些改變的參數。

2.1 driver選項

  這里需要注意的是,driver運行在spark on yarn的集群模式下。因為spark streaming應用是一個長期運行的任務,生成的日志文件會很大。為了解決這個問題,我們限制了寫入日志的消息的條數, 並且用RollingFileAppender限制了它們的大小。我們也關閉了spark.ui.showConsoleProgress選項來禁用控制台日志消息。

  通過測試,我們的driver因為永久代空間填滿而頻繁發生內存耗盡(永久代空間是類、方法等存儲的地方,不會被重新分配)。將永久代空間的大小升高到6G可以解決這個問題。

spark.driver.extraJavaOptions=-XX:MaxPermSize=6G

2.2 垃圾回收

  因為我們的spark streaming應用程序是一個長期運行的進程,在處理一段時間之后,我們注意到GC暫停時間過長,我們想在后台減少或者保持這個時間。調整UseConcMarkSweepGC參數是一個技巧。

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \

2.3 禁用Tungsten

  Tungstenspark執行引擎主要的改進。但是它的第一個版本是有問題的,所以我們暫時禁用它。

spark.sql.tungsten.enabled=false
spark.sql.codegen=false
spark.sql.unsafe.enabled=false

2.4 啟用反壓

  Spark Streaming在批處理時間大於批間隔時間時會出現問題。換一句話說,就是spark讀取數據的速度慢於kafka數據到達的速度。如果按照這個吞吐量執行過長的時間,它會造成不穩定的情況。 即接收executor的內存溢出。設置下面的參數解決這個問題。

spark.streaming.backpressure.enabled=true

2.5 調整本地化和塊配置

  下面的兩個參數是互補的。一個決定了數據本地化到task或者executor等待的時間,另外一個被spark streaming receiver使用對數據進行組塊。塊越大越好,但是如果數據沒有本地化到executor,它將會通過網絡移動到 任務執行的地方。我們必須在這兩個參數間找到一個好的平衡,因為我們不想數據塊太大,並且也不想等待本地化太長時間。我們希望所有的任務都在幾秒內完成。

  因此,我們改變本地化選項從3s到1s,我們也改變塊間隔為1.5s。

--conf "spark.locality.wait=1s" \ --conf "spark.streaming.blockInterval=1500ms" \

2.6 合並臨時文件

  在ext4文件系統中,推薦開啟這個功能。因為這會產生更少的臨時文件。

--conf "spark.shuffle.consolidateFiles=true" \

2.7 開啟executor配置

  在你配置kafka Dstream時,你能夠指定並發消費線程的數量。然而,kafka Dstream的消費者會運行在相同的spark driver節點上面。因此,為了從多台機器上面並行消費kafka topic, 我們必須實例化多個Dstream。雖然可以在處理之前合並相應的RDD,但是運行多個應用程序實例,把它們都作為相同kafka consumer group的一部分。

  為了達到這個目的,我們設置20個executor,並且每個executor有20個核。

--executor-memory 8G
--executor-cores 20
--num-executors 20

2.8 緩存方法

  使用RDD之前緩存RDD,但是記住在下次迭代之前從緩存中刪除它。緩存那些需要使用多次的數據非常有用。然而,不要使分區數目過大。保持分區數目較低可以減少,最小化調度延遲。下面的公式是我們使用的分區數的計算公式。

# of executors * # of cores = # of partitions


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM