java版gRPC實戰之五:雙向流


歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概覽

  • 本文是《java版gRPC實戰》系列的第五篇,目標是掌握雙向流類型的服務,即請求參數是流的形式,響應的內容也是流的形式;
  • 先來看看官方資料對雙向流式RPC的介紹:是雙方使用讀寫流去發送一個消息序列。兩個流獨立操作,因此客戶端和服務器 可以以任意喜歡的順序讀寫:比如, 服務器可以在寫入響應前等待接收所有的客戶端消息,或者可以交替 的讀取和寫入消息,或者其他讀寫的組合。 每個流中的消息順序被預留;
  • 掌握了客戶端流和服務端流兩種類型的開發后,雙向流類型就很好理解了,就是之前兩種類型的結合體,請求和響應都按照流的方式處理即可;
  • 今天的實戰,咱們來設計一個在線商城的功能:批量減扣庫存,即客戶端提交多個商品和數量,服務端返回每個商品減扣庫存成功和失敗的情況;
  • 咱們盡快進入編碼環節吧,具體內容如下:
  1. 在proto文件中定義雙向流類型的gRPC接口,再通過proto生成java代碼
  2. 開發服務端應用
  3. 開發客戶端應用
  4. 驗證

源碼下載

名稱 鏈接 備注
項目主頁 https://github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) git@github.com:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議
  • 這個git項目中有多個文件夾,《java版gRPC實戰》系列的源碼在grpc-tutorials文件夾下,如下圖紅框所示:

在這里插入圖片描述

  • grpc-tutorials文件夾下有多個目錄,本篇文章對應的服務端代碼在double-stream-server-side目錄下,客戶端代碼在double-stream-client-side目錄下,如下圖:

在這里插入圖片描述

在proto文件中定義雙向流類型的gRPC接口

  • 首先要做的就是定義gRPC接口,打開mall.proto,在里面新增方法和相關的數據結構,需要重點關注的是BatchDeduct方法的入參ProductOrder和返回值DeductReply都添加了stream修飾(ProductOrder是上一章定義的),代表該方法是雙向流類型:
// gRPC服務,這是個在線商城的庫存服務
service StockService {
    // 雙向流式:批量扣減庫存
    rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) {}
}

// 扣減庫存返回結果的數據結構
message DeductReply {
    // 返回碼
    int32 code = 1;
    // 描述信息
    string message = 2;
}
  • 雙擊下圖紅框中的task即可生成java代碼:

在這里插入圖片描述

  • 生成下圖紅框中的文件,即服務端定義和返回值數據結構:

在這里插入圖片描述

  • 接下來開發服務端;

開發服務端應用

  • 在父工程grpc-turtorials下面新建名為double-stream-server-side的模塊,其build.gradle內容如下:
// 使用springboot插件
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    // 作為gRPC服務提供方,需要用到此庫
    implementation 'net.devh:grpc-server-spring-boot-starter'
    // 依賴自動生成源碼的工程
    implementation project(':grpc-lib')
    // annotationProcessor不會傳遞,使用了lombok生成代碼的模塊,需要自己聲明annotationProcessor
    annotationProcessor 'org.projectlombok:lombok'
}
  • 配置文件application.yml:
spring:
  application:
    name: double-stream-server-side
# gRPC有關的配置,這里只需要配置服務端口號
grpc:
  server:
    port: 9901
  • 啟動類DoubleStreamServerSideApplication.java的代碼就不貼了,普通的springboot啟動類而已;
  • 重點是提供grpc服務的GrpcServerService.java,咱們要做的就是給上層框架返回一個匿名類,至於里面的onNext、onCompleted方法何時被調用是上層框架決定的,另外還准備了成員變量totalCount,這樣就可以記錄總數了,由於請求參數是流,因此匿名類的onNext會被多次調用,並且由於返回值是流,因此onNext中調用了responseObserver.onNext方法來響應流中的每個請求,這樣客戶端就不斷收到服務端的響應數據(即客戶端的onNext方法會被多次調用):
package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
@Slf4j
public class GrpcServerService extends StockServiceGrpc.StockServiceImplBase {

    @Override
    public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductReply> responseObserver) {
        // 返回匿名類,給上層框架使用
        return new StreamObserver<ProductOrder>() {

            private int totalCount = 0;

            @Override
            public void onNext(ProductOrder value) {
                log.info("正在處理商品[{}],數量為[{}]",
                        value.getProductId(),
                        value.getNumber());

                // 增加總量
                totalCount += value.getNumber();

                int code;
                String message;

                // 假設單數的都有庫存不足的問題
                if (0 == value.getNumber() % 2) {
                    code = 10000;
                    message = String.format("商品[%d]扣減庫存數[%d]成功", value.getProductId(), value.getNumber());
                } else {
                    code = 10001;
                    message = String.format("商品[%d]扣減庫存數[%d]失敗", value.getProductId(), value.getNumber());
                }

                responseObserver.onNext(DeductReply.newBuilder()
                        .setCode(code)
                        .setMessage(message)
                        .build());
            }

            @Override
            public void onError(Throwable t) {
                log.error("批量減扣庫存異常", t);
            }

            @Override
            public void onCompleted() {
                log.info("批量減扣庫存完成,共計[{}]件商品", totalCount);
                responseObserver.onCompleted();
            }
        };
    }
}

開發客戶端應用

  • 在父工程grpc-turtorials下面新建名為double-stream-server-side的模塊,其build.gradle內容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation project(':grpc-lib')
}
  • 配置文件application.yml,設置自己的web端口號和服務端地址:
server:
  port: 8082
spring:
  application:
    name: double-stream-client-side

grpc:
  client:
    # gRPC配置的名字,GrpcClient注解會用到
    double-stream-server-side:
      # gRPC服務端地址
      address: 'static://127.0.0.1:9901'
      enableKeepAlive: true
      keepAliveWithoutCalls: true
      negotiationType: plaintext
  • 啟動類DoubleStreamClientSideApplication.java的代碼就不貼了,普通的springboot啟動類而已;

  • 正常情況下我們都是用StreamObserver處理服務端響應,這里由於是異步響應,需要額外的方法從StreamObserver中取出業務數據,於是定一個新接口,繼承自StreamObserver,新增getExtra方法可以返回String對象,詳細的用法稍后會看到:

package com.bolingcavalry.grpctutorials;

import io.grpc.stub.StreamObserver;

public interface ExtendResponseObserver<T> extends StreamObserver<T> {
    String getExtra();
}
  • 重頭戲來了,看看如何遠程調用雙向流類型的gRPC接口,代碼中已經添加詳細注釋:
package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Service
@Slf4j
public class GrpcClientService {

    @GrpcClient("double-stream-server-side")
    private StockServiceGrpc.StockServiceStub stockServiceStub;

    /**
     * 批量減庫存
     * @param count
     * @return
     */
    public String batchDeduct(int count) {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // responseObserver的onNext和onCompleted會在另一個線程中被執行,
        // ExtendResponseObserver繼承自StreamObserver
        ExtendResponseObserver<DeductReply> responseObserver = new ExtendResponseObserver<DeductReply>() {

            // 用stringBuilder保存所有來自服務端的響應
            private StringBuilder stringBuilder = new StringBuilder();

            @Override
            public String getExtra() {
                return stringBuilder.toString();
            }

            /**
             * 客戶端的流式請求期間,每一筆請求都會收到服務端的一個響應,
             * 對應每個響應,這里的onNext方法都會被執行一次,入參是響應內容
             * @param value
             */
            @Override
            public void onNext(DeductReply value) {
                log.info("batch deduct on next");
                // 放入匿名類的成員變量中
                stringBuilder.append(String.format("返回碼[%d],返回信息:%s<br>" , value.getCode(), value.getMessage()));
            }

            @Override
            public void onError(Throwable t) {
                log.error("batch deduct gRPC request error", t);
                stringBuilder.append("batch deduct gRPC error, " + t.getMessage());
                countDownLatch.countDown();
            }

            /**
             * 服務端確認響應完成后,這里的onCompleted方法會被調用
             */
            @Override
            public void onCompleted() {
                log.info("batch deduct on complete");
                // 執行了countDown方法后,前面執行countDownLatch.await方法的線程就不再wait了,
                // 會繼續往下執行
                countDownLatch.countDown();
            }
        };

        // 遠程調用,此時數據還沒有給到服務端
        StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver);

        for(int i=0; i<count; i++) {
            // 每次執行onNext都會發送一筆數據到服務端,
            // 服務端的onNext方法都會被執行一次
            requestObserver.onNext(build(101 + i, 1 + i));
        }

        // 客戶端告訴服務端:數據已經發完了
        requestObserver.onCompleted();

        try {
            // 開始等待,如果服務端處理完成,那么responseObserver的onCompleted方法會在另一個線程被執行,
            // 那里會執行countDownLatch的countDown方法,一但countDown被執行,下面的await就執行完畢了,
            // await的超時時間設置為2秒
            countDownLatch.await(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error("countDownLatch await error", e);
        }

        log.info("service finish");
        // 服務端返回的內容被放置在requestObserver中,從getExtra方法可以取得
        return responseObserver.getExtra();
    }

    /**
     * 創建ProductOrder對象
     * @param productId
     * @param num
     * @return
     */
    private static ProductOrder build(int productId, int num) {
        return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
    }
}
  • 最后做個web接口,可以通過web請求驗證遠程調用:
package grpctutorials;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public String printMessage(@RequestParam(defaultValue = "1") int count) {
        return grpcClientService.batchDeduct(count);
    }
}
  • 編碼完成,開始驗證;

驗證

  • 啟動服務端DoubleStreamServerSideApplication:

在這里插入圖片描述

  • 啟動客戶端DoubleStreamClientSideApplication:

在這里插入圖片描述

  • 這里要改:瀏覽器輸入http://localhost:8083/?count=10,響應如下,可見遠程調用gRPC服務成功,流式響應的每一筆返回都被客戶端收到:

在這里插入圖片描述

  • 下面是服務端日志,可見逐一處理了客戶端的每一筆數據:

在這里插入圖片描述

  • 下面是客戶端日志,可見由於CountDownLatch的作用,發起gRPC請求的線程一直等待responseObserver.onCompleted在另一個線程被執行完后,才會繼續執行:

在這里插入圖片描述

  • 至此,四種類型的gRPC服務及其客戶端開發就完成了,一般的業務場景咱們都能應付自如,接下來的文章咱們會繼續深入學習,了解復雜場景下的gRPC操作;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 數據庫+中間件系列
  6. DevOps系列

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM