歡迎回來!
2.請求流接口
(客戶端可以源源不斷的給服務端傳參數,服務端會源源不斷的接受服務端的參數,最后在客戶端完成請求的時候,服務端返回一個結果)
在.proto文件中新加一個方法,這個方法的參數被 stream 關鍵字修飾
rpc methodRequestStream(stream Request) returns (Result) {}
然后用maven,清理一下緩存,重新編譯一下
2.1.服務端
重新編譯之后,實現剛剛新加的方法
@Override public StreamObserver<Request> methodRequestStream(StreamObserver<Result> responseObserver) { return new StreamObserver<Request>() { @Override public void onNext(Request request) { System.out.print("收到了請求 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); responseObserver.onCompleted(); } }; }
(友情提示,如果 StreamObserver 的的泛型是Result 我們就叫 返回流觀察者,如果是 Request 就叫請求流觀察者,這樣好描述一些)
這個和普通的有點不一樣,直接返回了一個 請求流觀察者 的接口實現,而且方法的參數還是一個 返回流觀察者 ,好像搞反了一樣,至於為什么,一會在客戶端那里 統一說
2.2.客戶端
請求流式異步調用,普通的是同步調用,我們在普通的方法里創建的實例 也是同步的,所以我們要在 JavaGrpcClient 中新加一個 異步調用的方法,添加一個異步的實例
public <Result> Result runAsync(Functional<TestServiceGrpc.TestServiceStub,Result> functional) { TestServiceGrpc.TestServiceStub testServiceStub = TestServiceGrpc.newStub(channel); return functional.run(testServiceStub); }
TestServiceGrpc.newStub 返回的是一個異步的實例
再加一個測試
@Test public void contextLoads2() { Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build(); StreamObserver<Result> responseObserver = new StreamObserver<Result>() { @Override public void onNext(Result result) { System.out.print("返回了結果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } }; StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver)); result.onNext(request); result.onNext(request); result.onNext(request); result.onCompleted(); try { Thread.sleep(600000); } catch (Exception ex){} }
這里我們實現了一個 返回流觀察者
StreamObserver<Result> responseObserver = new StreamObserver<Result>() { @Override public void onNext(Result result) { System.out.print("返回了結果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } };
調用方法的時候,將我們實現的 返回流觀察者 傳進去,返回給我們一個 請求流觀察者
StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));
其實這里返回的 請求流觀察者 就是服務端那里返回給我們的內個實現,服務端那里 返回流觀察者 是我們實現的 傳給他的
由於是異步調用,最后暫停一下,要不測試跑完,程序結束 開沒開始就結束了
try { Thread.sleep(600000); } catch (Exception ex){}
運行起來看結果
服務端的打印
客戶端的打印
這里我們發送了三次參數過去
result.onNext(request); result.onNext(request); result.onNext(request);
就相當於 服務端 那邊返回的 請求流觀察者 被調用了 三次 ,所以就打印了三句話
發送完參數結束請求
result.onCompleted();
服務端那里的結束請求中調用了一次我們傳給他的 返回流觀察者 中的 onNext 方法
所以客戶端就打印了一次
這里會有人問 這里不能返回 多個嗎
不能,雖然 這兩個觀察者 看上去一樣 都是 StreamObserver 接口,但是,這個方法只是請求流調用,在grpc的內部 最后返回的時候 只返回第一個指定的返回只,不管返回了多少個,在客戶端那邊只會收到 第一個返回的結果
3.響應流接口
(和請求流接口完全相反,請求流是異步,響應流是同步,請求流是接受多個請求返回一個結果,響應流是接受一個請求返回多個結果)
我們在.proto文件中再增加一個方法,這回這個方法的返回值被 stream 關鍵字修飾
rpc methodResultStream(Request) returns (stream Result){}
清緩存,重新編譯
3.1.服務端
實現剛剛新加的方法
@Override public void methodResultStream(Request request, StreamObserver<Result> responseObserver) { System.out.print("收到了請求 \n"); Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); responseObserver.onNext(result); try { Thread.sleep(2000); } catch (Exception ex){} responseObserver.onNext(result); responseObserver.onCompleted(); }
這里跟普通的差不多,只是我們返回了三次結果
responseObserver.onNext(result); responseObserver.onNext(result); try { Thread.sleep(2000); } catch (Exception ex){} responseObserver.onNext(result);
3.2.客戶端
沒啥好加的了,直接上測試
@Test public void contextLoads3() { Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build(); Iterator<Result> result = javaGrpcClient.run(o -> o.methodResultStream(request)); result.forEachRemaining(o -> { System.out.print("返回了結果 \n"); }); System.out.print("結束 \n"); }
返回流請求是同步的,所以要調同步的方法,返回了一個迭代器
Iterator<Result> result = javaGrpcClient.run(o -> o.methodResultStream(request));
迭代器中有服務端的所有返回結果
result.forEachRemaining(o -> { System.out.print("返回了結果 \n"); });
運行結果
服務端結果
客戶端結果
由於是同步調用,在forEach中會等待服務端的每一個返回結果
4.雙向流接口
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
歇會,抽根煙!
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
在.proto文件中再加一個方法
rpc methodDoubleStream(stream Request) returns (stream Result){}
實現她
雙向流的服務端和請求流的沒啥區別,只是在接收到請求的時候沒有立刻結束請求
@Override public StreamObserver<Request> methodDoubleStream(StreamObserver<Result> responseObserver) { return new StreamObserver<Request>() { @Override public void onNext(Request value) { System.out.print("收到了請求 \n"); Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build(); responseObserver.onNext(result); } @Override public void onError(Throwable t) { } @Override public void onCompleted() { responseObserver.onCompleted(); } }; }
客戶端也沒啥區別
@Test public void contextLoads4() { Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build(); StreamObserver<Result> responseObserver = new StreamObserver<Result>() { @Override public void onNext(Result result) { System.out.print("返回了結果 \n"); } @Override public void onError(Throwable throwable) { } @Override public void onCompleted() { } }; StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodDoubleStream(responseObserver)); result.onNext(request); result.onNext(request); result.onNext(request); result.onCompleted(); try { Thread.sleep(600000); } catch (Exception ex){} }
雙向流也是異步的,所以要等待
try { Thread.sleep(600000); } catch (Exception ex){}
服務端結果
客戶端結果
完結!撒花!