Spark中的多任務處理
Spark的一個非常常見的用例是並行運行許多作業。 構建作業DAG后,Spark將這些任務分配到多個Executor上並行處理。
但這並不能幫助我們在同一個Spark應用程序中同時運行兩個完全獨立的作業,例如同時從多個數據源讀取數據並將它們寫到對應的存儲,或同時處理多個文件等。
每個spark應用程序都需要一個SparkSession(Context)來配置和執行操作。 SparkSession對象是線程安全的,可以根據需要傳遞給你的Spark應用程序。
一個順序作業的例子
假設我們有一個spark 2.x應用程序,負責將幾個數據寫入到HDFS中。
import org.apache.spark.sql.SparkSession
object FancyApp {
def appMain(args: Array[String]) = {
// configure spark
val spark = SparkSession
.builder
.appName("parjobs")
.getOrCreate()
val df = spark.sparkContext.parallelize(1 to 100).toDF
doFancyDistinct(df, "hdfs:///dis.parquet")
doFancySum(df, "hdfs:///sum.parquet")
}
def doFancyDistinct(df: DataFrame, outPath: String) = df.distinct.write.parquet(outPath)
def doFancySum(df: DataFrame, outPath: String) = df.agg(sum("value")).write.parquet(outPath)
}
這個程序看起來沒有什么問題,Spark將按順序執行兩個動作。但這兩個動作是獨立, 我們可以同時執行它們。
一個有缺陷的並發作業的例子
如果你快速的在網上搜索一下 “scala異步編程”,你就會被引到Scala Future這個解決方案中。
例如以下為一個並行處理RDD的例子:
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}
val n: Int = 2
val files: Array[String] = ['/tmp/test1.csv','/tmp/test2.csv']
val rdds = files.map(f => pipeline(f, n))
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = Future {
df.rdd.foreach(_ => ()) // Force computation
df
}
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
我們只要根據搜索到的文檔中提供的例子修改一下,就會得到以下類似內容:
import org.apache.spark.sql.SparkSession
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object FancyApp {
def appMain(args: Array[String]) = {
// configure spark
val spark = SparkSession
.builder
.appName("parjobs")
.getOrCreate()
val df = spark.sparkContext.parallelize(1 to 100).toDF
val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
val taskB = doFancySum(df, "hdfs:///sum.parquet")
// Now wait for the tasks to finish before exiting the app
Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
}
def doFancyDistinct(df: DataFrame, outPath: String) = Future { df.distinct.write.parquet(outPath) }
def doFancySum(df: DataFrame, outPath: String) = Future { df.agg(sum("value")).write.parquet(outPath) }
}
ExecutionContext是用於管理並行操作的Context。 實際的線程模型可以由開發者明確提供,也可以使用全局默認值(這是一個 ForkJoinPool ),就像我們在上面的代碼中使用的一樣:
import scala.concurrent.ExecutionContext.Implicits.global
使用Global execution context 的問題在於它並不知道我們是在群集上啟動Spark作業。 默認情況下,Global execution context 提供與運行代碼的系統中的處理器相同數量的線程。 在我們的Spark應用程序中,它將與Driver上的處理器相同數量的線程。
一個優化過的並發作業的例子
我們需要控制我們的線程策略,更一般化地編寫我們的程序,以便可以在不同的線程模型中重用它們。
例如以下是我們從重寫的函數,它將允許我們精確控制execution context 來管理調用函數時提供的線程數。 例子中添加的隱式參數將允許調用的代碼指定運行函數時使用哪個ExecutionContext。
def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
df.distinct.write.parquet(outPath)
}
現在讓我們提出一個比默認的Global execution context更好的策略。我們希望能夠指定我們想要的並行度。
import org.apache.spark.sql.SparkSession
import import java.util.concurrent.Executors
import scala.concurrent._
import scala.concurrent.duration._
object FancyApp {
def appMain(args: Array[String]) = {
// configure spark
val spark = SparkSession
.builder
.appName("parjobs")
.getOrCreate()
// Set number of threads via a configuration property
val pool = Executors.newFixedThreadPool(5)
// create the implicit ExecutionContext based on our thread pool
implicit val xc = ExecutionContext.fromExecutorService(pool)
val df = spark.sparkContext.parallelize(1 to 100).toDF
val taskA = doFancyDistinct(df, "hdfs:///dis.parquet")
val taskB = doFancySum(df, "hdfs:///sum.parquet")
// Now wait for the tasks to finish before exiting the app
Await.result(Future.sequence(Seq(taskA,taskB)), Duration(1, MINUTES))
}
def doFancyDistinct(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
df.distinct.write.parquet(outPath)
}
def doFancySum(df: DataFrame, outPath: String)(implicit xc: ExecutionContext) = Future {
df.agg(sum("value")).write.parquet(outPath)
}
}
在這個例子中,我們定義了Execution context變量xc,含有五個線程。
參考資料
Spark Parallel Job Execution
How to run concurrent jobs(actions) in Apache Spark using single spark context
Processing multiple files as independent RDD’s in parallel
