grpc 詳解 java版
Java中gRPC的基本教程介紹。
本教程提供了有關使用gRPC的基本Java程序員介紹。
通過遍歷此示例,您將學習如何:
- grpc設計的核心概念。
- 在.proto文件中定義服務。
- 使用協議緩沖區編譯器生成服務器和客戶端代碼。
- 使用Java gRPC API為您的服務編寫一個簡單的客戶端和服務器。
假定您已經閱讀了gRPC簡介並且熟悉協議緩沖區。請注意,本教程中的示例使用了 proto3協議緩沖區語言的版本:您可以在proto3語言指南中找到更多信息和Java生成的代碼指南。
為什么要使用gRPC?
我們的示例是一個簡單的路由映射應用程序,它使客戶端可以獲取有關其路由功能的信息,創建其路由的摘要以及與服務器和其他客戶端交換路由信息(例如流量更新)。
借助gRPC,我們可以在一個.proto
文件中定義一次服務,並以gRPC支持的任何語言生成客戶端和服務器,而這又可以在從大型數據中心內的服務器到您自己的平板電腦的各種環境中運行– gRPC為您處理不同的語言和環境。我們還獲得了使用協議緩沖區的所有優點,包括有效的序列化,簡單的IDL和輕松的接口更新。
核心概念
GRPC包含三個不同的層:Stub, Channel和Transport。
Stu
Stub層是大多數開發人員都可以接觸到的,它為您要適應的任何數據模型/ IDL /接口提供類型安全的綁定。gRPC帶有協議緩沖區編譯器的插件,該插件可以從.proto
文件中生成Stub接口,但是與其他數據模型/ IDL的綁定是容易的並且值得鼓勵。
Channel
通道層是傳輸處理上的抽象,適合於偵聽/裝飾,並且比存根(Stub)層向應用程序公開更多的行為。對於應用程序框架而言,使用該層來解決諸如日志記錄,監視,身份驗證等跨領域的問題很容易。
Transport
傳輸層不費吹灰之力地將字節從導線中取出。它的接口是抽象的,足以允許插入不同的實現。請注意,傳輸層API被認為是gRPC內部的,並且與包中的核心API相比,其API保證較弱io.grpc
。
gRPC帶有三種傳輸實現:
- 基於Netty的傳輸是基於Netty的主要傳輸實現 。它適用於客戶端和服務器。
- 基於OkHttp的傳輸是基於OkHttp的輕量級傳輸 。它主要用於Android,並且僅用於客戶端。
- 進程內傳輸適用於服務器與客戶端處於同一進程中的情況。它對於測試很有用,同時也可以安全地用於生產。
示例代碼和設置
本教程的示例代碼在 grpc / grpc-java / examples / src / main / java / io / grpc / examples / routeguide中。要下載示例,請grpc-java
通過運行以下命令在存儲庫中克隆最新版本:
$ git clone -b v1.34.0 https://github.com/grpc/grpc-java.git
然后將當前目錄更改為grpc-java/examples
:
$ cd grpc-java/examples
定義服務
我們的第一步(如您從gRPC簡介中所知道的)是使用協議緩沖區定義gRPC服務以及方法請求和響應類型。 。您可以在grpc-java / examples / src / main / proto / route_guide.proto中看到完整的.proto文件。 。
在此示例中生成Java代碼時,我們java_package
在.proto中指定了一個文件選項:
option java_package = "io.grpc.examples.routeguide";
這指定了我們要用於生成的Java類的包。如果java_package
.proto文件中未提供顯式選項,則默認情況下將使用proto軟件包(使用“ package”關鍵字指定)。但是,proto軟件包通常不能成為良好的Java包,因為proto軟件包不應以反向域名開頭。如果我們從此.proto生成另一種語言的代碼,則該java_package
選項無效。
要定義服務,我們service
在.proto文件中指定一個名稱:
service RouteGuide {
...
}
然后,rpc
在服務定義中定義方法,並指定其請求和響應類型。gRPC允許您定義四種服務方法,所有這些方法都在RouteGuide
服務中使用:
-
一個簡單的RPC,客戶端使用存根將請求發送到服務器,然后等待響應返回,就像正常的函數調用一樣。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}
-
一個服務器端流RPC,其中客戶端發送請求到服務器,並獲得一個流中讀取消息的序列后面。客戶端從返回的流中讀取,直到沒有更多消息為止。如我們的示例所示,您可以通過
stream
在響應類型之前放置關鍵字來指定服務器端流方法。// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
-
一個客戶端流傳輸的RPC,其中客戶端將消息寫入Point序列,並且將它們發送到服務器,再次使用提供的流。客戶端寫完消息后,它將等待服務器讀取所有消息並返回其響應。您可以通過將
stream
關鍵字放在請求類型之前來指定客戶端流方法。// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
-
一個雙向流RPC雙方都派出使用讀寫流的消息序列。這兩個流是獨立運行的,因此客戶端和服務器可以按照自己喜歡的順序進行讀寫:例如,服務器可以在寫響應之前等待接收所有客戶端消息,或者可以先讀取一條消息再寫入一條消息,或其他一些讀寫組合。每個流中的消息順序都會保留。您可以通過
stream
在請求和響應之前放置關鍵字來指定這種類型的方法。// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我們的.proto
文件還包含用於服務方法中所有請求和響應類型的協議緩沖區消息類型定義-例如,以下是Point
消息類型:
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客戶端和服務器代碼
接下來,我們需要根據.proto服務定義生成gRPC客戶端和服務器接口。我們使用protoc
帶有特殊gRPC Java插件的協議緩沖區編譯器進行此操作。您需要使用 proto3 編譯器(同時支持proto2和proto3語法)以生成gRPC服務。
使用Gradle或Maven時,protoc構建插件可以生成必要的代碼作為構建的一部分。您可以參考grpc-java自述文件有關如何從您自己的.proto
文件生成代碼的信息。
以下類是根據我們的服務定義生成的:
-
Feature.java
,Point.java
,Rectangle.java
,和其他含有的所有協議緩存代碼來填充,序列化,並檢索我們的請求和響應消息的類型。 -
RouteGuideGrpc.java
其中包含(以及一些其他有用的代碼):
RouteGuide
服務器要實現 的基類RouteGuideGrpc.RouteGuideImplBase
,其中包含RouteGuide
服務中定義的所有方法 。- 客戶端可以用來與
RouteGuide
服務器對話的存根類。
創建服務器
首先讓我們看一下如何創建RouteGuide
服務器。如果您只對創建gRPC客戶端感興趣,則可以跳過本節,直接進入創建客戶端(盡管您可能仍然會發現它很有趣!)。
使我們的RouteGuide
服務發揮作用有兩個部分:
- 覆蓋根據我們的服務定義生成的服務基類:完成我們服務的實際“工作”。
- 運行gRPC服務器以偵聽來自客戶端的請求並返回服務響應。
您可以RouteGuide
在grpc-java / examples / src / main / java / io / grpc / examples / routeguide / RouteGuideServer.java中找到我們的示例服務器。 。讓我們仔細看看它是如何工作的。
實施RouteGuide
如您所見,我們的服務器有一個RouteGuideService
擴展生成的RouteGuideGrpc.RouteGuideImplBase
抽象類的類:
private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
...
}
簡單的RPC
RouteGuideService
實現我們所有的服務方法。首先讓我們看一下最簡單的方法GetFeature()
,該方法Point
僅從客戶端獲取a ,並從其數據庫中的a中返回相應的特征信息Feature
。
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
該getFeature()
方法有兩個參數:
Point
:請求StreamObserver<Feature>
:響應觀察者,這是服務器用來調用其響應的特殊接口。
要將我們的回復返回給客戶並完成通話,請執行以下操作:
Feature
根據服務定義中的說明,我們構造並填充一個響應對象以返回到客戶端。在此示例中,我們使用單獨的私有checkFeature()
方法進行此操作。- 我們使用響應觀察者的
onNext()
方法返回Feature
。 - 我們使用響應觀察者的
onCompleted()
方法來指定我們已經完成了對RPC的處理。
服務器端流式RPC
接下來,讓我們看一下我們的流式RPC。ListFeatures
是服務器端的流式RPC,因此我們需要將多個Feature
s發送回客戶端。
private final Collection<Feature> features;
...
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
像簡單的RPC一樣,此方法獲取一個請求對象(Rectangle
客戶端希望在其中找到Feature
)和一個StreamObserver
響應觀察者。
這次,我們獲得Feature
返回到客戶端所需的盡可能多的對象(在這種情況下,我們根據它們是否在我們的請求中從服務的功能集中選擇它們Rectangle
),並將每個對象依次寫入響應觀察器。使用其onNext()
方法。最后,就像在簡單的RPC中一樣,我們使用響應觀察者的onCompleted()
方法來告訴gRPC我們已經完成了響應的編寫。
客戶端流式RPC
現在,讓我們看一些更復雜的東西:客戶端流方法RecordRoute()
,Point
從客戶端獲取s的流,並返回RouteSummary
包含有關其行程的信息的單個流。
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
如您所見,像以前的方法類型一樣,我們的方法獲得一個 StreamObserver
響應觀察者參數,但是這次它返回a StreamObserver
供客戶端編寫其Point
。
在方法主體中,我們實例化了一個匿名對象StreamObserver
以返回,其中:
onNext()
每次客戶端將a寫入Point
消息流時,都應重寫該方法以獲取功能和其他信息。- 覆蓋此
onCompleted()
方法(在客戶端完成寫消息時調用)以填充並構建我們的RouteSummary
。然后,我們調用我們的方法本身的響應觀察者的onNext()
我們RouteSummary
,然后調用它的onCompleted()
方法來完成從服務器端調用。
雙向流式RPC
最后,讓我們看一下雙向流式RPC RouteChat()
。
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
與我們的客戶端流示例一樣,我們都獲取並返回了 StreamObserver
響應觀察器,只是這次我們在客戶端仍將消息寫入其 消息流時通過方法的響應觀察器返回值。此處讀取和寫入的語法與我們的客戶端流和服務器流方法完全相同。盡管雙方總是會按照對方的寫入順序獲得對方的消息,但是客戶端和服務器都可以按照任何順序進行讀取和寫入-流完全獨立地運行。
啟動服務器
一旦實現了所有方法,我們還需要啟動gRPC服務器,以便客戶端可以實際使用我們的服務。以下代碼段顯示了我們如何為RouteGuide
服務執行此操作:
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
...
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
...
}
如您所見,我們使用構建和啟動服務器ServerBuilder
。
為此,我們:
- 使用構建器的
forPort()
方法指定我們要用於偵聽客戶端請求的地址和端口。 - 創建我們的服務實現類的實例,
RouteGuideService
並將其傳遞給構建器的addService()
方法。 - 調用
build()
並start()
在構建器上為我們的服務創建並啟動RPC服務器。
創建客戶端
在本節中,我們將研究為我們的RouteGuide
服務創建一個客戶端。您可以在grpc-java / examples / src / main / java / io / grpc / examples / routeguide / RouteGuideClient.java中看到我們完整的示例客戶端代碼。
Instantiating a stub
要調用服務方法,我們首先需要創建一個存根,或者說,創建兩個存根:
- 一阻塞/同步存根:這意味着該RPC調用等待服務器響應,並且將或者返回一個響應或拋出異常。
- 一個非阻塞/異步存根,它對服務器進行非阻塞調用,在該服務器上異步返回響應。您只能使用異步存根進行某些類型的流式調用。
首先,我們需要為存根創建一個gRPC通道,指定我們要連接的服務器地址和端口:
public RouteGuideClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}
/** Construct client for accessing RouteGuide server using the existing channel. */
public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
我們使用ManagedChannelBuilder
創建 channel。
現在,我們可以使用channel創建stub,method是使用從.proto生成的RouteGuideGrpc類中提供的newStub和newBlockingStub方法。
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
Calling service method
現在讓我們看看我們如何調用我們的服務方法。
簡單的RPC
GetFeature
在阻塞存根上調用簡單的RPC與調用本地方法一樣簡單。
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature;
try {
feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
我們創建並填充一個請求協議緩沖區對象(在本例中為Point
),將其傳遞給getFeature()
阻塞存根上的方法,然后返回一個 Feature
。
如果發生錯誤,則將其編碼為Status
,我們可以從獲取 StatusRuntimeException
。
服務器端流式RPC
接下來,讓我們看一下服務器端對的流式調用ListFeatures
,該調用返回一個geo的流Feature
:
Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException ex) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
如您所見,它與我們剛剛看過的簡單RPC非常相似,除了返回一個Feature
,該方法返回一個Iterator
,客戶端可以用來讀取所有返回的Feature
s ,而不是返回單個。
客戶端流式RPC
現在,讓事情變得更復雜一些:客戶端流方法 RecordRoute
,我們將Point
s流發送到服務器並返回一個RouteSummary
。對於這種方法,我們需要使用異步存根。如果您已經閱讀了“創建服務器”,那么其中的一些內容可能看起來非常熟悉-異步流式RPC在兩側都以類似的方式實現。
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
如您所見,要調用此方法,我們需要創建一個StreamObserver
,實現一個特殊的接口供服務器使用其RouteSummary
響應進行調用。在我們StreamObserver
我們:
onNext()
當服務器將a寫入RouteSummary
消息流時,重寫用於打印返回的信息的方法。- 重寫該
onCompleted()
方法(在服務器完成其側面的調用時調用)以減小aCountDownLatch
,我們可以檢查該方法以查看服務器是否已完成寫入。
然后,將傳遞StreamObserver
給異步存根的recordRoute()
方法,並返回我們自己的StreamObserver
請求觀察器,以編寫 Point
s發送給服務器。一旦完成編寫點,就使用請求觀察者的onCompleted()
方法來告訴gRPC我們已經完成了在客戶端的編寫。完成后,我們CountDownLatch
將檢查服務器端是否已完成。
雙向流式RPC
最后,讓我們看一下雙向流式RPC RouteChat()
。
public void routeChat() throws Exception {
info("*** RoutChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RouteChat Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
正如我們的客戶端流的例子,我們都get和返回 StreamObserver
響應的觀察者,但這次我們通過我們的方法的反應派觀察員值,而服務器還在寫郵件給他們的 消息流。此處讀取和寫入的語法與我們的客戶端流方法完全相同。盡管雙方總是會按照對方的寫入順序獲得對方的消息,但是客戶端和服務器都可以按照任何順序進行讀取和寫入-流完全獨立地運行。
試試看!
按照示例目錄自述文件中的說明進行操作 構建並運行客戶端和服務器。
參考鏈接
https://grpc.io/docs/languages/java/quickstart/
https://github.com/grpc/grpc-java/blob/master/README.md
如果覺得寫得不好,歡迎指出;如果覺得寫得不錯,歡迎親們贊賞。