上次簡單介紹了grpc的使用方法,並創建了一個方法調用,在grpc中有四種服務類型,下面分別進行介紹
簡單rpc
這就是一般的rpc調用,一個請求對象對應一個返回對象
proto語法:
rpc simpleHello(Person) returns (Result) {}
service代碼
@Override
public void simpleHello(ProtoObj.Person request,
io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
//返回結果
responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());
responseObserver.onCompleted();
}
client代碼
@Test
public void simple() throws InterruptedException {
final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
//定義同步阻塞的stub
HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);
ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
//simple
System.out.println("---simple rpc---");
System.out.println(blockingStub.simpleHello(person).getString());
channel.shutdown();
}
輸出
---simple rpc---
hello, World
服務端流式rpc
一個請求對象,服務端可以傳回多個結果對象
proto語法
rpc serverStreamHello(Person) returns (stream Result) {}
service代碼
@Override
public void serverStreamHello(ProtoObj.Person request,
io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
//返回多個結果
responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());
responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello2, "+request.getMyName()).build());
responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+request.getMyName()).build());
responseObserver.onCompleted();
}
client代碼
@Test
public void serverStream(){
final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
//定義同步阻塞的stub
HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);
ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
//server side
System.out.println("---server stream rpc---");
//返回結果是Iterator
Iterator<ProtoObj.Result> it = blockingStub.serverStreamHello(person);
while (it.hasNext()) {
System.out.print(it.next());
}
channel.shutdown();
}
輸出
---server stream rpc---
string: "hello, World"
string: "hello2, World"
string: "hello3, World"
客戶端流式rpc
客戶端傳入多個請求對象,服務端返回一個響應結果
proto語法
rpc clientStreamHello(stream Person) returns (Result) {}
service代碼
@Override
public io.grpc.stub.StreamObserver<ProtoObj.Person> clientStreamHello(
final io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
//返回observer應對多個請求對象
return new StreamObserver<ProtoObj.Person>(){
private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder();
@Override
public void onNext(ProtoObj.Person value) {
builder.setString(builder.getString() +"," + value.getMyName());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
builder.setString("hello"+builder.getString());
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
};
}
client代碼
@Test
public void clientStream() throws InterruptedException {
final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
//定義異步的stub
HelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel);
ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
//client side
System.out.println("---client stream rpc---");
StreamObserver<ProtoObj.Result> responseObserver = new StreamObserver<ProtoObj.Result>() {
@Override
public void onNext(ProtoObj.Result result) {
System.out.println("client stream--" + result.getString());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
//關閉channel
channel.shutdown();
}
};
StreamObserver<ProtoObj.Person> clientStreamObserver = asyncStub.clientStreamHello(responseObserver);
clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build());
clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build());
clientStreamObserver.onCompleted();
//由於是異步獲得結果,所以sleep一秒
Thread.sleep(1000);
}
輸出
---client stream rpc---
client stream--hello,World,World2
雙向流式rpc
結合客戶端流式rpc和服務端流式rpc,可以傳入多個對象,返回多個響應對象
proto語法
rpc biStreamHello(stream Person) returns (stream Result) {}
service代碼
@Override
public io.grpc.stub.StreamObserver<ProtoObj.Person> biStreamHello(
final io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {
//返回observer應對多個請求對象
return new StreamObserver<ProtoObj.Person>(){
private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder();
@Override
public void onNext(ProtoObj.Person value) {
responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello2, "+value.getMyName()).build());
responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+value.getMyName()).build());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
client代碼
@Test
public void bidirectStream() throws InterruptedException {
final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
//定義異步的stub
HelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel);
ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
//bi stream
System.out.println("---bidirectional stream rpc---");
StreamObserver<ProtoObj.Result> responseObserver = new StreamObserver<ProtoObj.Result>() {
@Override
public void onNext(ProtoObj.Result result) {
System.out.println("bidirectional stream--"+result.getString());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
channel.shutdown();
}
};
StreamObserver<ProtoObj.Person> biStreamObserver=asyncStub.biStreamHello(responseObserver);
biStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build());
biStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build());
biStreamObserver.onCompleted();
//由於是異步獲得結果,所以sleep一秒
Thread.sleep(1000);
}
輸出
---bidirectional stream rpc---
bidirectional stream--hello2, World
bidirectional stream--hello3, World
bidirectional stream--hello2, World2
bidirectional stream--hello3, World2
總結
grpc通過使用流式的方式,返回/接受多個實例可以用於類似不定長數組的入參和出參
