Netty章節十四:Grpc四種服務方法的Java遠程調用


Grpc四種服務方法的Java遠程調用

快速入門安裝grpc請參考官方案例

詳細說明

也可參考官方GitHub的grpc-java說明

下載/添加JAR包

添加JAR包。或對於非Android的Maven,添加到您的pom.xml

<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-netty-shaded</artifactId>
  <version>1.29.0</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-protobuf</artifactId>
  <version>1.29.0</version>
</dependency>
<dependency>
  <groupId>io.grpc</groupId>
  <artifactId>grpc-stub</artifactId>
  <version>1.29.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
  <groupId>org.apache.tomcat</groupId>
  <artifactId>annotations-api</artifactId>
  <version>6.0.53</version>
  <scope>provided</scope>
</dependency>

或者對於非android的Gradle,增加你的依賴

implementation 'io.grpc:grpc-netty-shaded:1.29.0'
implementation 'io.grpc:grpc-protobuf:1.29.0'
implementation 'io.grpc:grpc-stub:1.29.0'
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+

對於Android客戶端,使用grpc-okhttp代替grpc-net -,使用grpc-protobuf-lite代替grpc-protobuf

implementation 'io.grpc:grpc-okhttp:1.29.0'
implementation 'io.grpc:grpc-protobuf-lite:1.29.0'
implementation 'io.grpc:grpc-stub:1.29.0'
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+

生成代碼

對與基於prtobuf的代碼生成你可以將proto文件放置到src/main/protosrc/test/proto 的目錄下同時搭配上恰當的插件。

對於使用Maven構建系統集成的基於protobuf的代碼生成,您可以使用 protobuf-maven-plugin(Eclipse 與 NetBeans 還應查看IDE文檔)

<build>
  <extensions>
    <extension>
      <groupId>kr.motd.maven</groupId>
      <artifactId>os-maven-plugin</artifactId>
      <version>1.6.2</version>
    </extension>
  </extensions>
  <plugins>
    <plugin>
      <groupId>org.xolstice.maven.plugins</groupId>
      <artifactId>protobuf-maven-plugin</artifactId>
      <version>0.6.1</version>
      <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.11.0:exe:${os.detected.classifier}</protocArtifact>
        <pluginId>grpc-java</pluginId>
        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.29.0:exe:${os.detected.classifier}</pluginArtifact>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>compile-custom</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

對於使用Gradle構建系統集成的基於protobuf的代碼生成,您可以使用 protobuf-gradle-plugin

plugins {
    id 'com.google.protobuf' version '0.8.8'
}

protobuf {
  protoc {
    artifact = "com.google.protobuf:protoc:3.11.0"
  }
  plugins {
    grpc {
      artifact = 'io.grpc:protoc-gen-grpc-java:1.29.0'
    }
  }
  generateProtoTasks {
    all()*.plugins {
      grpc {}
    }
  }
}

protobuf-gradle-plugin:

  1. 他會使用protoc命令行工具,根據你的*.proto文件來生成Java源文件
  2. 它將生成的Java源文件添加到相應的Java編譯單元(Java項目中的sourceSet;,以便它們可以與Java源代碼一起編譯。

傳輸層

傳輸層負責將字節從連線中取出和放入的繁重工作。它的接口是足夠的抽象,可以加入不同的實現,傳輸是以stream/流工廠的形式來進行的建模,注意,傳輸層API被認為是gRPC內部的API,它比包io.grpc下的核心API具有更弱的API保證。

  1. 基於Netty的傳輸是基於Netty的主要傳輸實現。它同時適用於客戶機和服務器。
  2. 基於OkHttp的傳輸是一種基於OkHttp的輕量級傳輸。它主要用於Android系統,只適用於客戶端。
  3. 進程內傳輸用於服務器與客戶端位於同一進程中的情況。它對於測試非常有用,同時對於生產使用也是安全的。

加入依賴

plugins {
    id 'java'
    id 'com.google.protobuf' version '0.8.8'
}

group 'com.sakura'
version '1.0'

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    testCompile (
            "junit:junit:4.12"

    )
    compile (
            "io.netty:netty-all:4.1.46.Final",
            'com.google.protobuf:protobuf-java:3.11.4',
            'com.google.protobuf:protobuf-java-util:3.11.4',
            'io.grpc:grpc-netty-shaded:1.28.0',
            'io.grpc:grpc-protobuf:1.28.0',
            'io.grpc:grpc-stub:1.28.0'
    )
}

//構建protobuf插件配置
protobuf {
    //輸出目錄的根目錄名,生成的java文件的位置,會在指定的目錄下的main下生成
    generateProtoTasks.generatedFilesBaseDir = "$projectDir/src"

    protoc {
        artifact = "com.google.protobuf:protoc:3.11.0"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.28.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {
                //grpc的接口類的生成目錄,默認是grpc
                outputSubDir = 'java'
            }
        }
    }
}

編寫一個Proto文件

syntax = "proto3";//定義使用的proto版本

package com.sakura.proto;//所有語言適用的包路徑定義語法
option java_package = "com.sakura.proto";//java包路徑 優先級高於package
option java_outer_classname = "Student";//生成的外部類名
option java_multiple_files = true;//是否生成多個文件


//定義rpc的方法
service StudentService{
    //1.客戶端發出一個普通的請求,服務器的返回一個普通的響應
    rpc GetRealNameByUsername(MyRequest) returns (MyResponse);
    //grpc的請求以及響應不能是基本數據類型,必須是一個message類型,不管請求里有幾個參數
    //他必須是你定義的一個message類型的
    //2.根據學生的年齡獲取與這個年齡相等的學生對象客戶端發生一個普通的請求,服務器的以流的形式返回
    rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse);
    //3.以流式的方式請求一個StudentRequest服務器會返回一個StudentResponseList
    rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList){}
    //4.客戶源與服務端都以流式的方式,雙向的數據流傳遞
    rpc BiTalk(stream StreamRequest) returns (stream StreamResponse);
}


//消息
message MyRequest{
    string username = 1;
}
message MyResponse{
    string realname = 2;
}

//單向流使用的消息
message StudentResponse{
    string name = 1;
    int32 age = 2;
    string city = 3;
}
message StudentRequest{
    int32 age = 1;
}
message StudentResponseList{
    repeated StudentResponse studentResponse = 1;
}

//雙向數據流傳遞使用的消息
message StreamRequest{
    string request_info = 1;
}

message StreamResponse{
    string response_info = 1;
}

服務端代碼

public class GrpcServer {

    private Server server;

    public static void main(String[] args) throws Exception{
        GrpcServer server = new GrpcServer();

        server.start();
        server.awaitTermination();

    }

    private void start()throws Exception{
        //創建服務通道配置端口,傳入映射的方法的實現類,然后構建並啟動
        this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl())
                .build().start();

        System.out.println("server started!");
        //設置一個回調鈎子
        Runtime.getRuntime().addShutdownHook(new Thread(() ->{
            System.out.println("JVM 關閉");
            try {
                this.stop();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
        System.out.println("執行到這里");
    }

    private void stop() throws InterruptedException {
        if(this.server != null){
            //關閉服務
            this.server.shutdown();
        }
    }

    private void awaitTermination() throws InterruptedException {
        if(this.server != null){
            //等待終止,讓服務不停止,可以設置超時時長
            this.server.awaitTermination();
            //this.server.awaitTermination(3000, TimeUnit.MILLISECONDS);
        }
    }

}

服務接口實現類

/**
 * @ClassName : StudentServiceImpl
 * @Description : 遠程調用的方法的具體實現,實現生成代碼中的內部抽象類
 */
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {

    /**
     * 重寫父類的方法
     * @param request 客戶端發來的數據
     * @param responseObserver 響應觀察者 用於響應客戶端的對象
     */
    @Override
    public void getRealNameByUsername(MyRequest request, StreamObserver<MyResponse> responseObserver) {
        System.out.println("接收到客戶端信息:" + request.getUsername());
        /*
            onCompleted()   標示這個方法調用結束,只能調用一次
            onError()   異常時調用
            onNext()    接下來要做什么事,可以用於結果返回
         */
        //構造響應對象,並返回
        responseObserver.onNext(MyResponse.newBuilder().setRealname("星空").build());
        //標示服務器處理結束
        responseObserver.onCompleted();
    }


    @Override
    public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
        System.out.println("接受到客戶端信息:" + request.getAge());
        responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海").setAge(18).setCity("北京").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海2").setAge(20).setCity("上海").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海3").setAge(22).setCity("廣州").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海4").setAge(18).setCity("深圳").build());

        responseObserver.onCompleted();
    }

    @Override
    public StreamObserver<StudentRequest> getStudentsWrapperByAges(StreamObserver<StudentResponseList> responseObserver) {
        //實現StreamObserver接口,實現方法當特定的事件觸發時,回調方法就會的到調用
        return new StreamObserver<StudentRequest>() {
            /**
             * 接收客戶端的請求,請求到來時被調用
             * 每來一次請求,onNext()方法就會被調用一次
             * 因為請求是流式的,onNext會被調用多次
             * @param value
             */
            @Override
            public void onNext(StudentRequest value) {
                System.out.println("onNext:" + value.getAge());
            }

            /**
             * 出現異常時被調用
             * @param t
             */
            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            /**
             * 表示客戶端將流式數據全部發給服務器端之后,客戶端就會有一個onCompleted事件,服務器端就會感知到
             * 然后服務器端在onCompleted中為客戶端返回最終結果
             */
            @Override
            public void onCompleted() {
                StudentResponse.Builder studentResponse =
                        StudentResponse.newBuilder().setName("彩虹海1").setAge(18).setCity("宇宙");
                StudentResponse.Builder studentResponse2 =
                        StudentResponse.newBuilder().setName("彩虹海2").setAge(20).setCity("宇宙");
                StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse)
                        .addStudentResponse(studentResponse2).build();
                //返回結果
                responseObserver.onNext(studentResponseList);
                //表示處理完成
                responseObserver.onCompleted();
            }
        };
    }

    @Override
    public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
        return new StreamObserver<StreamRequest>() {
            /**
             * 客戶端發來請求時被調用,每請求一次則被調用一次
             * @param value 客戶端發來的數據
             */
            @Override
            public void onNext(StreamRequest value) {
                //打印客戶端發來的數據
                System.out.println(value.getRequestInfo());

                //向客戶端返回數據
                responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            /**
             * 客戶端的onCompleted方法被調用時,被調用
             */
            @Override
            public void onCompleted() {
                //雙向的數據流傳遞,雖然是在兩個不同的流中傳遞互不干擾
                //但是當一方的流被關閉時另一方也要關閉與之交互的流
                responseObserver.onCompleted();
            }
        };
    }
}

客戶端代碼

/**
 * @ClassName : GrpcClient
 * @Description : grpc client
 */
public class GrpcClient {


    public static void main(String[] args) throws InterruptedException {
        //usePlaintext()使用純文本的方式,不加密
        ManagedChannel managedChannel =
                ManagedChannelBuilder.forTarget("localhost:8899").usePlaintext().build();
        //客戶端與服務端交互的對象  server與client通信的對象
        //blockingStub 阻塞的方式/同步  發出一個請求一定要等到另一端返回了響應才繼續往下執行
        StudentServiceGrpc.StudentServiceBlockingStub blockingStub =
                StudentServiceGrpc.newBlockingStub(managedChannel);
        //只要是客戶端是以流式的方式向服務器發送請求,這種請求一定以異步的
        //blockingStub是同步的阻塞的,則不會被提供方法
        //獲取一個異步的通信對象
        //創建一個支持該服務的所有呼叫類型的新異步存根,不會等待對方響應會一直向下執行
        StudentServiceGrpc.StudentServiceStub stub =
                StudentServiceGrpc.newStub(managedChannel);
/*        //構建消息
        MyRequest request = MyRequest.newBuilder().setUsername("出發,目標彩虹海").build();
        //調用具體方法,接收到響應
        MyResponse response = blockingStub.getRealNameByUsername(request);
        System.out.println("接收到服務器信息:" + response.getRealname());

        System.out.println("--------------------普通請求與響應 結束----------------------");
*/
 /*       
        //返回一個流式的響應就是一個迭代器,每返回一個對象就進入到迭代器中,再返回對象再進入迭代器,以此類推
        Iterator<StudentResponse> iter =
                blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(18).build());
        //iter.hasNext()  還有沒有下一個
        while (iter.hasNext()){
            StudentResponse studentResponse = iter.next();
            System.out.println(studentResponse.getName() + " , " +
                    studentResponse.getAge() + " , " + studentResponse.getCity());
        }
        System.out.println("-----------------------普通請求 流式響應 結束-------------------");
        
*/
        //客戶端請求一個steam(流式) blockingStub(同步)無法使用 只有使用異步形式
        //構造接收服務端信息的方法
/*      StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
            /**
             * 服務端向客戶端響應結果時會被調用
             * 服務端返回的數據,每返回一次數據則被調用一次
             * 如果服務器端也是流式的並且返回了多個數據,那么每次返回數據的時候都會被調用一次
             * @param value
             *//*
            @Override
            public void onNext(StudentResponseList value) {
                value.getStudentResponseList().forEach(studentResponse -> {
                    System.out.println(studentResponse.getName() + " , " +
                            studentResponse.getAge() + " , " + studentResponse.getCity());
                    System.out.println("**************");
                });
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
        };

        //構造客戶端向服務端發送的數據
        //getStudentsWrapperByAges(傳入處理服務端返回數據的回調對象)
        StreamObserver<StudentRequest> studentsWrapperByAges =
                stub.getStudentsWrapperByAges(studentResponseListStreamObserver);
        //發送數據
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(18).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(28).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(38).build());
        studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(48).build());
        //表示客戶端調用結束
        studentsWrapperByAges.onCompleted();
        System.out.println("-----------------------流式請求 普通響應 結束-------------------");
*/

        StreamObserver<StreamRequest> streamRequestStreamObserver =
                stub.biTalk(new StreamObserver<StreamResponse>() {
            /**
             * 收到服務器響應結果時,被調用
             * @param value 服務器返回的數據
             */
            @Override
            public void onNext(StreamResponse value) {
                System.out.println(value.getResponseInfo());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            /**
             * 服務器端onCompleted()被調用時,被觸發
             */
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
        });

        for (int i = 0; i < 10; i++) {
            streamRequestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
            Thread.sleep(1000);
        }

        streamRequestStreamObserver.onCompleted();
		System.out.println("-----------------------雙向數據流傳遞 結束-------------------");

        //客戶端向服務器端發送數據,數據還沒發送就繼續向下執行走到了onCompleted(),
        //然后在數據還未發出時程序就正常執行完成結束了
        //線程睡眠,強制讓程序等待,等待studentsWrapperByAges將數據全部發送給服務器端
        //如果不睡眠的話,數據還沒發送給服務器端,jvm就停止了,也就發送不出去了
        Thread.sleep(5000);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM