對於我這樣一個一直工作在.net平台上的developer來講,Hadoop,Spark,HBase等這些大數據名詞比較陌生,對於分布式計算,.net上也有類似的Parallel(我說的不是HDInsight), 這篇文章是我嘗試從.net上的Parallel類庫的角度去講述什么是spark。
我們先從C#的一個爛大街的例子(不是Helloworld),統計一篇文章單詞出現的頻率。
下面C#代碼是利用.net Parallel來寫的統計單詞出現頻率。
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace WordCountDemo 8 { 9 using System.IO; 10 using System.Threading; 11 class Program 12 { 13 /// <summary> 14 /// 我們以計算一篇文章中單詞的個數為例子 15 /// (計算文章單詞個數的demo簡直就是各種大數據計算的HelloWorld)。 16 /// 17 /// WordCountFlow是數單詞程序 18 /// WordCountDetail對WordCountFlow函數每一行進行拆解並做了詳細解釋。 19 /// </summary> 20 /// <param name="args"></param> 21 static void Main(string[] args) 22 { 23 string filePath = @"D:\BigDataSoftware\spark-2.1.0-bin-hadoop2.7\README.md"; 24 25 WordCountFlow(filePath); 26 Console.WriteLine("----------------------"); 27 WordCountDetail(filePath); 28 } 29 30 /// <summary> 31 /// 數單詞的程序流程 32 /// </summary> 33 /// <param name="filePath"></param> 34 static void WordCountFlow(string filePath) 35 { 36 File.ReadAllLines(filePath).AsParallel() 37 .SelectMany(t => t.Split(' ')) 38 .Select(t => new { word = t, tag = 1 }) 39 .GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Select(p => p.tag).Aggregate((a, b) => a + b) }) 40 // 如果對Aggregate函數不熟悉,上面代碼等同於下行 41 //.GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Sum(p => p.tag) }); 42 .ForAll(t => Console.WriteLine($"ParationId:{Thread.CurrentThread.ManagedThreadId} ({t.word}-{t.count})")); 43 } 44 45 /// <summary> 46 /// 數單詞程序流程的詳細解釋 47 /// </summary> 48 /// <param name="filePath"></param> 49 static void WordCountDetail(string filePath) 50 { 51 // 讀取整篇文章,文章每一行將作為一個string存儲到數組lines 52 string[] lines = File.ReadAllLines(filePath); 53 // AsParallel()是Parallel類庫的核心方法,具體的意思是將string[] lines這個數組分割成幾個分區(Partition)。 54 // 假設這篇文章有500行,那么這個方法會會把string[500]-lines分解成 (string[120] partitionA), 55 // (string[180] partitionB), (string[150] partitionC),(...) 等幾個Partition 56 // .net runtime將當前程序的負載(主要是cpu使用情況)情況為依據的分區算法來確定到底要分成幾個Partition, 57 // 我們可以大概認為cpu有幾個邏輯核(不准確),就會被分解成幾個Partition。 58 // 后續的計算中.net runtime將會針對每一個partition申請一個單獨的線程來處理. 59 // 比如:partitionA由001號線程處理,partitionB由002號線程處理。。。 60 ParallelQuery<string> parallelLines = lines.AsParallel(); 61 // linesA,linesB,linesC...數組中存儲的每一行根據空格分割成單詞,結果仍然是存放在ParallelQuery<string>這種分塊的結構中 62 // 下面帶有****的注釋,如果對函數式編程沒有了解,可以直接忽略。 63 // ****如果對函數式編程有所了解,會知道lambda天生lazy的,如果下面這行代碼打個斷點,當debug到這行代碼的時候, 64 // ****鼠標移動到parallelWords上時,我們不會看到每一個單詞, 65 // ****runtime並沒有真正將每一行分解成單詞,這行代碼僅僅是一種計算邏輯。 66 ParallelQuery<string> parallelWords = parallelLines.SelectMany(t => t.Split(' ')); 67 // 將每一個單子加上標記1,這行代碼返回的類型為ParallelQuery<var>,var為runtime自動判斷,此處var的類型的實際應該為 68 // class 匿名類型 69 // { 70 // public word {get;set;} 71 // public tag {get;set} 72 //} 73 var wordparis = parallelWords.Select(t => new { word = t, tag = 1 }); 74 // 根據單詞進行分組,同一個分組中的單詞個數求和,類似於如下sql select word,count(tag) from wordparis group by word 75 // 注意,此處同樣的單詞可能分布在不同的分區中,比如英語中常見的"the",可能partitionA中有3個"the",partitionB中有2個“the", 76 // 但是partitionA和partitionB分別被不同的線程處理,如果runtime足夠聰明的話,他應該先計算partitionA的the的個數(the,3), 77 // 然后計算partitionB的the的個數(the,2),最后將整個partition合並並且重新分割(shuffle),在做后續的計算 78 // shuffle后partition的分區和之前partition里面的數據會不同。 79 // 此處wordcountParis的類型為 80 // class 匿名類型 81 // { 82 // public word {get;set;} 83 // public count {get;set} 84 //} 85 var wordcountParis = wordparis.GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Select(p => p.tag).Aggregate((a, b) => a + b) }); 86 // 打印結果。由於線程執行的亂序,可以看到輸出的partitionId也是亂序。 87 wordcountParis.ForAll(t => Console.WriteLine($"ParationId:{Thread.CurrentThread.ManagedThreadId} ({t.word}-{t.count})")); 88 } 89 } 90 }
程序運行結果
通過上面的c#的例子,我們看到parallel如何將一篇文章分解成多個Partition來並且在不同Partition上進行並行計算的,在計算過程中,可能需要"shuffle",需要對原來的Partition進行重新洗牌。
我們假設,如果這個程序運行在集群上,這些Partition分布在不同的機器上,這樣就可以利用多台機器的力量而非一台機器多個線程的力量去做計算了,yeah!,你猜對了,這就是spark,下面的scala的wordCountFlow函數是在spark上統計單詞出現頻率的函數,與c#的WordCountFlow一樣,也是五行代碼,並且這五行代碼的邏輯也完全相同。只不過spark將數據分布在不同的機器上,並且讓機器進行計算,當然,如你所想,某些情況下需要shuffle,不同機器上的數據將會被匯聚並重新分割成新的分區。雖然Spark中的partition和net parallel中的partition並不完全對應(spark中的一台機器上可能有多個paratition) ,shuffle也是spark的專用詞匯,但基本的原理是類似的。
package wordCountExample import org.apache.spark.{SparkConf, SparkContext, TaskContext} /** * Created by StevenChennet on 2017/3/10. */ object WordCount { def main(args: Array[String]): Unit = { // 文件路徑 val filePath="D:\\BigDataSoftware\\spark-2.1.0-bin-hadoop2.7\\README.md" wordCountFlow(filePath) } def wordCountFlow(filePath:String ):Unit={ // sparkContext對象使用一個SparkConf對象來構造 // SparkConf主要進行一些設置,比如說local【*】表示盡量開啟更多線程並行處理 // SparkContext是spark執行任務的核心對象 // 下面五行代碼與C#的WordCountFlow五行代碼一一對應 new SparkContext(new SparkConf().setAppName("WordCount").setMaster("local[*]")).textFile(filePath) .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) .foreach(t=>println( s"Partition: ${ TaskContext.getPartitionId() } (${t._1}}-${t._2}})")) } }
據友情提醒,上面的Scala代碼的lambda太難看了,我轉換一下方式
new SparkContext(new SparkConf().setAppName("WordCount").setMaster("local[*]")).textFile(filePath) .flatMap(line=>line.split(" ")) .map(word=>(word,1)) .reduceByKey((a,b)=>a+b) .foreach(t=>println( s"Partition: ${ TaskContext.getPartitionId() } (${t._1}}-${t._2}})")) }
程序運行結果
在net parallel中,如果某個線程在計算過程中崩潰了,那可能導致整個程序都crash掉,如果是集群運算,因為一台宕機而讓整個集群崩潰可不是一個好決策,spark可以在計算之前先對要計算的內容持久化,如果一台機器crash,可以將這台機器的計算任務拉到另外一台機器上進行重新計算。