泛函編程(36)-泛函Stream IO:IO數據源-IO Source & Sink


  上期我們討論了IO處理過程:Process[I,O]。我們說Process就像電視信號盒子一樣有輸入端和輸出端兩頭。Process之間可以用一個Process的輸出端與另一個Process的輸入端連接起來形成一串具備多項數據處理功能的完整IO過程。但合成的IO過程兩頭輸入端則需要接到一個數據源,而另外一端則可能會接到一個數據接收設備如文件、顯示屏等。我們在這篇簡單地先介紹一下IO數據源Source和IO數據接收端Sink。

我們先用一個獨立的數據類型來代表數據源Source進行簡單的示范說明,這個類型與Process類型沒有任何關系:

 1 import ProcessLib._  2 object SourceSink {  3 trait Source[O] {  //以下helper function都是把Source當作O類的List處理
 4   def |>[O2](p: Process[O,O2]): Source[O2]   //粘接一個Process p. 向其輸入O
 5   def filter(f: O => Boolean): Source[O] = this |> Process.filter(f) //向p輸入O
 6   def map[O2](f: O => O2): Source[O2] = this |> Process.lift(f)  7   def take(n: Int): Source[O] = this |> Process.take(n)  //截取前n個O
 8   def takeWhile(f: O => Boolean): Source[O] = this |> Process.takeWhile(f)  9   def drop(n: Int): Source[O] = this |> Process.drop(n) //跳過前n個O
10   def dropWhile(f: O => Boolean): Source[O] = this |> Process.dropWhile(f) 11 }

從以上trait可以看到:Source的工作原理就是把一個Process的輸入黏貼到Source的輸出端。我們可以用這個 |> 把一串Process粘到Source的輸出,如:Src.proc1.proc2.proc3。不過我們得先把proc1,proc2,proc3定義成Source組件函數,因為Source是一個完全獨立的類型。

我們再來看看一個Source特殊案例:

1 case class ResourceR[R,I,O](   //Source的一個只讀資源案例
2  acquire: IO[R],   //資源使用門戶 resource handle
3  release: R => IO[Unit], //完成使用資源后的清理函數
4  step: R => IO[Option[I]], //資源內容讀取函數
5  trans: Process[I,O]  //輸出方式
6  ) extends Source[O] { 7      def |>[O2](p: Process[O,O2]): Source[O2] =  //實現抽象函數
8        ResourceR(acquire,release,step,trans |> p) //每次輸入都產生一個ResourceR.它的trans與p進行管道對接
9  }

這是個只讀的數據源。我們看到所有的動作都被包嵌在IO類型里,這樣可以把副作用的產生延后到一些Source Interpreter來運算。這里我們只要用最簡單的IO來說明就可以了:

 1 trait IO[A] { self =>
 2  def run: A  3     def map[B](f: A => B): IO[B] =
 4       new IO[B] { def run = f(self.run) }  5     def flatMap[B](f: A => IO[B]): IO[B] =
 6       new IO[B] { def run = f(self.run).run }  7 }  8 object IO {  9     def unit[A](a: => A): IO[A] = new IO[A] { def run = a } 10     def flatMap[A,B](fa: IO[A])(f: A => IO[B]) = fa flatMap f 11     def apply[A](a: => A): IO[A] = unit(a) // syntax for IO { .. }
12 }

這個IO類型我們在前面的討論里曾經練習過。

現在我們來看看一個文件讀取的ResourceR例子:

 1 object Source {  2 import java.io._  3     def lines(fileName: String): Source[String] =  //從文件fileName里讀取String
 4       ResourceR(   //創建一個Source的實例
 5         IO {io.Source.fromFile(fileName) },  //資源
 6         (src: io.Source) => IO { src.close },  //清理
 7         (src: io.Source) => IO {    //讀取
 8             lazy val iterator = src.getLines  9             if (iterator.hasNext) Some(iterator.next) else None //讀完返回None
10  }, 11         Process.passUnchanged) //Process[I,I],讀什么輸入什么
12 }

現在我們可以這樣寫一段程序了:

1  Source.lines("input.txt").count.exists{_ >= 40000 } 2                                                   //> res0: ch15.SourceSink.Source[Boolean] = ResourceR(ch15.SourceSink$IO$$anon$ 3                                                   //| 3@762efe5d,<function1>,<function1>,Await(<function1>))

噢,記住把count和exists放到Source trait里:

1     def exists(f: O => Boolean): Source[Boolean] = this |> Process.exists(f) 2     def count: Source[Int] = this |> Process.count

上面的表達式可以說還只是IO過程的描述。實際副作用產生是在interpreter里:

 1     def collect: IO[IndexedSeq[O]] = {  //讀取數據源返回IO[IndexedSeq[O]], 用IO.run來實際運算
 2          def tryOr[A](a: => A)(cleanup: IO[Unit]): A =  //運算表達式a, 出現異常立即清理現場
 3            try a catch {case e: Exception => cleanup.run; throw e}  4          @annotation.tailrec  //這是個尾遞歸算法,根據trans狀態
 5          def go(acc: IndexedSeq[O], cleanup: IO[Unit], step: IO[Option[I]], trans: Process[I,O]): IndexedSeq[O] =
 6  trans match {  7                case Halt() => cleanup.run; acc  //停止狀態,清理現場
 8                case Emit(out,next) => go(tryOr(out +: acc)(cleanup), cleanup, step, next) //積累acc
 9                case Await(iproc) => tryOr(step.run)(cleanup) match { 10                    case None => cleanup.run; acc  //讀完了清理現場
11                    case si => go(acc,cleanup,step,iproc(si))  //讀入元素作為Process輸入來改變Process狀態
12  } 13  } 14          acquire map {res => go(IndexedSeq(),release(res),step(res),trans)} //開始讀取
15      }

注意:無論讀取完成或中途失敗退出都會導致現場清理以防止資源漏出。可以推斷這個interpreter還是很安全的。

與Source同樣,我們還是用一個獨立的類型Sink來代表數據接收端進行簡單說明:

1 trait Sink[I] { 2      def <|[I2](p: Process[I2,I]): Sink[I2] //p的輸出接到Sink的輸入
3      def filter(f: I => Boolean): Sink[I] = this <| Process.filter(f)  //從p接收I
4      def map[I2](f: I2 => I): Sink[I2] = this <| Process.lift(f) //將接收的I2變成I
5      def take(n: Int): Sink[I] = this <| Process.take(n)  //從p接收前n個I
6      def takeWhile(f: I => Boolean): Sink[I] = this <| Process.takeWhile(f) 7      def drop(n: Int): Sink[I] = this <| Process.drop(n) //過濾掉首n個I
8      def dropWhile(f: I => Boolean): Sink[I] = this <| Process.dropWhile(f) 9  }

這和Source trait及其相似。注意和Process連接是反向的:由p指向Sink。

同樣,一個只寫的資源實例如下:

1 case class ResourceW[R,I,I2](  //只寫資源
2    acquire: IO[R],   //資源使用門戶, resource handle
3    release: R => IO[Unit],  //清理函數
4    rcvf: R => (I2 => IO[Unit]), //接收方式
5    trans: Process[I,I2]  //處理過程
6    ) extends Sink[I] { 7        def <|[I2](p: Process[I2,I]): Sink[I2] =
8          ResourceW(acquire,release,rcvf,p |> trans)    //制造一個ResourceW實例,由p到trans
9    }

這個也和ResourceR相似。還是與Process連接方式是反方向的:由p到trans。

以下是一個向文件寫入的Sink組件:

 1 object Sink {  2  import java.io._  3      def file(fileName: String, append: Boolean = false): Sink[String] = //結果是Sink[String]。必須用interpreter來運算
 4        ResourceW(   //是一個ResourceW實例
 5        IO {new FileWriter(fileName,append) }, //創建FileWriter
 6        (w: FileWriter) => IO {w.close},  //釋放FileWriter
 7        (w: FileWriter) => (s: String) => IO {w.write(s)},  //寫入
 8        Process.passUnchanged    //不處理寫入數據
 9  ) 10  }

在學習過程中發現,獨立於Process類型的Source,Sink類型使IO算法的表達式類型的集成很困難。這也限制了組件的功能。我們無法實現泛函編程簡潔高雅的表達形式。在下面的討論中我們會集中精力分析具備數據源功能的Process,希望在表達方式上能有所進步。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM