Netty-gRPC介紹和使用


轉自:http://www.saily.top/2017/07/23/netty5/

gRPC

Define your service using Protocol Buffers, a powerful binary serialization toolset and language

gRPC是基於Protobuf開發的RPC框架,簡化了protobuf的開發,提供了服務端和客戶端網絡交互這一塊的代碼。

Demo

照着https://grpc.io/docs/quickstart/java.html測試一下官方的Demo。

記得要把Update a gRPC service部分做了。

gRPC整合Gradle與代碼生成

https://github.com/grpc/grpc-java
這個是gRPC-java項目,先引入gRPC的依賴。

1
2
3
compile 'io.grpc:grpc-netty:1.4.0'
compile 'io.grpc:grpc-protobuf:1.4.0'
compile 'io.grpc:grpc-stub:1.4.0'

然后配置gradle的grpc插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
apply plugin: 'java'
apply plugin: 'com.google.protobuf'

buildscript {
repositories {
mavenCentral()
}
dependencies {
// ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier
// gradle versions
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.1'
}
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.2.0"
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.4.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}

后面直接用gradle的任務就可以生成代碼了。

gRPC提供了3種傳輸層的實現

  • gRPC comes with three Transport implementations:

    1. The Netty-based transport is the main transport implementation based on Netty. It is for both the client and the server.
    2. The OkHttp-based transport is a lightweight transport based on OkHttp. It is mainly for use on Android and is for client only.
    3. The inProcess transport is for when a server is in the same process as the client. It is useful for testing.

https://github.com/google/protobuf-gradle-plugin

The Gradle plugin that compiles Protocol Buffer (aka. Protobuf) definition files (*.proto) in your project. There are two pieces of its job:

  1. It assembles the Protobuf Compiler (protoc) command line and use it to generate Java source files out of your proto files.
  2. It adds the generated Java source files to the input of the corresponding Java compilation unit (sourceSet in a Java project; variant in an Android project), so that they can be compiled along with your Java sources.

實戰

配置好后,進行一個演示

src/main/proto新建一個文件Student.proto

gradle插件默認從src/main/proto找proto源文件進行代碼生成,這里有提到,而且這個路徑的配置是可以修改的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
syntax = "proto3";

package com.sail.proto;

option java_package = "com.sail.proto";
option java_outer_classname = "StudentProto";
option java_multiple_files = true;

service StudentService {
rpc GetRealNameByUsername(MyRequest) returns (MyResponse) {}

rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}

rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList) {}

rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}
}

message MyRequest {
string username = 1;
}

message MyResponse {
string realname = 2;
}

message StudentRequest {
int32 age = 1;
}

message StudentResponse {
string name = 1;
int32 age = 2;
string city = 3;
}

message StudentResponseList {
repeated StudentResponse studentResponse = 1;
}

message StreamRequest {
string request_info = 1;
}

message StreamResponse {
string response_info = 1;
}

然后執行gradle generateProto,生成的代碼默認是放在/build目錄下,我們手動拷貝到src/main/java

實現代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package com.sail.grpc;

import com.sail.proto.*;
import io.grpc.stub.StreamObserver;

import java.util.UUID;

/**
* @author yangfan
* @date 2017/08/01
*/
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {

@Override
public void getRealNameByUsername(MyRequest request, StreamObserver<MyResponse> responseObserver) {
System.out.println("接收到客戶端信息: " + request.getUsername());
responseObserver.onNext(MyResponse.newBuilder().setRealname("張三").build());
responseObserver.onCompleted();
}


/**
* 接收StudentRequest參數
* 返回stream的StudentResponse
*/
@Override
public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
System.out.println("接收到客戶端信息:" + request.getAge());

responseObserver.onNext(StudentResponse.newBuilder().setName("張三").setAge(20).setCity("北京").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("李四").setAge(30).setCity("天津").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("王五").setAge(40).setCity("成都").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("趙六").setAge(50).setCity("深圳").build());

responseObserver.onCompleted();
}


/**
* 接收stream的StudentRequest參數
* 返回StudentResponseList
*/
@Override
public StreamObserver<StudentRequest> getStudentsWrapperByAges(StreamObserver<StudentResponseList> responseObserver) {
return new StreamObserver<StudentRequest>() {

@Override
public void onNext(StudentRequest value) {
System.out.println("onNext: " + value.getAge());
}


@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}


@Override
public void onCompleted() {
StudentResponse studentResponse = StudentResponse.newBuilder().setName("張三").setAge(20).setCity("西安").build();
StudentResponse studentResponse2 = StudentResponse.newBuilder().setName("李四").setAge(30).setCity("成都").build();

StudentResponseList studentResponseList = StudentResponseList.newBuilder()
.addStudentResponse(studentResponse).addStudentResponse(studentResponse2).build();

responseObserver.onNext(studentResponseList);
responseObserver.onCompleted();
}
};
}

/**
* 雙向流式數據傳遞
*/
@Override
public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
return new StreamObserver<StreamRequest>() {
@Override
public void onNext(StreamRequest value) {
System.out.println(value.getRequestInfo());

responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
}

@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}

服務器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.sail.grpc;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

/**
* @author yangfan
* @date 2017/08/01
*/
public class GrpcServer {

private Server server;

private void start() throws IOException {
this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl()).build().start();
System.out.println("server started!");

// 這里在關閉JVM的時候會執行JVM回調鈎子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("關閉jvm");
GrpcServer.this.stop();
}));

System.out.println("執行到這里");
}


private void stop() {
if (server != null) {
this.server.shutdown();
}
}

private void awaitTermination() throws InterruptedException {
if (server != null) {
this.server.awaitTermination();
}
}

public static void main(String[] args) throws InterruptedException, IOException {

GrpcServer grpcServer = new GrpcServer();
grpcServer.start();
grpcServer.awaitTermination();

}
}

客戶端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.sail.grpc;

import com.sail.proto.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.time.LocalDateTime;
import java.util.Iterator;

/**
* @author yangfan
* @date 2017/08/01
*/
public class GrpcClient {
public static void main(String[] args) {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8899)
.usePlaintext(true).build();

StudentServiceGrpc.StudentServiceBlockingStub blockingStub = StudentServiceGrpc.newBlockingStub(managedChannel);
StudentServiceGrpc.StudentServiceStub stub = StudentServiceGrpc.newStub(managedChannel);
MyResponse myResponse = blockingStub.getRealNameByUsername(MyRequest.newBuilder().setUsername("zhangsan").build());

System.out.println(myResponse.getRealname());


System.out.println("----------------");

Iterator<StudentResponse> iter = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());

while (iter.hasNext()) {
StudentResponse studentResponse = iter.next();

System.out.println(studentResponse.getName() + ", " + studentResponse.getAge() + ", " + studentResponse.getCity());

}

System.out.println("----------------");


// getStudentsWrapperByAges的調用代碼

StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
@Override
public void onNext(StudentResponseList value) {
value.getStudentResponseList().forEach(studentResponse -> {
System.out.println(studentResponse.getName() + ", " + studentResponse.getAge() + ", " + studentResponse.getCity());
System.out.println("*******");
});
}

@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}

@Override
public void onCompleted() {
System.out.println("completed!");
}
};


// 只要客戶端是以流式發送請求,那么一定是異步的
StreamObserver<StudentRequest> studentRequestStreamObserver = stub.getStudentsWrapperByAges(studentResponseListStreamObserver);
// 發送多條數據
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(50).build());
studentRequestStreamObserver.onCompleted();

// 以上代碼是沒有輸出結果的,因為stub是異步的,所以當執行完onCompleted的時候程序就已經結束了,還沒有來得及發送請求
// 現在加入以下代碼,讓程序多運行一會
try {
Thread.sleep(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}



// 雙向數據流的調用

StreamObserver<StreamRequest> requestStreamObserver = stub.biTalk(new StreamObserver<StreamResponse>() {
@Override
public void onNext(StreamResponse value) {
System.out.println(value.getResponseInfo());
}

@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}

@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
});

for (int i = 0; i < 10; i++) {
requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}


}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM