泛函編程(32)-泛函IO:IO Monad


    由於泛函編程非常重視函數組合(function composition),任何帶有副作用(side effect)的函數都無法實現函數組合,所以必須把包含外界影響(effectful)副作用不純代碼(impure code)函數中的純代碼部分(pure code)抽離出來形成獨立的另一個純函數。我們通過代碼抽離把不純代碼逐步抽離向外推並在程序里形成一個純代碼核心(pure core)。這樣我們就可以順利地在這個純代碼核心中實現函數組合。IO Monad就是泛函編程處理副作用代碼的一種手段。我們先用個例子來示范副作用抽離:

1 case class Player(name: String, score: Int) 2 def printWinner(p: Player): Unit = 
3     println(p.name + " is the winner!") 4 def declareWinner(p1: Player, p2: Player): Unit =
5   if (p1.score > p2.score ) printWinner(p1) 6   else printWinner(p2)

很明顯,declareWinner是個包含副作用的函數。它做了兩件事:先對比兩個Player的分數然后打印分數較大的Player。這里打印可以說是一項帶有外作用的函數,我們試着把它分離出來:

1 def printWinner(p: Player): Unit = 
2     println(p.name + " is the winner!") 3 def winner(p1: Player, p2: Player): Player =
4   if (p1.score > p2.score) p1 5   else p2 6 def declareWinner(p1: Player, p2: Player): Unit =
7   printWinner(winner(p1, p2))

我們把分數比較代碼winner分離了出來。我們還可以繼續分解:printWinner也可以被認為做了兩件事:先合成了一條信息,然后打印信息:

1 def winnerMsg(p: Player): String =
2   p.name + " is the winner!"
3 def printWinner(p: Player): Unit = 
4  println(winnerMsg(p)) 5 def winner(p1: Player, p2: Player): Player =
6   if (p1.score > p2.score) p1 7   else p2 8 def declareWinner(p1: Player, p2: Player): Unit =
9   printWinner(winner(p1, p2))

這個例子看起來好像有些幼稚,但它示范了泛函編程的函數分解原理:我們並沒有改變程序的功能,只是對程序代碼進行了分解。把程序分解成更細的函數。實際上任何一個包含副作用的函數內都會有純函數等待我們去分解。用一種代數關系表達就是:任何一個函數 A => B 都可以被分解成:

1、一個純函數:A => D, 這里D只是一個功能描述表達式

2、一個帶副作用的非純函數: D => B, 它可以被視為D的解譯器(interpreter),把描述解譯成有副作用的指令

在泛函編程中我們會持續進行這種函數分解(factoring),把含有副作用的代碼分解提取出來向外推形成一個副作用代碼層。這個非純函數形成的代碼層內部則是經分解形成的純代碼核心。最后我們到達了那些表面上已經無可分解的非純函數如:println,它的類型是String => Unit, 接下去我們應該怎么辦呢?

實際上通過增加一個新的數據類型IO我們甚至可以對println進行分解:

 1 trait IO {def run: Unit }  2 def printLine(line: String) : IO = new IO {  3     def run = println(line)  4 }  5 def printWinner(p: Player): IO = 
 6  printLine(winnerMsg(p))  7 case class Player(name: String, score: Int)  8 def winnerMsg(p: Player): String =
 9   p.name + " is the winner!"
10 def winner(p1: Player, p2: Player): Player =
11   if (p1.score > p2.score) p1 12   else p2 13 def declareWinner(p1: Player, p2: Player): Unit =
14   printWinner(winner(p1, p2))

現在函數printWinner已經變成了純函數,它返回了一個IO值:這個IO值只是對一個副作用的描述但並沒有運行它。只有這個IO類型的解譯器(interpreter)在運算這個IO值時才會真正產生相應的副作用。

這里涉及到一些大的概念:編寫IO程序和運算IO值是相互分離的過程(separation of concern)。我們的IO程序用一系列表達式描述了要求的IO功能,而IO interpreter實現這些功能的方式可以是多樣的,包括:外設讀寫,文件、數據庫讀寫及並行讀取等等。如何實現這些IO功能與IO程序編寫無任何關系。

現在,有了這個IO類型,我們可以放心地用函數組合的泛函編程方式圍繞着這個IO類型來編寫IO程序,因為我們知道通過這個IO類型我們把副作用的產生推延到IO程序之外的IO解譯器里,而IO編程與解譯器是兩個各自獨立的程序。

泛函模式的IO編程就是把IO功能表達和IO副作用產生分開設計:IO功能描述使用基於IO Monad的Monadic編程語言,充分利用函數組合進行。而產生副作用的IO實現則推延到獨立的Interpreter部分。當然,Interpreter也有可能繼續分解,把產生副作用代碼再抽離向外推延,這樣我們還可以對Interpreter的純代碼核心進行函數組合。

我們上面的簡版IO類型只代表輸出類型(output type)。Input類型需要一個存放輸入值的變量。在泛函編程模式里變量是用類型參數代表的:

1 trait IO[+A] { self =>
2  def run: A 3     def map[B](f: A => B): IO[B] =
4       new IO[B] { def run = f(self.run)} 5     def flatMap[B](f: A => IO[B]): IO[B] =
6       new IO[B] {def run = f(self.run).run} 7 }

我們用run來對IO值進行計算。在上面我們已經實現了map和flatMap函數,所以這個IO類型就是個Monad。看下面:

1 object IO extends Monad[IO] { 2     def unit[A](a: A) = new IO[A] {def run = a} 3     def flatMap[A,B](ma: IO[A])(f: A => IO[B]) = ma flatMap f 4     def map[A,B](ma: IO[A])(f: A => B) = ma map f 5     def apply[A](a: A) = unit(a)  //IO構建器,可以實現 IO {...}
6 }

既然IO類型是個Monad類型,那么我們就可以使用monadic語言編程了:

1 def ReadLine: IO[String] = IO { readLine } 2 def PrintLine(msg: String): IO[Unit] = IO { println(msg) } 3 def fahrenheitToCelsius(f: Double): Double =
4   (f -32) * 5.0 / 9.0
5 def converter: IO[Unit] = for { 6     _ <- PrintLine("Enter a temperature in degrees fahrenheit:") 7     d <- ReadLine.map(_.toDouble) 8     _ <- PrintLine(fahrenheitToCelsius(d).toString) 9 } yield ()

我們再來看看這個IO類型:IO[A] { def run: A },從類型款式來看我們只知道IO[A]類型值是個延后值,因為A值是通過調用函數run取得的。實際情況是run在運算A值時run函數里的純代碼向程序外的環境提請一些運算要求如輸入(readLine),然后把結果傳遞到另外一些純代碼;然后這些純代碼有可能又向外提請。我們根本無法確定副作用是在那個環節產生的。如此可以確定,這個IO類型無法完整地表達IO運算。我們必需對IO類型進行重新定義:

1 trait IO[A] {def run: A} 2 case class Pure[+A](a: A) extends IO[A] 3 case class Request[Extenal[_],I,A](expr: Extenal[I], cont: I => IO[A]) extends IO[A]

這個IO類型把純代碼與副作用代碼分開兩種IO運算狀態:IO運算可以是一個純函數值,或者是一個外部副作用運算請求。這個External類型定義了外部副作用運算方式,它決定了我們程序能獲得什么樣的外部副作用運算。這個External[I]就像一個表達式,但只能用外部運算IO的程序來運算它。cont函數是個接續函數,它決定了獲取External[I]運算結果后接着該做些什么。

現在我們可以明確分辨一個運算中的純函數和副作用函數。但是我們還無法控制External類型的行為。External[I]可以代表一個簡單的推延值,如下:

1 trait Runnable[A] { def run: A } 2 object Delay { 3     def apply[A](a: A) = new Runnable[A] { def run = a} 4 } 5 Delay {println("SIDE EFFECTS!!!")}

如上所示,任何副作用都可以被放入Delay。如果我們希望更好控制使用外界影響,可以把External的選項作為IO的類參數:

1 trait IO[F[_],+A] {} 2 case class Pure[F[_],+A](get: A) extends IO[F,A] 3 case class Request[F[_],I,+A](expr: F[I], cont: I => IO[F,A]) extends IO[F,A]

我們只是把External換成了F,然后把它放進IO類型參數。現在我們可以通過定義不同的F類型來獲取不同的副作用,例如:

1 trait Console[A] 2 case object ReadLine extends Console[Option[String]] 3 case class PrintLine(msg: String) extends Console[Unit]

現在通過IO[Console,A]我們獲得了只能對鍵盤顯示屏進行讀寫的副作用。當然,我們還可以定義文件、數據庫、網絡讀寫這些IO能力的F類型。所以我們通過定義F來規范使用副作用。注意,即使在Console類型我們也無法獲知副作用是否的確產生,這部分是由F類型的Interpreter在運算IO程序時才確定的。這不又是Free Monad分開獨立考慮(separation of concern)的Interpreter部分嘛。這是我們可以把這部分延后分開考慮。

我們先看看如何計算這個IO類型的值:

 1 trait Run[F[_]] {  2  def apply[A](expr: F[A]): (A, Run[F])  3 }  4 object IO {  5     def run[F[_],A](R: Run[F])(io: IO[F,A]): A = io match {  6         case Pure(a) => a  7         case Request(expr, cont) => R(expr) match {  8           case (a,r2) => run(r2)(cont(a))  9  } 10  } 11 }

可以看出這個run函數是個遞歸算法:先計算F值然后再遞歸調用run運算所產生的IO值。

我們現在可以創建一個F類型的實例然后運算IO:

 1 trait Console[A]  2 case object ReadLine extends Console[Option[String]]  3 case class PrintLine(msg: String) extends Console[Unit]  4 
 5 object RunConsole extends Run[Console] {  6     def apply[A](c: Console[A]): (A, Run[Console]) = c match {  7         case ReadLine => {  8             val r = try Some(readLine) catch { case _ => None }  9  (r, RunConsole) 10  } 11         case PrintLine(m) => (println(m),RunConsole) 12  } 13 } 14 IO.run(RunConsole)(ioprg)

實際上這個IO類型是個Monad,因為我們可以實現它的unit和flatMap函數:

 1 trait IO[F[_],A] {  2  def unit(a: A) = Pure(a)  3  def flatMap[B](f: A => IO[F,B]): IO[F,B] = this match {  4      case Pure(a) => f(a)  5 // case Request(expr,cont) => Request(expr, cont andThen (_ flatMap f))
 6   case Request(expr,cont) => Request(expr, (x: Any) => cont(x) flatMap f)  7  }  8  def map[B](f: A => B): IO[F,B] = flatMap(a => Pure(f(a)))  9 } 10 case class Pure[F[_],A](get: A) extends IO[F,A] 11 case class Request[F[_],I,A](expr: F[I], cont: I => IO[F,A]) extends IO[F,A]

 它的Monad實例如下:

1 def ioMonad[F[_]] = new Monad[({type l[x] = IO[F, x]})#l] { 2   def unit[A](a: A) = Pure(a) 3   def flatMap[A,B](fa:IO[F,A])(f: A => IO[F,B]): IO[F,B] = fa flatMap f 4   def map[A,B](fa: IO[F,A])(f: A => B): IO[F,B] = fa map f 5 
6 }

我們可以把這個IO類型的運算方式再概括一點:只要F類型是個Monad,那么我們就可以運算IO值:

 1 trait IO[F[_],A] {  2  def unit(a: A) = Pure(a)  3  def flatMap[B](f: A => IO[F,B]): IO[F,B] = this match {  4      case Pure(a) => f(a)  5 // case Request(expr,cont) => Request(expr, cont andThen (_ flatMap f))
 6   case Request(expr,cont) => Request(expr, (x: Any) => cont(x) flatMap f)  7  }  8  def map[B](f: A => B): IO[F,B] = flatMap(a => Pure(f(a)))  9  def runM[F[_],A](F: Monad[F])(io: IO[F,A]): F[A] = io match { 10       case Pure(a) => F.unit(a) 11 // case Request(expr, cont) => F.flatMap(expr)(cont andThen (_.runM(F)(io)))
12       case Request(expr, cont) => F.flatMap(expr)(x => cont(x).runM(F)(io)) 13  } 14 } 15 case class Pure[F[_],A](get: A) extends IO[F,A] 16 case class Request[F[_],I,A](expr: F[I], cont: I => IO[F,A]) extends IO[F,A]

有了F類型的Monad實例,函數runM現在能運算IO類型的值了。

在以上的討論過程中我們得出了這樣的結論:F類型代表了IO類型的Interpreter,我們不需要知道它到底產生副作用與否或者怎樣產生。我們用F類型來把副作用的使用推延到F類型的實例。我們的IO程序只是對IO算法的描述。如何運算IO值包括如何使用副作用則在運算Interpreter時才體現。

作為IO算法,首先必須注意防止的就是遞歸算法產生的堆棧溢出問題。運算IO值的runM是個遞歸算法,那我們必須保證至少它是一個尾遞歸算法。當然,我們前面討論的Trampoline類型是最佳選擇。我們可以比較一下IO和Trampoline類型結構:

 1 trait IO[F[_],A] {  2  def unit(a: A) = Pure(a)  3  def flatMap[B](f: A => IO[F,B]): IO[F,B] = this match {  4      case Pure(a) => f(a)  5 // case Request(expr,cont) => Request(expr, cont andThen (_ flatMap f))
 6   case Request(expr,cont) => Request(expr, (x: Any) => cont(x) flatMap f)  7  }  8  def map[B](f: A => B): IO[F,B] = flatMap(a => Pure(f(a)))  9 } 10 case class Pure[F[_],A](get: A) extends IO[F,A] 11 case class Request[F[_],I,A](expr: F[I], cont: I => IO[F,A]) extends IO[F,A] 12 
13 trait Trampoline[A] { 14   def unit(a: A): Trampoline[A] = Done(a) 15   def flatMap[B](f: A => Trampoline[B]): Trampoline[B] = this match { 16     case Done(a) => f(a) 17     case More(k) => k() flatMap f 18  } 19   def map[B](f: A => B): Trampoline[B] = flatMap(a => Done(f(a))) 20 } 21 case class Done[A](a: A) extends Trampoline[A] 22 case class More[A](k: () => Trampoline[A]) extends Trampoline[A] 

 

它們有許多相似點。最主要的是它們都是循環遞歸結構,能實現以heap換stack目的。我們可以把Trampoline類型的算法引進到IO類型中,這樣就可以有效防止StackOverflow問題。實際上IO類型與Trampoline類型的深度抽象類型Free Monad更為相似:

 

 1 trait Free[F[_],A] {  2  private case class FlatMap[B](a: Free[F,A], f: A => Free[F,B]) extends Free[F,B]  3  def unit(a: A): Free[F,A] = Return(a)  4  def flatMap[B](f: A => Free[F,B])(implicit F: Functor[F]): Free[F,B] = this match {  5      case Return(a) => f(a)  6      case Suspend(k) => Suspend(F.map(k)(a => a flatMap f))  7      case FlatMap(b,g) => FlatMap(b, g andThen (_ flatMap f))  8  }  9  def map[B](f: A => B)(implicit F: Functor[F]): Free[F,B] = flatMap(a => Return(f(a))) 10 } 11 case class Return[F[_],A](a: A) extends Free[F,A] 12 case class Suspend[F[_],A](ffa: F[Free[F,A]]) extends Free[F,A]

Free類型的FlatMap結構和IO類型的Request結構極其相像。我們在前面的討論中已經介紹了Free類型:首先它是一個為支持尾遞歸算法而設計的結構,是由一個Functor F產生的Monad。Free的功能由Monad和Interpreter兩部分組成:Monad部分使我們可以使用Monadic編程語言來描述一些算法,Interpreter就是F類型,必須是個Functor,它負責描述副作用行為。只有在運算算法時才真正產生副作用。我們可以直接使用Free類型代表IO運算:用Free的Monadic編程語言來描述IO算法,用Interpreter來描述IO效果,用Free的Trampoline運算機制實現尾遞歸運算。現在我們先看看完整的Free類型:

 1 trait Free[F[_],A] {  2  private case class FlatMap[B](a: Free[F,A], f: A => Free[F,B]) extends Free[F,B]  3  def unit(a: A): Free[F,A] = Return(a)  4  def flatMap[B](f: A => Free[F,B])(implicit F: Functor[F]): Free[F,B] = this match {  5      case Return(a) => f(a)  6      case Suspend(k) => Suspend(F.map(k)(a => a flatMap f))  7   case FlatMap(b,g) => FlatMap(b, g andThen (_ flatMap f))  8  }  9  
10  def map[B](f: A => B)(implicit F: Functor[F]): Free[F,B] = flatMap(a => Return(f(a))) 11  def resume(implicit F: Functor[F]): Either[F[Free[F,A]],A] = this match { 12      case Return(a) => Right(a) 13      case Suspend(k) => Left(k) 14      case FlatMap(a,f) => a match { 15          case Return(b) => f(b).resume 16          case Suspend(k) => Left(F.map(k)(_ flatMap f)) 17          case FlatMap(b,g) => FlatMap(b, g andThen (_ flatMap f)).resume 18  } 19  } 20  def liftF(fa: F[A])(implicit F: Functor[F]): Free[F,A] =
21  Suspend(F.map(fa)(Return(_))) 22 } 23 case class Return[F[_],A](a: A) extends Free[F,A] 24 case class Suspend[F[_],A](ffa: F[Free[F,A]]) extends Free[F,A] 

我們先用Free Monadic編程語言來描述IO算法:

 1 trait Console[A]  2 case class GetLine[A](next: A) extends Console[A]  3 case class PutLine[A](msg: String, next: A) extends Console[A]  4 implicit val consoleFunctor = new Functor[Console]{  5     def map[A,B](ca: Console[A])(f: A => B): Console[B] = ca match {  6         case GetLine(a) => GetLine(f(a))  7         case PutLine(m,a) => PutLine(m,f(a))  8  }  9 }                                                 //> consoleFunctor : ch13.ex3.Functor[ch13.ex3.Console] = ch13.ex3$$anonfun$ma 10                                                   //| in$1$$anon$1@53e25b76
11 type ConsoleIO[A] = Free[Console,A] 12 implicit def liftConsole[A](ca: Console[A]) = Free.liftF(ca) 13                                                   //> liftConsole: [A](ca: ch13.ex3.Console[A])ch13.ex3.Free[ch13.ex3.Console,A]
14 def putLine(msg: String) = PutLine(msg,())        //> putLine: (msg: String)ch13.ex3.PutLine[Unit]
15 def getLine = GetLine(())                         //> getLine: => ch13.ex3.GetLine[Unit]
16 val ioprg:ConsoleIO[Unit] = for { 17  _ <- putLine("What is your first name ?") 18  first <- getLine 19  _ <- putLine("What is your last name ?") 20  last <- getLine 21  _ <- putLine(s"Hello, $first $last !") 22 } yield()                                         //> ioprg : ch13.ex3.Free[ch13.ex3.Console,Unit] = Suspend(PutLine(What is you 23                                                   //| r first name ?,Suspend(GetLine(Suspend(PutLine(What is your last name ?,Sus 24                                                   //| pend(GetLine(Suspend(PutLine(Hello, () () !,Return(())))))))))))

現在我們用Monadic編程語言描述了一個IO程序,下一步就是運算這個IO程序從而獲得它的值。如何運算IO值是Interpreter的功能。這個過程可能會產生副作用。至於如何產生副作用,產生什么樣的副作用則由Interpreter程序描述。IO值運算過程就是一個由Monadic IO功能描述到IO影響產生方式Interpret語句的語言轉換(interpret,翻譯)。我們可以來看看這個運算函數:

 1 trait ~>[F[_],G[_]]{  2  def apply[A](fa: F[A]): G[A]  3 }  4 trait Free[F[_],A] {  5  private case class FlatMap[B](a: Free[F,A], f: A => Free[F,B]) extends Free[F,B]  6  def unit(a: A): Free[F,A] = Return(a)  7  def flatMap[B](f: A => Free[F,B])(implicit F: Functor[F]): Free[F,B] = this match {  8      case Return(a) => f(a)  9      case Suspend(k) => Suspend(F.map(k)(a => a flatMap f)) 10   case FlatMap(b,g) => FlatMap(b, g andThen (_ flatMap f)) 11  } 12  
13  def map[B](f: A => B)(implicit F: Functor[F]): Free[F,B] = flatMap(a => Return(f(a))) 14  def resume(implicit F: Functor[F]): Either[F[Free[F,A]],A] = this match { 15      case Return(a) => Right(a) 16      case Suspend(k) => Left(k) 17      case FlatMap(a,f) => a match { 18          case Return(b) => f(b).resume 19          case Suspend(k) => Left(F.map(k)(_ flatMap f)) 20          case FlatMap(b,g) => FlatMap(b, g andThen (_ flatMap f)).resume 21  } 22  } 23  def foldMap[G[_]](f: (F ~> G))(implicit F: Functor[F], G: Monad[G]): G[A] = resume match { 24        case Right(a) => G.unit(a) 25        case Left(k) => G.flatMap(f(k))(_ foldMap f) 26  } 27 } 28 case class Return[F[_],A](a: A) extends Free[F,A] 29 case class Suspend[F[_],A](ffa: F[Free[F,A]]) extends Free[F,A] 30 object Free { 31  def liftF[F[_],A](fa: F[A])(implicit F: Functor[F]): Free[F,A] =
32  Suspend(F.map(fa)(Return(_))) 33 
34 }

這個foldMap就是一個IO程序運算函數。由於它是一個循環計算,所以通過resume函數引入Trampoline尾遞歸計算方式來保證避免StackOverflow問題發生。foldMap函數將IO描述語言F解譯成可能產生副作用的G語言。在解譯的過程中逐步用flatMap運行非純代碼。

我們可以用Free Monad的結構替代IO類型結構,這樣我們就可以用Monadic編程語言來描述IO程序。至於實際的IO副作用如何,我們只知道產生副作用的Interpret程序是個Monad,其它一無所知。

現在我們可以進入Interpreter編程了:

 1 type Id[A] = A  2 implicit val idMonad = new Monad[Id] {  3     def unit[A](a: A): A = a  4     def flatMap[A,B](fa: A)(f: A => B): B = f(fa)  5 }  6 object ConsoleEffect extends (Console ~> Id) {  7     def apply[A](c: Console[A]): A = c match {  8         case GetLine(n) => readLine ; n  9         case PutLine(m,n) => println(m); n 10  } 11 } 12 ioprg.foldMap(ConsoleEffect)

我們說過:運算IO值就是把IO程序語言逐句翻譯成產生副作用的Interpreter語言這個過程。在以上例子里我們采用了Id Monad作為Interpreter語言。Id Monad的flatMap不做任何事情,所以IO程序被直接對應到基本IO函數readLine, println上了。

我們也可以把副作用變成對List進行讀寫:

 1 case class InOutLog(inLog: List[String], outLog: List[String])  2 case class Logger[A](runLogger: InOutLog => (A, InOutLog))  3 object MockConsole extends (Console ~> Logger) {  4     def apply[A](c: Console[A]): Logger[A] = Logger[A](  5       s => (c, s) match {  6           case (GetLine(n), InOutLog(in,out)) => (in.head, InOutLog(in.tail,out))  7           case (PutLine(l,n), InOutLog(in,out)) => ((), InOutLog(in, l :: out))  8  }  9  ) 10 } 11 
12 val s = ioprg.foldMap(MockConsole) 13 s.runLogger(InOutLog(List("Tiger","Chan"),Nil))

只要我們在Interpret程序里把GetLine,PutLine對應到InLog,OutLog兩個List的讀寫。

如果我們需要采用無獨占(Non-blocking)讀寫副作用的話可以這樣改寫Interpreter:

 1 object NonBlockingIO extends(Console ~> Future) {  2     def apply[A](c: Console[A]): Future[A] = c match {  3         case GetLine(n) => Future.unit {  4             try Some(readLine) catch {case _: Exception => None}  5  }  6         case PutLine(n,l) => Future.unit{  7  println(l)  8  }  9  } 10 }

從以上的討論我們得出:IO類型可以用Free類型代替,這樣我們可以充分利用Free Monad的Monadic編程語言編寫IO程序,我們又可以分開考慮編寫可能產生IO副作用的Interpreter。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM