前提
最近在看Netty相關的資料,剛好SOFA-BOLT是一個比較成熟的Netty自定義協議棧實現,於是決定研讀SOFA-BOLT的源碼,詳細分析其協議的組成,簡單分析其客戶端和服務端的源碼實現。
- 吐槽一下:
SOFA-BOLT的代碼縮進和FastJson類似,變量名稱強制對齊,對於一般開發者來說看着源碼會有不適感
當前閱讀的源碼是2021-08左右的SOFA-BOLT倉庫的master分支源碼。
SOFA-BOLT簡單介紹
SOFA-BOLT是螞蟻金融服務集團開發的一套基於Netty實現的網絡通信框架,本質是一套Netty私有協議棧封裝,目的是為了讓開發者能將更多的精力放在基於網絡通信的業務邏輯實現上,而不是過多的糾結於網絡底層NIO的實現以及處理難以調試的網絡問題和Netty二次開發問題。SOFA-BOLT的架構設計和功能如下:

上圖來源於SOFA-BOLT官網https://www.sofastack.tech/projects/sofa-bolt/overview
SOFA-BOLT協議透視
由於SOFA-BOLT協議是基於Netty實現的自定義協議棧,協議本身的實現可以快速地在Encoder和Decoder的實現中找到,進一步定位到com.alipay.remoting.rpc包中。從源碼得知,SOFA-BOLT協議目前有兩個版本,協議在RpcProtocol和RpcProtocolV2的類頂部注釋中有比較詳細的介紹,基於這些介紹可以簡單整理出兩個版本協議的基本構成。
V1版本協議的基本構成
V1版本的協議請求Frame基本構成:

V1版本的協議響應Frame基本構成:

針對V1版本的協議,各個屬性展開如下:
- 請求
Frame和響應Frame的公共屬性:
| 屬性Code | 屬性含義 | Java類型 | 大小(byte) | 備注 |
|---|---|---|---|---|
| proto | 協議編碼 | byte | 1 | V1版本下,proto = 1,V2版本下,proto = 2 |
| type | 類型 | byte | 1 | 0 => RESPONSE,1 => REQUEST,2 => REQUEST_ONEWAY |
| cmdcode | 命令編碼 | short | 2 | 1 => rpc request,2 => rpc response |
| ver2 | 命令版本 | byte | 1 | 從源碼得知目前固定為1 |
| requestId | 請求ID | int | 4 | 某個請求CMD的全局唯一標識 |
| codec | 編碼解碼器 | byte | 1 | - |
上表中,codec從字面上理解是編碼解碼器,實際上是序列化和反序列實現的標記,V1和V2目前都是固定codec = 1,通過源碼跟蹤到SerializerManager的配置值為Hessian2 = 1,也就是默認使用Hessian2進行序列化和反序列化,詳細見源碼中的HessianSerializer
- 請求
Frame特有的屬性:
| 屬性Code | 屬性含義 | Java類型 | 大小(byte) | 備注 |
|---|---|---|---|---|
| timeout | 請求超時時間 | int | 4 | |
| classLen | 請求對象(參數)類型的名稱長度 | short | 2 | 值>=0 |
| headerLen | 請求頭長度 | short | 2 | 值>=0 |
| contentLen | 請求內容長度 | int | 4 | 值>=0 |
| className bytes | 請求對象(參數)類型的名稱 | byte[] |
- | |
| header bytes | 請求頭 | byte[] |
- | |
| content bytes | 請求內容 | byte[] |
- |
- 響應
Frame特有的屬性:
| 屬性Code | 屬性含義 | Java類型 | 大小(byte) | 備注 |
|---|---|---|---|---|
| respstatus | 響應狀態值 | short | 2 | 在ResponseStatus中定義,目前內置13種狀態,例如0 => SUCCESS |
| classLen | 響應對象(參數)類型的名稱長度 | short | 2 | 值>=0 |
| headerLen | 響應頭長度 | short | 2 | 值>=0 |
| contentLen | 響應內容長度 | int | 4 | 值>=0 |
| className bytes | 響應對象(參數)類型的名稱 | byte[] |
- | |
| header bytes | 響應頭 | byte[] |
- | |
| content bytes | 響應內容 | byte[] |
- |
這里可以看出V1版本中的請求Frame和響應Frame只有細微的差別,(請求Frame中獨立存在timeout屬性,而響應Frame獨立存在respstatus屬性),絕大部分的屬性都是復用的,並且三個長度和三個字節數組是相互制約的:
classLen <=> className bytesheaderLen <=> header bytescontentLen <=> content bytes
V2版本協議的基本構成
V2版本的協議請求Frame基本構成:

V2版本的協議響應Frame基本構成:

V2版本的協議相比V1版本多了2個必傳公共屬性和1個可選公共屬性:
| 屬性Code | 屬性含義 | Java類型 | 大小(byte) | 備注 |
|---|---|---|---|---|
| ver1 | 協議版本 | byte | 1 | 是為了在V2版本協議中兼容V1版本的協議 |
| switch | 協議開關 | byte | 1 | 基於BitSet實現的開關,最多8個 |
| CRC32 | 循環冗余校驗值 | int | 4 | 可選的,由開關ProtocolSwitch.CRC_SWITCH_INDEX決定是否啟用,啟用的時候會基於整個Frame進行計算 |
這幾個新增屬性中,switch代表ProtocolSwitch實現中的BitSet轉換出來的byte字段,由於byte只有8位,因此協議在傳輸過程中最多只能傳遞8個開關的狀態,這些開關的下標為[0,7]。CRC32是基於整個Frame轉換出來的byte數組進行計算,JDK中有原生從API,可以簡單構建一個工具類如下進行計算:
public enum Crc32Utils {
/**
* 單例
*/
X;
/**
* 進行CRC32結果計算
*
* @param content 內容
* @return crc32 result
*/
public long crc32(byte[] content) {
CRC32 crc32 = new CRC32();
crc32.update(content, 0, content.length);
long r = crc32.getValue();
// crc32.reset();
return r;
}
}
V2版本協議把CRC32的計算結果強制轉換為int類型,可以思考一下這里為什么不會溢出。
SOFA-BOLT架構
考慮到如果分析源碼,文章篇幅會比較長,並且如果有開發過Netty自定義協議棧的經驗,SOFA-BOLT的源碼並不復雜,這里僅僅分析SOFA-BOLT的架構和核心組件功能。協議由接口Protocol定義:
public interface Protocol {
// 命令編碼器
CommandEncoder getEncoder();
// 命令解碼器
CommandDecoder getDecoder();
// 心跳觸發器
HeartbeatTrigger getHeartbeatTrigger();
// 命令處理器
CommandHandler getCommandHandler();
// 命令工廠
CommandFactory getCommandFactory();
}
由V2版本協議實現RpcProtocolV2可以得知:

另外,所有需要發送或者接收的Frame都被封裝為Command,而Command的類族如下:

也就是:
RequestCommand定義了請求命令需要的所有屬性,最終由RpcCommandEncoderV2進行編碼ResponseCommand定義了響應命令需要的所有屬性,最終由RpcCommandDecoderV2進行解碼
梳理完上面的組件就可以畫出下面的一個基於SOFA-BOLT協議進行的Client => Server的交互圖:

SOFA-BOLT使用
由於sofa-bolt已經封裝好了完整的RpcClient和RpcServer,使用此協議只需要引用依賴,然后初始化客戶端和服務端,編寫對應的UserProcessor實現即可。引入相關依賴:
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.65</version>
</dependency>
新建請求實體類RequestMessage、響應實體類ResponseMessage和對應的處理器RequestMessageProcessor:
@Data
public class RequestMessage implements Serializable {
private Long id;
private String content;
}
@Data
public class ResponseMessage implements Serializable {
private Long id;
private String content;
private Long status;
}
public class RequestMessageProcessor extends SyncUserProcessor<RequestMessage> {
@Override
public Object handleRequest(BizContext bizContext, RequestMessage requestMessage) throws Exception {
ResponseMessage message = new ResponseMessage();
message.setContent(requestMessage.getContent());
message.setId(requestMessage.getId());
message.setStatus(10087L);
return message;
}
@Override
public String interest() {
return RequestMessage.class.getName();
}
}
其中處理器需要同步處理需要繼承超類SyncUserProcessor,選用異步處理的時候需要繼承超類AsyncUserProcessor,作為參數的所有實體類必須實現Serializable接口(如果有嵌套對象,每個嵌套對象所在類也必須實現Serializable接口),否則會出現序列化相關的異常。最后編寫客戶端和服務端的代碼:
@Slf4j
public class BlotApp {
private static final int PORT = 8081;
private static final String ADDRESS = "127.0.0.1:" + PORT;
public static void main(String[] args) throws Exception {
RequestMessageProcessor processor = new RequestMessageProcessor();
RpcServer server = new RpcServer(8081, true);
server.startup();
server.registerUserProcessor(processor);
RpcClient client = new RpcClient();
client.startup();
RequestMessage request = new RequestMessage();
request.setId(99L);
request.setContent("hello bolt");
ResponseMessage response = (ResponseMessage) client.invokeSync(ADDRESS, request, 2000);
log.info("響應結果:{}", response);
}
}
運行輸出結果:
響應結果:ResponseMessage(id=99, content=hello bolt, status=10087)
基於SOFA-BOLT協議編寫簡單CURD項目
本地測試MySQL服務構建客戶表如下:
CREATE DATABASE test;
USE test;
CREATE TABLE t_customer
(
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
customer_name VARCHAR(32) NOT NULL
);
為了簡化JDBC操作,引入spring-boot-starter-jdbc(這里只借用JdbcTemplate的輕度封裝)相關依賴:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.3.0.RELEASE</version>
</dependency>
編寫核心同步處理器:
// 創建
@Data
public class CreateCustomerReq implements Serializable {
private String customerName;
}
@Data
public class CreateCustomerResp implements Serializable {
private Long code;
private Long customerId;
}
public class CreateCustomerProcessor extends SyncUserProcessor<CreateCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public CreateCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, CreateCustomerReq req) throws Exception {
KeyHolder keyHolder = new GeneratedKeyHolder();
jdbcTemplate.update(connection -> {
PreparedStatement ps = connection.prepareStatement("insert into t_customer(customer_name) VALUES (?)",
Statement.RETURN_GENERATED_KEYS);
ps.setString(1, req.getCustomerName());
return ps;
}, keyHolder);
CreateCustomerResp resp = new CreateCustomerResp();
resp.setCustomerId(Objects.requireNonNull(keyHolder.getKey()).longValue());
resp.setCode(RespCode.SUCCESS);
return resp;
}
@Override
public String interest() {
return CreateCustomerReq.class.getName();
}
}
// 更新
@Data
public class UpdateCustomerReq implements Serializable {
private Long customerId;
private String customerName;
}
public class UpdateCustomerProcessor extends SyncUserProcessor<UpdateCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public UpdateCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, UpdateCustomerReq req) throws Exception {
UpdateCustomerResp resp = new UpdateCustomerResp();
int updateCount = jdbcTemplate.update("UPDATE t_customer SET customer_name = ? WHERE id = ?", ps -> {
ps.setString(1, req.getCustomerName());
ps.setLong(2, req.getCustomerId());
});
if (updateCount > 0) {
resp.setCode(RespCode.SUCCESS);
}
return resp;
}
@Override
public String interest() {
return UpdateCustomerReq.class.getName();
}
}
// 刪除
@Data
public class DeleteCustomerReq implements Serializable {
private Long customerId;
}
@Data
public class DeleteCustomerResp implements Serializable {
private Long code;
}
public class DeleteCustomerProcessor extends SyncUserProcessor<DeleteCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public DeleteCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, DeleteCustomerReq req) throws Exception {
DeleteCustomerResp resp = new DeleteCustomerResp();
int updateCount = jdbcTemplate.update("DELETE FROM t_customer WHERE id = ?", ps -> ps.setLong(1,req.getCustomerId()));
if (updateCount > 0){
resp.setCode(RespCode.SUCCESS);
}
return resp;
}
@Override
public String interest() {
return DeleteCustomerReq.class.getName();
}
}
// 查詢
@Data
public class SelectCustomerReq implements Serializable {
private Long customerId;
}
@Data
public class SelectCustomerResp implements Serializable {
private Long code;
private Long customerId;
private String customerName;
}
public class SelectCustomerProcessor extends SyncUserProcessor<SelectCustomerReq> {
private final JdbcTemplate jdbcTemplate;
public SelectCustomerProcessor(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Object handleRequest(BizContext bizContext, SelectCustomerReq req) throws Exception {
SelectCustomerResp resp = new SelectCustomerResp();
Customer result = jdbcTemplate.query("SELECT * FROM t_customer WHERE id = ?", ps -> ps.setLong(1, req.getCustomerId()), rs -> {
Customer customer = null;
if (rs.next()) {
customer = new Customer();
customer.setId(rs.getLong("id"));
customer.setCustomerName(rs.getString("customer_name"));
}
return customer;
});
if (Objects.nonNull(result)) {
resp.setCustomerId(result.getId());
resp.setCustomerName(result.getCustomerName());
resp.setCode(RespCode.SUCCESS);
}
return resp;
}
@Override
public String interest() {
return SelectCustomerReq.class.getName();
}
@Data
public static class Customer {
private Long id;
private String customerName;
}
}
編寫數據源、客戶端和服務端代碼:
public class CurdApp {
private static final int PORT = 8081;
private static final String ADDRESS = "127.0.0.1:" + PORT;
public static void main(String[] args) throws Exception {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai");
config.setDriverClassName(Driver.class.getName());
config.setUsername("root");
config.setPassword("root");
HikariDataSource dataSource = new HikariDataSource(config);
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
CreateCustomerProcessor createCustomerProcessor = new CreateCustomerProcessor(jdbcTemplate);
UpdateCustomerProcessor updateCustomerProcessor = new UpdateCustomerProcessor(jdbcTemplate);
DeleteCustomerProcessor deleteCustomerProcessor = new DeleteCustomerProcessor(jdbcTemplate);
SelectCustomerProcessor selectCustomerProcessor = new SelectCustomerProcessor(jdbcTemplate);
RpcServer server = new RpcServer(PORT, true);
server.registerUserProcessor(createCustomerProcessor);
server.registerUserProcessor(updateCustomerProcessor);
server.registerUserProcessor(deleteCustomerProcessor);
server.registerUserProcessor(selectCustomerProcessor);
server.startup();
RpcClient client = new RpcClient();
client.startup();
CreateCustomerReq createCustomerReq = new CreateCustomerReq();
createCustomerReq.setCustomerName("throwable.club");
CreateCustomerResp createCustomerResp = (CreateCustomerResp)
client.invokeSync(ADDRESS, createCustomerReq, 5000);
System.out.println("創建用戶[throwable.club]結果:" + createCustomerResp);
SelectCustomerReq selectCustomerReq = new SelectCustomerReq();
selectCustomerReq.setCustomerId(createCustomerResp.getCustomerId());
SelectCustomerResp selectCustomerResp = (SelectCustomerResp)
client.invokeSync(ADDRESS, selectCustomerReq, 5000);
System.out.println(String.format("查詢用戶[id=%d]結果:%s", selectCustomerReq.getCustomerId(),
selectCustomerResp));
UpdateCustomerReq updateCustomerReq = new UpdateCustomerReq();
updateCustomerReq.setCustomerId(selectCustomerReq.getCustomerId());
updateCustomerReq.setCustomerName("throwx.cn");
UpdateCustomerResp updateCustomerResp = (UpdateCustomerResp)
client.invokeSync(ADDRESS, updateCustomerReq, 5000);
System.out.println(String.format("更新用戶[id=%d]結果:%s", updateCustomerReq.getCustomerId(),
updateCustomerResp));
selectCustomerReq.setCustomerId(updateCustomerReq.getCustomerId());
selectCustomerResp = (SelectCustomerResp)
client.invokeSync(ADDRESS, selectCustomerReq, 5000);
System.out.println(String.format("查詢更新后的用戶[id=%d]結果:%s", selectCustomerReq.getCustomerId(),
selectCustomerResp));
DeleteCustomerReq deleteCustomerReq = new DeleteCustomerReq();
deleteCustomerReq.setCustomerId(selectCustomerResp.getCustomerId());
DeleteCustomerResp deleteCustomerResp = (DeleteCustomerResp)
client.invokeSync(ADDRESS, deleteCustomerReq, 5000);
System.out.println(String.format("刪除用戶[id=%d]結果:%s", deleteCustomerReq.getCustomerId(),
deleteCustomerResp));
}
}
執行結果如下:
創建用戶[throwable.club]結果:CreateCustomerResp(code=0, customerId=1)
查詢用戶[id=1]結果:SelectCustomerResp(code=0, customerId=1, customerName=throwable.club)
更新用戶[id=1]結果:UpdateCustomerResp(code=0)
查詢更新后的用戶[id=1]結果:SelectCustomerResp(code=0, customerId=1, customerName=throwx.cn)
更新用戶[id=1]結果:DeleteCustomerResp(code=0)
確認最后刪除操作結束后驗證數據庫表,確認t_customer表為空。
基於GO語言編寫SOFA-BOLT協議客戶端
這里嘗試使用GO語言編寫一個SOFA-BOLT協議客戶端,考慮到實現一個完整版本會比較復雜,這里簡化為只實現Encode和命令調用部分,暫時不處理響應和Decode。編寫結構體RequestCommand如下:
// RequestCommand sofa-bolt v2 req cmd
type RequestCommand struct {
ProtocolCode uint8
ProtocolVersion uint8
Type uint8
CommandCode uint16
CommandVersion uint8
RequestId uint32
Codec uint8
Switch uint8
Timeout uint32
ClassLength uint16
HeaderLength uint16
ContentLength uint32
ClassName []byte
Header []byte
Content []byte
}
這里注意一點,所有的整數類型必須使用具體的類型,例如uint必須用uint32,否則會出現Buffer寫入異常的問題。接着編寫一個編碼方法:
// encode req => slice
func encode(cmd *RequestCommand) []byte {
container := make([]byte, 0)
buf := bytes.NewBuffer(container)
buf.WriteByte(cmd.ProtocolCode)
buf.WriteByte(cmd.ProtocolVersion)
buf.WriteByte(cmd.Type)
binary.Write(buf, binary.BigEndian, cmd.CommandCode)
buf.WriteByte(cmd.CommandVersion)
binary.Write(buf, binary.BigEndian, cmd.RequestId)
buf.WriteByte(cmd.Codec)
buf.WriteByte(cmd.Switch)
binary.Write(buf, binary.BigEndian, cmd.Timeout)
binary.Write(buf, binary.BigEndian, cmd.ClassLength)
binary.Write(buf, binary.BigEndian, cmd.HeaderLength)
binary.Write(buf, binary.BigEndian, cmd.ContentLength)
buf.Write(cmd.ClassName)
buf.Write(cmd.Header)
buf.Write(cmd.Content)
return buf.Bytes()
}
最后編寫TCP客戶端:
type Req struct {
Id int64 `json:"id"`
Name string `json:"name"`
}
package main
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"net"
)
func main() {
con, err := net.Dial("tcp", "127.0.0.1:9999")
if err != nil {
fmt.Println("err:", err)
return
}
defer con.Close()
req := &Req{
Id: 8080,
Name: "throwx.cn",
}
content, err := json.Marshal(req)
if err != nil {
fmt.Println("err:", err)
return
}
var header []byte
className := []byte("com.alipay.remoting.Req")
cmd := &RequestCommand{
ProtocolCode: 2,
ProtocolVersion: 2,
Type: 1,
CommandCode: 1,
CommandVersion: 1,
RequestId: 10087,
Codec: 1,
Switch: 0,
Timeout: 5000,
ClassLength: uint16(len(className)),
HeaderLength: 0,
ContentLength: uint32(len(content)),
ClassName: className,
Header: header,
Content: content,
}
pkg := encode(cmd)
_, err = con.Write(pkg)
if err != nil {
fmt.Println("err:", err)
return
}
}
協議的V2版本Crc32屬性是可選的,這里為了簡化處理也暫時忽略了
這里看到Content屬性為了簡化處理使用了JSON做序列化,因此需要稍微改動SOFA-BOLT的源碼,引入FastJson和FastJsonSerializer,改動見下圖:

先啟動BoltApp(SOFA-BOLT服務端),再執行GO編寫的客戶端,結果如下:

小結
SOFA-BOLT是一個高性能成熟可擴展的Netty私有協議封裝,比起原生Netty編程,提供了便捷的同步、異步調用,提供基礎心跳支持和重連等特性。引入SyncUserProcessor和AsyncUserProcessor的功能,對於業務開發更加友好。SOFA-BOLT協議本質也是一個緊湊、高性能的RPC協議。在考慮引入Netty進行底層通訊的場景,可以優先考慮使用SOFA-BOLT或者考慮把SOFA-BOLT作為候選方案之一,只因SOFA-BOLT是輕量級的,學習曲線平緩,基本沒有其他中間件依賴。
Demo所在倉庫:
(本文完 c-5-d e-a-20210806)
