關於Spark中的多任務並發處理(Concurrency)


 

 

 

Spark中的多任務處理

Spark的一個非常常見的用例是並行運行許多作業。 構建作業DAG后,Spark將這些任務分配到多個Executor上並行處理。
但這並不能幫助我們在同一個Spark應用程序中同時運行兩個完全獨立的作業,例如同時從多個數據源讀取數據並將它們寫到對應的存儲,或同時處理多個文件等。

每個spark應用程序都需要一個SparkSession(Context)來配置和執行操作。 SparkSession對象是線程安全的,可以根據需要傳遞給你的Spark應用程序。

一個順序作業的例子

假設我們有一個spark 2.x應用程序,負責將幾個數據寫入到HDFS中。

import org.apache.spark.sql.SparkSession

object FancyApp {
  def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    val df = spark.sparkContext.parallelize(1 to 100).toDF
    doFancyDistinct(df, "hdfs:///dis.parquet")
    doFancySum(df, "hdfs:///sum.parquet")
  }

  def doFancyDistinct(df: DataFrame, outPath: String) = df.distinct.write.parquet(outPath)
  
  def doFancySum(df: DataFrame, outPath: String) = df.agg(sum("value")).write.parquet(outPath)

}

這個程序看起來沒有什么問題,Spark將按順序執行兩個動作。但這兩個動作是獨立, 我們可以同時執行它們。

一個有缺陷的並發作業的例子

如果你快速的在網上搜索一下 “scala異步編程”,你就會被引到Scala Future這個解決方案中。
例如以下為一個並行處理RDD的例子:


import scala.concurrent._
import ExecutionContext.Implicits.global

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}
val n: Int = 2 
val files: Array[String] = ['/tmp/test1.csv','/tmp/test2.csv']

val rdds = files.map(f => pipeline(f, n))

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = Future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

我們只要根據搜索到的文檔中提供的例子修改一下,就會得到以下類似內容:

import org.apache.spark.sql.SparkSession
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object FancyApp {
  def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    val df = spark.sparkContext.parallelize(1 to 100).toDF
    val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
    val taskB = doFancySum(df, "hdfs:///sum.parquet")
    // Now wait for the tasks to finish before exiting the app
    Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
  }

  def doFancyDistinct(df: DataFrame, outPath: String) = Future { df.distinct.write.parquet(outPath) }

  def doFancySum(df: DataFrame, outPath: String) = Future { df.agg(sum("value")).write.parquet(outPath) }
}

ExecutionContext是用於管理並行操作的Context。 實際的線程模型可以由開發者明確提供,也可以使用全局默認值(這是一個 ForkJoinPool ),就像我們在上面的代碼中使用的一樣:

import scala.concurrent.ExecutionContext.Implicits.global

使用Global execution context 的問題在於它並不知道我們是在群集上啟動Spark作業。 默認情況下,Global execution context 提供與運行代碼的系統中的處理器相同數量的線程。 在我們的Spark應用程序中,它將與Driver上的處理器相同數量的線程。

一個優化過的並發作業的例子

我們需要控制我們的線程策略,更一般化地編寫我們的程序,以便可以在不同的線程模型中重用它們。

例如以下是我們從重寫的函數,它將允許我們精確控制execution context 來管理調用函數時提供的線程數。 例子中添加的隱式參數將允許調用的代碼指定運行函數時使用哪個ExecutionContext。

def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
  df.distinct.write.parquet(outPath)
}

現在讓我們提出一個比默認的Global execution context更好的策略。我們希望能夠指定我們想要的並行度。

import org.apache.spark.sql.SparkSession
import import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._

object FancyApp {
  def appMain(args: Array[String]) = {
    // configure spark
    val spark = SparkSession
        .builder
        .appName("parjobs")
        .getOrCreate()

    // Set number of threads via a configuration property
    val pool = Executors.newFixedThreadPool(5)
    // create the implicit ExecutionContext based on our thread pool
    implicit val xc = ExecutionContext.fromExecutorService(pool)
    val df = spark.sparkContext.parallelize(1 to 100).toDF
    val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
    val taskB = doFancySum(df, "hdfs:///sum.parquet")
    // Now wait for the tasks to finish before exiting the app
    Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
  }

  def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
    df.distinct.write.parquet(outPath)
  }

  def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
    df.agg(sum("value")).write.parquet(outPath) 
  }
}

在這個例子中,我們定義了Execution context變量xc,含有五個線程。

參考資料

Spark Parallel Job Execution
How to run concurrent jobs(actions) in Apache Spark using single spark context
Processing multiple files as independent RDD’s in parallel

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM