Nodejs使用gRPC與Java進行遠程通信
Java代碼
加入依賴
plugins {
id 'java'
id 'com.google.protobuf' version '0.8.8'
}
group 'com.sakura'
version '1.0'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
testCompile (
"junit:junit:4.12"
)
compile (
"io.netty:netty-all:4.1.46.Final",
'com.google.protobuf:protobuf-java:3.11.4',
'com.google.protobuf:protobuf-java-util:3.11.4',
'io.grpc:grpc-netty-shaded:1.28.0',
'io.grpc:grpc-protobuf:1.28.0',
'io.grpc:grpc-stub:1.28.0'
)
}
//構建protobuf插件配置
protobuf {
//輸出目錄的根目錄名,生成的java文件的位置,會在指定的目錄下的main下生成
generateProtoTasks.generatedFilesBaseDir = "$projectDir/src"
protoc {
artifact = "com.google.protobuf:protoc:3.11.0"
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.28.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {
//grpc的接口類的生成目錄,默認是grpc
outputSubDir = 'java'
}
}
}
}
編寫一個Proto文件
syntax = "proto3";//定義使用的proto版本
package com.sakura.proto;//所有語言適用的包路徑定義語法
option java_package = "com.sakura.proto";//java包路徑 優先級高於package
option java_outer_classname = "Student";//生成的外部類名
option java_multiple_files = true;//是否生成多個文件
//定義rpc的方法
service StudentService{
//1.客戶端發出一個普通的請求,服務器的返回一個普通的響應
rpc GetRealNameByUsername(MyRequest) returns (MyResponse);
//grpc的請求以及響應不能是基本數據類型,必須是一個message類型,不管請求里有幾個參數
//他必須是你定義的一個message類型的
//2.根據學生的年齡獲取與這個年齡相等的學生對象客戶端發生一個普通的請求,服務器的以流的形式返回
rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse);
//3.以流式的方式請求一個StudentRequest服務器會返回一個StudentResponseList
rpc GetStudentsWrapperByAges(stream StudentRequest) returns (StudentResponseList){}
//4.客戶源與服務端都以流式的方式,雙向的數據流傳遞
rpc BiTalk(stream StreamRequest) returns (stream StreamResponse);
}
//消息
message MyRequest{
string username = 1;
}
message MyResponse{
string realname = 2;
}
//單向流使用的消息
message StudentResponse{
string name = 1;
int32 age = 2;
string city = 3;
}
message StudentRequest{
int32 age = 1;
}
message StudentResponseList{
repeated StudentResponse studentResponse = 1;
}
//雙向數據流傳遞使用的消息
message StreamRequest{
string request_info = 1;
}
message StreamResponse{
string response_info = 1;
}
服務端代碼
public class GrpcServer {
private Server server;
public static void main(String[] args) throws Exception{
GrpcServer server = new GrpcServer();
server.start();
server.awaitTermination();
}
private void start()throws Exception{
//創建服務通道配置端口,傳入映射的方法的實現類,然后構建並啟動
this.server = ServerBuilder.forPort(8899).addService(new StudentServiceImpl())
.build().start();
System.out.println("server started!");
//設置一個回調鈎子
Runtime.getRuntime().addShutdownHook(new Thread(() ->{
System.out.println("JVM 關閉");
try {
this.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
System.out.println("執行到這里");
}
private void stop() throws InterruptedException {
if(this.server != null){
//關閉服務
this.server.shutdown();
}
}
private void awaitTermination() throws InterruptedException {
if(this.server != null){
//等待終止,讓服務不停止,可以設置超時時長
this.server.awaitTermination();
//this.server.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
}
}
服務接口實現類
/**
* @ClassName : StudentServiceImpl
* @Description : 遠程調用的方法的具體實現,實現生成代碼中的內部抽象類
*/
public class StudentServiceImpl extends StudentServiceGrpc.StudentServiceImplBase {
/**
* 重寫父類的方法
* @param request 客戶端發來的數據
* @param responseObserver 響應觀察者 用於響應客戶端的對象
*/
@Override
public void getRealNameByUsername(MyRequest request, StreamObserver<MyResponse> responseObserver) {
System.out.println("接收到客戶端信息:" + request.getUsername());
/*
onCompleted() 標示這個方法調用結束,只能調用一次
onError() 異常時調用
onNext() 接下來要做什么事,可以用於結果返回
*/
//構造響應對象,並返回
responseObserver.onNext(MyResponse.newBuilder().setRealname("星空").build());
//標示服務器處理結束
responseObserver.onCompleted();
}
@Override
public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
System.out.println("接受到客戶端信息:" + request.getAge());
responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海").setAge(18).setCity("北京").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海2").setAge(20).setCity("上海").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海3").setAge(22).setCity("廣州").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("彩虹海4").setAge(18).setCity("深圳").build());
responseObserver.onCompleted();
}
@Override
public StreamObserver<StudentRequest> getStudentsWrapperByAges(StreamObserver<StudentResponseList> responseObserver) {
//實現StreamObserver接口,實現方法當特定的事件觸發時,回調方法就會的到調用
return new StreamObserver<StudentRequest>() {
/**
* 接收客戶端的請求,請求到來時被調用
* 每來一次請求,onNext()方法就會被調用一次
* 因為請求是流式的,onNext會被調用多次
* @param value
*/
@Override
public void onNext(StudentRequest value) {
System.out.println("onNext:" + value.getAge());
}
/**
* 出現異常時被調用
* @param t
*/
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
/**
* 表示客戶端將流式數據全部發給服務器端之后,客戶端就會有一個onCompleted事件,服務器端就會感知到
* 然后服務器端在onCompleted中為客戶端返回最終結果
*/
@Override
public void onCompleted() {
StudentResponse.Builder studentResponse =
StudentResponse.newBuilder().setName("彩虹海1").setAge(18).setCity("宇宙");
StudentResponse.Builder studentResponse2 =
StudentResponse.newBuilder().setName("彩虹海2").setAge(20).setCity("宇宙");
StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse)
.addStudentResponse(studentResponse2).build();
//返回結果
responseObserver.onNext(studentResponseList);
//表示處理完成
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
return new StreamObserver<StreamRequest>() {
/**
* 客戶端發來請求時被調用,每請求一次則被調用一次
* @param value 客戶端發來的數據
*/
@Override
public void onNext(StreamRequest value) {
//打印客戶端發來的數據
System.out.println(value.getRequestInfo());
//向客戶端返回數據
responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
/**
* 客戶端的onCompleted方法被調用時,被調用
*/
@Override
public void onCompleted() {
//雙向的數據流傳遞,雖然是在兩個不同的流中傳遞互不干擾
//但是當一方的流被關閉時另一方也要關閉與之交互的流
responseObserver.onCompleted();
}
};
}
}
客戶端代碼
/**
* @ClassName : GrpcClient
* @Description : grpc client
*/
public class GrpcClient {
public static void main(String[] args) throws InterruptedException {
//usePlaintext()使用純文本的方式,不加密
ManagedChannel managedChannel =
ManagedChannelBuilder.forTarget("localhost:8899").usePlaintext().build();
//客戶端與服務端交互的對象 server與client通信的對象
//blockingStub 阻塞的方式/同步 發出一個請求一定要等到另一端返回了響應才繼續往下執行
StudentServiceGrpc.StudentServiceBlockingStub blockingStub =
StudentServiceGrpc.newBlockingStub(managedChannel);
//只要是客戶端是以流式的方式向服務器發送請求,這種請求一定以異步的
//blockingStub是同步的阻塞的,則不會被提供方法
//獲取一個異步的通信對象
//創建一個支持該服務的所有呼叫類型的新異步存根,不會等待對方響應會一直向下執行
StudentServiceGrpc.StudentServiceStub stub =
StudentServiceGrpc.newStub(managedChannel);
//構建消息
MyRequest request = MyRequest.newBuilder().setUsername("出發,目標彩虹海").build();
//調用具體方法,接收到響應
MyResponse response = blockingStub.getRealNameByUsername(request);
System.out.println("接收到服務器信息:" + response.getRealname());
System.out.println("--------------------普通請求與響應 結束----------------------");
/*
//返回一個流式的響應就是一個迭代器,每返回一個對象就進入到迭代器中,再返回對象再進入迭代器,以此類推
Iterator<StudentResponse> iter =
blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(18).build());
//iter.hasNext() 還有沒有下一個
while (iter.hasNext()){
StudentResponse studentResponse = iter.next();
System.out.println(studentResponse.getName() + " , " +
studentResponse.getAge() + " , " + studentResponse.getCity());
}
System.out.println("-----------------------普通請求 流式響應 結束-------------------");
*/
//客戶端請求一個steam(流式) blockingStub(同步)無法使用 只有使用異步形式
//構造接收服務端信息的方法
/* StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
/**
* 服務端向客戶端響應結果時會被調用
* 服務端返回的數據,每返回一次數據則被調用一次
* 如果服務器端也是流式的並且返回了多個數據,那么每次返回數據的時候都會被調用一次
* @param value
*//*
@Override
public void onNext(StudentResponseList value) {
value.getStudentResponseList().forEach(studentResponse -> {
System.out.println(studentResponse.getName() + " , " +
studentResponse.getAge() + " , " + studentResponse.getCity());
System.out.println("**************");
});
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
//構造客戶端向服務端發送的數據
//getStudentsWrapperByAges(傳入處理服務端返回數據的回調對象)
StreamObserver<StudentRequest> studentsWrapperByAges =
stub.getStudentsWrapperByAges(studentResponseListStreamObserver);
//發送數據
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(18).build());
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(28).build());
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(38).build());
studentsWrapperByAges.onNext(StudentRequest.newBuilder().setAge(48).build());
//表示客戶端調用結束
studentsWrapperByAges.onCompleted();
System.out.println("-----------------------流式請求 普通響應 結束-------------------");
*/
/*
StreamObserver<StreamRequest> streamRequestStreamObserver =
stub.biTalk(new StreamObserver<StreamResponse>() {
/**
* 收到服務器響應結果時,被調用
* @param value 服務器返回的數據
*//*
@Override
public void onNext(StreamResponse value) {
System.out.println(value.getResponseInfo());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
/**
* 服務器端onCompleted()被調用時,被觸發
*//*
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
});
for (int i = 0; i < 10; i++) {
streamRequestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
Thread.sleep(1000);
}
streamRequestStreamObserver.onCompleted();
System.out.println("-----------------------雙向數據流傳遞 結束-------------------");
*/
//客戶端向服務器端發送數據,數據還沒發送就繼續向下執行走到了onCompleted(),
//然后在數據還未發出時程序就正常執行完成結束了
//線程睡眠,強制讓程序等待,等待studentsWrapperByAges將數據全部發送給服務器端
//如果不睡眠的話,數據還沒發送給服務器端,jvm就停止了,也就發送不出去了
Thread.sleep(5000);
}
}
安裝Nodejs
ArchLinux通過nvm安裝node npm
nvm ls-remote //顯示所有遠端node版本
nvm install 版本號 //安裝node
nvm use 已安裝node版本號 //切換node
nvm alias default 已安裝node版本號 //設置默認使用的node版本
配置package.json
{
"name": "grpc-examples",
"version": "0.1.0",
"dependencies": {
"@grpc/proto-loader": "^0.1.0",
"async": "^1.5.2",
"google-protobuf": "^3.0.0",
"grpc": "^1.11.0",
"lodash": "^4.6.1",
"minimist": "^1.2.0"
}
}
安裝grpc代碼環境
npm config set registry https://registry.npm.taobao.org //npm設置淘寶源
sudo npm install //下載依賴,在項目目錄下 `package.json`同目錄
Nodejs動態代碼生成
編寫代碼時是不需要提前使用proto文件生成對應的js代碼的,只需要指定proto文件的位置,在運行的過程當中會動態的生成js文件,客戶端與服務端都可以使用動態生成
服務端代碼
//*.proto文件的絕對路徑
var PROTO_FILE_PATH = "/home/ideaHome/netty/nodejs/grpc_nodejs/proto/Student.proto";
//引入grpc
var grpc = require("grpc");
//加載rpc的方法
var grpcService = grpc.load(PROTO_FILE_PATH).com.sakura.proto;
//通過grpc獲取server
var server = new grpc.Server();
server.addService(grpcService.StudentService.service,{
//函數: 服務(想要自定義)
getRealNameByUsername: getRealNameByUsername,
getStudentsByAge: getStudentsByAge,
getStudentsWrapperByAges: getStudentsWrapperByAges,
biTalk: biTalk
});
//grpc.ServerCredentials.createInsecure() 使用純文本的方式傳輸 不使用加密的
server.bind("localhost:8899",grpc.ServerCredentials.createInsecure());
//啟動服務器
server.start();
//實現具體方法被調用時,處理的方法(服務)
//參數(客戶端的請求,回調(函數,請求收到之后最后調用這個回調把結果返回給客戶端))
function getRealNameByUsername(call, callback){
console.log("username:" + call.request.username);
//參數(錯誤對象,真正要給客戶端返回的結果值)
callback(null,{realname: "星空"});
}
//用不到 空實現
function getStudentsByAge() {}
function getStudentsWrapperByAges() {}
function biTalk() {}
客戶端代碼
//*.proto文件的絕對路徑
var PROTO_FILE_PATH = "/home/ideaHome/netty/nodejs/grpc_nodejs/proto/Student.proto";
//引入grpc
var grpc = require("grpc");
var grpcService = grpc.load(PROTO_FILE_PATH).com.sakura.proto;
//grpc.credentials.createInsecure() 不使用加密的
var client = new grpcService.StudentService("localhost:8899",grpc.credentials.createInsecure());
//node是一個異步的框架,絕大多數都是通過回調的方式獲取對端的響應,也有同步的操作,
//但是對與node來說絕大多數都是通過異步的方式來獲取對方返回的結果
//動態的代碼生成就是以一個json對象的方式傳輸給對端
//方法 (參數,回調方法(異常,對端返回的數據))
client.getRealNameByUsername({username: "麥當"},function(error,respData){
console.log(respData);
});
測試
Java服務端 node客戶端
可通過 node xxxx/xxxx.js //運行nodejs代碼
//java服務端顯示
server started!
執行到這里
接收到客戶端信息:麥當
//node客戶端顯示
{ realname: '星空' }
node服務端 Java客戶端
//node服務端顯示
username:出發,目標彩虹海
//java客戶端顯示
接收到服務器信息:星空
--------------------普通請求與響應 結束----------------------
Nodejs靜態代碼生成
安裝js代碼生成工具
npm install -g grpc-tools //通過npm全局安裝grpc-tools
生成js代碼
格式:grpc_tools_node_protoc --js_out=import_style=commonjs,binary:js普通代碼生成的文件位置 --grpc_out=調用接口的生成位置 --plugin=protoc-gen-grpc=grpc_tools_node_protoc_plugin插件的位置可通過which查看 proto文件
例如:grpc_tools_node_protoc --js_out=import_style=commonjs,binary:static_codegen/ --grpc_out=static_codegen/ --plugin=protoc-gen-grpc=/home/miki/.nvm/versions/node/v12.17.0/bin/grpc_tools_node_protoc_plugin proto/Student.proto
服務端代碼
var service = require('../static_codegen/proto/Student_grpc_pb');
var messages = require('../static_codegen/proto/Student_pb');
var grpc = require('grpc');
var server = new grpc.Server();
server.addService(service.StudentServiceService,{
//函數: 服務(想要自定義)
getRealNameByUsername: getRealNameByUsername,
getStudentsByAge: getStudentsByAge,
getStudentsWrapperByAges: getStudentsWrapperByAges,
biTalk: biTalk
});
server.bind("localhost:8899",grpc.ServerCredentials.createInsecure());
server.start();
//參數(客戶端的請求,回調(函數,請求收到之后最后調用這個回調把結果返回給客戶端))
function getRealNameByUsername(call,callback) {
console.log("request:" + call.request.getUsername());
var myResponse = new messages.MyResponse();
myResponse.setRealname("目標,彩虹海");
callback(null, myResponse);
}
//用不到 空實現
function getStudentsByAge() {}
function getStudentsWrapperByAges() {}
function biTalk() {}
客戶端代碼
var service = require('../static_codegen/proto/Student_grpc_pb');
var messages = require('../static_codegen/proto/Student_pb');
var grpc = require('grpc');
//創建一個客戶端
var client = new service.StudentServiceClient("localhost:8899",
grpc.credentials.createInsecure());
var request = new messages.MyRequest();
request.setUsername("米龍");
//發出請求
client.getRealNameByUsername(request, function (error, respData) {
console.log(respData.getRealname());
});
測試
Java服務端 node客戶端
可通過 node xxxx/xxxx.js //運行nodejs代碼
//java服務端顯示
server started!
執行到這里
接收到客戶端信息:米龍
//node客戶端顯示
星空
node服務端 Java客戶端
//node服務端顯示
request:出發,目標彩虹海
//java客戶端顯示
接收到服務器信息:目標,彩虹海
--------------------普通請求與響應 結束----------------------