Spark中的多線程並發處理


Spark中的多任務處理

Spark的一個非常常見的用例是並行運行許多作業。 構建作業DAG后,Spark將這些任務分配到多個Executor上並行處理。
但這並不能幫助我們在同一個Spark應用程序中同時運行兩個完全獨立的作業,例如同時從多個數據源讀取數據並將它們寫到對應的存儲,或同時處理多個文件等。

每個spark應用程序都需要一個SparkSession(Context)來配置和執行操作。 SparkSession對象是線程安全的,可以根據需要傳遞給你的Spark應用程序。

順序執行的例子

import org.apache.spark.sql.SparkSession

object FancyApp {
  def def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    val df = spark.sparkContext.parallelize(1 to 100).toDF
    doFancyDistinct(df, "hdfs:///dis.parquet")
    doFancySum(df, "hdfs:///sum.parquet")
  }

  def doFancyDistinct(df: DataFrame, outPath: String) = df.distinct.write.parquet(outPath)


  def doFancySum(df: DataFrame, outPath: String) = df.agg(sum("value")).write.parquet(outPath)

}

優化后的例子

import org.apache.spark.sql.SparkSession
import import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._

object FancyApp {
  def def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    // Set number of threads via a configuration property
    val pool = Executors.newFixedThreadPool(5)
    // create the implicit ExecutionContext based on our thread pool
    implicit val xc = ExecutionContext.fromExecutorService(pool)
    val df = spark.sparkContext.parallelize(1 to 100).toDF
    val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
    val taskB = doFancySum(df, "hdfs:///sum.parquet")
    // Now wait for the tasks to finish before exiting the app
    Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
  }

  def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
    df.distinct.write.parquet(outPath)
  }

  def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
    df.agg(sum("value")).write.parquet(outPath) 
  }
}

java 實現例子

    val executors = Executors.newFixedThreadPool(threadPoolNum)
    val completionService = new ExecutorCompletionService[String](executors)
    for ((branch_id, dataList) <- summary) {
      logInfo(s"************** applicationId is ${applicationId} about Multi-threading starting: file is ${branch_id}")
      completionService.submit(new Callable[String] {
        override def call(): String = {
          new VerificationTest(spark, branch_id, dataList, separator).runJob()
          branch_id
        }
      })
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM