場景
gPRC簡介以及Java中使用gPRC實現客戶端與服務端通信(附代碼下載):
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108711541
在上面的博客中介紹了gRPC以及使用最基本的rpc通信方式即一個請求對象返回一個響應的方式進行通信。
除此之外gRPC還有以下三種方式。
服務端流式
一個請求對象,服務端返回多個結果對象
proto示例語法
rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}
客戶端流式
客戶端傳入多個請求對象,服務端返回一個響應結果。
proto示例語法
rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}
雙向流式
傳入多個對象可以返回多個響應對象
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
關注公眾號
霸道的程序猿
獲取編程相關電子書、教程推送與免費下載。
實現
服務端流式實現
在上面博客的基礎上,打開Person.proto文件
message StudentRequest { int32 age = 1; } message StudentResponse { string name = 1; int32 age = 2; string city = 3; }
添加兩個message作為請求和響應對象。
因為gRPC的請求和響應對象必須在message中定義,不能直接使用string或者int32這種作為參數。
然后在新建接口方法
service PersonService {
rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}
}
此方法是要請求參數為一個age,然后返回多個學生對象。
然后調用插件生成代碼。
然后來到PersonServiceImpl中對接口方法進行實現
@Override public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) { System.out.println("接收到的客戶端消息為:"+request.getAge()); responseObserver.onNext(StudentResponse.newBuilder().setName("1公眾號:霸道的程序猿") .setAge(30) .setCity("北京") .build()); responseObserver.onNext(StudentResponse.newBuilder().setName("2公眾號:霸道的程序猿") .setAge(40) .setCity("上海") .build()); responseObserver.onNext(StudentResponse.newBuilder().setName("3公眾號:霸道的程序猿") .setAge(50) .setCity("廣州") .build()); responseObserver.onCompleted(); }
然后來到客戶端中
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost",8899) .usePlaintext().build(); PersonServiceGrpc.PersonServiceBlockingStub blockingStub = PersonServiceGrpc.newBlockingStub(managedChannel); System.out.println("請求-流式響應,調用getRealNameByUsername"); Iterator<StudentResponse> iter = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build()); while (iter.hasNext()) { StudentResponse studentResponse = iter.next(); System.out.println(studentResponse.getName()); System.out.println(studentResponse.getAge()); System.out.println(studentResponse.getCity()); }
然后運行服務端后再運行客戶端
此時服務端
客戶端流式實現
打開proto文件
message StudentRequest { int32 age = 1; } message StudentResponse { string name = 1; int32 age = 2; string city = 3; } message StudentResponseList { repeated StudentResponse studentResponse = 1; }
添加響應的list,要實現客戶端發動流式多個請求參數(年齡),服務端返回單個list對象,其中每個List的數據是學生對象。
添加接口方法
service PersonService {
rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}
}
然后調用插件生成代碼。打開PersonServiceImpl進行方法的實現
@Override public StreamObserver<StudentRequest> getStudentsWrapperByAges(final StreamObserver<StudentResponseList> responseObserver) { return new StreamObserver<StudentRequest>() { public void onNext(StudentRequest studentRequest) { System.out.println("onNext:"+studentRequest.getAge()); } public void onError(Throwable throwable) { System.out.println(throwable.getMessage()); } public void onCompleted() { StudentResponse studentResponse = StudentResponse.newBuilder() .setName("公眾號:霸道的程序猿") .setAge(20) .setCity("北京").build(); StudentResponse studentResponse1 = StudentResponse.newBuilder() .setName("1公眾號:霸道的程序猿") .setAge(30) .setCity("上海").build(); StudentResponseList studentResponseList = StudentResponseList.newBuilder() .addStudentResponse(studentResponse).addStudentResponse(studentResponse1).build(); responseObserver.onNext(studentResponseList); responseObserver.onCompleted(); } }; }
與上面不同,客戶端如果是流式請求的話,那么客戶端必須使用異步的stub
PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel);
客戶端代碼為
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost",8899) .usePlaintext().build(); PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel); System.out.println("-----------------------------"); System.out.println("流式請求-響應,調用GetStudentsWrapperByAges"); StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() { public void onNext(StudentResponseList studentResponseList) { studentResponseList.getStudentResponseList().forEach(studengResponse ->{ System.out.println(studengResponse.getName()); System.out.println(studengResponse.getAge()); System.out.println(studengResponse.getCity()); }); } public void onError(Throwable throwable) { System.out.println(throwable.getMessage()); } public void onCompleted() { System.out.println("completed"); } }; StreamObserver<StudentRequest> studentRequestStreamObserver = stub.getStudentsWrapperByAges(studentResponseListStreamObserver); studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build()); studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build()); studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build()); studentRequestStreamObserver.onCompleted(); try { Thread.sleep(50000); } catch (InterruptedException e) { e.printStackTrace(); }
因為是異步的所以必須使進程進行休眠才能看到效果
運行服務端后運行客戶端
此時服務端
雙向流式實現
打開proto文件
message StreamRequest { string request_info = 1; } message StreamResponse { string response_info = 1; }
新建流式請求與響應參數,然后新建接口方法
service PersonService {
rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}
}
然后實現接口方法
@Override public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) { return new StreamObserver<StreamRequest>() { @Override public void onNext(StreamRequest streamRequest) { System.out.println(streamRequest.getRequestInfo()); responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build()); } @Override public void onError(Throwable throwable) { System.out.println(throwable.getMessage()); } @Override public void onCompleted() { responseObserver.onCompleted(); } }; }
在客戶端中
package com.badao.grpcjava; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import java.time.LocalDate; import java.util.Iterator; public class GrpcClient { public static void main(String[] args) { ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost",8899) .usePlaintext().build(); PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel); System.out.println("-----------------------------"); System.out.println("流式請求-流式響應,調用BiTalk"); StreamObserver<StreamRequest> requestStreamObserver = stub.biTalk(new StreamObserver<StreamResponse>() { @Override public void onNext(StreamResponse streamResponse) { System.out.println(streamResponse.getResponseInfo()); } @Override public void onError(Throwable throwable) { System.out.println(throwable.getMessage()); } @Override public void onCompleted() { System.out.println("onComplated"); } }); for(int i =0;i<10;i++) { requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDate.now().toString()).build()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(50000); } catch (InterruptedException e) { e.printStackTrace(); } } }
運行服務端后運行客戶端
示例代碼下載
https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/12883063