akka-stream是基於Actor模式的,所以也繼承了Actor模式的“堅韌性(resilient)”特點,在任何異常情況下都有某種整體統一的異常處理策略和具體實施方式。在akka-stream的官方文件中都有詳細的說明和示范例子。我們在這篇討論里也沒有什么更好的想法和范例,也只能略做一些字面翻譯和分析理解的事了。下面列出了akka-stream處理異常的一些實用方法:
1、recover:這是一個函數,發出數據流最后一個元素然后根據上游發生的異常終止當前數據流
2、recoverWithRetries:也是個函數,在上游發生異常后改選用后備數據流作為上游繼續運行
3、Backoff restart strategy:是RestartSource,RestartFlow,RestartSink的一個屬性。為它們提供“逐步延遲重啟策略”
4、Supervision strategy:是數據流構件的“異常監管策略”屬性。為發生異常的功能階段Stage提供異常情況處理方法
下面我們就用一些代碼例子來示范它們的使用方法:
1、recover:Flow[T].recover函數的款式如下:
/** * Recover allows to send last element on failure and gracefully complete the stream * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. * * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element * * '''Backpressures when''' downstream backpressures * * '''Completes when''' upstream completes or upstream failed with exception pf can handle * * '''Cancels when''' downstream cancels * */ def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T] = via(Recover(pf))
下面是一個用例:
Source(0 to 10).map { n =>
if (n < 5) n.toString else throw new Exception("Boooommm!") }.recover{ case e: Exception => s"truncate stream: ${e.getMessage}" }.runWith(Sink.foreach(println))
運算結果:
0
1
2
3
4 truncate stream: Boooommm!
2、recoverWithRetries:看看它的函數款式:
/** * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't * attempt to recover at all. * * A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * * Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically. * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * * '''Backpressures when''' downstream backpressures * * '''Completes when''' upstream completes or upstream failed with exception pf can handle * * '''Cancels when''' downstream cancels * * @param attempts Maximum number of retries or -1 to retry indefinitely * @param pf Receives the failure cause and returns the new Source to be materialized if any * @throws IllegalArgumentException if `attempts` is a negative number other than -1 * */ def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = via(new RecoverWith(attempts, pf))
attempts代表發生異常過程中嘗試恢復次數,0代表不嘗試恢復,直接異常中斷。<0代表無限嘗試次數。下面是一個用例示范:
val backupSource = Source(List("five","six","seven","eight","nine")) Source(0 to 10).map { n =>
if (n < 5) n.toString else throw new RuntimeException("Boooommm!") }.recoverWithRetries(attempts = 1, { case e: RuntimeException => backupSource } ).runWith(Sink.foreach(println))
運算結果:
0
1
2
3
4 five six seven eight nine
3、Backoff-Restart-Strategy:aka-stream預設定了RestartSource,RestartFlow,RestartSink來在Source,Flow,Sink節點實現“逐步延遲重啟策略”,即采取一種逐步延后重啟時間點的方式來避免多個進程同時爭取某一項資源。下面是這三個類型的定義:
/** * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. * * They are useful for graphs that need to run for longer than the [[Source]] can necessarily guarantee it will, for * example, for [[Source]] streams that depend on a remote server that may crash or become partitioned. The * RestartSource ensures that the graph can continue running while the [[Source]] restarts. */
object RestartSource { /** * Wrap the given [[Source]] with a [[Source]] that will restart it when it fails or complete using an exponential * backoff. * * This [[Source]] will never emit a complete or failure, since the completion or failure of the wrapped [[Source]] * is always handled by restarting it. The wrapped [[Source]] can however be cancelled by cancelling this [[Source]]. * When that happens, the wrapped [[Source]], if currently running will be cancelled, and it will not be restarted. * This can be triggered simply by the downstream cancelling, or externally by introducing a [[KillSwitch]] right * after this [[Source]] in the graph. * * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * * @param minBackoff minimum (initial) duration until the child actor will * started again, if it is terminated * @param maxBackoff the exponential back-off is capped to this duration * @param randomFactor after calculation of the exponential back-off an additional * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. * @param sourceFactory A factory for producing the [[Source]] to wrap. */ def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sourceFactory: () ⇒ Source[T, _]): Source[T, NotUsed] = { Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor)) } } /** * A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails. * * They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for * example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The * RestartFlow ensures that the graph can continue running while the [[Flow]] restarts. */
object RestartFlow { /** * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential * backoff. * * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] * will be allowed to terminate without being restarted. * * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * * @param minBackoff minimum (initial) duration until the child actor will * started again, if it is terminated * @param maxBackoff the exponential back-off is capped to this duration * @param randomFactor after calculation of the exponential back-off an additional * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. * @param flowFactory A factory for producing the [[Flow]] to wrap. */ def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(flowFactory: () ⇒ Flow[In, Out, _]): Flow[In, Out, NotUsed] = { Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor)) } } /** * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails. * * They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for * example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The * RestartSink ensures that the graph can continue running while the [[Sink]] restarts. */
object RestartSink { /** * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential * backoff. * * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the * graph. * * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already * sent may have been lost. * * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * * @param minBackoff minimum (initial) duration until the child actor will * started again, if it is terminated * @param maxBackoff the exponential back-off is capped to this duration * @param randomFactor after calculation of the exponential back-off an additional * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. * @param sinkFactory A factory for producing the [[Sink]] to wrap. */ def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed] = { Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor)) } }
注意這些withBackoff[T]中的sourceFactor,flowFactor,sinkFactory,是它們構建了目標構件。下面我們就虛構一個由RestartSource,RestartFlow,RestartSink合組成的數據流:
val backoffSource = RestartSource.withBackoff( minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ){ () => Source(List("FileA","FileB","FileC"))} val backoffFlow = RestartFlow.withBackoff( minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ){ () => Flow[String].map(_.toUpperCase())} val backoffSink = RestartSink.withBackoff( minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ){ () => Sink.foreach(println)} backoffSource.via(backoffFlow).to(backoffSink).run()
當然,在現實應用中這幾個構件都可能涉及到一些資源的占用,如數據庫、網絡服務等。下面是運算結果顯示:
FILEA
FILEB
FILEC
FILEA
FILEB
FILEC
FILEA
FILEB
FILEC
這個stream是重復循環的。我們只有通過KillSwitch來手動終止它:
val killSwitch = backoffSource.viaMat(KillSwitches.single)(Keep.right) .viaMat(backoffFlow)(Keep.left) .toMat(backoffSink)(Keep.left) .run() Thread.sleep(1000) killSwitch.shutdown()
4、Supervisor-Strategy:這種模式是受Actor監管策略模式的啟發,在aka-stream的一些功能節點Stage上實現的。對於某些功能節點Stage來說,可能這種監管模式就根本不適用,如連接外部系統的Stage,因為造成異常失敗的因素可能還是會重復造成異常。對於出現異常的stream,Supervisor-Strategy提供了三種處理方法:
Stop:終結stream,返回異常
Resume:越過當前元素,繼續運行
Restart:重新啟動、越過當前元素、清除任何內部狀態
akka-stream的默認異常處理方式是Stop,即立即終止數據流,返回異常。
我們可以通過ActorMaterializerSettings().withSupervisionStrategy以及Flow[T].withAttributes(ActorAttributes.withSupervisionStrategy來設定異常監管策略。下面這個例子使用了ActorMaterializerSettings來設定Supervision:
implicit val mat2 = ActorMaterializer( ActorMaterializerSettings(sys).withSupervisionStrategy(decider) .withInputBuffer(initialSize = 16, maxSize = 16) ) Source(1 to 5).map { n =>
if (n != 3) n.toString else throw new ArithmeticException("no 3 please!") }.runWith(Sink.foreach(println)) Thread.sleep(1000) println("") Thread.sleep(1000) Source(1 to 5).map { n =>
if (n != 5) n.toString else throw new Exception("no 3 please!") }.runWith(Sink.foreach(println))
上面兩個stream分別示范了Resume和Stop策略的效果,如下:
1
2
4
5
1
2
3
4
在下面的這個例子里我們在Flow構件的屬性Attributes里設定了SupervisionStrategy:
val decider : Supervision.Decider = { case _: IllegalArgumentException => Supervision.Restart case _ => Supervision.Stop } val flow = Flow[Int] .scan(0) { (acc, elem) =>
if (elem < 0) throw new IllegalArgumentException("negative not allowed") else acc + elem }.withAttributes(ActorAttributes.supervisionStrategy(decider)) Source(List(1, 3, -1, 5, 7)).via(flow) .runWith(Sink.foreach(println))
以上例子中對異常采用了Restart。從下面的運算結果中我們確定了Restart在重啟過程中清除了內部狀態,也就是說從發生異常的位置開始重新進行計算了:
0
1
4
0
5
12
好了,下面是這次示范涉及的完整源代碼:
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import scala.concurrent.duration._ object ExceptionHandling extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) /* Source(0 to 10).map { n => if (n < 5) n.toString else throw new Exception("Boooommm!") }.recover{ case e: Exception => s"truncate stream: ${e.getMessage}" }.runWith(Sink.foreach(println)) */
/* val backupSource = Source(List("five","six","seven","eight","nine")) Source(0 to 10).map { n => if (n < 5) n.toString else throw new RuntimeException("Boooommm!") }.recoverWithRetries(attempts = 0, { case e: RuntimeException => backupSource } ).runWith(Sink.foreach(println)) val backoffSource = RestartSource.withBackoff( minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ){ () => Source(List("FileA","FileB","FileC"))} val backoffFlow = RestartFlow.withBackoff( minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ){ () => Flow[String].map(_.toUpperCase())} val backoffSink = RestartSink.withBackoff( minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ){ () => Sink.foreach(println)} //backoffSource.via(backoffFlow).to(backoffSink).run() val killSwitch = backoffSource.viaMat(KillSwitches.single)(Keep.right) .viaMat(backoffFlow)(Keep.left) .toMat(backoffSink)(Keep.left) .run() Thread.sleep(1000) killSwitch.shutdown() */
/* val decider: Supervision.Decider = { case _: ArithmeticException => Supervision.Resume case _ => Supervision.Stop } implicit val mat2 = ActorMaterializer( ActorMaterializerSettings(sys).withSupervisionStrategy(decider) .withInputBuffer(initialSize = 16, maxSize = 16) ) Source(1 to 5).map { n => if (n != 3) n.toString else throw new ArithmeticException("no 3 please!") }.runWith(Sink.foreach(println)) Thread.sleep(1000) println("") Thread.sleep(1000) Source(1 to 5).map { n => if (n != 5) n.toString else throw new Exception("no 3 please!") }.runWith(Sink.foreach(println)) */ val decider : Supervision.Decider = { case _: IllegalArgumentException => Supervision.Restart case _ => Supervision.Stop } val flow = Flow[Int] .scan(0) { (acc, elem) =>
if (elem < 0) throw new IllegalArgumentException("negative not allowed") else acc + elem }.withAttributes(ActorAttributes.supervisionStrategy(decider)) Source(List(1, 3, -1, 5, 7)).via(flow) .runWith(Sink.foreach(println)) scala.io.StdIn.readLine() sys.terminate() }