在現實應用中akka-stream往往需要集成其它的外部系統形成完整的應用。這些外部系統可能是akka系列系統或者其它類型的系統。所以,akka-stream必須提供一些函數和方法來實現與各種不同類型系統的信息交換。在這篇討論里我們就介紹幾種通用的信息交換方法和函數。
akka-stream提供了mapAsync+ask模式可以從一個運算中的數據流向外連接某個Actor來進行數據交換。這是一種akka-stream與Actor集成的應用。說到與Actor集成,聯想到如果能把akka-stream中復雜又消耗資源的運算任務交付給Actor,那么我們就可以充分利用actor模式的routing,cluster,supervison等等特殊功能來實現分布式高效安全的運算。下面就是這個mapAsync函數定義:
/** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. The function returns a `Future` and the * value of that future will be emitted downstream. The number of Futures * that shall run in parallel is given as the first argument to ``mapAsync``. * These Futures may complete in any order, but the elements that * are emitted downstream are in the same order as received from upstream. * * If the function `f` throws an exception or if the `Future` is completed * with failure and the supervision decision is [[akka.stream.Supervision.Stop]] * the stream will be completed with failure. * * If the function `f` throws an exception or if the `Future` is completed * with failure and the supervision decision is [[akka.stream.Supervision.Resume]] or * [[akka.stream.Supervision.Restart]] the element is dropped and the stream continues. * * The function `f` is always invoked on the elements in the order they arrive. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * * '''Emits when''' the Future returned by the provided function finishes for the next element in sequence * * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream * backpressures or the first future is not completed * * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted * * '''Cancels when''' downstream cancels * * @see [[#mapAsyncUnordered]] */ def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T]): Repr[T] = via(MapAsync(parallelism, f))
mapAsync把一個函數f: Out=>Future[T]在parallelism個Future里並行運算。我們來看看ask的款式:
def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = internalAsk(message, timeout, sender)
剛好是 T=>Future[T]這樣的款式。所以我們可以用下面這種方式從Stream里與Actor溝通:
stream.mapAsync(parallelism = 5)(elem => (ref ? elem).mapTo[String])
在以上的用例里Stream的每一個元素都通過ref ? elem發送給了ActorRef在一個Future里運算,這個Actor完成運算后返回Future[String]類型結果。值得注意的是mapAsync通過這個返回的Future來實現stream backpressure,也就是說這個運算Actor必須返回結果,否則Stream就會掛在那里了。下面我們先示范一下mapAsync的直接應用:
import akka.actor._ import akka.pattern._ import akka.stream._ import akka.stream.scaladsl._ import akka.routing._ import scala.concurrent.duration._ import akka.util.Timeout object StorageActor { case class Query(rec: Int, qry: String) //模擬數據存寫Query
class StorageActor extends Actor with ActorLogging { //模擬存寫操作Actor
override def receive: Receive = { case Query(num,qry) => val reply = s"${self.path} is saving: [$qry]" sender() ! reply //必須回復mapAsync, 抵消backpressure
reply } } def props = Props(new StorageActor) } object MapAsyncDemo extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val storageActor = sys.actorOf(StorageActor.props,"dbWriter") implicit val timeout = Timeout(3 seconds) Source(Stream.from(1)).delay(1.second,DelayOverflowStrategy.backpressure) .mapAsync(parallelism = 3){ n => (storageActor ? StorageActor.Query(n,s"Record#$n")).mapTo[String] }.runWith(Sink.foreach(println)) scala.io.StdIn.readLine() sys.terminate() }
在這個例子里parallelism=3,我們在StorageActor里把當前運算中的實例返回並顯示出來:
akka://demoSys/user/dbWriter is saving: [Record#1]
akka://demoSys/user/dbWriter is saving: [Record#2]
akka://demoSys/user/dbWriter is saving: [Record#3]
akka://demoSys/user/dbWriter is saving: [Record#4]
akka://demoSys/user/dbWriter is saving: [Record#5]
akka://demoSys/user/dbWriter is saving: [Record#6]
...
可以看到:mapAsync只調用了一個Actor。那么所謂的並行運算parallelism=3的意思就只能代表在多個Future線程中同時運算了。為了實現對Actor模式特點的充分利用,我們可以通過router來實現在多個actor上並行運算。Router分pool和group兩種類型:pool類router自己構建routees,group類型則調用已經構建的Actor。在我們這次的測試里只能使用group類型的Router,因為如果需要對routee實現監管supervision的話,pool類型的router在routee終止時會自動補充構建新的routee,如此就避開了監管策略。首先增加StorageActor的routing功能:
val numOfActors = 3 val routees: List[ActorRef] = List.fill(numOfActors)( //構建3個StorageActor
sys.actorOf(StorageActor.props)) val routeePaths: List[String] = routees.map{ref => "/user/"+ref.path.name} val storageActorPool = sys.actorOf( RoundRobinGroup(routeePaths).props() .withDispatcher("akka.pool-dispatcher") ,"starageActorPool" ) implicit val timeout = Timeout(3 seconds) Source(Stream.from(1)).delay(1.second,DelayOverflowStrategy.backpressure) .mapAsync(parallelism = 1){ n => (storageActorPool ? StorageActor.Query(n,s"Record#$n")).mapTo[String] }.runWith(Sink.foreach(println))
我們使用了RoundRobinGroup作為智能任務分配方式。注意上面的parallelism=1:現在不需要多個Future了。再看看運行的結果顯示:
akka://demoSys/user/$a is saving: [Record#1]
akka://demoSys/user/$b is saving: [Record#2]
akka://demoSys/user/$c is saving: [Record#3]
akka://demoSys/user/$a is saving: [Record#4]
akka://demoSys/user/$b is saving: [Record#5]
akka://demoSys/user/$c is saving: [Record#6]
akka://demoSys/user/$a is saving: [Record#7]
可以看到現在運算任務是在a,b,c三個Actor上並行運算的。既然是模擬數據庫的並行存寫動作,我們可以試着為每個routee增加逐步延時重啟策略BackOffSupervisor:
object StorageActor { case class Query(rec: Int, qry: String) //模擬數據存寫Query
class DbException(cause: String) extends Exception(cause) //自定義存寫異常
class StorageActor extends Actor with ActorLogging { //存寫操作Actor
override def receive: Receive = { case Query(num,qry) =>
var res: String = ""
try { res = saveToDB(num,qry) } catch { case e: Exception => Error(num,qry) //模擬操作異常
} sender() ! res case _ => } def saveToDB(num: Int,qry: String): String = { //模擬存寫函數
val msg = s"${self.path} is saving: [$qry#$num]"
if ( num % 3 == 0) Error(num,qry) //模擬異常
else { log.info(s"${self.path} is saving: [$qry#$num]") s"${self.path} is saving: [$qry#$num]" } } def Error(num: Int,qry: String): String = { val msg = s"${self.path} is saving: [$qry#$num]" sender() ! msg throw new DbException(s"$msg blew up, boooooom!!!") } //驗證異常重啟 //BackoffStrategy.onStop goes through restart process
override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting ${self.path.name} on ${reason.getMessage}") super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { log.info(s"Restarted ${self.path.name} on ${reason.getMessage}") super.postRestart(reason) } override def postStop(): Unit = { log.info(s"Stopped ${self.path.name}!") super.postStop() } //BackOffStrategy.onFailure dosn't go through restart process
override def preStart(): Unit = { log.info(s"PreStarting ${self.path.name} ...") super.preStart() } } def props = Props(new StorageActor) } object StorageActorGuardian { //帶監管策略的StorageActor
def props: Props = { //在這里定義了監管策略和StorageActor構建
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { case _: StorageActor.DbException => SupervisorStrategy.Restart } val options = Backoff.onStop(StorageActor.props, "dbWriter", 100 millis, 500 millis, 0.0) .withManualReset .withSupervisorStrategy( OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)( decider.orElse(SupervisorStrategy.defaultDecider) ) ) BackoffSupervisor.props(options) } } object IntegrateDemo extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val numOfActors = 3 val routees: List[ActorRef] = List.fill(numOfActors)( sys.actorOf(StorageActorGuardian.props)) val routeePaths: List[String] = routees.map{ref => "/user/"+ref.path.name} //獲取ActorPath
val storageActorPool = sys.actorOf( RoundRobinGroup(routeePaths).props() .withDispatcher("akka.pool-dispatcher") ,"starageActorPool" ) implicit val timeout = Timeout(3 seconds) Source(Stream.from(1)).delay(3.second,DelayOverflowStrategy.backpressure) .mapAsync(parallelism = 1){ n => (storageActorPool ? StorageActor.Query(n,s"Record")).mapTo[String] }.runWith(Sink.foreach(println)) scala.io.StdIn.readLine() sys.terminate() }
我們同時增加了模擬異常發生、StorageActor生命周期callback來跟蹤異常發生時SupervisorStrategy.Restart的執行情況。從試運行反饋結果證實Backoff.onFailure不會促發Restart事件,而是直接促發了preStart事件。Backoff.onStop則走Restart過程。Backoff.onFailure是在Actor出現異常終止觸動的,而Backoff.onStop則是目標Actor在任何情況下終止后觸發。值得注意的是,在以上例子里運算Actor會越過造成異常的這個流元素,所以我們必須在preRestart里把這個元素補發給自己:
//驗證異常重啟 //BackoffStrategy.onStop goes through restart process
override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting ${self.path.name} on ${reason.getMessage}") message match { case Some(Query(n,qry)) => self ! Query(n+101,qry) //把異常消息再補發送給自己,n+101更正了異常因素
case _ => log.info(s"Exception message: None") } super.preRestart(reason, message) }
如果我們不需要委托給Actor運算任務的返回結果,可以嘗試用Sink.actorRefWithAck:
/** * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal. * First element is always `onInitMessage`, then stream is waiting for acknowledgement message * `ackMessage` from the given actor which means that it is ready to process * elements. It also requires `ackMessage` message after each stream element * to make backpressure work. * * If the target actor terminates the stream will be canceled. * When the stream is completed successfully the given `onCompleteMessage` * will be sent to the destination actor. * When the stream is completed with failure - result of `onFailureMessage(throwable)` * function will be sent to the destination actor. */ def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any, onFailureMessage: (Throwable) ⇒ Any = Status.Failure): Sink[T, NotUsed] = Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage))
在這里ActorRef只能返回有關backpressure狀態信號。actorRefWithAck自己則返回Sink[T,NotUsed],也就是說它構建了一個Sink。actorRefWithAck使用三種信號來與目標Actor溝通:
1、onInitMessage:stream發送給ActorRef的第一個信號,表示可以開始數據交換
2、ackMessage:ActorRef向stream發出的信號,回復自身准備完畢,可以接收消息,也是一種backpressure卸除消息
3、onCompleteMessage:stream發給ActorRef,通知stream已經完成了所有流元素發送
我們必須修改上個例子中的StorageActor來符合actorRefWithAck的應用和與目標Actor的溝通:
object StorageActor { val onInitMessage = "start" val onCompleteMessage = "done" val ackMessage = "ack"
case class Query(rec: Int, qry: String) //模擬數據存寫Query
class DbException(cause: String) extends Exception(cause) //自定義存寫異常
class StorageActor extends Actor with ActorLogging { //存寫操作Actor
override def receive: Receive = { case `onInitMessage` => sender() ! ackMessage case Query(num,qry) =>
var res: String = ""
try { res = saveToDB(num,qry) } catch { case e: Exception => Error(num,qry) //模擬操作異常
} sender() ! ackMessage case `onCompleteMessage` => //clean up resources 釋放資源
case _ => } def saveToDB(num: Int,qry: String): String = { //模擬存寫函數
val msg = s"${self.path} is saving: [$qry#$num]"
if ( num % 5 == 0) Error(num,qry) //模擬異常
else { log.info(s"${self.path} is saving: [$qry#$num]") s"${self.path} is saving: [$qry#$num]" } } def Error(num: Int,qry: String) = { val msg = s"${self.path} is saving: [$qry#$num]" sender() ! ackMessage throw new DbException(s"$msg blew up, boooooom!!!") } //驗證異常重啟 //BackoffStrategy.onStop goes through restart process
override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting ${self.path.name} on ${reason.getMessage}") message match { case Some(Query(n,qry)) => self ! Query(n+101,qry) //把異常消息再補發送給自己,n+101更正了異常因素
case _ => log.info(s"Exception message: None") } super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { log.info(s"Restarted ${self.path.name} on ${reason.getMessage}") super.postRestart(reason) } override def postStop(): Unit = { log.info(s"Stopped ${self.path.name}!") super.postStop() } //BackOffStrategy.onFailure dosn't go through restart process
override def preStart(): Unit = { log.info(s"PreStarting ${self.path.name} ...") super.preStart() } } def props = Props(new StorageActor) }
StorageActor類里包括了對actorRefWithAck溝通消息onInitMessage、ackMessage、onCompleteMessage的處理。這個Actor只返回backpressure消息ackMessage,而不是返回任何運算結果。注意,在preRestart里我們把造成異常的元素處理后再補發給了自己。Sink.actorRefWithAck的調用方式如下:
Source(Stream.from(1)).map(n => Query(n,s"Record")).delay(3.second,DelayOverflowStrategy.backpressure) .runWith(Sink.actorRefWithAck( storageActorPool, onInitMessage, ackMessage,onCompleteMessage))
完整的運行環境源代碼如下:
object SinkActorRefWithAck extends App { import StorageActor._ implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val storageActor = sys.actorOf(StorageActor.props,"storageActor") val numOfActors = 3 val routees: List[ActorRef] = List.fill(numOfActors)( sys.actorOf(StorageActorGuardian.props)) val routeePaths: List[String] = routees.map{ref => "/user/"+ref.path.name} val storageActorPool = sys.actorOf( RoundRobinGroup(routeePaths).props() .withDispatcher("akka.pool-dispatcher") ,"starageActorPool" ) Source(Stream.from(1)).map(n => Query(n,s"Record")).delay(3.second,DelayOverflowStrategy.backpressure) .runWith(Sink.actorRefWithAck( storageActorPool, onInitMessage, ackMessage,onCompleteMessage)) scala.io.StdIn.readLine() sys.terminate() }
如果一個外部系統向一個數據流提供數據,那我們可以把這個外部系統當作數據流的源頭Source。akka-stream提供了個Source.queque函數來構建一種Source,外部系統可以利用這個Source來向Stream發送數據。Source.queque的函數款式如下:
/** * Creates a `Source` that is materialized as an [[akka.stream.scaladsl.SourceQueue]]. * You can push elements to the queue and they will be emitted to the stream if there is demand from downstream, * otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded * if downstream is terminated. * * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if * there is no space available in the buffer. * * Acknowledgement mechanism is available. * [[akka.stream.scaladsl.SourceQueue.offer]] returns `Future[QueueOfferResult]` which completes with * `QueueOfferResult.Enqueued` if element was added to buffer or sent downstream. It completes with * `QueueOfferResult.Dropped` if element was dropped. Can also complete with `QueueOfferResult.Failure` - * when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. * * The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future` * call when buffer is full. * * You can watch accessibility of stream with [[akka.stream.scaladsl.SourceQueue.watchCompletion]]. * It returns future that completes with success when stream is completed or fail when stream is failed. * * The buffer can be disabled by using `bufferSize` of 0 and then received message will wait * for downstream demand unless there is another message waiting for downstream demand, in that case * offer result will be completed according to the overflow strategy. * * @param bufferSize size of buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = Source.fromGraph(new QueueSource(bufferSize, overflowStrategy).withAttributes(DefaultAttributes.queueSource))
Source.queue構建了一個Source:Source[T,SourceQueueWithComplete[T]],SourceQueueWithComplete類型如下:
/** * This trait adds completion support to [[SourceQueue]]. */ trait SourceQueueWithComplete[T] extends SourceQueue[T] { /** * Complete the stream normally. Use `watchCompletion` to be notified of this * operation’s success. */ def complete(): Unit /** * Complete the stream with a failure. Use `watchCompletion` to be notified of this * operation’s success. */ def fail(ex: Throwable): Unit /** * Method returns a [[Future]] that will be completed if the stream completes, * or will be failed when the stage faces an internal failure or the the [[SourceQueueWithComplete.fail]] method is invoked. */ def watchCompletion(): Future[Done] }
它在SourceQueue的基礎上增加了幾個抽象函數,主要用來向目標數據流發送終止信號包括:complete,fail。watchCompletion是一種監視函數,返回Future代表SourceQueue被手工終止或stream由於某些原因終止運算。下面是SourceQueue定義:
/** * This trait allows to have the queue as a data source for some stream. */ trait SourceQueue[T] { /** * Method offers next element to a stream and returns future that: * - completes with `Enqueued` if element is consumed by a stream * - completes with `Dropped` when stream dropped offered element * - completes with `QueueClosed` when stream is completed during future is active * - completes with `Failure(f)` when failure to enqueue element from upstream * - fails when stream is completed or you cannot call offer in this moment because of implementation rules * (like for backpressure mode and full buffer you need to wait for last offer call Future completion) * * @param elem element to send to a stream */ def offer(elem: T): Future[QueueOfferResult] /** * Method returns a [[Future]] that will be completed if the stream completes, * or will be failed when the stage faces an internal failure. */ def watchCompletion(): Future[Done] }
這個界面支持了SourceQueue的基本操作:offer(elem: T), watchComplete()兩個函數。下面我們就用個例子來示范SourceQueue的使用方法:我們用Calculator actor來模擬外部系統、先用Source.queue構建一個SourceQueue然后再連接下游形成一個完整的數據流。把這個數據流傳給Calculator,這樣Calculator就可以向這個運行中的Stream發送數據了。我們會通過這個過程來示范SourceQueue的基本操作。下面這個Calculator Actor模擬了一個外部系統作為SourceQueue用戶:
object Calculator { trait Operations case class Add(op1:Int, op2:Int) extends Operations case class DisplayError(err: Exception) extends Operations case object Stop extends Operations case class ProduceError(err: Exception) extends Operations def props(inputQueue: SourceQueueWithComplete[String]) = Props(new Calculator(inputQueue)) } class Calculator(inputQueue: SourceQueueWithComplete[String]) extends Actor with ActorLogging{ import Calculator._ import context.dispatcher override def receive: Receive = { case Add(op1,op2) => val msg = s"$op1 + $op2 = ${op1 + op2}" inputQueue.offer(msg) //.mapTo[String]
.recover { case e: Exception => DisplayError(e)} .pipeTo(self) case QueueOfferResult.Enqueued => log.info("QueueOfferResult.Enqueued") case QueueOfferResult.Dropped =>
case QueueOfferResult.Failure(cause) =>
case QueueOfferResult.QueueClosed => log.info("QueueOfferResult.QueueClosed") case Stop => inputQueue.complete() case ProduceError(e) => inputQueue.fail(e) } }
我們看到,Calculator通過傳入的inputQueue把計算結果傳給數據流顯示出來。在receive函數里我們把offer用法以及它可能產生的返回結果通過pipeTo都做了示范。注意:不能使用mapTo[String],因為offer返回Future[T],T不是String,會造成類型轉換錯誤。而我們已經在Source.queue[String]注明了offer(elem) elem的類型是String。inputQueue的構建方式如下:
val source: Source[String, SourceQueueWithComplete[String]] = Source.queue[String](bufferSize = 16, overflowStrategy = OverflowStrategy.backpressure) val inputQueue: SourceQueueWithComplete[String] = source.toMat(Sink.foreach(println))(Keep.left).run() inputQueue.watchCompletion().onComplete { case Success(result) => println(s"Calculator ends with: $result") case Failure(cause) => println(s"Calculator ends with exception: ${cause.getMessage}") }
增加了watchCompetion來監測SourceQueue發出的終止信號。我們還可以看到:以上SoureQueue實例source是支持backpressure的。下面是這個例子的具體運算方式:
object SourceQueueDemo extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val source: Source[String, SourceQueueWithComplete[String]] = Source.queue[String](bufferSize = 16, overflowStrategy = OverflowStrategy.backpressure) val inputQueue: SourceQueueWithComplete[String] = source.toMat(Sink.foreach(println))(Keep.left).run() inputQueue.watchCompletion().onComplete { case Success(result) => println(s"Calculator ends with: $result") case Failure(cause) => println(s"Calculator ends with exception: ${cause.getMessage}") } val calc = sys.actorOf(Calculator.props(inputQueue),"calculator") import Calculator._ calc ! Add(3,5) scala.io.StdIn.readLine calc ! Add(39,1) scala.io.StdIn.readLine calc ! ProduceError(new Exception("Boooooommm!")) scala.io.StdIn.readLine calc ! Add(1,1) scala.io.StdIn.readLine sys.terminate() }
在本次討論里我們了解了akka-stream與外界系統對接集成的一些情況。主要介紹了一些支持Reactive-Stream backpressure的方法。
以下是本次示范的全部源代碼:
MapAsyncDemo.scala:
import akka.actor._ import akka.pattern._ import akka.stream._ import akka.stream.scaladsl._ import akka.routing._ import scala.concurrent.duration._ import akka.util.Timeout object StorageActor { case class Query(rec: Int, qry: String) //模擬數據存寫Query
class DbException(cause: String) extends Exception(cause) //自定義存寫異常
class StorageActor extends Actor with ActorLogging { //存寫操作Actor
override def receive: Receive = { case Query(num,qry) =>
var res: String = ""
try { res = saveToDB(num,qry) } catch { case e: Exception => Error(num,qry) //模擬操作異常
} sender() ! res case _ => } def saveToDB(num: Int,qry: String): String = { //模擬存寫函數
val msg = s"${self.path} is saving: [$qry#$num]"
if ( num % 5 == 0) Error(num,qry) //模擬異常
else { log.info(s"${self.path} is saving: [$qry#$num]") s"${self.path} is saving: [$qry#$num]" } } def Error(num: Int,qry: String): String = { val msg = s"${self.path} is saving: [$qry#$num]" sender() ! msg //卸去backpressure
throw new DbException(s"$msg blew up, boooooom!!!") } //驗證異常重啟 //BackoffStrategy.onStop goes through restart process
override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting ${self.path.name} on ${reason.getMessage}") message match { case Some(Query(n,qry)) => self ! Query(n+101,qry) //把異常消息再補發送給自己,n+101更正了異常因素
case _ => log.info(s"Exception message: None") } super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { log.info(s"Restarted ${self.path.name} on ${reason.getMessage}") super.postRestart(reason) } override def postStop(): Unit = { log.info(s"Stopped ${self.path.name}!") super.postStop() } //BackOffStrategy.onFailure dosn't go through restart process
override def preStart(): Unit = { log.info(s"PreStarting ${self.path.name} ...") super.preStart() } } def props = Props(new StorageActor) } object StorageActorGuardian { //帶監管策略的StorageActor
def props: Props = { //在這里定義了監管策略和StorageActor構建
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { case _: StorageActor.DbException => SupervisorStrategy.Restart } val options = Backoff.onStop(StorageActor.props, "dbWriter", 100 millis, 500 millis, 0.0) .withManualReset .withSupervisorStrategy( OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)( decider.orElse(SupervisorStrategy.defaultDecider) ) ) BackoffSupervisor.props(options) } } object IntegrateDemo extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val numOfActors = 3 val routees: List[ActorRef] = List.fill(numOfActors)( sys.actorOf(StorageActorGuardian.props)) val routeePaths: List[String] = routees.map{ref => "/user/"+ref.path.name} val storageActorPool = sys.actorOf( RoundRobinGroup(routeePaths).props() .withDispatcher("akka.pool-dispatcher") ,"starageActorPool" ) implicit val timeout = Timeout(3 seconds) Source(Stream.from(1)).delay(3.second,DelayOverflowStrategy.backpressure) .mapAsync(parallelism = 1){ n => (storageActorPool ? StorageActor.Query(n,s"Record")).mapTo[String] }.runWith(Sink.foreach(println)) scala.io.StdIn.readLine() sys.terminate() }
SinkActorRefAckDemo.scala:
package sinkactorrefack import akka.actor._ import akka.pattern._ import akka.stream._ import akka.stream.scaladsl._ import akka.routing._ import scala.concurrent.duration._ object StorageActor { val onInitMessage = "start" val onCompleteMessage = "done" val ackMessage = "ack"
case class Query(rec: Int, qry: String) //模擬數據存寫Query
class DbException(cause: String) extends Exception(cause) //自定義存寫異常
class StorageActor extends Actor with ActorLogging { //存寫操作Actor
override def receive: Receive = { case `onInitMessage` => sender() ! ackMessage case Query(num,qry) =>
var res: String = ""
try { res = saveToDB(num,qry) } catch { case e: Exception => Error(num,qry) //模擬操作異常
} sender() ! ackMessage case `onCompleteMessage` => //clean up resources 釋放資源
case _ => } def saveToDB(num: Int,qry: String): String = { //模擬存寫函數
val msg = s"${self.path} is saving: [$qry#$num]"
if ( num == 3) Error(num,qry) //模擬異常
else { log.info(s"${self.path} is saving: [$qry#$num]") s"${self.path} is saving: [$qry#$num]" } } def Error(num: Int,qry: String) = { val msg = s"${self.path} is saving: [$qry#$num]" sender() ! ackMessage throw new DbException(s"$msg blew up, boooooom!!!") } //驗證異常重啟 //BackoffStrategy.onStop goes through restart process
override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.info(s"Restarting ${self.path.name} on ${reason.getMessage}") message match { case Some(Query(n,qry)) => self ! Query(n+101,qry) //把異常消息再補發送給自己,n+101更正了異常因素
case _ => log.info(s"Exception message: None") } super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { log.info(s"Restarted ${self.path.name} on ${reason.getMessage}") super.postRestart(reason) } override def postStop(): Unit = { log.info(s"Stopped ${self.path.name}!") super.postStop() } //BackOffStrategy.onFailure dosn't go through restart process
override def preStart(): Unit = { log.info(s"PreStarting ${self.path.name} ...") super.preStart() } } def props = Props(new StorageActor) } object StorageActorGuardian { //帶監管策略的StorageActor
def props: Props = { //在這里定義了監管策略和StorageActor構建
def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = { case _: StorageActor.DbException => SupervisorStrategy.Restart } val options = Backoff.onStop(StorageActor.props, "dbWriter", 100 millis, 500 millis, 0.0) .withManualReset .withSupervisorStrategy( OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 second)( decider.orElse(SupervisorStrategy.defaultDecider) ) ) BackoffSupervisor.props(options) } } object SinkActorRefWithAck extends App { import StorageActor._ implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val storageActor = sys.actorOf(StorageActor.props,"storageActor") val numOfActors = 3 val routees: List[ActorRef] = List.fill(numOfActors)( sys.actorOf(StorageActorGuardian.props)) val routeePaths: List[String] = routees.map{ref => "/user/"+ref.path.name} val storageActorPool = sys.actorOf( RoundRobinGroup(routeePaths).props() .withDispatcher("akka.pool-dispatcher") ,"starageActorPool" ) Source(Stream.from(1)).map(n => Query(n,s"Record")).delay(3.second,DelayOverflowStrategy.backpressure) .runWith(Sink.actorRefWithAck( storageActorPool, onInitMessage, ackMessage,onCompleteMessage)) scala.io.StdIn.readLine() sys.terminate() }
SourceQueueDemo.scala:
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import scala.concurrent._ import scala.util._ import akka.pattern._ object Calculator { trait Operations case class Add(op1:Int, op2:Int) extends Operations case class DisplayError(err: Exception) extends Operations case object Stop extends Operations case class ProduceError(err: Exception) extends Operations def props(inputQueue: SourceQueueWithComplete[String]) = Props(new Calculator(inputQueue)) } class Calculator(inputQueue: SourceQueueWithComplete[String]) extends Actor with ActorLogging{ import Calculator._ import context.dispatcher override def receive: Receive = { case Add(op1,op2) => val msg = s"$op1 + $op2 = ${op1 + op2}" inputQueue.offer(msg) .recover { case e: Exception => DisplayError(e)} .pipeTo(self).mapTo[String] case QueueOfferResult => log.info("QueueOfferResult.Enqueued") case QueueOfferResult.Enqueued => log.info("QueueOfferResult.Enqueued") case QueueOfferResult.Dropped =>
case QueueOfferResult.Failure(cause) =>
case QueueOfferResult.QueueClosed => log.info("QueueOfferResult.QueueClosed") case Stop => inputQueue.complete() case ProduceError(e) => inputQueue.fail(e) } } object SourceQueueDemo extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val source: Source[String, SourceQueueWithComplete[String]] = Source.queue[String](bufferSize = 16, overflowStrategy = OverflowStrategy.backpressure) val inputQueue: SourceQueueWithComplete[String] = source.toMat(Sink.foreach(println))(Keep.left).run() inputQueue.watchCompletion().onComplete { case Success(result) => println(s"Calculator ends with: $result") case Failure(cause) => println(s"Calculator ends with exception: ${cause.getMessage}") } val calc = sys.actorOf(Calculator.props(inputQueue),"calculator") import Calculator._ calc ! Add(3,5) scala.io.StdIn.readLine calc ! Add(39,1) scala.io.StdIn.readLine calc ! ProduceError(new Exception("Boooooommm!")) scala.io.StdIn.readLine calc ! Add(1,1) scala.io.StdIn.readLine sys.terminate() }