java版gRPC實戰之三:服務端流


歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

《java版gRPC實戰》全系列鏈接

  1. 用proto生成代碼
  2. 服務發布和調用
  3. 服務端流
  4. 客戶端流
  5. 雙向流
  6. 客戶端動態獲取服務端地址
  7. 基於eureka的注冊發現

關於gRPC定義的四種類型

本文是《java版gRPC實戰》系列的第三篇,前文咱們實戰體驗了簡單的RPC請求和響應,那種簡單的請求響應方式其實只是gRPC定義的四種類型之一,這里給出《gRPC 官方文檔中文版》對這四種gRPC類型的描述:

  1. 簡單 RPC:客戶端使用存根(stub)發送請求到服務器並等待響應返回,就像平常的函數調用一樣;
  2. 服務器端流式 RPC:客戶端發送請求到服務器,拿到一個流去讀取返回的消息序列。 客戶端讀取返回的流,直到里面沒有任何消息;(即本篇內容)
  3. 客戶端流式 RPC:客戶端寫入一個消息序列並將其發送到服務器,同樣也是使用流。一旦 客戶端完成寫入消息,它等待服務器完成讀取返回它的響應;
  4. 雙向流式 RPC:是雙方使用讀寫流去發送一個消息序列。兩個流獨立操作,因此客戶端和服務器 可以以任意喜歡的順序讀寫:比如, 服務器可以在寫入響應前等待接收所有的客戶端消息,或者可以交替 的讀取和寫入消息,或者其他讀寫的組合。 每個流中的消息順序被預留;

本篇概覽

本篇是服務端流類型的gRPC服務實戰,包括以下內容:

  1. 開發一個gRPC服務,類型是服務端流;
  2. 開發一個客戶端,調用前面發布的gRPC服務;
  3. 驗證;
  • 不多說了,開始上代碼;

源碼下載

名稱 鏈接 備注
項目主頁 https://github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) git@github.com:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議
  • 這個git項目中有多個文件夾,《java版gRPC實戰》系列的源碼在grpc-tutorials文件夾下,如下圖紅框所示:

在這里插入圖片描述

  • grpc-tutorials文件夾下有多個目錄,本篇文章對應的服務端代碼在server-stream-server-side目錄下,客戶端代碼在server-stream-client-side目錄下,如下圖:

在這里插入圖片描述

開發一個gRPC服務,類型是服務端流

  • 首先要開發的是gRPC服務端,一共要做下圖所示的七件事:

在這里插入圖片描述

  • 打開grpc-lib模塊,在src/main/proto目錄下新增文件mall.proto,里面定一個了一個gRPC方法ListOrders及其入參和返回對象,內容如下,要注意的是返回值要用關鍵字stream修飾,表示該接口類型是服務端流:
syntax = "proto3";

option java_multiple_files = true;
// 生成java代碼的package
option java_package = "com.bolingcavalry.grpctutorials.lib";
// 類名
option java_outer_classname = "MallProto";

// gRPC服務,這是個在線商城的訂單查詢服務
service OrderQuery {
    // 服務端流式:訂單列表接口,入參是買家信息,返回訂單列表(用stream修飾返回值)
    rpc ListOrders (Buyer) returns (stream Order) {}
}

// 買家ID
message Buyer {
    int32 buyerId = 1;
}

// 返回結果的數據結構
message Order {
    // 訂單ID
    int32 orderId = 1;
    // 商品ID
    int32 productId = 2;
    // 交易時間
    int64 orderTime = 3;
    // 買家備注
    string buyerRemark = 4;
}
  • 雙擊下圖紅框位置的generateProto,即可根據proto生成java代碼:

在這里插入圖片描述

  • 新生成的java代碼如下圖紅框:

在這里插入圖片描述

  • 在父工程grpc-turtorials下面新建名為server-stream-server-side的模塊,其build.gradle內容如下:
// 使用springboot插件
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    // 作為gRPC服務提供方,需要用到此庫
    implementation 'net.devh:grpc-server-spring-boot-starter'
    // 依賴自動生成源碼的工程
    implementation project(':grpc-lib')
}
  • 新建配置文件application.yml:
spring:
  application:
    name: server-stream-server-side
# gRPC有關的配置,這里只需要配置服務端口號
grpc:
  server:
    port: 9899
  • 啟動類:
package com.bolingcavalry.grpctutorials;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ServerStreamServerSideApplication {
    public static void main(String[] args) {
        SpringApplication.run(ServerStreamServerSideApplication.class, args);
    }
}
  • 接下來是最關鍵的gRPC服務,代碼如下,可見responseObserver.onNext方法被多次調用,用以向客戶端持續輸出數據,最后通過responseObserver.onCompleted結束輸出:
package com.bolingcavalry.grpctutorials;

import com.bolingcavalry.grpctutorials.lib.Buyer;
import com.bolingcavalry.grpctutorials.lib.Order;
import com.bolingcavalry.grpctutorials.lib.OrderQueryGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import java.util.ArrayList;
import java.util.List;

@GrpcService
public class GrpcServerService extends OrderQueryGrpc.OrderQueryImplBase {

    /**
     * mock一批數據
     * @return
     */
    private static List<Order> mockOrders(){
        List<Order> list = new ArrayList<>();
        Order.Builder builder = Order.newBuilder();

        for (int i = 0; i < 10; i++) {
            list.add(builder
                    .setOrderId(i)
                    .setProductId(1000+i)
                    .setOrderTime(System.currentTimeMillis()/1000)
                    .setBuyerRemark(("remark-" + i))
                    .build());
        }

        return list;
    }

    @Override
    public void listOrders(Buyer request, StreamObserver<Order> responseObserver) {
        // 持續輸出到client
        for (Order order : mockOrders()) {
            responseObserver.onNext(order);
        }
        // 結束輸出
        responseObserver.onCompleted();
    }
}
  • 至此,服務端開發完成,咱們再開發一個springboot應用作為客戶端,看看如何遠程調用listOrders接口,得到responseObserver.onNext方法輸出的數據;

開發一個客戶端,調用前面發布的gRPC服務

  • 客戶端模塊的基本功能是提供一個web接口,其內部會調用服務端的listOrders接口,將得到的數據返回給前端,如下圖:

在這里插入圖片描述

  • 在父工程grpc-turtorials下面新建名為server-stream-client-side的模塊,其build.gradle內容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation project(':grpc-lib')
}
  • 應用配置信息application.yml內容如下,可見是端口和gRPC服務端地址的配置:
server:
  port: 8081
spring:
  application:
    name: server-stream-client-side

grpc:
  client:
    # gRPC配置的名字,GrpcClient注解會用到
    server-stream-server-side:
      # gRPC服務端地址
      address: 'static://127.0.0.1:9899'
      enableKeepAlive: true
      keepAliveWithoutCalls: true
      negotiationType: plaintext
  • 服務端的listOrders接口返回的Order對象里面有很多gRPC相關的內容,不適合作為web接口的返回值,因此定義一個DispOrder類作為web接口返回值:
package com.bolingcavalry.grpctutorials;

import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;

@Data
@AllArgsConstructor
public class DispOrder {
    private int orderId;
    private int productId;
    private String orderTime;
    private String buyerRemark;
}
  • 平淡無奇的啟動類:
package com.bolingcavalry.grpctutorials;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ServerStreamClientSideApplication {

    public static void main(String[] args) {
        SpringApplication.run(ServerStreamClientSideApplication.class, args);
    }
}
  • 重點來了,GrpcClientService.java,里面展示了如何遠程調用gRPC服務的listOrders接口,可見對於服務端流類型的接口,客戶端這邊通過stub調用會得到Iterator類型的返回值,接下來要做的就是遍歷Iterator:
package com.bolingcavalry.grpctutorials;

import com.bolingcavalry.grpctutorials.lib.Buyer;
import com.bolingcavalry.grpctutorials.lib.Order;
import com.bolingcavalry.grpctutorials.lib.OrderQueryGrpc;
import io.grpc.StatusRuntimeException;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

@Service
@Slf4j
public class GrpcClientService {

    @GrpcClient("server-stream-server-side")
    private OrderQueryGrpc.OrderQueryBlockingStub orderQueryBlockingStub;

    public List<DispOrder> listOrders(final String name) {
        // gRPC的請求參數
        Buyer buyer = Buyer.newBuilder().setBuyerId(101).build();

        // gRPC的響應
        Iterator<Order> orderIterator;

        // 當前方法的返回值
        List<DispOrder> orders = new ArrayList<>();

        // 通過stub發起遠程gRPC請求
        try {
            orderIterator = orderQueryBlockingStub.listOrders(buyer);
        } catch (final StatusRuntimeException e) {
            log.error("error grpc invoke", e);
            return new ArrayList<>();
        }

        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

        log.info("start put order to list");
        while (orderIterator.hasNext()) {
            Order order = orderIterator.next();

            orders.add(new DispOrder(order.getOrderId(),
                                    order.getProductId(),
                                    // 使用DateTimeFormatter將時間戳轉為字符串
                                    dtf.format(LocalDateTime.ofEpochSecond(order.getOrderTime(), 0, ZoneOffset.of("+8"))),
                                    order.getBuyerRemark()));
            log.info("");
        }

        log.info("end put order to list");

        return orders;
    }
}
  • 最后做一個controller類,對外提供一個web接口,里面會調用GrpcClientService的方法:
package com.bolingcavalry.grpctutorials;

import com.bolingcavalry.grpctutorials.lib.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public List<DispOrder> printMessage(@RequestParam(defaultValue = "will") String name) {
        return grpcClientService.listOrders(name);
    }
}
  • 至此,編碼完成,開始驗證

驗證

  1. 啟動server-stream-server-side,啟動成功后會監聽9989端口:

在這里插入圖片描述

  1. 啟動server-stream-client-side,再在瀏覽器訪問:http://localhost:8081/?name=Tom ,得到結果如下(firefox自動格式化json數據),可見成功地獲取了gRPC的遠程數據:

在這里插入圖片描述

至此,服務端流類型的gRPC接口的開發和使用實戰就完成了,接下來的章節還會繼續學習另外兩種類型;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 數據庫+中間件系列
  6. DevOps系列

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM