gRPC的通信方式-客戶端流式、服務端流式、雙向流式在Java的調用示例


場景

gPRC簡介以及Java中使用gPRC實現客戶端與服務端通信(附代碼下載):

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108711541

在上面的博客中介紹了gRPC以及使用最基本的rpc通信方式即一個請求對象返回一個響應的方式進行通信。

除此之外gRPC還有以下三種方式。

服務端流式

一個請求對象,服務端返回多個結果對象

proto示例語法

rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}

 

客戶端流式

客戶端傳入多個請求對象,服務端返回一個響應結果。

proto示例語法

rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}

 

雙向流式

傳入多個對象可以返回多個響應對象

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi
關注公眾號
霸道的程序猿
獲取編程相關電子書、教程推送與免費下載。

實現

服務端流式實現

在上面博客的基礎上,打開Person.proto文件

message StudentRequest {
    int32 age = 1;
}

message StudentResponse {
    string name = 1;
    int32 age = 2;
    string city = 3;
}

 

添加兩個message作為請求和響應對象。

因為gRPC的請求和響應對象必須在message中定義,不能直接使用string或者int32這種作為參數。

然后在新建接口方法

service PersonService {
    rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}
}

 

此方法是要請求參數為一個age,然后返回多個學生對象。

然后調用插件生成代碼。

然后來到PersonServiceImpl中對接口方法進行實現

    @Override
    public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
        System.out.println("接收到的客戶端消息為:"+request.getAge());

        responseObserver.onNext(StudentResponse.newBuilder().setName("1公眾號:霸道的程序猿")
                .setAge(30)
                .setCity("北京")
                .build());

        responseObserver.onNext(StudentResponse.newBuilder().setName("2公眾號:霸道的程序猿")
                .setAge(40)
                .setCity("上海")
                .build());

        responseObserver.onNext(StudentResponse.newBuilder().setName("3公眾號:霸道的程序猿")
                .setAge(50)
                .setCity("廣州")
                .build());
        responseObserver.onCompleted();
    }

 

然后來到客戶端中

 

       ManagedChannel managedChannel  = ManagedChannelBuilder.forAddress("localhost",8899)
                .usePlaintext().build();
        PersonServiceGrpc.PersonServiceBlockingStub blockingStub = PersonServiceGrpc.newBlockingStub(managedChannel);

        System.out.println("請求-流式響應,調用getRealNameByUsername");
        Iterator<StudentResponse> iter = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());
        while (iter.hasNext())
        {
            StudentResponse studentResponse = iter.next();
            System.out.println(studentResponse.getName());
            System.out.println(studentResponse.getAge());
            System.out.println(studentResponse.getCity());
        }

 

然后運行服務端后再運行客戶端

 

 

此時服務端

 

 

 

客戶端流式實現

打開proto文件

message StudentRequest {
    int32 age = 1;
}

message StudentResponse {
    string name = 1;
    int32 age = 2;
    string city = 3;
}

message StudentResponseList {
    repeated StudentResponse studentResponse = 1;
}

 

添加響應的list,要實現客戶端發動流式多個請求參數(年齡),服務端返回單個list對象,其中每個List的數據是學生對象。

添加接口方法

service PersonService {
    rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}
}

 

然后調用插件生成代碼。打開PersonServiceImpl進行方法的實現

 

   @Override
    public StreamObserver<StudentRequest> getStudentsWrapperByAges(final StreamObserver<StudentResponseList> responseObserver) {
        return new StreamObserver<StudentRequest>() {
            public void onNext(StudentRequest studentRequest) {
                System.out.println("onNext:"+studentRequest.getAge());
            }

            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            public void onCompleted() {
                StudentResponse studentResponse = StudentResponse.newBuilder()
                        .setName("公眾號:霸道的程序猿")
                        .setAge(20)
                        .setCity("北京").build();
                StudentResponse studentResponse1 = StudentResponse.newBuilder()
                        .setName("1公眾號:霸道的程序猿")
                        .setAge(30)
                        .setCity("上海").build();

                StudentResponseList studentResponseList = StudentResponseList.newBuilder()
                        .addStudentResponse(studentResponse).addStudentResponse(studentResponse1).build();

                responseObserver.onNext(studentResponseList);
                responseObserver.onCompleted();

            }
        };
    }

 

與上面不同,客戶端如果是流式請求的話,那么客戶端必須使用異步的stub

PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel);

客戶端代碼為

        ManagedChannel managedChannel  = ManagedChannelBuilder.forAddress("localhost",8899)
                .usePlaintext().build();

        PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel);
  
        System.out.println("-----------------------------");
        System.out.println("流式請求-響應,調用GetStudentsWrapperByAges");
        StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
            public void onNext(StudentResponseList studentResponseList) {
                studentResponseList.getStudentResponseList().forEach(studengResponse ->{
                    System.out.println(studengResponse.getName());
                    System.out.println(studengResponse.getAge());
                    System.out.println(studengResponse.getCity());
                });
            }

            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            public void onCompleted() {
                System.out.println("completed");
            }
        };
        StreamObserver<StudentRequest> studentRequestStreamObserver = stub.getStudentsWrapperByAges(studentResponseListStreamObserver);
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());
        studentRequestStreamObserver.onCompleted();

        try {
            Thread.sleep(50000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

 

因為是異步的所以必須使進程進行休眠才能看到效果

運行服務端后運行客戶端

 

 

此時服務端

 

 

 

雙向流式實現

打開proto文件

message StreamRequest {
    string request_info = 1;
}

message StreamResponse {
    string response_info = 1;
}

 

新建流式請求與響應參數,然后新建接口方法

service PersonService {
    rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}
}

 

然后實現接口方法

    @Override
    public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
        return new StreamObserver<StreamRequest>() {
            @Override
            public void onNext(StreamRequest streamRequest) {
                System.out.println(streamRequest.getRequestInfo());
                responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }

 

在客戶端中

package com.badao.grpcjava;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.time.LocalDate;
import java.util.Iterator;

public class GrpcClient {
    public static void main(String[] args) {
        ManagedChannel managedChannel  = ManagedChannelBuilder.forAddress("localhost",8899)
                .usePlaintext().build();
        PersonServiceGrpc.PersonServiceStub stub = PersonServiceGrpc.newStub(managedChannel);

        System.out.println("-----------------------------");
        System.out.println("流式請求-流式響應,調用BiTalk");

        StreamObserver<StreamRequest> requestStreamObserver = stub.biTalk(new StreamObserver<StreamResponse>() {
            @Override
            public void onNext(StreamResponse streamResponse) {
                System.out.println(streamResponse.getResponseInfo());
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onComplated");
            }
        });


        for(int i =0;i<10;i++)
        {
            requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDate.now().toString()).build());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        try {
            Thread.sleep(50000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

運行服務端后運行客戶端

 

 

 

 

 

示例代碼下載

https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/12883063


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM