java grpc 簡單易懂 ---2


 

 

 

 

 

歡迎回來!

2.請求流接口

(客戶端可以源源不斷的給服務端傳參數,服務端會源源不斷的接受服務端的參數,最后在客戶端完成請求的時候,服務端返回一個結果)

 

在.proto文件中新加一個方法,這個方法的參數被 stream 關鍵字修飾

rpc methodRequestStream(stream Request) returns (Result) {}

  

然后用maven,清理一下緩存,重新編譯一下

 

2.1.服務端

 重新編譯之后,實現剛剛新加的方法

    @Override
    public StreamObserver<Request> methodRequestStream(StreamObserver<Result> responseObserver) {
        return new StreamObserver<Request>() {
            @Override
            public void onNext(Request request) {
                System.out.print("收到了請求 \n");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
                responseObserver.onNext(result);
                responseObserver.onCompleted();
            }
        };
    }

  

(友情提示,如果 StreamObserver  的的泛型是Result 我們就叫 返回流觀察者,如果是 Request 就叫請求流觀察者,這樣好描述一些)

這個和普通的有點不一樣,直接返回了一個 請求流觀察者 的接口實現,而且方法的參數還是一個 返回流觀察者 ,好像搞反了一樣,至於為什么,一會在客戶端那里 統一說

 

2.2.客戶端

請求流式異步調用,普通的是同步調用,我們在普通的方法里創建的實例 也是同步的,所以我們要在 JavaGrpcClient 中新加一個 異步調用的方法,添加一個異步的實例

public <Result> Result runAsync(Functional<TestServiceGrpc.TestServiceStub,Result> functional)
    {
        TestServiceGrpc.TestServiceStub testServiceStub =
                TestServiceGrpc.newStub(channel);

        return functional.run(testServiceStub);
    }

TestServiceGrpc.newStub 返回的是一個異步的實例

 

再加一個測試

 

@Test
    public void contextLoads2() {
        Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
        StreamObserver<Result> responseObserver = new StreamObserver<Result>() {
            @Override
            public void onNext(Result result) {
                System.out.print("返回了結果 \n");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {

            }
        };
        StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));
        result.onNext(request);
        result.onNext(request);
        result.onNext(request);
        result.onCompleted();

        try {
            Thread.sleep(600000);
        }
        catch (Exception ex){}
    }

  

這里我們實現了一個 返回流觀察者 

StreamObserver<Result> responseObserver = new StreamObserver<Result>() {
            @Override
            public void onNext(Result result) {
                System.out.print("返回了結果 \n");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {

            }
        };

  

調用方法的時候,將我們實現的 返回流觀察者 傳進去,返回給我們一個 請求流觀察者

StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));

  

其實這里返回的 請求流觀察者 就是服務端那里返回給我們的內個實現,服務端那里 返回流觀察者 是我們實現的 傳給他的

 

由於是異步調用,最后暫停一下,要不測試跑完,程序結束 開沒開始就結束了

try {
    Thread.sleep(600000);
}
catch (Exception ex){}

  

 

運行起來看結果

服務端的打印

 

客戶端的打印

 

這里我們發送了三次參數過去

result.onNext(request);
result.onNext(request);
result.onNext(request);

  

就相當於 服務端 那邊返回的 請求流觀察者 被調用了 三次 ,所以就打印了三句話

 

發送完參數結束請求

result.onCompleted();

  

服務端那里的結束請求中調用了一次我們傳給他的 返回流觀察者 中的 onNext 方法

所以客戶端就打印了一次

 

這里會有人問 這里不能返回 多個嗎

不能,雖然 這兩個觀察者 看上去一樣 都是 StreamObserver 接口,但是,這個方法只是請求流調用,在grpc的內部 最后返回的時候 只返回第一個指定的返回只,不管返回了多少個,在客戶端那邊只會收到 第一個返回的結果

 

 

3.響應流接口

(和請求流接口完全相反,請求流是異步,響應流是同步,請求流是接受多個請求返回一個結果,響應流是接受一個請求返回多個結果)

 

 我們在.proto文件中再增加一個方法,這回這個方法的返回值被 stream 關鍵字修飾

rpc methodResultStream(Request) returns (stream Result){}

  

清緩存,重新編譯

3.1.服務端

 實現剛剛新加的方法

@Override
    public void methodResultStream(Request request, StreamObserver<Result> responseObserver) {
        System.out.print("收到了請求 \n");
        Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
        responseObserver.onNext(result);
        responseObserver.onNext(result);
        try {
            Thread.sleep(2000);
        }
        catch (Exception ex){}
        responseObserver.onNext(result);
        responseObserver.onCompleted();
    }

  

 

這里跟普通的差不多,只是我們返回了三次結果

responseObserver.onNext(result);
responseObserver.onNext(result);
try {
    Thread.sleep(2000);
}
catch (Exception ex){}
responseObserver.onNext(result);

  

 

3.2.客戶端

沒啥好加的了,直接上測試

@Test
    public void contextLoads3() {
        Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
        Iterator<Result> result = javaGrpcClient.run(o -> o.methodResultStream(request));

        result.forEachRemaining(o ->
        {
            System.out.print("返回了結果 \n");
        });
        System.out.print("結束 \n");
    }

  

 

返回流請求是同步的,所以要調同步的方法,返回了一個迭代器

Iterator<Result> result = javaGrpcClient.run(o -> o.methodResultStream(request));

  

迭代器中有服務端的所有返回結果

result.forEachRemaining(o ->
{
    System.out.print("返回了結果 \n");
});

  

運行結果

服務端結果

 

客戶端結果

由於是同步調用,在forEach中會等待服務端的每一個返回結果

 

 

4.雙向流接口

 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

歇會,抽根煙!

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

 

在.proto文件中再加一個方法

rpc methodDoubleStream(stream Request) returns (stream Result){}

  

實現

 

雙向流的服務端和請求流的沒啥區別,只是在接收到請求的時候沒有立刻結束請求

@Override
    public StreamObserver<Request> methodDoubleStream(StreamObserver<Result> responseObserver) {
        return new StreamObserver<Request>() {
            @Override
            public void onNext(Request value) {
                System.out.print("收到了請求 \n");
                Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
                responseObserver.onNext(result);
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }

  

客戶端也沒啥區別

@Test
    public void contextLoads4() {
        Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
        StreamObserver<Result> responseObserver = new StreamObserver<Result>() {
            @Override
            public void onNext(Result result) {
                System.out.print("返回了結果 \n");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {

            }
        };
        StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodDoubleStream(responseObserver));
        result.onNext(request);
        result.onNext(request);
        result.onNext(request);
        result.onCompleted();

        try {
            Thread.sleep(600000);
        }
        catch (Exception ex){}
    }

  

雙向流也是異步的,所以要等待

try {
    Thread.sleep(600000);
}
catch (Exception ex){}

  

 服務端結果

 

客戶端結果

 

完結!撒花!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM