akka-grpc - 應用案例


  上期說道:http/2還屬於一種不算普及的技術協議,可能目前只適合用於內部系統集成,現在開始大面積介入可能為時尚早。不過有些項目需求不等人,需要使用這項技術,所以研究了一下akka-grpc,寫了一篇介紹。本想到此為止,繼續其它項目。想想這樣做法有點不負責任,像是草草收場。畢竟用akka-grpc做了些事情,想想還是再寫這篇跟大家分享使用kka-grpc的過程。

我說過,了解akka-grpc的主要目的還是在protobuf的應用上。這是一種高效率的序列化協議。剛好,公司有這么個項目,是一個圖像處理平台:把很多圖片拍攝終端的圖像傳上平台進行商品識別、OCR等圖像處理。由於終端數量多、圖像處理又特別消耗內存、CPU等計算資源、又要求快速響應,所以第一考慮就是使用akka-cluster把圖像處理任務分割到多個節點上並行處理。這里就需要仔細考慮圖片在終端到平台、然后集群節點與點actor間的傳輸效率了。如何在akka系統里使用protobuf格式的數據正是本篇討論和示范的目的。

akka-grpc應用一般從IDL文件里消息類型和服務函數的定義開始,如下面這個.proto文件示范:

syntax = "proto3"; import "google/protobuf/wrappers.proto"; import "google/protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = { // don't append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file // useful when extending traits or using custom types // import: "io.ontherocks.hellogrpc.RockingMessage" // code to put at the top of generated file // works only with `single_file: true` //preamble: "sealed trait SomeSealedTrait"
}; package com.datatech.pos.abs; message UCredential { string userid = 1; string password = 2; } message JWToken { string jwt = 1; } message Picture { int32 num = 1; bytes blob = 2; } message Capture { string ean = 1; bytes cover1 = 2; bytes cover2 = 3; } message Book { string ean = 1; string ver = 2; string isbn = 3; string title = 4; string publisher = 5; double price = 6; bytes cover1 = 7; bytes cover2 = 8; } message QueryResult { int32 sts = 1; string msg         = 2; Book bookinfo = 3; } service Services { rpc GetAuthToken(UCredential) returns (JWToken) {}; rpc SavePicture(Picture) returns (QueryResult) {}; rpc GetPicture(Picture) returns (Picture) {}; // rpc SaveCapture(Capture) returns (QueryResult) {}; // rpc GetCapture(Capture) returns (Capture) {}; // rpc GetBookInfo(Capture) returns (QueryResult) {};
}

因為這次示范針對的是protobuf的使用,所以就揀了SavePicture,GetPicture這兩項服務函數。JWToken只是用戶身份憑證,集群分片shard-entityId是以用戶憑證為基礎的,所以平台需要通過JWT進行跨節點任務指派以實現分布式圖像處理運算。

下面就要在編譯器插件自動產生的基礎服務接口代碼基礎上進行具體的服務功能實現。這部分主要是對接口函數的實現(oveerride):

class gRPCServices(trace: Boolean, system: ActorSystem, sharding: ClusterSharding)( implicit waitResponseTimeout: Timeout, authenticator: AuthBase) extends ServicesPowerApi with LogSupport { implicit val ec = system.dispatcher log.stepOn = trace override def getAuthToken(request: UCredential, meta: Metadata): Future[JWToken] = { val entityRef = sharding.entityRefFor(Authenticator.EntityKey, UUID.randomUUID.toString) val jwtResp = for { ui <- entityRef.ask[Authenticator.Response](Authenticator.GetUserInfo(request.userid, _)) .map { case Authenticator.UserInfo(info) => info case _ => Map[String, Any]() } jwt <- entityRef.ask[Authenticator.Response](Authenticator.GetToken(ui, _)) } yield jwt jwtResp.map { case Authenticator.JWToken(jwt) =>
        if (jwt.nonEmpty) JWToken(jwt) else throw new Exception("身份驗證失敗!無法提供憑證。") case _ => throw new Exception("身份驗證失敗!無法提供憑證。") } } override def savePicture(in: Picture, metadata: Metadata): Future[QueryResult] = { val jwt = getJwt(metadata).getOrElse("") val ids = authenticator.shopIdFromJwt(jwt).getOrElse(("","","","","")) val (shopId, posId, termId, impurl,devId) = ids val entityRef = sharding.entityRefFor(ImgProcessor.EntityKey, s"$shopId:$posId") val futResp = entityRef.ask[ImgProcessor.Response](ImgProcessor.SaveImage(in, _)) .map { case ImgProcessor.ValidImgPro(img) => QueryResult(sts = 0, msg = "picture saved.") case ImgProcessor.FailedImgPro(msg) => QueryResult(sts = -1, msg = msg) } futResp } override def getPicture(in: Picture, metadata: Metadata): Future[Picture] = { val jwt = getJwt(metadata).getOrElse("") val ids = authenticator.shopIdFromJwt(jwt).getOrElse(("","","","","")) val (shopId, posId, termId, impurl,devId) = ids val entityRef = sharding.entityRefFor(ImgProcessor.EntityKey, s"$shopId:$posId") val futResp = entityRef.ask[ImgProcessor.Response](ImgProcessor.GetImage(in.num, _)) .map { case ImgProcessor.ValidImgPro(img) => img case ImgProcessor.FailedImgPro(msg) => Picture(-1, ByteString.EMPTY) } futResp } def getJwt(metadata: Metadata): Option[String] = { metadata.getText("bearer") } }

由於是通過PowerApi模式產生的接口代碼,所以接口函數都帶有MetaData參數,代表HttpRequest header集合。可以看到:服務函數實現都是通過entityRef,一個分片調度器分配到集群某個節點ImgProcessor.EntityKey類型的entity-actor上進行的。shopId:posId就是代表為某用戶構建的entityId,這個是通過用戶在Request中提供的MetaData參數中jwt解析得出的。

可以看到,具體服務提供是通過集群的分片實現的。下面是這個分片的代碼示范:

      log.step(s"initializing sharding for ${ImgProcessor.EntityKey} ...")(MachineId("","")) val imgEntityType = Entity(ImgProcessor.EntityKey) { entityContext => ImgProcessor(entityContext.shard,mgoHosts,entityContext.entityId,trace,keepAlive) }.withStopMessage(ImgProcessor.StopWorker) sharding.init(imgEntityType)

上面imgEntityType就是shard-entity類型,其實就是按用戶提供的jwt在任意集群節點上實時構建的一個opencv圖像處理器。下面是這個entity-actor的示范代碼:

object ImgProcessor extends LogSupport { sealed trait Command extends CborSerializable case class SaveImage(img: Picture, replyTo: ActorRef[Response]) extends Command case class GetImage(imgnum: Int,replyTo: ActorRef[Response]) extends Command sealed trait Response extends CborSerializable case class ValidImgPro(img: Picture) extends Response case class FailedImgPro(msg: String) extends Response def apply(shard: ActorRef[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration): Behavior[Command] = { val (shopId,posId) = entityId.split(':').toList match { case sid::pid::Nil  => (sid,pid) } implicit val loc = Messages.MachineId(shopId,posId) log.stepOn = trace Behaviors.setup[Command] { ctx =>
      implicit val ec = ctx.executionContext ctx.setReceiveTimeout(keepAlive, Idle) Behaviors.withTimers[Command] { timer => Behaviors.receiveMessage[Command] { case SaveImage(img, replyTo) => log.step(s"ImgProcessor: SaveImage(${img.num})") implicit val client = mongoClient(mgoHosts) maybeMgoClient = Some(client) ctx.pipeToSelf(savePicture(img)) { case Success(_) => { replyTo ! ValidImgPro(img) Done(loc.shopid, loc.posid, s"saved image #${img.num}.") } case Failure(err) => log.error(s"ImgProcessor: SaveImage Error: ${err.getMessage}") replyTo ! FailedImgPro(err.getMessage) Done(loc.shopid, loc.posid, s"SaveImage with error: ${err.getMessage}") } Behaviors.same case GetImage(imgnum, replyTo) => ... } }

整個圖片傳輸是通過actor的消息實現的。akka消息支持多種序列化格式,包括protobuf, 在配置文件.conf里定義:

akka { loglevel = INFO actor { provider = cluster serializers { jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer" proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { "com.datatech.pos.abs.CborSerializable" = jackson-cbor "scalapb.GeneratedMessage" = proto } } }

grpc server 基本上是個標准模塊,不同的只是service參數:

class gRPCServer(host: String, port: Int) extends LogSupport { def runServer(system: ActorSystem[_], service: gRPCServices): Future[Http.ServerBinding] = { implicit val classic = system.toClassic implicit val ec: ExecutionContext = system.executionContext // Create service handlers
    val serviceHandler: HttpRequest => Future[HttpResponse] = ServicesPowerApiHandler(service) // Bind service handler servers to localhost:8080/8081
    val binding = Http().bindAndHandleAsync( serviceHandler, interface = host, port = port, connectionContext = HttpConnectionContext()) // report successful binding
    binding.foreach { binding => println(s"******* startup gRPC-server on: port = $port *******") } binding //#server
 } }

下面是客戶端測試代碼:

object gRPCTestClient { def main(args: Array[String]): Unit = { val config_onenode = ConfigFactory.load("onenode") implicit val sys = ActorSystem("grpc-client", config_onenode) implicit val ec = sys.dispatcher val clientSettings = GrpcClientSettings.fromConfig(Services.name) // val clientSettings = GrpcClientSettings.connectToServiceAt("192.168.11.189", 50052);
    implicit val client = ServicesClient(clientSettings) val futJwt = client.getAuthToken(UCredential("9013", "123456")) val jwt = Await.result(futJwt, 5.seconds).jwt println(s"got jwt: ${jwt}") scala.io.StdIn.readLine() val bytes = FileStreaming.FileToByteArray("books/59c10d099b26e.jpg") val mat = bytesToMat(bytes) show(mat,"sent picture") scala.io.StdIn.readLine() val picture = Picture(111,marshal(bytes)) val futQR = client.savePicture().addHeader("Bearer", jwt).invoke(Picture(111,marshal(bytes))) futQR.onComplete { case Success(qr) => println(s"Saving Success: ${qr.msg}") case Failure(err) => println(s"Saving Error: ${err.getMessage}") } scala.io.StdIn.readLine() val futPic = client.getPicture().addHeader("Bearer", jwt).invoke(Picture(111,ByteString.EMPTY)) futPic.onComplete { case Success(pic) => val image = bytesToMat(unmarshal(pic.blob)) show(image, s"picture:${pic.num}") case Failure(err) => println(s"Reading Error: ${err.getMessage}") } scala.io.StdIn.readLine() sys.terminate() } }

基本流程是:先通過getAuthToken獲取jwt;在調用服務時通過addHeader("bearer",jwt)把jwt隨着函數調用一起提交給服務端。

客戶端設置可以在配置文件中定義:

akka { loglevel = INFO grpc.client { "com.datatech.pos.abs.Services" { host = 192.168.11.189 port = 52051
      override-authority = foo.test.google.fr use-tls = false } } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM