grpc的四種服務類型


上次簡單介紹了grpc的使用方法,並創建了一個方法調用,在grpc中有四種服務類型,下面分別進行介紹

簡單rpc

這就是一般的rpc調用,一個請求對象對應一個返回對象

proto語法:
rpc simpleHello(Person) returns (Result) {}

service代碼
@Override
public void simpleHello(ProtoObj.Person request,
                  io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
	//返回結果
    responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());
	responseObserver.onCompleted();
}

client代碼
@Test
public void  simple() throws InterruptedException {

    final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
    //定義同步阻塞的stub
	HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);

    ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
    //simple
    System.out.println("---simple rpc---");
    System.out.println(blockingStub.simpleHello(person).getString());
	channel.shutdown();
}

輸出
---simple rpc---
hello, World

服務端流式rpc

一個請求對象,服務端可以傳回多個結果對象

proto語法
rpc serverStreamHello(Person) returns (stream Result) {}

service代碼
@Override
public void serverStreamHello(ProtoObj.Person request,
                        io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
	//返回多個結果
    responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());
    responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello2, "+request.getMyName()).build());
    responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+request.getMyName()).build());
    responseObserver.onCompleted();
}

client代碼
@Test
public void serverStream(){

    final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
    //定義同步阻塞的stub
	HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);

    ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();

    //server side
    System.out.println("---server stream rpc---");
	//返回結果是Iterator
    Iterator<ProtoObj.Result> it = blockingStub.serverStreamHello(person);
    while (it.hasNext()) {
        System.out.print(it.next());
    }
	channel.shutdown();
}

輸出
---server stream rpc---
string: "hello, World"
string: "hello2, World"
string: "hello3, World"

客戶端流式rpc

客戶端傳入多個請求對象,服務端返回一個響應結果

proto語法
rpc clientStreamHello(stream Person) returns (Result) {}

service代碼
@Override
public io.grpc.stub.StreamObserver<ProtoObj.Person> clientStreamHello(
       final io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
   //返回observer應對多個請求對象
   return new StreamObserver<ProtoObj.Person>(){
       private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder();
       @Override
       public void onNext(ProtoObj.Person value) {
            builder.setString(builder.getString() +"," + value.getMyName());
       }

       @Override
       public void onError(Throwable t) {

       }

       @Override
       public void onCompleted() {
           builder.setString("hello"+builder.getString());
           responseObserver.onNext(builder.build());
           responseObserver.onCompleted();
       }
   };
}

client代碼
@Test
public void clientStream() throws InterruptedException {
    final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
    //定義異步的stub
	HelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel);
    ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();

    //client side
    System.out.println("---client stream rpc---");
    StreamObserver<ProtoObj.Result> responseObserver = new StreamObserver<ProtoObj.Result>() {
        @Override
        public void onNext(ProtoObj.Result result) {
            System.out.println("client stream--" + result.getString());
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onCompleted() {
			//關閉channel
			channel.shutdown();
        }
    };
    StreamObserver<ProtoObj.Person> clientStreamObserver = asyncStub.clientStreamHello(responseObserver);
    clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build());
    clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build());
    clientStreamObserver.onCompleted();
	//由於是異步獲得結果,所以sleep一秒
    Thread.sleep(1000);
}

輸出
---client stream rpc---
client stream--hello,World,World2

雙向流式rpc

結合客戶端流式rpc和服務端流式rpc,可以傳入多個對象,返回多個響應對象

proto語法
rpc biStreamHello(stream Person) returns (stream Result) {}

service代碼
@Override
public io.grpc.stub.StreamObserver<ProtoObj.Person> biStreamHello(
        final io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
    //返回observer應對多個請求對象
    return new StreamObserver<ProtoObj.Person>(){
        private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder();
        @Override
        public void onNext(ProtoObj.Person value) {
            responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello2, "+value.getMyName()).build());
            responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+value.getMyName()).build());
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onCompleted() {
            responseObserver.onCompleted();
        }
    };
}

client代碼
@Test
public void bidirectStream() throws InterruptedException {

    final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
    //定義異步的stub
	HelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel);

    ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();

    //bi stream
    System.out.println("---bidirectional stream rpc---");
    StreamObserver<ProtoObj.Result>  responseObserver = new StreamObserver<ProtoObj.Result>() {
        @Override
        public void onNext(ProtoObj.Result result) {
            System.out.println("bidirectional stream--"+result.getString());
        }

        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onCompleted() {
            channel.shutdown();
        }
    };
    StreamObserver<ProtoObj.Person> biStreamObserver=asyncStub.biStreamHello(responseObserver);
    biStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build());
    biStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build());
    biStreamObserver.onCompleted();
	//由於是異步獲得結果,所以sleep一秒
    Thread.sleep(1000);

}

輸出
---bidirectional stream rpc---
bidirectional stream--hello2, World
bidirectional stream--hello3, World
bidirectional stream--hello2, World2
bidirectional stream--hello3, World2	

總結

grpc通過使用流式的方式,返回/接受多個實例可以用於類似不定長數組的入參和出參


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM