ScalaPB(1): using protobuf in akka


    任何類型的實例作為消息在兩端獨立系統的機器之間進行傳遞時必須經過序列化/反序列化serialize/deserialize處理過程。假設以下場景:在一個網絡里有兩台連接的服務器,它們分別部署了獨立的akka系統。如果我們需要在這兩台服務器的akka系統之間進行消息交換的話,所有消息都必須經過序列化/反序列化處理。akka系統對於用戶自定義消息類型的默認序列化處理是以java-object serialization 方式進行的。我們上次提過:由於java-object-serialization會把一個java-object的類型信息、實例值、它所包含的其它類型描述信息等都寫入序列化的結果里,所以會占據較大空間,傳輸數據的效率相對就低了。protobuf是binary格式的,基本只包括實例值,所以數據傳輸效率較高。下面我們就介紹如何在akka系統中使用protobuf序列化。在akka中使用自定義序列化方法包括下面的這些步驟:

1、在.proto文件中對消息類型進行IDL定義

2、用ScalaPB編譯IDL文件並產生scala源代碼。這些源代碼中包括了涉及的消息類型及它們的操作方法

3、在akka程序模塊中import產生的classes,然后直接調用這些類型和方法

4、按akka要求編寫序列化方法

5、在akka的.conf文件里actor.serializers段落中定義akka的默認serializer

下面的build.sbt文件里描述了程序結構:

lazy val commonSettings = Seq(
  name := "AkkaProtobufDemo",
  version := "1.0",
  scalaVersion := "2.12.6",
)

lazy val local = (project in file("."))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11",
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
    ),
    name := "akka-protobuf-demo"
  )

lazy val remote = (project in file("remote"))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11"
    ),
    name := "remote-system"
  ).dependsOn(local)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

local和remote是兩個分開的項目。我們會在這兩個項目里分別部署akka系統。注意依賴項中的scalapb.runtime。PB.targets指明了產生源代碼的路徑。我們還需要在project/scalapb.sbt中指定scalaPB插件: 

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

我們首先在.proto文件里定義消息:

syntax = "proto3"; // Brought in from scalapb-runtime
import "scalapb/scalapb.proto"; import "google/protobuf/wrappers.proto"; package learn.proto; message Added { int32 nbr1 = 1; int32 nbr2 = 2; } message Subtracted { int32 nbr1 = 1; int32 nbr2 = 2; } message AddedResult { int32 nbr1 = 1; int32 nbr2 = 2; int32 result = 3; } message SubtractedResult { int32 nbr1 = 1; int32 nbr2 = 2; int32 result = 3; }

現在我們先在remote項目里定義一個Calculator actor:

package akka.protobuf.calculator import akka.actor._ import com.typesafe.config.ConfigFactory import learn.proto.messages._ class Calculator extends Actor with ActorLogging { override def receive: Receive = { case Added(a,b) => log.info("Calculating %d + %d".format(a, b)) sender() ! AddedResult(a,b,a+b) case Subtracted(a,b) => log.info("Calculating %d - %d".format(a, b)) sender() ! SubtractedResult(a,b,a-b) } } object Calculator { def props = Props(new Calculator) } object CalculatorStarter extends App { val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552") .withFallback(ConfigFactory.load()) val calcSystem = ActorSystem("calcSystem",config) calcSystem.actorOf(Calculator.props,"calculator") println("press any key to end program ...") scala.io.StdIn.readLine() calcSystem.terminate() }

運行CalculatorStarter產生一個calculator actor:  akka.tcp://calcSystem@127.0.0.1:2552/user/calculator

下面我們在local項目里從端口2551上部署另一個akka系統,然后調用端口2552上部署akka系統的calculator actor:

package akka.protobuf.calcservice import akka.actor._ import learn.proto.messages._ import scala.concurrent.duration._ class CalcRunner(path: String) extends Actor with ActorLogging { sendIdentifyRequest() def sendIdentifyRequest(): Unit = { context.actorSelection(path) ! Identify(path) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout) } def receive = identifying def identifying : Receive = { case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) => log.info("Remote calculator started!") context.watch(calcRef) context.become(calculating(calcRef)) case ActorIdentity(_,None) => log.info("Remote calculator not found!") case ReceiveTimeout => sendIdentifyRequest() case s @ _ => log.info(s"Remote calculator not ready. [$s]") } def calculating(calculator: ActorRef) : Receive = { case (op : Added) => calculator ! op case (op : Subtracted) => calculator ! op case AddedResult(a,b,r)  => log.info(s"$a + $b = $r") case SubtractedResult(a,b,r) => log.info(s"$a - $b = $r") case Terminated(calculator) => log.info("Remote calculator terminated, restarting ...") sendIdentifyRequest() context.become(identifying) case ReceiveTimeout => //nothing
 } } object CalcRunner { def props(path: String) = Props(new CalcRunner(path)) }

這個CalcRunner是一個actor,在程序里首先通過向remote項目中的calculator-actor傳送Identify消息以取得具體的ActorRef。然后用這個ActorRef與calculator-actor進行交互。這其中Identify是akka預定消息類型,其它消息都是ScalaPB從.proto文件中產生的。下面是local項目的運算程序:

 

package akka.protobuf.demo import akka.actor._ import akka.util.Timeout import com.typesafe.config.ConfigFactory import akka.protobuf.calcservice._ import scala.concurrent.duration._ import scala.util._ import learn.proto.messages._ object Main extends App { val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551") .withFallback(ConfigFactory.load()) val calcSystem = ActorSystem("calcSystem",config) val calcPath = "akka.tcp://calcSystem@127.0.0.1:2552/user/calculator" val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner") println("Calculator started ...") import calcSystem.dispatcher calcSystem.scheduler.schedule(1.second, 1.second) { if (Random.nextInt(100) % 2 == 0) calculator ! Added(Random.nextInt(100), Random.nextInt(100)) else calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100)) } scala.io.StdIn.readLine() }

 

配置文件application.conf:

 

akka { actor { provider = remote } remote { netty.tcp { hostname = "127.0.0.1" } } }

 

先運行remote然后local。注意下面出現的提示:

 

[akka.serialization.Serialization(akka://calcSystem)] Using the default Java serializer for class [learn.proto.messages.Added] which is not recommended because of performance implications. Use another serializer 

下面是protobuf類型的序列化方法:

package akka.protobuf.serializer import akka.serialization.SerializerWithStringManifest import learn.proto.messages._ class ProtobufSerializer extends SerializerWithStringManifest{ def identifier: Int = 101110116

  override def manifest(o: AnyRef): String = o.getClass.getName final val AddedManifest = classOf[Added].getName final val SubtractedManifest = classOf[Subtracted].getName final val AddedResultManifest = classOf[AddedResult].getName final val SubtractedResultManifest = classOf[SubtractedResult].getName override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { println("inside fromBinary"+manifest) manifest match { case AddedManifest => Added.parseFrom(bytes) case SubtractedManifest => Subtracted.parseFrom(bytes) case AddedResultManifest => AddedResult.parseFrom(bytes) case SubtractedResultManifest => SubtractedResult.parseFrom(bytes) } } override def toBinary(o: AnyRef): Array[Byte] = { println("inside toBinary ") o match { case a: Added => a.toByteArray case s :Subtracted => s.toByteArray case aR: AddedResult => aR.toByteArray case sR: SubtractedResult => sR.toByteArray } } }

然后我們需要在application.conf中告訴akka系統使用這些方法:

 actor { serializers { proto = "akka.protobuf.serializer.ProtobufSerializer" } serialization-bindings { "java.io.Serializable" = none "com.google.protobuf.Message" = proto "learn.proto.messages.Added" = proto "learn.proto.messages.AddedResult" = proto "learn.proto.messages.Subtracted" = proto "learn.proto.messages.SubtractedResult" = proto } }

現在再重新運行:

[INFO] [04/30/2018 18:41:02.348] [calcSystem-akka.actor.default-dispatcher-2] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] Remote calculator started!
inside toBinary inside fromBinarylearn.proto.messages.AddedResult [INFO] [04/30/2018 18:41:03.234] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] 18 + 38 = 56
inside toBinary inside fromBinarylearn.proto.messages.AddedResult [INFO] [04/30/2018 18:41:04.197] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] 22 + 74 = 96

系統使用了自定義的ProtobufferSerializer。

下面是本次示范的完整源代碼:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

build.sbt

lazy val commonSettings = Seq(
  name := "AkkaProtobufDemo",
  version := "1.0",
  scalaVersion := "2.12.6",
)

lazy val local = (project in file("."))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11",
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
    ),
    name := "akka-protobuf-demo"
  )

lazy val remote = (project in file("remote"))
  .settings(commonSettings)
  .settings(
    libraryDependencies ++= Seq(
      "com.typesafe.akka"      %% "akka-remote" % "2.5.11"
    ),
    name := "remote-system"
  ).dependsOn(local)

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

resources/application.conf

akka { actor { provider = remote } remote { netty.tcp { hostname = "127.0.0.1" } } actor { serializers { proto = "akka.protobuf.serializer.ProtobufSerializer" } serialization-bindings { "java.io.Serializable" = none "com.google.protobuf.Message" = proto "learn.proto.messages.Added" = proto "learn.proto.messages.AddedResult" = proto "learn.proto.messages.Subtracted" = proto "learn.proto.messages.SubtractedResult" = proto } } }

main/protobuf/messages.proto

syntax = "proto3";

// Brought in from scalapb-runtime
import "scalapb/scalapb.proto";
import "google/protobuf/wrappers.proto";

package learn.proto;

message Added {

    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message Subtracted {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
}

message AddedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

message SubtractedResult {
    int32 nbr1 = 1;
    int32 nbr2 = 2;
    int32 result = 3;
}

remote/Calculator.scala

package akka.protobuf.calculator import akka.actor._ import com.typesafe.config.ConfigFactory import learn.proto.messages._ class Calculator extends Actor with ActorLogging { override def receive: Receive = { case Added(a,b) => log.info("Calculating %d + %d".format(a, b)) sender() ! AddedResult(a,b,a+b) case Subtracted(a,b) => log.info("Calculating %d - %d".format(a, b)) sender() ! SubtractedResult(a,b,a-b) } } object Calculator { def props = Props(new Calculator) } object CalculatorStarter extends App { val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552") .withFallback(ConfigFactory.load()) val calcSystem = ActorSystem("calcSystem",config) calcSystem.actorOf(Calculator.props,"calculator") println("press any key to end program ...") scala.io.StdIn.readLine() calcSystem.terminate() }

CalcService.scala

package akka.protobuf.calcservice import akka.actor._ import learn.proto.messages._ import scala.concurrent.duration._ class CalcRunner(path: String) extends Actor with ActorLogging { sendIdentifyRequest() def sendIdentifyRequest(): Unit = { context.actorSelection(path) ! Identify(path) import context.dispatcher context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout) } def receive = identifying def identifying : Receive = { case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) => log.info("Remote calculator started!") context.watch(calcRef) context.become(calculating(calcRef)) case ActorIdentity(_,None) => log.info("Remote calculator not found!") case ReceiveTimeout => sendIdentifyRequest() case s @ _ => log.info(s"Remote calculator not ready. [$s]") } def calculating(calculator: ActorRef) : Receive = { case (op : Added) => calculator ! op case (op : Subtracted) => calculator ! op case AddedResult(a,b,r)  => log.info(s"$a + $b = $r") case SubtractedResult(a,b,r) => log.info(s"$a - $b = $r") case Terminated(calculator) => log.info("Remote calculator terminated, restarting ...") sendIdentifyRequest() context.become(identifying) case ReceiveTimeout => //nothing
 } } object CalcRunner { def props(path: String) = Props(new CalcRunner(path)) }

Main.scala

package akka.protobuf.demo import akka.actor._ import akka.util.Timeout import com.typesafe.config.ConfigFactory import akka.protobuf.calcservice._ import scala.concurrent.duration._ import scala.util._ import learn.proto.messages._ object Main extends App { val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551") .withFallback(ConfigFactory.load()) val calcSystem = ActorSystem("calcSystem",config) val calcPath = "akka.tcp://calcSystem@127.0.0.1:2552/user/calculator" val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner") println("Calculator started ...") import calcSystem.dispatcher calcSystem.scheduler.schedule(1.second, 1.second) { if (Random.nextInt(100) % 2 == 0) calculator ! Added(Random.nextInt(100), Random.nextInt(100)) else calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100)) } scala.io.StdIn.readLine() }

ProtobufferSerializer.scala

package akka.protobuf.serializer import akka.serialization.SerializerWithStringManifest import learn.proto.messages._ class ProtobufSerializer extends SerializerWithStringManifest{ def identifier: Int = 101110116

  override def manifest(o: AnyRef): String = o.getClass.getName final val AddedManifest = classOf[Added].getName final val SubtractedManifest = classOf[Subtracted].getName final val AddedResultManifest = classOf[AddedResult].getName final val SubtractedResultManifest = classOf[SubtractedResult].getName override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { println("inside fromBinary"+manifest) manifest match { case AddedManifest => Added.parseFrom(bytes) case SubtractedManifest => Subtracted.parseFrom(bytes) case AddedResultManifest => AddedResult.parseFrom(bytes) case SubtractedResultManifest => SubtractedResult.parseFrom(bytes) } } override def toBinary(o: AnyRef): Array[Byte] = { println("inside toBinary ") o match { case a: Added => a.toByteArray case s :Subtracted => s.toByteArray case aR: AddedResult => aR.toByteArray case sR: SubtractedResult => sR.toByteArray } } }

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM