目錄
因為設備的通信協議准備采用
protobuf,所以准備這篇protobuf的使用入門,golang作為客戶端,java作為服務端,這才能真正體現出protobuf的無關語言特性。本文采用
protobuf2,注重於如何快速入門使用,並不會涉及到具體的細節知識點。
整體結構說明
golang作為客戶端,java作為服務端,protobuf2為兩者的通信協議格式。

protobuf2文件
-
protobuf2簡介
-
helloworld.proto
syntax = "proto2"; package proto; message ProtocolMessage { message SearchRequest{ required string name = 1; optional int32 search = 2 ; } message ActionRequest{ required string name = 1; optional int32 action = 2 ; } message SearchResponse{ required string name = 1; optional int32 search = 2 ; } message ActionResponse{ required string name = 1; optional int32 action = 2 ; } optional SearchRequest searchRequest = 1; optional ActionRequest actionRequest = 2; optional SearchResponse searchResponse = 3; optional ActionResponse actionResponse = 4; }SearchRequest和SearchResponse為對應的請求和相應message;ActionRequest和ActionResponse為對應的請求和相應message;- 由於服務端使用
netty框架,限制了只能接受一個message進行編碼解碼,所以把SearchRequest、SearchResponse、ActionRequest和ActionResponse都內嵌到ProtocolMessage中,通過對ProtocolMessage編碼解碼進行數據交互。
golang客戶端
目錄結構
client_proto/
├── api
│ ├── proto # 存放proto協議文件以及生產的pd.go文件
│ ├── helloworld.pb.go
│ └── helloworld.proto
├── cmd
│ ├── main.go
│ ├── util
│ └── util.go
采用go mod 進行開發
生成pb.go文件
-
安裝proto
自行百度......
-
在.proto文件處,輸入
protoc --go_out=./ helloworld.proto -
即可生成
helloworld.pb.go文件
main.go
package main
import (
"github.com/gin-gonic/gin"
proto "grpc/api/grpc_proto"
"grpc/cmd/demo3/util"
"net/http"
"time"
)
func init() {
util.InitTransfer()
}
func main() {
router := gin.Default()
// search 測試
router.GET("/search", func(c *gin.Context) {
name := "search"
search := int32(12)
message := &proto.ProtocolMessage{
SearchRequest:&proto.ProtocolMessage_SearchRequest{
Name:&name,
Search:&search,
},
}
if err := util.G_transfer.SendMsg(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
return
}
if err := util.G_transfer.ReadResponse(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
return
}
c.JSON(200, gin.H{
"message": message.SearchResponse.Name,
})
})
// action測試
router.GET("/action", func(c *gin.Context) {
name := "action"
action := int32(34)
message := &proto.ProtocolMessage{
ActionRequest: &proto.ProtocolMessage_ActionRequest{
Name: &name,
Action: &action,
},
}
if err := util.G_transfer.SendMsg(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
}
if err := util.G_transfer.ReadResponse(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
}
c.JSON(200, gin.H{
"message": message.ActionResponse.Name,
})
})
ReadTimeout := time.Duration(60) * time.Second
WriteTimeout := time.Duration(60) * time.Second
s := &http.Server{
Addr: ":8090",
Handler: router,
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
MaxHeaderBytes: 1 << 20,
}
s.ListenAndServe()
}
util.go
package util
import (
"encoding/binary"
"errors"
"github.com/gogo/protobuf/proto"
grpc_proto "grpc/api/grpc_proto"
"net"
)
var (
G_transfer *Transfer
)
func InitTransfer() {
var (
pTCPAddr *net.TCPAddr
conn net.Conn
err error
)
if pTCPAddr, err = net.ResolveTCPAddr("tcp", "127.0.0.1:3210"); err != nil {
return
}
if conn, err = net.DialTCP("tcp", nil, pTCPAddr); err != nil {
return
}
// 定義 Transfer 指針變量
G_transfer = &Transfer{
Conn: conn,
}
}
// 聲明 Transfer 結構體
type Transfer struct {
Conn net.Conn // 連接
Buf [1024 * 2]byte // 傳輸時,使用的緩沖
}
// 獲取並解析服務器的消息
func (transfer *Transfer) ReadResponse(response *grpc_proto.ProtocolMessage) (err error) {
_, err = transfer.Conn.Read(transfer.Buf[:4])
if err != nil {
return
}
// 根據 buf[:4] 轉成一個 uint32 類型
var pkgLen uint32
pkgLen = binary.BigEndian.Uint32(transfer.Buf[:4])
//根據pkglen 讀取消息內容
n, err := transfer.Conn.Read(transfer.Buf[:pkgLen])
if n != int(pkgLen) || err != nil {
return
}
if err = proto.Unmarshal(transfer.Buf[:pkgLen], response); err != nil {
return
}
return
}
// 發送消息到服務器
func (transfer *Transfer) SendMsg(action *grpc_proto.ProtocolMessage) (err error) {
var (
sendBytes []byte
readLen int
)
//sendBytes, ints := action.Descriptor()
if sendBytes, err = proto.Marshal(action); err != nil {
return
}
pkgLen := uint32(len(sendBytes))
var buf [4]byte
binary.BigEndian.PutUint32(buf[:4],pkgLen)
if readLen, err = transfer.Conn.Write(buf[:4]); readLen != 4 && err != nil {
if readLen == 0 {
return errors.New("發送數據長度發生異常,長度為0")
}
return
}
// 發送消息
if readLen, err = transfer.Conn.Write(sendBytes); err != nil {
if readLen == 0 {
return errors.New("檢查到服務器關閉,客戶端也關閉")
}
return
}
return
}
- 這里發送消息和讀取消息都需要先發送/解析數據的長度,然后發送/解析數據本身;
- 這里與服務端怎么樣解析/發送數據有關,這是由於
netty框架中定義的編碼解碼器決定的。
java服務端
目錄結構
server_proto/
├── src
│ ├── main
│ ├── java
│ ├── com
│ ├── dust
│ ├── proto_server
│ ├── config
│ └── NettyConfig.java
│ ├── netty
│ └── NettyServerListener.java
│ └── SocketServerHandler.java
│ ├── proto
│ └── Helloworld.java
│ └── helloworld.proto # proto配置文件
│ └── Application.java # 啟動配置類
│ ├── resources
│ └── application.yml #配置文件
│ ├── test
└── pom.xml # maven配置文件
采用springBoot+netty+maven開發
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.dust</groupId>
<artifactId>proto_server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>proto_server</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- protobuf依賴-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.19.Final</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 注意:
protobuf-java的版本為3.8.0,必須和安裝proto.exe的版本保持一致。
application.yml
# netty配置
netty:
# 端口號
port: 3210
# 最大線程數
maxThreads: 1024
# 數據包的最大長度
max_frame_length: 65535
NettyConfig.java
package com.dust.proto_server.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyConfig {
private int port;
}
生成Helloworld.java
- 在.proto文件處,輸入
protoc --java_out=./ helloworld.proto - 即可生成
Helloworld.java文件
SocketServerHandler.java
package com.dust.proto_server.netty;
import com.dust.proto_server.proto.Helloworld;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@ChannelHandler.Sharable
public class SocketServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(SocketServerHandler.class);
public ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx){
Channel channel = ctx.channel();
LOGGER.info(channel.id().toString()+"加入");
CHANNEL_GROUP.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx){
Channel channel = ctx.channel();
LOGGER.info(channel.id().toString()+"退出");
CHANNEL_GROUP.remove(channel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
//
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
LOGGER.info("開始讀取客戶端發送過來的數據");
Helloworld.ProtocolMessage protocolMessage = (Helloworld.ProtocolMessage) msg;
Helloworld.ProtocolMessage.Builder builder = Helloworld.ProtocolMessage.newBuilder();
if (protocolMessage.getSearchRequest().getSerializedSize() != 0) {
Helloworld.ProtocolMessage.SearchRequest searchRequest = protocolMessage.getSearchRequest();
LOGGER.info("searchRequest--{}",searchRequest);
Helloworld.ProtocolMessage.SearchResponse searchResponse = Helloworld.ProtocolMessage.SearchResponse.newBuilder().setName("i am SearchResponse").setSearch(45).build();
builder.setSearchResponse(searchResponse);
} else if (protocolMessage.getActionRequest().getSerializedSize() != 0) {
Helloworld.ProtocolMessage.ActionRequest actionRequest = protocolMessage.getActionRequest();
LOGGER.info("actionRequest--{}",actionRequest);
Helloworld.ProtocolMessage.ActionResponse actionResponse = Helloworld.ProtocolMessage.ActionResponse.newBuilder().setName("i am ActionResponse").setAction(67).build();
builder.setActionResponse(actionResponse);
}
Helloworld.ProtocolMessage message = builder.build();
// 發送數據長度
ctx.channel().writeAndFlush(message.toByteArray().length);
// 發送數據本身
ctx.channel().writeAndFlush(message);
}
}
NettyServerListener.java
package com.dust.proto_server.netty;
import com.dust.proto_server.config.NettyConfig;
import com.dust.proto_server.proto.Helloworld;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
@Component
public class NettyServerListener {
/**
* NettyServerListener 日志輸出器
*
*/
private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);
/**
* 創建bootstrap
*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* BOSS
*/
EventLoopGroup boss = new NioEventLoopGroup();
/**
* Worker
*/
EventLoopGroup work = new NioEventLoopGroup();
@Resource
private SocketServerHandler socketServerHandler;
/**
* NETT服務器配置類
*/
@Resource
private NettyConfig nettyConfig;
/**
* 關閉服務器方法
*/
@PreDestroy
public void close() {
LOGGER.info("關閉服務器....");
//優雅退出
boss.shutdownGracefully();
work.shutdownGracefully();
}
/**
* 開啟及服務線程
*/
public void start() {
// 從配置文件中(application.yml)獲取服務端監聽端口號
int port = nettyConfig.getPort();
serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 負責通過4字節Header指定的Body長度將消息切割
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
// 負責將frameDecoder處理后的完整的一條消息的protobuf字節碼轉成ProtocolMessage對象
pipeline.addLast("protobufDecoder",
new ProtobufDecoder(Helloworld.ProtocolMessage.getDefaultInstance()));
// 負責將寫入的字節碼加上4字節Header前綴來指定Body長度
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
// 負責將ProtocolMessage對象轉成protobuf字節碼
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast(socketServerHandler);
}
}).option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO));
try {
LOGGER.info("netty服務器在[{}]端口啟動監聽", port);
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.info("[出現異常] 釋放資源");
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
- 這個類就定義服務端是怎么樣處理接受和發送數據的;

-
frameDecoder和protobufDecoder對應的handler用於解碼Protobuf package數據包,他們都是Upstream Handles:先處理長度,然后再處理數據本身; -
frameEncoder和protobufEncoder對應的handler用於編碼Protobuf package數據包,他們都是Downstream Handles; -
此外還有一個handler,是一個自定義的Upstream Handles,用於開發者從網絡數據中解析得到自己所需的數據
socketServerHandler; -
上例Handles的執行順序為
upstream:frameDecoder,protobufDecoder,handler //解碼從Socket收到的數據 downstream:frameEncoder,protobufEncoder //編碼要通過Socket發送出去的數據
Application.java
package com.dust.proto_server;
import com.dust.proto_server.netty.NettyServerListener;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;
@SpringBootApplication
public class Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Resource
private NettyServerListener nettyServerListener;
@Override
public void run(String... args) throws Exception {
nettyServerListener.start();
}
}
測試
-
先啟動服務端,再啟動客戶端
-
search測試

- action測試

