上期我們討論了IO處理過程:Process[I,O]。我們說Process就像電視信號盒子一樣有輸入端和輸出端兩頭。Process之間可以用一個Process的輸出端與另一個Process的輸入端連接起來形成一串具備多項數據處理功能的完整IO過程。但合成的IO過程兩頭輸入端則需要接到一個數據源,而另外一端則可能會接到一個數據接收設備如文件、顯示屏等。我們在這篇簡單地先介紹一下IO數據源Source和IO數據接收端Sink。
我們先用一個獨立的數據類型來代表數據源Source進行簡單的示范說明,這個類型與Process類型沒有任何關系:
1 import ProcessLib._ 2 object SourceSink { 3 trait Source[O] { //以下helper function都是把Source當作O類的List處理
4 def |>[O2](p: Process[O,O2]): Source[O2] //粘接一個Process p. 向其輸入O
5 def filter(f: O => Boolean): Source[O] = this |> Process.filter(f) //向p輸入O
6 def map[O2](f: O => O2): Source[O2] = this |> Process.lift(f) 7 def take(n: Int): Source[O] = this |> Process.take(n) //截取前n個O
8 def takeWhile(f: O => Boolean): Source[O] = this |> Process.takeWhile(f) 9 def drop(n: Int): Source[O] = this |> Process.drop(n) //跳過前n個O
10 def dropWhile(f: O => Boolean): Source[O] = this |> Process.dropWhile(f) 11 }
從以上trait可以看到:Source的工作原理就是把一個Process的輸入黏貼到Source的輸出端。我們可以用這個 |> 把一串Process粘到Source的輸出,如:Src.proc1.proc2.proc3。不過我們得先把proc1,proc2,proc3定義成Source組件函數,因為Source是一個完全獨立的類型。
我們再來看看一個Source特殊案例:
1 case class ResourceR[R,I,O]( //Source的一個只讀資源案例
2 acquire: IO[R], //資源使用門戶 resource handle
3 release: R => IO[Unit], //完成使用資源后的清理函數
4 step: R => IO[Option[I]], //資源內容讀取函數
5 trans: Process[I,O] //輸出方式
6 ) extends Source[O] { 7 def |>[O2](p: Process[O,O2]): Source[O2] = //實現抽象函數
8 ResourceR(acquire,release,step,trans |> p) //每次輸入都產生一個ResourceR.它的trans與p進行管道對接
9 }
這是個只讀的數據源。我們看到所有的動作都被包嵌在IO類型里,這樣可以把副作用的產生延后到一些Source Interpreter來運算。這里我們只要用最簡單的IO來說明就可以了:
1 trait IO[A] { self =>
2 def run: A 3 def map[B](f: A => B): IO[B] =
4 new IO[B] { def run = f(self.run) } 5 def flatMap[B](f: A => IO[B]): IO[B] =
6 new IO[B] { def run = f(self.run).run } 7 } 8 object IO { 9 def unit[A](a: => A): IO[A] = new IO[A] { def run = a } 10 def flatMap[A,B](fa: IO[A])(f: A => IO[B]) = fa flatMap f 11 def apply[A](a: => A): IO[A] = unit(a) // syntax for IO { .. }
12 }
這個IO類型我們在前面的討論里曾經練習過。
現在我們來看看一個文件讀取的ResourceR例子:
1 object Source { 2 import java.io._ 3 def lines(fileName: String): Source[String] = //從文件fileName里讀取String
4 ResourceR( //創建一個Source的實例
5 IO {io.Source.fromFile(fileName) }, //資源
6 (src: io.Source) => IO { src.close }, //清理
7 (src: io.Source) => IO { //讀取
8 lazy val iterator = src.getLines 9 if (iterator.hasNext) Some(iterator.next) else None //讀完返回None
10 }, 11 Process.passUnchanged) //Process[I,I],讀什么輸入什么
12 }
現在我們可以這樣寫一段程序了:
1 Source.lines("input.txt").count.exists{_ >= 40000 } 2 //> res0: ch15.SourceSink.Source[Boolean] = ResourceR(ch15.SourceSink$IO$$anon$ 3 //| 3@762efe5d,<function1>,<function1>,Await(<function1>))
噢,記住把count和exists放到Source trait里:
1 def exists(f: O => Boolean): Source[Boolean] = this |> Process.exists(f) 2 def count: Source[Int] = this |> Process.count
上面的表達式可以說還只是IO過程的描述。實際副作用產生是在interpreter里:
1 def collect: IO[IndexedSeq[O]] = { //讀取數據源返回IO[IndexedSeq[O]], 用IO.run來實際運算
2 def tryOr[A](a: => A)(cleanup: IO[Unit]): A = //運算表達式a, 出現異常立即清理現場
3 try a catch {case e: Exception => cleanup.run; throw e} 4 @annotation.tailrec //這是個尾遞歸算法,根據trans狀態
5 def go(acc: IndexedSeq[O], cleanup: IO[Unit], step: IO[Option[I]], trans: Process[I,O]): IndexedSeq[O] =
6 trans match { 7 case Halt() => cleanup.run; acc //停止狀態,清理現場
8 case Emit(out,next) => go(tryOr(out +: acc)(cleanup), cleanup, step, next) //積累acc
9 case Await(iproc) => tryOr(step.run)(cleanup) match { 10 case None => cleanup.run; acc //讀完了清理現場
11 case si => go(acc,cleanup,step,iproc(si)) //讀入元素作為Process輸入來改變Process狀態
12 } 13 } 14 acquire map {res => go(IndexedSeq(),release(res),step(res),trans)} //開始讀取
15 }
注意:無論讀取完成或中途失敗退出都會導致現場清理以防止資源漏出。可以推斷這個interpreter還是很安全的。
與Source同樣,我們還是用一個獨立的類型Sink來代表數據接收端進行簡單說明:
1 trait Sink[I] { 2 def <|[I2](p: Process[I2,I]): Sink[I2] //p的輸出接到Sink的輸入
3 def filter(f: I => Boolean): Sink[I] = this <| Process.filter(f) //從p接收I
4 def map[I2](f: I2 => I): Sink[I2] = this <| Process.lift(f) //將接收的I2變成I
5 def take(n: Int): Sink[I] = this <| Process.take(n) //從p接收前n個I
6 def takeWhile(f: I => Boolean): Sink[I] = this <| Process.takeWhile(f) 7 def drop(n: Int): Sink[I] = this <| Process.drop(n) //過濾掉首n個I
8 def dropWhile(f: I => Boolean): Sink[I] = this <| Process.dropWhile(f) 9 }
這和Source trait及其相似。注意和Process連接是反向的:由p指向Sink。
同樣,一個只寫的資源實例如下:
1 case class ResourceW[R,I,I2]( //只寫資源
2 acquire: IO[R], //資源使用門戶, resource handle
3 release: R => IO[Unit], //清理函數
4 rcvf: R => (I2 => IO[Unit]), //接收方式
5 trans: Process[I,I2] //處理過程
6 ) extends Sink[I] { 7 def <|[I2](p: Process[I2,I]): Sink[I2] =
8 ResourceW(acquire,release,rcvf,p |> trans) //制造一個ResourceW實例,由p到trans
9 }
這個也和ResourceR相似。還是與Process連接方式是反方向的:由p到trans。
以下是一個向文件寫入的Sink組件:
1 object Sink { 2 import java.io._ 3 def file(fileName: String, append: Boolean = false): Sink[String] = //結果是Sink[String]。必須用interpreter來運算
4 ResourceW( //是一個ResourceW實例
5 IO {new FileWriter(fileName,append) }, //創建FileWriter
6 (w: FileWriter) => IO {w.close}, //釋放FileWriter
7 (w: FileWriter) => (s: String) => IO {w.write(s)}, //寫入
8 Process.passUnchanged //不處理寫入數據
9 ) 10 }
在學習過程中發現,獨立於Process類型的Source,Sink類型使IO算法的表達式類型的集成很困難。這也限制了組件的功能。我們無法實現泛函編程簡潔高雅的表達形式。在下面的討論中我們會集中精力分析具備數據源功能的Process,希望在表達方式上能有所進步。