目標
實現一千萬個不重復整數的排序,可以一次性加載到 2G 的內存里。
本文適合於想要了解新語言 Scala 並發異步編程框架 Akka, Future 的筒鞋。 讀完本文后,將了解如何綜合使用 ForkJoin 框架、 Akka 模型、以及 Future 進行並發異步編程,還有一系列小的編程點。
任務拆分
首先要進行任務拆分。要實現一千萬個不重復整數的排序, 可以拆分為三個子任務:
(1) 生成一千萬的不重復整數並寫入文件 NumberGeneratorTask;
(2) 從文件讀取並檢測確實生成的是一千萬個不重復的整數 CheckUnduplicatedNumbersActor;
(3) 從文件讀取整數進行排序和排序檢測 BigfileSortActor。接下來逐一實現這些子任務。
入口如下。這里使用了 Akka 的框架及 ForkJoin 實例。其中啟動 NumberGeneratorTask 抽離到一個工具類 ForkJoinPoolStartup 來實現,更好地維護和復用, 比如啟動不同參數的 NumberGeneratorTask 。
從 system.actorOf(Props(new CheckUnduplicatedNumbersActor(numbers, bigfileSortActor)), name="checkNumberActor") 可以看出如何創建帶參數或不帶參數的 Actor 實例。注意到,如果任務流程是從 NumberGeneratorTask -> checkNumberTask -> bigfileSortTask , 那么,對應的Actor 順序正好是反過來:先創建任務流程最靠后的Actor,再創建流程中靠前的Actor ,因為靠前的Actor 需要持有流程中下一個Actor的引用以便向其發送消息。BigFileSortActor 持有 ActorSystem 實例引用 system , 便於在排序及檢測完成后終止整個 Actor 系統。
package scalastudy.concurrent.billionsort import akka.actor.{ActorSystem, Props} import scalastudy.concurrent.ForkJoinPoolStartup import scalastudy.concurrent.config.ActorSystemFactory import scalastudy.concurrent.billionsort.Constants._ /** * Created by shuqin on 16/5/18. */ object BillionNumberSort extends App { launch() def launch(): Unit = { ForkJoinPoolStartup.start(createActors(), poolWaitSecs) } def createActors():NumberGeneratorTask = { val system:ActorSystem = ActorSystemFactory.newInstance() val bigfileSortActor = system.actorOf(Props(new BigFileSortActor(numbers, system))) val checkNumberActor = system.actorOf(Props(new CheckUnduplicatedNumbersActor(numbers, bigfileSortActor)), name="checkNumberActor") val numGenTask = new NumberGeneratorTask(numbers, 0, rangeMaxNumber, checkNumberActor) return numGenTask } }
package scalastudy.concurrent import java.util.concurrent.{ForkJoinPool, TimeUnit} import scalastudy.concurrent.billionsort.NumberGeneratorTask /** * Created by shuqin on 17/4/27. */ object ForkJoinPoolStartup { def start(entranceTask:NumberGeneratorTask, waitSecs:Int):Unit = { val pool = new ForkJoinPool() pool.execute(entranceTask) pool.shutdown pool.awaitTermination(waitSecs, TimeUnit.SECONDS) pool.shutdownNow assert( pool.isTerminated == true ) } }
生成一千萬個不重復整數
ForkJoin的使用
顯然,這個子任務是可以采用 ForkJoin 來完成的。 ForkJoin 是分治思想的框架性實現, 將原問題分解為同樣性質的多個子問題,然后將子問題的解組合起來得到原問題的解。通常采用二分法。實現上,通常會采用遞歸結構, 注意遞歸不要太深。 actorName ! message 表示向名稱為 actorName 的Actor 實例發送 message 消息,message 可以是任意數據結構,字符串、列表、元組、對象等。這里發送了兩種類型: 整數列表 randInts.map(i=>i+start).toList 或 整數元組 (start, end) 。生成隨機無序整數使用了已有的Java類 RandomSelector 的方法,這表明了,Scala 可以輕易無縫地使用 Java 現有的代碼和庫。
NumberGeneratorTask 的實現如下:
package scalastudy.concurrent.billionsort import java.util.concurrent.RecursiveAction import akka.actor.ActorRef import zzz.study.algorithm.select.RandomSelector import scalastudy.concurrent.billionsort.Constants.threshold import scalastudy.concurrent.billionsort.Constants.debug /** * Created by shuqin on 16/5/19. * * 在 [start, end] 選出 num 個不重復的整數 * */ class NumberGeneratorTask(num:Int, start:Int, end:Int, checkNumberActor: ActorRef) extends RecursiveAction { override def compute(): Unit = { if (debug) { println("Select: " + num + " unduplicated numbers from [" + start + " " + end + ")"); } if (num <= threshold) { if (num > end - start+1) { checkNumberActor ! start.to(end).toList } else { val randInts = RandomSelector.selectMDisorderedRandInts2(num, end-start+1) checkNumberActor ! randInts.map(i=>i+start).toList } } else { val middle = start/2 + end/2 val leftTask = new NumberGeneratorTask(num/2, start, middle, checkNumberActor) val rightTask = new NumberGeneratorTask((num+1)/2, middle+1, end, checkNumberActor) if (debug) { println("Left: [" + start + "-" + middle + "," + num/2 + "]") println("Right: [" + (middle+1) + "-" + end + "," + (num+1)/2 + "]") } leftTask.fork rightTask.fork leftTask.join rightTask.join checkNumberActor ! (start, end) } } }
檢測生成的一千萬個整數不重復
Actor通信
Akka Actor 並發模型一個重要優勢在於為代表單任務的Actor提供了健壯可擴展的消息傳遞通信機制。繼承 Actor 之后,需要覆寫指定方法 override def receive: Receive ,使用靈活而強大的 case 語句(偏函數)來匹配消息的類型及消息的值,從而做不同的判斷和操作。
怎樣判斷整數生成任務完成從而可以開始檢測了呢?在 NumberGeneratorTask 生成最后一組整數時並回退到最開始的調用層時,就會發送 (0, Constants.rangeMaxNumber) 作為信號, 而 CheckUnduplicatedNumbersActor 則通過 case (0,Constants.rangeMaxNumber) 可以匹配到這一點。
Trait與Actor的分工
最開始,接收NumberGeneratorTask 傳來的消息進行處理以及檢測生成的整數不重復都寫在了 CheckUnduplicatedNumbersActor 這一個類里。后來想了想,覺得這個類混雜了不同的功能和職責,因此拆分成了兩個類:CheckUnduplicatedNumbersActor 和 CheckUnduplicatedNumbers 。 其中 CheckUnduplicatedNumbersActor 負責任務協作和計算調度, 而 CheckUnduplicatedNumbers 負責檢測生成的整數不重復的實際工作。職責明晰,各司其責,分開獨立發展。使用 with CheckUnduplicatedNumbers 語法,可以使得具體類混入 trait 的功能,實現多重能力繼承,既能利用多重繼承的優勢,又能避免多重字段繼承帶來的問題。
策略模式的使用
CheckUnduplicatedNumbers 使用了策略模式。對於一千萬個整數來說,內存占用 40M 左右, 2G 內存是裝滴下的, 若是十億個整數,那么就需要 4G,就不能一次性加載了。因此這里定義了個接口,並實現了一次性加載策略和位圖策略。可以使用位圖來檢測不重復的整數,甚至可以直接進行排序。可參考 《位圖排序(位圖技術應用)》。 BitMapStrategy 實現了使用位圖技術來對一千萬個不重復整數進行排序的策略。讀者感興趣可以實現多次加載策略,以應對內存不夠的情形。
此外,Source.fromFile(filename).getLines 這里返回的是迭代器, 如果內存不夠用的話,就必須使用這個方法,而不是 Source.fromFile(filename).getLines.toList , 后者會將所有行全部加載到內存中而導致 OutOfMemoryError .
package scalastudy.concurrent.billionsort import java.io.{File, PrintWriter} import akka.actor.{Actor, ActorRef} import scala.collection.immutable.List import scalastudy.concurrent.billionsort.Constants.filename /** * Created by shuqin on 16/5/19. */ class CheckUnduplicatedNumbersActor(val numbers:Int, bigfileSortActor: ActorRef) extends Actor with CheckUnduplicatedNumbers { val fwResult = new PrintWriter(new File(filename)) var count = 0 val useBigFileSort = true override def receive: Receive = { case numberList: List[Int] => fwResult.write(numberList.mkString(" ") + "\n"); count += numberList.length case (0, Constants.rangeMaxNumber) => println("Reach End.") println("Expected: " + numbers + " , Actual Received: " + count) assert(count == numbers) fwResult.flush fwResult.close checkUnduplicatedNumbers(filename, numbers) if (useBigFileSort) { bigfileSortActor ! filename } case _ => println("未知消息,請檢查原因 !") } }
package scalastudy.concurrent.billionsort import java.io.{File, PrintWriter} import zzz.study.datastructure.vector.EnhancedBigNBitsVector import scala.collection.mutable.Set import scala.io.Source import scalastudy.concurrent.billionsort.Constants.rangeMaxNumber /** * Created by shuqin on 17/4/26. */ trait CheckUnduplicatedNumbers { def checkUnduplicatedNumbers(filename:String, numbers:Int): Unit = { assert(new OnceLoadStrategy().checkUnduplicatedNumbersInFile(filename, numbers) == true) assert(new BitMapStrategy().checkUnduplicatedNumbersInFile(filename,numbers) == true) println("checkUnduplicatedNumbers passed.") } /** * 一次性加載所有數到內存, 適用於內存可以裝下所有數的情況 * 比如 10000000 個整數占用 40M 空間, 2G 內存是綽綽有余的, 但十億占用 4G 空間失效 */ class OnceLoadStrategy extends CheckUnduplicatedStrategy { def checkUnduplicatedNumbersInFile(filename:String, numbers:Int):Boolean = { var numbersInFile = 0 val unDupNumberSet = Set[Int]() Source.fromFile(filename).getLines. foreach { line => val numbersInLine = line.split("\\s+").map(Integer.parseInt(_)).toSet numbersInFile += numbersInLine.size; unDupNumberSet ++= numbersInLine } println(s"Expected: ${numbers} , Actual In File: ${numbersInFile} ") println("Unduplicated numbers in File: " + unDupNumberSet.size) unDupNumberSet.size == numbers } } /** * 使用位圖技術來檢測不重復的數, 實際上還能用於排序 * N個數只要 4(N/32+1) = N/8 + 4 個字節 * 十億個數只要 125000004B = 125MB * 反過來, 內存 1G 的機器可以對 80億 的不重復數進行排序 */ class BitMapStrategy extends CheckUnduplicatedStrategy { val nbitsVector = new EnhancedBigNBitsVector(rangeMaxNumber) override def checkUnduplicatedNumbersInFile(filename: String, numbers:Int): Boolean = { Source.fromFile(filename).getLines. foreach { line => val numbersInLine = line.split("\\s+").map(Integer.parseInt(_)).toSet numbersInLine.foreach { num => nbitsVector.setBit(num) } } val undupTotal = checkAndSort(filename) println(s"undupTotal: ${undupTotal}") assert(undupTotal == numbers) return true } def checkAndSort(filename: String): Integer = { val fwFinalResult = new PrintWriter(new File(s"${filename}.sorted.txt")) val sorted = nbitsVector.expr() var undupTotal = sorted.size() fwFinalResult.flush() fwFinalResult.close() return undupTotal } } trait CheckUnduplicatedStrategy { def checkUnduplicatedNumbersInFile(filename:String, numbers:Int):Boolean } }
大文件排序
Oh, 終於進入正題了。大文件排序當然采用歸並排序了。 在這個實現里,值得注意的是采用了 Future 全異步框架。
Future全異步框架
可以看到:
(1) def produceFuture(line:String): Future[List[List[Int]]] 將文件的每一行(包含 threshold 個整數)轉化為一個對行內整數排序的 Future, 可以在后續獲取結果; 對於一個文件,就是獲得了 futureTasks = List[Future[List[List[Int]]]] ; List[List[Int]] 是為了讓后面的 Reduce 語法上走得通。
(2) val sortedListsFuture = Future.reduce(futureTasks)(cat(_,_)) 將 List[Future[List[List[Int]]]] 整合成一個 TotalFuture, 這個 TotalFuture 的結果是 futureTasks 里面的所有 Future 結果的連接; 每一個 Future 的結果是一個已排序的列表; 那么 TotalFuture 的結果是一個已排序列表的列表。List[Future[List[List[Int]]]] 看着是不是有點頭暈目眩?這生動地說明,要想玩轉編程,數據結構功底要扎實!
(3) 注意到下面這行代碼: 是將一個 Future A 轉化為另一個 Future B. 其中 B 的結果是基於 A. 在本例中,即是將已排序列表的列表合並為最終列表,但仍然返回的是 Future 而不是最終列表。為什么要這么寫, 而不是將 sortedListsFuture 的結果取出來再合並呢? 這是由於之前的所有動作都是異步的。 如果應用只是取排序的結果,那么也沒什么; 但如果應用要將 sortedListsFuture 的結果寫入文件呢? 進而還要做一下排序檢測? 那么, 就不得不在后面加入 TimeUnit.SECONDS.sleep(n) 的代碼, 讓主線程休息一會了(因為前面整個是異步的, 在 sortedListsFuture 還沒完成時,后面的代碼就會被執行了)! 而且你得不斷估計前面的排序/合並操作究竟大約需要多少時間從而不斷調整休眠的時間! 之前就是這樣實現的! 但這樣並不符合 Future 異步框架的初衷! 因此后面,我突然覺得要寫成全異步的, 也體驗到了寫成全異步應用的滋味~~ :) 要求確實是有點高,需要不斷從 Future 轉換成新的 Future ~~ 同時你也發現, Scala Future 也提供了一個幫助編寫全異步框架的 API ~~
sortedListsFuture map { value:List[List[Int]] => CollectionUtil.mergeKOrderedList(value) }
(4) 由於后面將排序結果寫入文件以及從文件檢測排序是否 OK 都是同步的,因此,可以在排序 Future 完成后執行。 注意到 Future 的非阻塞寫法: f.onComplete { case Success(result) => doWith(result) ; case Failure(ex) => doWith(ex) }
(5) 為了將列表鏈接起來,也試錯了好幾次: (x :: y :: Nil).flatten ; 如果寫成 reduce(_ :: _ :: Nil) 是會報錯的; 寫成 reduce(_.flatten :: _.flatten :: Nil) 最終會合並成兩個列表不符合預期。
package scalastudy.concurrent.billionsort import java.io.{File, PrintWriter} import java.util.concurrent.TimeUnit import akka.actor.{Actor, ActorSystem, Props} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success} import scalastudy.utils.{CollectionUtil, DefaultFileUtil, PathConstants} /** * Created by shuqin on 16/5/20. */ class BigFileSortActor(numbers: Int, actorSystem: ActorSystem) extends Actor with SortChecker { override def receive: Receive = { case filename:String => println("Received File: " + filename) sortFile(filename) } def produceFuture(line:String): Future[List[List[Int]]] = { val origin = line.split("\\s+").map( s => Integer.parseInt(s)).toList Future { List(origin.sorted) } } def cat(x: List[List[Int]],y:List[List[Int]]): List[List[Int]] = { return (x :: y :: Nil).flatten } def obtainSortedFuture(futureTasks:List[Future[List[List[Int]]]]):Future[List[Int]] = { val sortedListsFuture = Future.reduce(futureTasks)(cat(_,_)) sortedListsFuture map { value:List[List[Int]] => CollectionUtil.mergeKOrderedList(value) } } def sortFile(filename:String):Unit = { val futureTasks = DefaultFileUtil.readFileLines(filename).map(produceFuture(_)) println("task numbers: " + futureTasks.size) val allNumberSortedFuture = obtainSortedFuture(futureTasks) allNumberSortedFuture.onComplete { case Success(value:List[Int]) => println("sort finished.") writeSorted(value, filename) checkSorted(filename, numbers) println("sleep 3s and then begin to stop all.") TimeUnit.SECONDS.sleep(3) actorSystem.shutdown case Failure(ex) => println("Sort failed: " + ex.getMessage) } } def writeSorted(allNumberSorted: List[Int], filename: String): Unit = { val fwResult = new PrintWriter(new File(filename + ".sorted.txt")) fwResult.write(allNumberSorted.mkString("\n")) fwResult.flush fwResult.close } } object BigFileSortActorTest { def main(args:Array[String]):Unit = { val numbers = 10000000 val system = ActorSystem("BigFileSortActorTest") val bigFileSortActor = system.actorOf(Props(new BigFileSortActor(numbers, system)),name="bigFileSortActor") bigFileSortActor ! PathConstants.projPath + "/data/" + numbers +".txt" TimeUnit.SECONDS.sleep(640) system.shutdown } }
合並有序鏈表
合並有序鏈表是歸並排序的核心環節,也是歸並排序的性能關鍵之所在。
CollectionUtil 實現了一個二路有序列表合並和K路有序列表合並。 其中二路有序列表合並和K路有序列表均分別使用了三種方法來實現:一種是過程式的插入合並,一種是結合foldLeft 或 reduce 的函數式的實現,一種是更高效的實現。讀者可以體會三種的差異。過程式的合並時間效率尚可,但空間開銷比較大,大數據量時容易導致OOM, 函數式的做法時間效率不夠優,而更高效的實現盡可能結合兩者的優勢。
注意到,K路有序列表合並使用到了 klists.par.reduce(merge) , 將普通列表轉化為並行列表,以一定空間開銷換取可以並行地合並大量列表的時間效率。實際調試查看大量列表合並進度時,可以在 merge 函數的返回結果行之上加一行 println(result.size),查看合並后的列表大小。
Scala 的 List 是一個鏈表 (head::(tail::Nil)), 空列表可以用 List(), Nil 來表示; 將元素添加在列表頭部可使用 elem :: list , 在列表尾部添加元素使用 list :+ elem ; 列表鏈接使用 list1 ::: list2。默認的 List 是不可變的,意味着每次操作都會創建一個新的 List , 對於大有序列表合並的空間效率是不能接受的。因此,大有序列表的合並必須采用可變列表 ListBuffer . 至於如何做到 O(n+m), 還需要探索。
NOTE: 通過性能測試發現,merge 時間效率是最高的,但是當列表很大時會拋GC exceed limit 異常; mergeInplace 的性能次之,在列表長度到達 10000 時,性能開始急速下降幾十倍; mergeFunctional 的時間性能最差。看上去情況並不如預料。估計應該是使用 List 姿勢不對。后續打算專門開一篇文章討論合並有序列表的優化。
Scala 的集合性能參考: http://docs.scala-lang.org/overviews/collections/performance-characteristics.html
package scalastudy.utils import scala.collection.mutable import scala.collection.mutable.{ListBuffer, Map} import scala.math.pow /** * Created by lovesqcc on 16-4-2. */ object CollectionUtil { def main(args: Array[String]): Unit = { testSortByValue testAllMergeIsRight testPerf(merge) testPerf(mergeInplace) // testPerf(mergeFunctional) } def testSortByValue():Unit = { val map = Map("shuqin" -> 31, "yanni" -> 28) sortByValue(map).foreach { println } } def testAllMergeIsRight(): Unit = { testMerge(merge) testMerge(mergeFunctional) testMerge(mergeInplace) testMergeKOrderedList(mergeKOrderedList) testMergeKOrderedList(mergeKOrderedListFunctional) testMergeKOrderedList(mergeKOrderedListIneffective) } def testMerge(merge: (List[Int], List[Int]) => List[Int]):Unit = { assert(merge(Nil, Nil) == List()) assert(merge(List(), Nil) == List()) assert(merge(List(), List()) == List()) assert(merge(List(), List(1,3)) == List(1,3)) assert(merge(List(4,2), List()) == List(4,2)) assert(merge(List(4,2), Nil) == List(4,2)) assert(merge(List(2,4), List(1,3)) == List(1,2,3,4)) assert(merge(List(2,4), List(1,3,5)) == List(1,2,3,4,5)) assert(merge(List(2,4,6), List(1,3)) == List(1,2,3,4,6)) assert(merge(List(2,4,6), List(8,10)) == List(2,4,6,8,10)) println("test merge list passed.") } def testMergeKOrderedList(mergeKOrderedList: List[List[Int]] => List[Int]):Unit = { assert(mergeKOrderedList(Nil) == List()) assert(mergeKOrderedList(List()) == List()) assert(mergeKOrderedList(List(List())) == List()) assert(mergeKOrderedList(List(List(1,2))) == List(1,2)) assert(mergeKOrderedList(List(List(), List())) == List()) assert(mergeKOrderedList(List(List(), List(1,3))) == List(1,3)) assert(mergeKOrderedList(List(List(2,4), List())) == List(2,4)) assert(mergeKOrderedList(List(List(2,4), List(1,3))) == List(1,2,3,4)) assert(mergeKOrderedList(List(List(2,4), List(1,3,5))) == List(1,2,3,4,5)) assert(mergeKOrderedList(List(List(2,4,6), List(1,3))) == List(1,2,3,4,6)) assert(mergeKOrderedList(List(List(2,4,7), List(1,6), List(3,5))) == List(1,2,3,4,5,6,7)) assert(mergeKOrderedList(List(List(2,4,9), List(1,7), List(3,6), List(5,8))) == List(1,2,3,4,5,6,7,8,9)) println("test mergeKOrderedList passed.") } def testPerf(merge: (List[Int], List[Int]) => List[Int]):Unit = { val n = 10 val numbers = (1 to 7).map(pow(n,_).intValue) println(numbers) numbers.foreach { num => val methodName = merge.toString() val start = System.currentTimeMillis val xList = (1 to num).filter(_ % 2 == 0).toList val yList = (1 to num).filter(_ % 2 == 1).toList val merged = merge(xList, yList) val mergedSize = merged.size val end = System.currentTimeMillis val cost = end - start println(s"method=${methodName}, numbers=${num}, merged size: ${mergedSize}, merge cost: ${cost} ms") } } /** * 對指定 Map 按值排序 */ def sortByValue(m: Map[String,Int]): Map[String,Int] = { val sortedm = new mutable.LinkedHashMap[String,Int] m.toList.sortWith{case(kv1,kv2) => kv1._2 > kv2._2}.foreach { t => sortedm(t._1) = t._2 } return sortedm } /** * 合並兩個有序列表 * 將 yList 合並到 xList 上 * 結合了 mergeFunctional 和 mergeIneffective 的優勢 * 沒有空間開銷,時間復雜度為 O(n+m), n,m 分別是 xList, yList 的列表長度
* xList and yList should both be ListBuffer , and return ListBuffer
* * TODO not implemented */ def mergeInplace(xList: List[Int], yList: List[Int]): List[Int] = { (xList, yList) match { case (Nil, Nil) => List[Int]() case (Nil, _) => yList case (_, Nil) => xList case (hx :: xtail, hy :: ytail) => var result = List[Int]() var xListP = List[Int]() var yListP = List[Int]() if (hx > hy) { result = hy :: Nil xListP = xList yListP = ytail } else { result = hx:: Nil yListP = yList xListP = xtail } while (xListP != Nil && yListP != Nil) { if (xListP.head > yListP.head) { result = result :+ yListP.head yListP = yListP.tail } else { result = result :+ xListP.head xListP = xListP.tail } } if (xListP == Nil) { result = result ::: yListP } if (yListP == Nil) { result = result ::: xListP } // println("xsize=" + xList.size + ", ysize= " + yList.size + ", merged=" + result.size) result } } /** * 合並兩個有序列表 * * 由於每次插入 yList 元素到 xList 都要從頭遍歷,因此算法時間復雜度是 O(n*m) */ def mergeFunctional(xList: List[Int], yList: List[Int]): List[Int] = { (xList, yList) match { case (Nil, Nil) => List[Int]() case (Nil, _) => yList case (_, Nil) => xList case (hx :: xtail, hy :: ytail) => yList.foldLeft(xList)(insert) } } def insert(xList:List[Int], y:Int): List[Int] = { (xList, y) match { case (Nil, _) => y :: Nil case (hx :: xtail, _) => if (hx > y) { y :: xList } else { var result = hx :: Nil var pCurr = xtail while (pCurr != Nil && pCurr.head < y) { result = result :+ pCurr.head pCurr = pCurr.tail } (result :+ y) ::: pCurr } } } /** * 合並兩個有序列表 * 將 yList 與 xList 合並到一個全新的鏈表上 * 由於使用指針是漸進地合並,因此算法時間復雜度是 O(n+m) n,m 分別是 xList, yList 的列表長度 * 由於有列表復制操作,且是漸進地合並,因此算法空間復雜度也是 O(n+m) */ def merge(xList: List[Int], yList: List[Int]): List[Int] = { if (xList.isEmpty) { return yList } if (yList.isEmpty) { return xList } val result = ListBuffer[Int]() var xListC = xList var yListC = yList while (!xListC.isEmpty && !yListC.isEmpty ) { if (xListC.head < yListC.head) { result.append(xListC.head) xListC = xListC.tail } else { result.append(yListC.head) yListC = yListC.tail } } if (xListC.isEmpty) { result.appendAll(yListC) } if (yListC.isEmpty) { result.appendAll(xListC) } result.toList } /** * 合並k個有序列表 * 轉化為並行容器進行並行地合並,有空間開銷 */ def mergeKOrderedList(klists: List[List[Int]]): List[Int] = { if (klists.isEmpty) { return List[Int]() } if (klists.size == 1) { return klists.head } klists.par.reduce(merge) } /** * 合並k個有序列表 * 使用函數式逐個地合並 */ def mergeKOrderedListFunctional(klists: List[List[Int]]): List[Int] = { if (klists.isEmpty) { return List[Int]() } if (klists.size == 1) { return klists.head } klists.reduce(merge) } /** * 合並k個有序列表 * 使用插入逐個地合並 */ def mergeKOrderedListIneffective(klists: List[List[Int]]): List[Int] = { if (klists.isEmpty) { return List[Int]() } var nlist = klists.size if (nlist == 1) { return klists.head } var klistp = klists; val kbuf = ListBuffer[List[Int]]() while (nlist > 1) { for (i <- 0 to nlist/2-1) { kbuf.insert(i, merge(klistp(2*i), klistp(2*i+1))) if (nlist%2 == 1) { kbuf.append(klistp(nlist-1)) } } nlist = nlist - nlist/2 klistp = kbuf.toList } kbuf.toList.head } }
排序后檢測
排序后檢測,既可以做成一個 Actor ,也可以做成一個 trait. 如果排序檢測本身在整個任務協作中占有一席之地,那么做成Actor比較合適;如果只是一個配合性的動作,那么做成 trait 會更直接。這里選擇作為一個trait, 而 BigFileSortActor 通過 with SortChecker 來借用它的排序檢測能力。
這里提供了 checkSort 的過程式實現和函數式實現,讀者可體會其中的差異。由於迭代器迭代一次后就變成空,因此迭代過程中要記錄迭代次數,來與指定的整數數目進行比較斷言。
package scalastudy.concurrent.billionsort import scala.io.Source /** * Created by shuqin on 17/4/25. */ trait SortChecker { /** * 每次比較列表的兩個數, 后一個不小於前一個 * NOTE: 使用迭代器模式 */ def checkSorted(filename:String, numbers:Int): Unit = { val numIterator = Source.fromFile(filename + ".sorted.txt").getLines().map(line => Integer.parseInt(line.trim)) checkSort(numIterator, numbers) println("test sorted passed.") } /** * 函數式實現 */ def checkSort(numIterator: Iterator[Int], numbers:Int):Unit = { var count = 1 numIterator.reduceLeft((prev,next) => { assert(prev <= next); count += 1 ; next; } ) assert(count == numbers) } /** * 過程式實現 */ def checkSortProcedural(numIterator: Iterator[Int], numbers:Int): Unit = { var last = 0 var count = 0 numIterator.foreach { num => assert(num >= last) last = num count += 1 } assert(count == numbers) } def checkSort(numList: List[Int], numbers:Int): Unit = { checkSort(numList.iterator, numbers) } }
輔助類
(1) Constants 包含了本示例所需要的常量,便於性能調優。
(2) 從 N 個數中選出不重復的 M 個數參見 RandomSelector 的實現。 算法出處:《編程珠璣》第十二章 取樣問題。
package scalastudy.concurrent.billionsort import scalastudy.utils.PathConstants /** * Created by shuqin on 17/4/27. */ object Constants { // 生成的整數中不超過的最大數 val rangeMaxNumber = 1000000000 // 在 [0, rangeMaxNumber] 生成 numbers 個不重復的整數 val numbers = 10000000 // 每次生成不超過 threshold 個不重復的整數數組; // 該值不能過小, 否則會因遞歸層次過深導致內存不足. val threshold = numbers / 10 // 存儲生成的不重復整數 val filename = PathConstants.projPath + s"/data/${numbers}.txt" // ForkJoin 池終止前的等待時間 val poolWaitSecs = 15 // Debug 選項 val debug = false }
package zzz.study.algorithm.select; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Random; import java.util.Set; import java.util.TreeSet; public class RandomSelector { private RandomSelector() { } private static Random rand = new Random(47); /** * bigRandInt: 返回一個非常大的隨機整數,該整數的二進制位數不小於 bits */ public static int bigRandInt(int bits) { if (bits >= 32 || bits <= 0) { throw new IllegalArgumentException("參數 " + bits + " 錯誤,必須為小於 32 的正整數!"); } int baseNum = 1 << (bits - 1); return rand.nextInt(Integer.MAX_VALUE - baseNum) + baseNum; } /** * randRange: 生成給定范圍的隨機整數 * @param low 范圍下限 * @param high 范圍上限(不包含) * @return 給定范圍的隨機整數 */ public static int randRange(int low, int high) { if (high <= low) { throw new IllegalArgumentException("參數 [" + low + "," + high + "] 錯誤,第一個參數必須小於第二個參數!"); } return bigRandInt(30) % (high-low) + low; } /** * selectMOrderedRandInts : 從指定集合中隨機選擇指定數目的整數,並以有序輸出 * @param m 需要選取的整數數目 * @param n 指定整數集合 [0:n-1] * @return 隨機選取的有序整數列表 */ public static int[] selectMOrderedRandInts(int m, int n) { checkParams(m, n); int[] result = new int[m]; int remaining = n; int selector = m; for (int k=0, i=0; i < n; i++) { if ((bigRandInt(30) % remaining) < selector) { result[k++] = i; selector--; } remaining--; } return result; } /** * selectMOrderedRandInts2 : 從指定集合中隨機選擇指定數目的整數,並以有序輸出 * @param m 需要選取的整數數目 * @param n 指定整數集合 [0:n-1] * @return 隨機選取的有序整數列表 */ public static int[] selectMOrderedRandInts2(int m, int n) { checkParams(m, n); Set<Integer> holder = new TreeSet<Integer>(); while (holder.size() < m) { holder.add(bigRandInt(30) % n); } return collectionToArray(holder); } /** * selectMOrderedRandInts3 : 從指定集合中隨機選擇指定數目的整數,並以有序輸出 * @param m 需要選取的整數數目 * @param n 指定整數集合 [0:n-1] * @return 隨機選取的有序整數列表 */ public static int[] selectMOrderedRandInts3(int m, int n) { checkParams(m, n); int[] arr = selectMDisorderedRandInts3(m, n); Arrays.sort(arr); return arr; } /** * selectMDisorderedRandInts2: 從指定整數集合中隨機選擇指定數目的整數,並以無序輸出 * @param m 需要選取的整數數目 * @param n 指定整數集合 [0:n-1] * @return 隨機選取的無序整數列表 */ public static int[] selectMDisorderedRandInts2(int m, int n) { checkParams(m, n); Set<Integer> intSet = new HashSet<Integer>(); while (intSet.size() < m) { intSet.add(bigRandInt(30) % n); } return collectionToArray(intSet); } /** * selectMDisorderedRandInts3: 從指定整數集合中隨機選擇指定數目的整數,並以無序輸出 * @param m 需要選取的整數數目 * @param n 指定整數集合 [0:n-1] * @return 隨機選取的無序整數列表 */ public static int[] selectMDisorderedRandInts3(int m, int n) { checkParams(m, n); int[] arr = new int[n]; for (int i=0; i < n; i++) { arr[i] = i; } for (int k=0; k < m; k++) { int j = randRange(k, n); int tmp = arr[k]; arr[k] = arr[j]; arr[j] = tmp; } return Arrays.copyOf(arr, m); } public static void checkParams(int m, int n) { if (m > n || m <= 0 || n <= 0 ) { throw new IllegalArgumentException("參數 [" + m + "," + n + "] 錯誤,必須均為正整數,且第一個參數必須小於或等於第二個參數!"); } } /** * collectionToArray : 將指定整數集合轉化為整型數組列表 * @param collection 指定整數集合 * @return 要返回的整型數組列表,若給定集合為空,則返回 null */ public static int[] collectionToArray(Collection<Integer> collection) { if (collection == null || collection.size() == 0) { return null; } int[] result = new int[collection.size()]; int k = 0; for (Integer integer : collection) { result[k] = integer; k++; } return result; } /** * printArray: 打印數組的便利方法,每打印十個數換行 * @param arr 指定要打印的數組 */ public static void printArray(int[] arr) { for (int i=0; i < arr.length; i++) { System.out.printf("%d%c", arr[i], i%10==9 ? '\n' : ' '); } } }
小結
本來只是想寫一個 ForkJoin 的示例,但寫着寫着就加入了 akka, future 的元素, 是在解決問題的過程中逐漸引入的。我覺得這種學習的方式很好,就是在解決一個問題的過程中,可以綜合地探索和學習到很多不同的東西。傳統的學習講究"循序漸進"的方式,但是"跳躍式+快速試錯"也許是學習新技術的更好的方法。 :)
對於Scala並發異步編程,可以總結如下:
(1) ForkJoin 非常適合於數據並發或數據並行的計算,在分布式計算架構之上就演變成 Map-Reduce 計算模型了;
(2) Akka-Actor 並發模型非常適合於任務協作和通信的並發任務。多線程與鎖同步機制的問題就在於,線程之間沒有通信的通道,只好通過在內存區域開辟若干共享可變的狀態來協調線程之間的協作; 而 Actor 模型則為代表任務的Actor之間的通信和協作通過了消息傳遞機制。正應了那句話:通過通信來共享內存,而不是通過內存來共享通信。
(3) Scala Future API 提供了一個全異步的框架。不像 Java 那樣只能生成一個 Future 隨后取數據, Scala Future 可以通過各種計算操作映射成各種各樣的 Future, 而且可以級聯、組合這些 Future 得到新的 Future, 然后才從轉換后的最終 Future 中獲取結果,並且提供了非阻塞的處理結果的方式, 是靈活、可擴展的異步編程框架。
(4) 在執行某些容器的大量獨立操作時,可以采用並行計算。Scala 提供了並行容器的實現以及簡便的串行容器轉並行容器的方法,充分利用多核的能力做並行計算。