歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;
《java版gRPC實戰》全系列鏈接
本篇概覽
- 本文是《java版gRPC實戰》系列的第四篇,前文掌握了服務端流,適合從服務端獲取大量數據的場景,今天的目標是掌握客戶端流類型的服務,包括服務提供方和使用方兩側的開發;
- 先來看看官方資料對客戶端流式RPC的介紹:客戶端寫入一個消息序列並將其發送到服務器,同樣也是使用流。一旦客戶端完成寫入消息,它等待服務器完成讀取返回它的響應;
- 本文由以下幾部分組成:
- 提前小結幾個重要的知識點,稍后開發過程中要重點關注這幾個地方;
- 在proto文件中定義客戶端流類型的gRPC接口,再通過proto生成java代碼;
- 開發服務端應用;
- 開發客戶端應用;
- 驗證;
提前小結
為了突出重點,這里將幾個關鍵的知識點提前給出:
- 客戶端流的特點,是請求方以流的形式提交數據到響應方;
- 一次RPC請求中,請求方可以通過流的方式源源不斷的提交數據,直到調用了StreamObserver的onCompleted方法,才算提交數據完成;
- 平時咱們調用方法時,方法內部用到的數據是通過入參傳進來的,但這里不一樣,客戶端要傳給服務端的數據和gRPC方法的入參沒有關系,而是和方法的返回對象有關(執行返回對象的onNext方法可以將數據傳給服務端);
- 客戶端在A線程上傳完數據后,服務端的響應是在另一個線程B執行的,因此,如果A線程拿到服務端響應,就要B線程的異步響應方法執行完畢,等待的方法有多種,我用的是CountDownLatch;
- 在服務端,開發者要編寫的代碼和以往web開發不同,不是將數據處理好返回,而是返回一個StreamObserver實例給上層框架,由框架負責處理的邏輯,開發者專注開發StreamObserver的實現即可,例如重寫onNext方法,客戶端通過流每上傳一筆數據,onNext方法都會被外層框架執行一次;
- 如果您用的是IDEA,記得勾選下圖紅框中的選框,否則運行應用的時候可能遇到lombok相關的問題:
- 上面提到的這些,會在接下來的開發過程中充分體現出來;
源碼下載
- 本篇實戰中的完整源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
名稱 | 鏈接 | 備注 |
---|---|---|
項目主頁 | https://github.com/zq2599/blog_demos | 該項目在GitHub上的主頁 |
git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該項目源碼的倉庫地址,https協議 |
git倉庫地址(ssh) | git@github.com:zq2599/blog_demos.git | 該項目源碼的倉庫地址,ssh協議 |
- 這個git項目中有多個文件夾,《java版gRPC實戰》系列的源碼在grpc-tutorials文件夾下,如下圖紅框所示:
- grpc-tutorials文件夾下有多個目錄,本篇文章對應的服務端代碼在client-stream-server-side目錄下,客戶端代碼在client-stream-client-side目錄下,如下圖:
在proto文件中定義客戶端流類型的gRPC接口
- 首先要做的就是定義gRPC接口,打開mall.proto,在里面新增方法和相關的數據結構,需要重點關注的是AddToCart方法的入參ProductOrder前面添加了stream修飾,代表該方法是客戶端流類型:
// gRPC服務,這是個在線商城的購物車服務
service CartService {
// 客戶端流式:添加多個商品到購物車
rpc AddToCart (stream ProductOrder) returns (AddCartReply) {}
}
// 提交購物車時的產品信息
message ProductOrder {
// 商品ID
int32 productId = 1;
// 商品數量
int32 number = 2;
}
// 提交購物車返回結果的數據結構
message AddCartReply {
// 返回碼
int32 code = 1;
// 描述信息
string message = 2;
}
- 雙擊下圖紅框中的task即可生成java代碼:
- 生成下圖紅框中的文件:
- 接下來開發服務端;
開發服務端應用
- 在父工程grpc-turtorials下面新建名為client-stream-server-side的模塊,其build.gradle內容如下:
// 使用springboot插件
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
// 作為gRPC服務提供方,需要用到此庫
implementation 'net.devh:grpc-server-spring-boot-starter'
// 依賴自動生成源碼的工程
implementation project(':grpc-lib')
// annotationProcessor不會傳遞,使用了lombok生成代碼的模塊,需要自己聲明annotationProcessor
annotationProcessor 'org.projectlombok:lombok'
}
- 配置文件application.yml:
spring:
application:
name: client-stream-server-side
# gRPC有關的配置,這里只需要配置服務端口號
grpc:
server:
port: 9900
-
啟動類ClientStreamServerSideApplication.java的代碼就不貼了,普通的springboot啟動類而已;
-
重點是提供grpc服務的GrpcServerService.java,請結合前面小結的第五點來閱讀代碼,咱們要做的就是給上層框架返回一個匿名類,至於里面的onNext、onCompleted方法何時被調用是上層框架決定的,另外還准備了成員變量totalCount,這樣就可以記錄總數了:
package com.bolingcavalry.grpctutorials;
import com.bolingcavalry.grpctutorials.lib.AddCartReply;
import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;
@GrpcService
@Slf4j
public class GrpcServerService extends CartServiceGrpc.CartServiceImplBase {
@Override
public StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartReply> responseObserver) {
// 返回匿名類,給上層框架使用
return new StreamObserver<ProductOrder>() {
// 記錄處理產品的總量
private int totalCount = 0;
@Override
public void onNext(ProductOrder value) {
log.info("正在處理商品[{}],數量為[{}]",
value.getProductId(),
value.getNumber());
// 增加總量
totalCount += value.getNumber();
}
@Override
public void onError(Throwable t) {
log.error("添加購物車異常", t);
}
@Override
public void onCompleted() {
log.info("添加購物車完成,共計[{}]件商品", totalCount);
responseObserver.onNext(AddCartReply.newBuilder()
.setCode(10000)
.setMessage(String.format("添加購物車完成,共計[%d]件商品", totalCount))
.build());
responseObserver.onCompleted();
}
};
}
}
開發客戶端應用
- 在父工程grpc-turtorials下面新建名為client-stream-server-side的模塊,其build.gradle內容如下:
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.devh:grpc-client-spring-boot-starter'
implementation project(':grpc-lib')
}
- 配置文件application.yml,設置自己的web端口號和服務端地址:
server:
port: 8082
spring:
application:
name: client-stream-client-side
grpc:
client:
# gRPC配置的名字,GrpcClient注解會用到
client-stream-server-side:
# gRPC服務端地址
address: 'static://127.0.0.1:9900'
enableKeepAlive: true
keepAliveWithoutCalls: true
negotiationType: plaintext
- 啟動類ClientStreamClientSideApplication.java的代碼就不貼了,普通的springboot啟動類而已;
- 正常情況下我們都是用StreamObserver處理服務端響應,這里由於是異步響應,需要額外的方法從StreamObserver中取出業務數據,於是定一個新接口,繼承自StreamObserver,新增getExtra方法可以返回String對象,詳細的用法稍后會看到:
package com.bolingcavalry.grpctutorials;
import io.grpc.stub.StreamObserver;
public interface ExtendResponseObserver<T> extends StreamObserver<T> {
String getExtra();
}
- 重頭戲來了,看看如何遠程調用客戶端流類型的gRPC接口,前面小結提到的2、3、4點都會涉及到,代碼中已經添加詳細注釋:
package com.bolingcavalry.grpctutorials;
import com.bolingcavalry.grpctutorials.lib.AddCartReply;
import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class GrpcClientService {
@GrpcClient("client-stream-server-side")
private CartServiceGrpc.CartServiceStub cartServiceStub;
public String addToCart(int count) {
CountDownLatch countDownLatch = new CountDownLatch(1);
// responseObserver的onNext和onCompleted會在另一個線程中被執行,
// ExtendResponseObserver繼承自StreamObserver
ExtendResponseObserver<AddCartReply> responseObserver = new ExtendResponseObserver<AddCartReply>() {
String extraStr;
@Override
public String getExtra() {
return extraStr;
}
private int code;
private String message;
@Override
public void onNext(AddCartReply value) {
log.info("on next");
code = value.getCode();
message = value.getMessage();
}
@Override
public void onError(Throwable t) {
log.error("gRPC request error", t);
extraStr = "gRPC error, " + t.getMessage();
countDownLatch.countDown();
}
@Override
public void onCompleted() {
log.info("on complete");
extraStr = String.format("返回碼[%d],返回信息:%s" , code, message);
countDownLatch.countDown();
}
};
// 遠程調用,此時數據還沒有給到服務端
StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver);
for(int i=0; i<count; i++) {
// 發送一筆數據到服務端
requestObserver.onNext(build(101 + i, 1 + i));
}
// 客戶端告訴服務端:數據已經發完了
requestObserver.onCompleted();
try {
// 開始等待,如果服務端處理完成,那么responseObserver的onCompleted方法會在另一個線程被執行,
// 那里會執行countDownLatch的countDown方法,一但countDown被執行,下面的await就執行完畢了,
// await的超時時間設置為2秒
countDownLatch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("countDownLatch await error", e);
}
log.info("service finish");
// 服務端返回的內容被放置在requestObserver中,從getExtra方法可以取得
return responseObserver.getExtra();
}
/**
* 創建ProductOrder對象
* @param productId
* @param num
* @return
*/
private static ProductOrder build(int productId, int num) {
return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();
}
}
- 最后做個web接口,可以通過web請求驗證遠程調用:
package com.bolingcavalry.grpctutorials;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
public class GrpcClientController {
@Autowired
private GrpcClientService grpcClientService;
@RequestMapping("/")
public String printMessage(@RequestParam(defaultValue = "1") int count) {
return grpcClientService.addToCart(count);
}
}
- 編碼完成,開始驗證;
驗證
- 啟動服務端ClientStreamServerSideApplication:
- 啟動客戶端ClientStreamClientSideApplication:
- 瀏覽器輸入http://localhost:8082/?count=100,響應如下,可見遠程調用gRPC服務成功:
- 下面是服務端日志,可見逐一處理了客戶端的每一筆數據:
- 下面是客戶端日志,可見由於CountDownLatch的作用,發起gRPC請求的線程一直等待responseObserver.onCompleted在另一個線程被執行完后,才會繼續執行:
- 至此,客戶端流類型的gRPC服務及其客戶端開發就完成了,這種異步操作與咱們平時開發同步類型的web接口還是有差別的,希望本文能給您帶來一些參考,下一篇咱們實戰最后一種類型:雙向流式;
你不孤單,欣宸原創一路相伴
歡迎關注公眾號:程序員欣宸
微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos