java版gRPC實戰之四:客戶端流


歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

《java版gRPC實戰》全系列鏈接

  1. 用proto生成代碼
  2. 服務發布和調用
  3. 服務端流
  4. 客戶端流
  5. 雙向流
  6. 客戶端動態獲取服務端地址
  7. 基於eureka的注冊發現

本篇概覽

  • 本文是《java版gRPC實戰》系列的第四篇,前文掌握了服務端流,適合從服務端獲取大量數據的場景,今天的目標是掌握客戶端流類型的服務,包括服務提供方和使用方兩側的開發;
  • 先來看看官方資料對客戶端流式RPC的介紹:客戶端寫入一個消息序列並將其發送到服務器,同樣也是使用流。一旦客戶端完成寫入消息,它等待服務器完成讀取返回它的響應;
  • 本文由以下幾部分組成:
  1. 提前小結幾個重要的知識點,稍后開發過程中要重點關注這幾個地方;
  2. 在proto文件中定義客戶端流類型的gRPC接口,再通過proto生成java代碼;
  3. 開發服務端應用;
  4. 開發客戶端應用;
  5. 驗證;

提前小結

為了突出重點,這里將幾個關鍵的知識點提前給出:

  1. 客戶端流的特點,是請求方以流的形式提交數據到響應方;
  2. 一次RPC請求中,請求方可以通過流的方式源源不斷的提交數據,直到調用了StreamObserver的onCompleted方法,才算提交數據完成;
  3. 平時咱們調用方法時,方法內部用到的數據是通過入參傳進來的,但這里不一樣,客戶端要傳給服務端的數據和gRPC方法的入參沒有關系,而是和方法的返回對象有關(執行返回對象的onNext方法可以將數據傳給服務端);
  4. 客戶端在A線程上傳完數據后,服務端的響應是在另一個線程B執行的,因此,如果A線程拿到服務端響應,就要B線程的異步響應方法執行完畢,等待的方法有多種,我用的是CountDownLatch;
  5. 在服務端,開發者要編寫的代碼和以往web開發不同,不是將數據處理好返回,而是返回一個StreamObserver實例給上層框架,由框架負責處理的邏輯,開發者專注開發StreamObserver的實現即可,例如重寫onNext方法,客戶端通過流每上傳一筆數據,onNext方法都會被外層框架執行一次;
  6. 如果您用的是IDEA,記得勾選下圖紅框中的選框,否則運行應用的時候可能遇到lombok相關的問題:

在這里插入圖片描述

  • 上面提到的這些,會在接下來的開發過程中充分體現出來;

源碼下載

名稱 鏈接 備注
項目主頁 https://github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) git@github.com:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議
  • 這個git項目中有多個文件夾,《java版gRPC實戰》系列的源碼在grpc-tutorials文件夾下,如下圖紅框所示:

在這里插入圖片描述

  • grpc-tutorials文件夾下有多個目錄,本篇文章對應的服務端代碼在client-stream-server-side目錄下,客戶端代碼在client-stream-client-side目錄下,如下圖:

在這里插入圖片描述

在proto文件中定義客戶端流類型的gRPC接口

  • 首先要做的就是定義gRPC接口,打開mall.proto,在里面新增方法和相關的數據結構,需要重點關注的是AddToCart方法的入參ProductOrder前面添加了stream修飾,代表該方法是客戶端流類型:
// gRPC服務,這是個在線商城的購物車服務
service CartService {
    // 客戶端流式:添加多個商品到購物車
    rpc AddToCart (stream ProductOrder) returns (AddCartReply) {}
}

// 提交購物車時的產品信息
message ProductOrder {
    // 商品ID
    int32 productId = 1;
    // 商品數量
    int32 number = 2;
}

// 提交購物車返回結果的數據結構
message AddCartReply {
    // 返回碼
    int32 code = 1;
    // 描述信息
    string message = 2;
}
  • 雙擊下圖紅框中的task即可生成java代碼:

在這里插入圖片描述

  • 生成下圖紅框中的文件:

在這里插入圖片描述

  • 接下來開發服務端;

開發服務端應用

  • 在父工程grpc-turtorials下面新建名為client-stream-server-side的模塊,其build.gradle內容如下:
// 使用springboot插件
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    // 作為gRPC服務提供方,需要用到此庫
    implementation 'net.devh:grpc-server-spring-boot-starter'
    // 依賴自動生成源碼的工程
    implementation project(':grpc-lib')
    // annotationProcessor不會傳遞,使用了lombok生成代碼的模塊,需要自己聲明annotationProcessor
    annotationProcessor 'org.projectlombok:lombok'
}
  • 配置文件application.yml:
spring:
  application:
    name: client-stream-server-side
# gRPC有關的配置,這里只需要配置服務端口號
grpc:
  server:
    port: 9900
  • 啟動類ClientStreamServerSideApplication.java的代碼就不貼了,普通的springboot啟動類而已;

  • 重點是提供grpc服務的GrpcServerService.java,請結合前面小結的第五點來閱讀代碼,咱們要做的就是給上層框架返回一個匿名類,至於里面的onNext、onCompleted方法何時被調用是上層框架決定的,另外還准備了成員變量totalCount,這樣就可以記錄總數了:

package com.bolingcavalry.grpctutorials;

import com.bolingcavalry.grpctutorials.lib.AddCartReply;
import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
@Slf4j
public class GrpcServerService extends CartServiceGrpc.CartServiceImplBase {

    @Override
    public StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartReply> responseObserver) {
        // 返回匿名類,給上層框架使用
        return new StreamObserver<ProductOrder>() {

            // 記錄處理產品的總量
            private int totalCount = 0;

            @Override
            public void onNext(ProductOrder value) {
                log.info("正在處理商品[{}],數量為[{}]",
                        value.getProductId(),
                        value.getNumber());

                // 增加總量
                totalCount += value.getNumber();
            }

            @Override
            public void onError(Throwable t) {
                log.error("添加購物車異常", t);
            }

            @Override
            public void onCompleted() {
                log.info("添加購物車完成,共計[{}]件商品", totalCount);
                responseObserver.onNext(AddCartReply.newBuilder()
                                                    .setCode(10000)
                                                    .setMessage(String.format("添加購物車完成,共計[%d]件商品", totalCount))
                                                    .build());
                responseObserver.onCompleted();
            }
        };
    }
}

開發客戶端應用

  • 在父工程grpc-turtorials下面新建名為client-stream-server-side的模塊,其build.gradle內容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation project(':grpc-lib')
}
  • 配置文件application.yml,設置自己的web端口號和服務端地址:
server:
  port: 8082
spring:
  application:
    name: client-stream-client-side

grpc:
  client:
    # gRPC配置的名字,GrpcClient注解會用到
    client-stream-server-side:
      # gRPC服務端地址
      address: 'static://127.0.0.1:9900'
      enableKeepAlive: true
      keepAliveWithoutCalls: true
      negotiationType: plaintext
  • 啟動類ClientStreamClientSideApplication.java的代碼就不貼了,普通的springboot啟動類而已;
  • 正常情況下我們都是用StreamObserver處理服務端響應,這里由於是異步響應,需要額外的方法從StreamObserver中取出業務數據,於是定一個新接口,繼承自StreamObserver,新增getExtra方法可以返回String對象,詳細的用法稍后會看到:
package com.bolingcavalry.grpctutorials;

import io.grpc.stub.StreamObserver;

public interface ExtendResponseObserver<T> extends StreamObserver<T> {
    String getExtra();
}
  • 重頭戲來了,看看如何遠程調用客戶端流類型的gRPC接口,前面小結提到的2、3、4點都會涉及到,代碼中已經添加詳細注釋:
package com.bolingcavalry.grpctutorials;

import com.bolingcavalry.grpctutorials.lib.AddCartReply;
import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Service
@Slf4j
public class GrpcClientService {

    @GrpcClient("client-stream-server-side")
    private CartServiceGrpc.CartServiceStub cartServiceStub;

    public String addToCart(int count) {
        
        CountDownLatch countDownLatch = new CountDownLatch(1);
        
        // responseObserver的onNext和onCompleted會在另一個線程中被執行,
        // ExtendResponseObserver繼承自StreamObserver
        ExtendResponseObserver<AddCartReply> responseObserver = new ExtendResponseObserver<AddCartReply>() {

            String extraStr;

            @Override
            public String getExtra() {
                return extraStr;
            }

            private int code;

            private String message;

            @Override
            public void onNext(AddCartReply value) {
                log.info("on next");
                code = value.getCode();
                message = value.getMessage();
            }

            @Override
            public void onError(Throwable t) {
                log.error("gRPC request error", t);
                extraStr = "gRPC error, " + t.getMessage();
                countDownLatch.countDown();
            }

            @Override
            public void onCompleted() {
                log.info("on complete");
                extraStr = String.format("返回碼[%d],返回信息:%s" , code, message);
                countDownLatch.countDown();
            }
        };
        
        // 遠程調用,此時數據還沒有給到服務端
        StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver);
        
        for(int i=0; i<count; i++) {
            // 發送一筆數據到服務端
            requestObserver.onNext(build(101 + i, 1 + i));
        }

        // 客戶端告訴服務端:數據已經發完了
        requestObserver.onCompleted();

        try {
            // 開始等待,如果服務端處理完成,那么responseObserver的onCompleted方法會在另一個線程被執行,
            // 那里會執行countDownLatch的countDown方法,一但countDown被執行,下面的await就執行完畢了,
            // await的超時時間設置為2秒
            countDownLatch.await(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("countDownLatch await error", e);
        }

        log.info("service finish");
        // 服務端返回的內容被放置在requestObserver中,從getExtra方法可以取得
        return responseObserver.getExtra();
    }

    /**
     * 創建ProductOrder對象
     * @param productId
     * @param num
     * @return
     */
    private static ProductOrder build(int productId, int num) {
        return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
    }
}
  • 最后做個web接口,可以通過web請求驗證遠程調用:
package com.bolingcavalry.grpctutorials;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public String printMessage(@RequestParam(defaultValue = "1") int count) {
        return grpcClientService.addToCart(count);
    }
}
  • 編碼完成,開始驗證;

驗證

  • 啟動服務端ClientStreamServerSideApplication:

在這里插入圖片描述

  • 啟動客戶端ClientStreamClientSideApplication:

在這里插入圖片描述

在這里插入圖片描述

  • 下面是服務端日志,可見逐一處理了客戶端的每一筆數據:

在這里插入圖片描述

  • 下面是客戶端日志,可見由於CountDownLatch的作用,發起gRPC請求的線程一直等待responseObserver.onCompleted在另一個線程被執行完后,才會繼續執行:

在這里插入圖片描述

  • 至此,客戶端流類型的gRPC服務及其客戶端開發就完成了,這種異步操作與咱們平時開發同步類型的web接口還是有差別的,希望本文能給您帶來一些參考,下一篇咱們實戰最后一種類型:雙向流式;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 數據庫+中間件系列
  6. DevOps系列

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM