ScalaPB(3): gRPC streaming


  接着上期討論的gRPC unary服務我們跟着介紹gRPC streaming,包括: Server-Streaming, Client-Streaming及Bidirectional-Streaming。我們首先在.proto文件里用IDL描述Server-Streaming服務:

/* * responding stream of increment results */ service SumOneToMany { rpc AddOneToMany(SumRequest) returns (stream SumResponse) {} } message SumRequest { int32 toAdd = 1; } message SumResponse { int32 currentResult = 1; }

SumOneToMany服務中AddOneToMany函數接受一個SumRequest然后返回stream SumResponse,就這么簡單。經過編譯后產生了SumOneToManyGrpc.scala文件,在這個文件里提供了有關RPC操作的api。我們看看protoc把IDL描述的服務函數變成了什么樣的scala函數:

def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit 

調用scala函數addOneToMany需要傳入參數SumRequest和StreamObserver[SumResponse],也就是說用戶需要准備這兩個入參數。在調用addOneToMany函數時用戶事先構建這個StreamObserver傳給server,由server把結果通過這個結構傳回用戶。gRPC是通過StreamObserver類型實例來實現數據streaming的。這個類型的構建例子如下:

 

    val responseObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") def onCompleted(): Unit = println("ON_COMPLETED") def onNext(value: SumResponse): Unit = println(s"ON_NEXT: Current sum: ${value.currentResult}") }

server端通過onNext把結果不斷傳回給client端,因為這個responseObserver是在client端構建的。下面是SumManyToMany的實現:

 class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany { override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = { val currentSum: AtomicInt = Atomic(0) (1 to request.toAdd).map { _ => responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet())) } Thread.sleep(1000)     //delay and then finish
 responseObserver.onCompleted() } }

這個addOneToMany服務函數把 1-request.toAdd之間的數字逐個通過responseObserver返還調用方。 在客戶端如下調用服務:

    // get asyn stub
    val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel) // prepare stream observer
    val streamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done incrementing !!!") override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}") } // call service with stream observer
    client.addOneToMany(SumRequest().withToAdd(6),streamObserver)

Client-Streaming服務的IDL如下:

/* * responding a result from a request of stream of numbers */ service SumManyToOne { rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {} }

傳入stream SumRequest, 返回SumResponse。scalaPB自動產生scala代碼中的addManyToOne函數款式如下:

def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]

調用方提供StreamObserver[SumResponse]用作返回結果,函數返回客方需要的StreamObserver[SumRequest]用以傳遞request流。注意:雖然在.proto文件中AddManyToOne的返回結果是單個SumResponse,但產生的scala函數則提供了一個StreamObserver[SumResponse]類型,所以需要謹記只能調用一次onNext。下面是這個服務的實現代碼:

  class Many2OneService extends SumManyToOneGrpc.SumManyToOne { val currentSum: AtomicInt = Atomic(0) override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
       new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done summing!") override def onNext(value: SumRequest): Unit = { //only allow one response
           if (value.toAdd > 0) currentSum.add(value.toAdd) else responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } }

客戶方調用示范如下:

    //pass to server for result
    val respStreamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done responding!") override def onNext(value: SumResponse): Unit = println(s"Result: ${value.currentResult}") } //get async stub
    val client = SumManyToOneGrpc.stub(channel) //get request stream observer from server
    val reqStreamObserver = client.addManyToOne(respStreamObserver) List(2,5,8,4,0).map { n => reqStreamObserver.onNext(SumRequest(n)) }

Bidirectional-Streaming的IDL描述如下:

/* * Sums up numbers received from the client and returns the current result after each received request. */ service SumInter { rpc AddInter(stream SumRequest) returns (stream SumResponse) {} }

這個service SumInter 描述了stream SumRequest 及 stream SumResponse運算模式。產生的對應scala函數如下:

def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]

這個函數的款式與Client-Streaming服務函數是一樣的。但是,我們可以通過responseObserver傳遞多個SumResponse。這個服務的實現代碼是這樣的: 

  class Many2ManyService extends SumInterGrpc.SumInter { override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
      new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done requesting!") override def onNext(value: SumRequest): Unit = { responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } }

我們可以多次調用responseObserver.onNext。客戶端源代碼如下:

    //create stream observer for result stream
    val responseObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") def onCompleted(): Unit = println("ON_COMPLETED") def onNext(value: SumResponse): Unit = println(s"ON_NEXT: Current sum: ${value.currentResult}") } //get request container
    val requestObserver = client.addInter(responseObserver) scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { val toBeAdded = Random.nextInt(11) println(s"Adding number: $toBeAdded") requestObserver.onNext(SumRequest(toBeAdded)) }

下面是本次示范的源代碼:

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

build.sbt

import scalapb.compiler.Version.scalapbVersion import scalapb.compiler.Version.grpcJavaVersion name := "learn-gRPC" version := "0.1" scalaVersion := "2.12.6" libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", "io.grpc" % "grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix" %% "monix" % "2.3.0" ) PB.targets in Compile := Seq( scalapb.gen() -> (sourceManaged in Compile).value )

src/main/protobuf/sum.proto

syntax = "proto3"; package learn.grpc.services; /* * responding stream of increment results */ service SumOneToMany { rpc AddOneToMany(SumRequest) returns (stream SumResponse) {} } /* * responding a result from a request of stream of numbers */ service SumManyToOne { rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {} } /* * Sums up numbers received from the client and returns the current result after each received request. */ service SumInter { rpc AddInter(stream SumRequest) returns (stream SumResponse) {} } message SumRequest { int32 toAdd = 1; } message SumResponse { int32 currentResult = 1; }

gRPCServer.scala

package learn.grpc.server import io.grpc.{ServerBuilder,ServerServiceDefinition} trait gRPCServer { def runServer(service: ServerServiceDefinition): Unit = { val server = ServerBuilder .forPort(50051) .addService(service) .build .start // make sure our server is stopped when jvm is shut down
    Runtime.getRuntime.addShutdownHook(new Thread() { override def run(): Unit = server.shutdown() }) server.awaitTermination() } }

OneToManyServer.scala

package learn.grpc.sum.one2many.server import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import monix.execution.atomic.{Atomic,AtomicInt} import learn.grpc.server.gRPCServer object One2ManyServer extends gRPCServer { class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany { override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = { val currentSum: AtomicInt = Atomic(0) (1 to request.toAdd).map { _ => responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet())) } Thread.sleep(1000)     //delay and then finish
 responseObserver.onCompleted() } } def main(args: Array[String]) = { val svc = SumOneToManyGrpc.bindService(new SumOne2ManyService, scala.concurrent.ExecutionContext.global) runServer(svc) } }

OneToManyClient.scala

package learn.grpc.sum.one2many.client import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ object One2ManyClient { def main(args: Array[String]): Unit = { //build connection channel
    val channel = io.grpc.ManagedChannelBuilder .forAddress("LocalHost",50051) .usePlaintext(true) .build() // get asyn stub
    val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel) // prepare stream observer
    val streamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done incrementing !!!") override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}") } // call service with stream observer
    client.addOneToMany(SumRequest().withToAdd(6),streamObserver) // wait for async execution
 scala.io.StdIn.readLine() } }

ManyToOneServer.scala

package learn.grpc.sum.many2one.server import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import learn.grpc.server.gRPCServer import monix.execution.atomic.{Atomic,AtomicInt} object Many2OneServer extends gRPCServer { class Many2OneService extends SumManyToOneGrpc.SumManyToOne { val currentSum: AtomicInt = Atomic(0) override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
       new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done summing!") override def onNext(value: SumRequest): Unit = { //only allow one response
           if (value.toAdd > 0) currentSum.add(value.toAdd) else responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } } def main(args: Array[String]): Unit = { val svc = SumManyToOneGrpc.bindService(new Many2OneService,scala.concurrent.ExecutionContext.global) runServer(svc) } }

ManyToOneClient.scala

package learn.grpc.sum.many2one.client import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ object Many2OneClient { def main(args: Array[String]): Unit = { //build channel
    val channel = io.grpc.ManagedChannelBuilder .forAddress("LocalHost",50051) .usePlaintext(true) .build() //pass to server for result
    val respStreamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done responding!") override def onNext(value: SumResponse): Unit = println(s"Result: ${value.currentResult}") } //get async stub
    val client = SumManyToOneGrpc.stub(channel) //get request stream observer from server
    val reqStreamObserver = client.addManyToOne(respStreamObserver) List(2,5,8,4,0).map { n => reqStreamObserver.onNext(SumRequest(n)) } scala.io.StdIn.readLine() } }

ManyToManyServer.scala 

package learn.grpc.sum.many2many.server import io.grpc.stub.StreamObserver import learn.grpc.services.sum._ import learn.grpc.server.gRPCServer import monix.execution.atomic.{Atomic,AtomicInt} object Many2ManyServer extends gRPCServer { class Many2ManyService extends SumInterGrpc.SumInter { override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
      new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done requesting!") override def onNext(value: SumRequest): Unit = { responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd))) } } } def main(args: Array[String]): Unit = { val svc = SumInterGrpc.bindService(new Many2ManyService, scala.concurrent.ExecutionContext.global) runServer(svc) } }

ManyToManyClient.scala

package learn.grpc.sum.many2many.client import monix.execution.Scheduler.{global => scheduler} import learn.grpc.services.sum._ import scala.concurrent.duration._ import scala.util.Random import io.grpc._ import io.grpc.stub.StreamObserver object Many2ManyClient { def main(args: Array[String]): Unit = { val channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext(true).build val client = SumInterGrpc.stub(channel) //create stream observer for result stream
    val responseObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") def onCompleted(): Unit = println("ON_COMPLETED") def onNext(value: SumResponse): Unit = println(s"ON_NEXT: Current sum: ${value.currentResult}") } //get request container
    val requestObserver = client.addInter(responseObserver) scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { val toBeAdded = Random.nextInt(11) println(s"Adding number: $toBeAdded") requestObserver.onNext(SumRequest(toBeAdded)) } scala.io.StdIn.readLine() } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM