開始食用grpc(之二)
https://www.cnblogs.com/funnyzpc/p/9570992.html
開始食用grpc(之一)
https://www.cnblogs.com/funnyzpc/p/9501353.html
https://grpc.io
https://github.com/grpc/grpc-java/tree/master/examples/example-tls
https://github.com/grpc/grpc-java
https://www.myssl.cn/tools/merge-pem-cert.html
雙向流式調用方法及注意事項:
由於雙向流的使用方式不用於上期所講的,這里我從編寫一步步講。
先在preview-grpc-lib工程先的proto文件夾下編寫一個包含雙向流的是proto文件以生成客戶端和服務器相關代碼(記得把生成的代碼放入工程內)。
(MultiStream.proto)
1 syntax = "proto3";
2
3 option java_multiple_files = true;
4 option java_package = "com.funnyzpc.XXX.grpc.lib.multiStream";
5 //定義一個服務
6 service MultiStreamService{
7 rpc queryStream (stream MultiStreamReq) returns (stream MultiStreamResp) {
8
9 }
10
11 }
12 //定義一個請求體(用於傳參)
13 message MultiStreamReq{
14 int32 page_no=1;
15 int32 page_size=2;
16 MultiStreamDataReq data=3;
17 }
18
19 message MultiStreamDataReq{
20 string name=1;
21 bool type=2;
22 }
23 //定義一個響應體(用於回參)
24 message MultiStreamResp{
25 string req_str=1;
26 MultiStreamFirstResp first=2;
27 }
28 message MultiStreamFirstResp{
29 string f_content=1;
30 int64 idx=2;
31
32 }
這里可能需要對比着上一節所講的復雜proto文件編寫的內容,可以看到:請求體和響應體的定義大致都是一樣的,只是在服務定義的時候會有一些些差別>請求體和響應體的前面多了一個關鍵字"stream” ,就是(請求或響應)只要一方是以流的方式發送就需要聲明為 “stream" 。
編寫個客戶端服務代碼:
1 @Service
2 public class GrpcMultiStreamClientService {
3 private static final Logger LOG=LoggerFactory.getLogger(GrpcMultiStreamClientService.class);
4
5 @GrpcClient("preview-grpc-server")
6 private Channel rpcChannel;
7
8 /**
9 * grpc>雙向流方式
10 * @return
11 */
12 public Object queryByStream()throws Exception{
13 Map<String,Object> resp=new HashMap<>();
14
15 StreamObserver<MultiStreamResp> req= new StreamObserver<MultiStreamResp>() {
16 @Override
17 public void onNext(MultiStreamResp multiStreamResp) {
18 resp.put("req_str",multiStreamResp.getReqStr());
19 resp.put("f_content",multiStreamResp.getFirst().getFContent());
20 resp.put("idx",multiStreamResp.getFirst().getIdx());
21 LOG.info("onNext()");
22 //return resp;
23 }
24
25 @Override
26 public void onError(Throwable throwable) {
27 LOG.info("onError()");
28 }
29
30 @Override
31 public void onCompleted() {
32 LOG.info("onCompleted()");
33 }
34 };
35
36 MultiStreamServiceGrpc.MultiStreamServiceStub stud=MultiStreamServiceGrpc.newStub(rpcChannel);
37 StreamObserver<MultiStreamReq> reqStream=stud.queryStream(req);
38
39 MultiStreamDataReq streamDataReq=MultiStreamDataReq.newBuilder()
40 .setName("req>name field")
41 .setType(false)
42 .build();
43 MultiStreamReq streamReq= MultiStreamReq.newBuilder()
44 .setPageNo(1)
45 .setPageSize(20)
46 .setData(streamDataReq).build();
47
48 reqStream.onNext(streamReq);
49 reqStream.onCompleted();
50 return resp;
51 }
52 }
可以在上圖看到,請求方法內首先是要放入一個構造的內部請求方法,請求體也需要放入到StreamObserver這個對象中,這是與之前編寫的grpc客戶端(阻塞)所不一樣的地方,同時構造stub的時候是newStub而不是newBlockingStub ,當然這兩者是有區別的,前者僅適用於http2二進制流的方式 並且是一個異步的(這是重點),而后者是僅適用於http1.1的字符明文方式 並且是阻塞方式(這也是重點),后面我會說說這兩者的具體使用區別。
接下來寫一個grpc服務端服務類,這是代碼:
1 @GrpcService(value= MultiStreamServiceGrpc.class)
2 public class GrpcMultiStreamService extends MultiStreamServiceGrpc.MultiStreamServiceImplBase{
3 private static final Logger LOG= LoggerFactory.getLogger(GrpcMultiStreamService.class);
4
5 @Override
6 public StreamObserver<MultiStreamReq> queryStream(StreamObserver<MultiStreamResp> resp) {
7 return new StreamObserver<MultiStreamReq>() {
8 @Override
9 public void onNext(MultiStreamReq multiStreamReq) {
10 MultiStreamFirstResp firstResp=MultiStreamFirstResp.newBuilder()
11 .setFContent("f_content")
12 .setIdx(99).build();
13 MultiStreamResp req=MultiStreamResp.newBuilder()
14 .setReqStr("req_str")
15 .setFirst(firstResp).build();
16 resp.onNext(req);
17 resp.onCompleted();
18 }
19
20 @Override
21 public void onError(Throwable throwable) {
22 LOG.info("onError()");
23 }
24
25 @Override
26 public void onCompleted() {
27 LOG.info("onCompleted()");
28 }
29 };
30 31 32 }
整體的構造方法和邏輯代碼和客戶端代碼相似,同時服務端的邏輯代碼基本上全在StreamObserver這個異步對象中處理,同時這個構造方法也提供了錯誤和完成所對的重載方法,要進行業務處理也必須在重載的onNext方法中編寫。
主題的服務已經編寫完成,現在添加一個控制器來看看這個服務有沒問題。
1 @Autowired
2 private GrpcMultiStreamClientService multiStreamClientService;
3
4 @RequestMapping("/grpc4")
5 public Object grpc4()throws Exception{
6 return multiStreamClientService.queryByStream();
7 }

可能你會咦的一聲說:請求是成功的,但為什么取到的服務端的參數是空呢?
其實這個很好理解,因為客戶端的請求服務方式是流,此種方式下響應當然是異步的,這里方便調試,需要添加線程阻塞,才可能獲取到服務端的響應參數(下圖中紅色部分)>
1 @Service
2 public class GrpcMultiStreamClientService {
3 private static final Logger LOG=LoggerFactory.getLogger(GrpcMultiStreamClientService.class);
4
5 @GrpcClient("preview-grpc-server")
6 private Channel rpcChannel;
7
8 /**
9 * grpc>雙向流方式
10 * @return
11 */
12 public Object queryByStream()throws Exception{
13 Map<String,Object> resp=new HashMap<>();
14
15 StreamObserver<MultiStreamResp> req= new StreamObserver<MultiStreamResp>() {
16 @Override
17 public void onNext(MultiStreamResp multiStreamResp) {
18 resp.put("req_str",multiStreamResp.getReqStr());
19 resp.put("f_content",multiStreamResp.getFirst().getFContent());
20 resp.put("idx",multiStreamResp.getFirst().getIdx());
21 LOG.info("onNext()");
22 //return resp;
23 }
24
25 @Override
26 public void onError(Throwable throwable) {
27 LOG.info("onError()");
28 }
29
30 @Override
31 public void onCompleted() {
32 LOG.info("onCompleted()");
33 }
34 };
35
36 MultiStreamServiceGrpc.MultiStreamServiceStub stud=MultiStreamServiceGrpc.newStub(rpcChannel);
37 StreamObserver<MultiStreamReq> reqStream=stud.queryStream(req);
38
39 MultiStreamDataReq streamDataReq=MultiStreamDataReq.newBuilder()
40 .setName("req>name field")
41 .setType(false)
42 .build();
43 MultiStreamReq streamReq= MultiStreamReq.newBuilder()
44 .setPageNo(1)
45 .setPageSize(20)
46 .setData(streamDataReq).build();
47
48 reqStream.onNext(streamReq);
49 reqStream.onCompleted();
50 Thread.sleep(10000);
51 return resp;
52 }
53 }
可以看到線程睡眠了10秒,如果打斷點可以看到 睡眠的過程中會響應客戶端中的onNext方法,再就是把參數放入到resp中,當然客戶端服務為流的方式下一般不做線程睡眠的操作,因為服務器有可能超時,如果超時那可就麻煩了。所以說grpc異步是有極好的應用場景,比如業務費阻塞,日志處理等等,同時如果需要直接響應請使用阻塞的方式(上面已經說過了),好了,這個時候,我們看看結果>

ok,可以順利的看到服務器的響應結果了。
grpc安全問題及攔截器:
對於grpc安全問題,grpc只在服務端提供了 服務端證書驗證 的方式,具體就是在在客戶端請求的時候驗證客戶地址是否是有效而已,默認不使用的時候服務端證書的開關是關閉着的,這個驗證其實也很簡陋,具體的可以看看源碼便知:

如若開發的系統要保證極高的安全度,建議使用這兩類方式:
A>將客戶端應用和服務端應用放置在同一個內往下,服務端關閉外網直接訪問
B>可以在服務端添加攔截器,使用token的方式來驗證客戶端身份是否合法(這種方式可能需要客戶端設置請求頭)
對於以上兩種安全訪問方式,也可以以混合的方式使用,對於以上后者,我簡單的列舉下如何使用攔截器,就一個簡單的例子呵~
首先填寫一個服務端攔截器>
1 public class GrpcInterceptor implements ServerInterceptor {
2 private static final Logger LOG=LoggerFactory.getLogger(GrpcInterceptor.class);
3
4 @Override
5 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
6 LOG.info(call.getAttributes().toString());
7 String inetSocketString = call.getAttributes()
8 .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString();
9 LOG.info(inetSocketString);
10 return next.startCall(call,headers);
11 }
12 }
如上,攔截器實現於grpc 的 ServerInterceptor 來編寫的,如果需要做攔截處理 可以直接在interceptCall方法中編寫相應的邏輯。
然后需要在服務端服務類的注解中聲明所使用的攔截器>
1 @GrpcService(value= MultiStreamServiceGrpc.class,interceptors = GrpcInterceptor.class)
2 public class GrpcMultiStreamService extends MultiStreamServiceGrpc.MultiStreamServiceImplBase{
3 //此處略
4 }
攔截器聲明可以見以上代碼紅色部分,以上代碼的具體邏輯部分與以上GrpcMultiStreamService內容相同,同時順帶說下上面注解中的value變量,這個變量只是聲明當前服務端服務類所使用的grpc的服務類是什么,當然可以填寫其他的grpc的服務類(一定是proto文件生成的類才可以),並且不能為空!,同時這里就不給測試結果囖,讀者打個斷點就知道了。

