spark 調優 多線程並行處理任務


方式1:

1. 明確 Spark中Job 與 Streaming中 Job 的區別
1.1 Spark Core

一個 RDD DAG Graph 可以生成一個或多個 Job(Action操作)

一個Job可以認為就是會最終輸出一個結果RDD的一條由RDD組織而成的計算

Job在spark里應用里是一個被調度的單位
1.2 Streaming

一個 batch 的數據對應一個 DStreamGraph

而一個 DStreamGraph 包含一或多個關於 DStream 的輸出操作

每一個輸出對應於一個Job,一個 DStreamGraph 對應一個JobSet,里面包含一個或多個Job
2. Streaming Job的並行度

Job的並行度由兩個配置決定:

    spark.scheduler.mode(FIFO/FAIR)
    spark.streaming.concurrentJobs

一個 Batch 可能會有多個 Action 執行,比如注冊了多個 Kafka 數據流,每個Action都會產生一個Job

所以一個 Batch 有可能是一批 Job,也就是 JobSet 的概念

這些 Job 由 jobExecutor 依次提交執行

而 JobExecutor 是一個默認池子大小為1的線程池,所以只能執行完一個Job再執行另外一個Job

這里說的池子,大小就是由spark.streaming.concurrentJobs 控制的

concurrentJobs 決定了向 Spark Core 提交Job的並行度

提交一個Job,必須等這個執行完了,才會提交第二個

假設我們把它設置為2,則會並發的把 Job 提交給 Spark Core

Spark 有自己的機制決定如何運行這兩個Job,這個機制其實就是FIFO或者FAIR(決定了資源的分配規則)

默認是 FIFO,也就是先進先出,把 concurrentJobs 設置為2,但是如果底層是FIFO,那么會優先執行先提交的Job

雖然如此,如果資源夠兩個job運行,還是會並行運行兩個Job     

Spark Streaming 不同Batch任務可以並行計算么    https://developer.aliyun.com/article/73004

 

conf.setMaster("local[4]")
    conf.set("spark.streaming.concurrentJobs", "3") //job 並行對
    conf.set("spark.scheduler.mode", "FIFO")   
    val sc = new StreamingContext(conf, Seconds(5))

你會發現,不同batch的job其實也可以並行運行的,這里需要有幾個條件:

  1. 有延時發生了,batch無法在本batch完成
  2. concurrentJobs > 1
  3. 如果scheduler mode 是FIFO則需要某個Job無法一直消耗掉所有資源

Mode是FAIR則盡力保證你的Job是並行運行的,毫無疑問是可以並行的。 

 

 

 

方式2:

原文鏈接:https://blog.csdn.net/qq_21277411/java/article/details/86572944

場景1:

程序每次處理的數據量是波動的,比如周末比工作日多很多,晚八點比凌晨四點多很多。
一個spark程序處理的時間在1-2小時波動是OK的。而spark streaming程序不可以,如果每次處理的時間是1-10分鍾,就很蛋疼。
設置10分鍾吧,實際上10分鍾的也就那一段高峰時間,如果設置每次是1分鍾,很多時候會出現程序處理不過來,排隊過多的任務延遲更久,還可能出現程序崩潰的可能。
場景2:

程序需要處理的相似job數隨着業務的增長越來越多
我們知道spark的api里無相互依賴的stage是並行處理的,但是job之間是串行處理的。
spark程序通常是離線處理,比如T+1之類的延遲,時間變長是可以容忍的。而spark streaming是准實時的,如果業務增長導致延遲增加就很不合理。

spark雖然是串行執行job,但是是可以把job放到線程池里多線程執行的。如何在一個SparkContext中提交多個任務

 

DStream.foreachRDD{
      rdd =>
        //創建線程池
        val executors=Executors.newFixedThreadPool(rules.length)
        //將規則放入線程池
        for( ru <- rules){
          val task= executors.submit(new Callable[String] {
            override def call(): String ={
              //執行規則
              runRule(ru,spark)
            }
          })
        }
        //每次創建的線程池執行完所有規則后shutdown
        executors.shutdown()
    }

原文鏈接:https://blog.csdn.net/qq_21277411/java/article/details/86572944

注意點

1.最后需要executors.shutdown()。
如果是executors.shutdownNow()會發生未執行完的task強制關閉線程。
如果使用executors.awaitTermination()則會發生阻塞,不是我們想要的結果。
如果沒有這個shutdowm操作,程序會正常執行,但是長時間會產生大量無用的線程池,因為每次foreachRDD都會創建一個線程池。

2.可不可以將創建線程池放到foreachRDD外面?
不可以,這個關系到對於scala閉包到理解,經測試,第一次或者前幾次batch是正常的,后面的batch無線程可用。

3.線程池executor崩潰了就會導致數據丟失
原則上是這樣的,但是正常的代碼一般不會發生executor崩潰。至少我在使用的時候沒遇到過。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM