原文如下:
rpcx是一個類似阿里巴巴 Dubbo 和微博 Motan 的分布式的RPC服務框架,基於Golang net/rpc實現。
談起分布式的RPC框架,比較出名的是阿里巴巴的dubbo,包括由當當網維護的dubbox。
不知道dubbo在阿里的內部競爭中敗給了HSF,還是阿里有意將其閉源了,官方的代碼使用的spring還停留在2.5.6.SEC03的版本,dubbox的spring也只升級到3.2.9.RELEASE。
不管怎樣,dubbo還是在電商企業得到廣泛的應用,京東也有部分在使用dubbo開發。
DUBBO是一個分布式服務框架,致力於提供高性能和透明化的RPC遠程服務調用方案,是阿里巴巴SOA服務化治理方案的核心框架,每天為2,000+個服務提供3,000,000,000+次訪問量支持,並被廣泛應用於阿里巴巴集團的各成員站點。微博的RPC框架 Motan 也正式開源了,如張雷所說:
2013 年微博 RPC 框架 Motan 在前輩大師們(福林、fishermen、小麥、王喆等)的精心設計和辛勤工作中誕生,向各位大師們致敬,也得到了微博各個技術團隊的鼎力支持及不斷完善,如今 Motan 在微博平台中已經廣泛應用,每天為數百個服務完成近千億次的調用。
這兩個個優秀的框架都是使用Java開發的,國外的互聯網企業也有非常出名的的RPC框架如 thrift 、 finagle 。
本項目 rpcx 的目標就是實現一個Go生態圈的Dubbo,為Go生態圈提供一個分布式的、多插件的、帶有服務治理功能的產品級的RPC框架。
Go生態圈已經有一些RPC庫,如官方的 net/rpc 、 grpc-go 、 gorilla-rpc 等,為什么還要開發 rpcx 呢?
原因在於盡管這些框架都是為Go實現的RPC庫,但是它們的功能比較單一,只是實現了點對點(End-to-End)的通訊框架。缺乏服務治理的功能,比如服務注冊和發現、負載均衡、容災、服務監控等功能。因此我基於Go net/rpc框架實現了一個類似Dubbo的分布式框架。
和rpcx比較類似的Go RPC框架是 go-micro ,但是rpcx提供了更豐富的功能,基於TCP的通訊協議性能更好。
RPC是什么
遠程過程調用(英語:Remote Procedure Call,縮寫為 RPC)是一個計算機通信協議。該協議允許運行於一台計算機的程序調用另一台計算機的子程序,而程序員無需額外地為這個交互作用編程。如果涉及的軟件采用面向對象編程,那么遠程過程調用亦可稱作遠程調用或遠程方法調用,例:Java RMI。簡單地說就是能使應用像調用本地方法一樣的調用遠程的過程或服務。很顯然,這是一種client-server的交互形式,調用者(caller)是client,執行者(executor)是server。典型的實現方式就是request–response通訊機制。
RPC 是進程之間的通訊方式(inter-process communication, IPC), 不同的進程有不同的地址空間。
如果client和server在同一台機器上,盡管物理地址空間是相同的,但是虛擬地址空間不同。
如果它們在不同的主機上,物理地址空間也不同。
RPC的實現的技術各不相同,也不一定兼容。
一個正常的RPC過程可以分成下面幾步:
- client調用client stub,這是一次本地過程調用
- client stub將參數打包成一個消息,然后發送這個消息。打包過程也叫做 marshalling
- client所在的系統將消息發送給server
- server的的系統將收到的包傳給server stub
- server stub解包得到參數。 解包也被稱作 unmarshalling
- 最后server stub調用服務過程. 返回結果按照相反的步驟傳給client
RPC只是描繪了 Client 與 Server 之間的點對點調用流程,包括 stub、通信、RPC 消息解析等部分,在實際應用中,還需要考慮服務的高可用、負載均衡等問題,所以產品級的 RPC 框架除了點對點的 RPC 協議的具體實現外,還應包括服務的發現與注銷、提供服務的多台 Server 的負載均衡、服務的高可用等更多的功能。目前的 RPC 框架大致有兩種不同的側重方向,一種偏重於服務治理,另一種偏重於跨語言調用。
服務治理型的 RPC 框架有 Dubbo、DubboX、Motan 等,這類的 RPC 框架的特點是功能豐富,提供高性能的遠程調用以及服務發現及治理功能,適用於大型服務的微服務化拆分以及管理,對於特定語言(Java)的項目可以十分友好的透明化接入。但缺點是語言耦合度較高,跨語言支持難度較大。
跨語言調用型的 RPC 框架有 Thrift、gRPC、Hessian、Hprose 等,這一類的 RPC 框架重點關注於服務的跨語言調用,能夠支持大部分的語言進行語言無關的調用,非常適合於為不同語言提供通用遠程服務的場景。但這類框架沒有服務發現相關機制,實際使用時一般需要代理層進行請求轉發和負載均衡策略控制。
本項目 rpcx 屬於服務治理類型,是一個基於 Go 開發的高性能的輕量級 RPC 框架,Motan 提供了實用的服務治理功能和基於插件的擴展能力。
RPCX的特點
rpcx使用Go實現,適合使用Go語言實現RPC的功能。
- 基於net/rpc,可以將net/rpc實現的RPC項目輕松的轉換為分布式的RPC
- 插件式設計,可以配置所需的插件,比如服務發現、日志、統計分析等
- 基於TCP長連接,只需很小的額外的消息頭
- 支持多種編解碼協議,如Gob、Json、MessagePack、gencode、ProtoBuf等
- 服務發現:服務發布、訂閱、通知等,支持多種發現方式如ZooKeeper、Etcd等
- 高可用策略:失敗重試(Failover)、快速失敗(Failfast)
- 負載均衡:支持隨機請求、輪詢、低並發優先、一致性 Hash等
- 規模可擴展,可以根據性能的需求增減服務器
- 其他:調用統計、訪問日志等
rpcx目標是輕量級的,小而簡單,但是期望所有的功能都可以通過插件的方式搭積木的方式完成。
RPCX架構
rpcx中有服務提供者 RPC Server,服務調用者 RPC Client 和服務注冊中心 Registry 三個角色。
- Server 向 Registry 注冊服務,並向注冊中心發送心跳匯報狀態(基於不同的registry有不同的實現)。
- Client 需要向注冊中心查詢 RPC 服務者列表,Client 根據 Registry 返回的服務者列表,選取其中一個 Sever 進行 RPC 調用。
- 當 Server 發生宕機時,Registry 會監測到服務者不可用(zookeeper session機制或者手工心跳),Client 感知后會對本地的服務列表作相應調整。client可能被動感知(zookeeper)或者主動定時拉取。
- 可選地,Server可以定期向Registry匯報調用統計信息,Client可以根據調用次數選擇壓力最小的Server
當前rpcx支持zookeeper, etcd等注冊中心,Consul注冊中心正在開發中。
rpcx基於Go net/rpc的底層實現, Client和Server之間通訊是通過TCP進行通訊的,它們之間通過Client發送Request,Server返回Response實現。
Request和Response消息的格式都是 Header+Body
的格式。Header和Body具體的格式根據編碼方式的不同而不同,可以是二進制,也可以是結構化數據如JSON。
RPCX的特性
rpcx擁有眾多特性。
服務器特性
編碼 (序列化)
rpcx當前支持多種序列化/反序列化的方式,可以根據需求選擇合適的編碼庫。
特性 | 功能描述 |
---|---|
gob | 官方提供的序列化方式,基於一個包含元數據的流 |
jsonrpc | 也是官方提供的編碼庫,以JSON格式傳輸 |
msgp | 類似json格式的編碼,但是更小更快,可以直接編碼struct |
gencode | 一個超級快的序列化庫,需要定義schema,但是定義方式和struct類似 |
protobuf | Google推出的廣受關注的序列化庫,推薦使用 gogo-protobuf ,可以獲得更高的性能 |
在數據結構簡單的情況下,這幾種庫都可以滿足需求,參照本文中的benchmark測試。但是如果追求性能,建議采用后面三種序列化庫。
序列化庫的選擇對於RPC服務的影響是巨大的,我創建了另外一個項目專門比較各序列化庫的性能: gosercomp 。
新的序列化庫的實現也非常簡單,只需實現下面兩個方法即可:
funcNewXXXXXServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { …… } funcNewXXXXXClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec { …… }
編碼庫負責marshal/unmarshal Reuqest/Response, 包括消息中的Header和Body。如果你想,你也可以對Header和Body實現不同的編碼。
注冊中心
目前提供了兩種注冊中心:
- ZooKeeperRegisterPlugin
通過ZooKeeper實現服務發現。
服務在注冊的時候會自動在ZooKeeper上創建一個Ephemeral節點,因此當服務宕機的時候此節點就被刪除,Client也會感知到。
同時,Server也會把調用次數定時更新到ZooKeeper,這樣Client可以根據一段時間的調用次數選擇壓力較小的服務器節點進行連接。
注冊中心的配置只需在服務器初始化的時候增加以下代碼,服務的實現無需做任何的改動,也不需要額外的配置。
plugin := &ZooKeeperRegisterPlugin{ ServiceAddress: "tcp@127.0.0.1:1234", ZooKeeperServers: []string{"127.0.0.1:2181"}, BasePath: "/betterrpc", metrics: metrics.NewRegistry(), Services: make([]string,1), updateInterval: time.Minute, } server.PluginContainer.Add(plugin)
其中ServiceAddress為本機(Server)要暴露給Client地址。因為ZooKeeper的節點名不支持"/",所以此處用"@"代替"://"。
ZooKeeperServers為ZK集群的地址。
BasePath為一個服務組,此組下的服務對於Client都是可見的。
- EtcdRegisterPlugin
通過etcd也可以實現服務發現。
etcd可以通過TTL判斷服務器的存活,另外此插件也會定時把調用次數定時更新到etcd。
此插件可以使用下面的代碼配置:
plugin := &EtcdRegisterPlugin{ ServiceAddress: "tcp@127.0.0.1:1234", EtcdServers: []string{"http://127.0.0.1:2379"}, BasePath: "/betterrpc", metrics: metrics.NewRegistry(), Services: make([]string,1), updateInterval: time.Minute, } server.PluginContainer.Add(plugin)
注意注冊中心插件必須在配置服務之前設置,否則注冊中心無法獲取要注冊的服務信息。
擴展點
當前rpcx為server提供了以下擴展點:
- 服務注冊時
- Client連接時
- 讀取Request Header的前后
- 讀取Request Body的前后
- 返回Response的前后
你可以根據這些擴展點編寫自己的插件,只需實現相應的接口即可。定義的接口你可以看godoc的IXXXXXXPlugin的定義。
上面介紹的注冊中心就是通過插件的方式實現。同時rpcx還實現了其它的插件,如下面的介紹。
- LogRegisterPlugin: 記錄服務注冊日志
- MetricsPlugin: 統計服務調用次數和處理時間
- RateLimitingPlugin: 限流操作,限定服務器的TPS
客戶端特性
負載均衡
負載均衡是通過不同的ClientSelector來實現的。
負載均衡器 | 功能描述 |
---|---|
DirectClientSelector | 點對點的直連,客戶端直接連接一個服務器 |
MultiClientSelector | 多對多的直連,一個客戶端可以從一組固定的服務器中選擇一個直連,無需注冊中心 |
ZooKeeperClientSelector | 從ZK注冊中心選擇一個服務器連接 |
EtcdClientSelector | 從Etcd注冊中心選擇一個服務器連接 |
一個Selector需要實現ClientSelector接口:
typeClientSelectorinterface{ Select(clientCodecFunc ClientCodecFunc) (*rpc.Client, error) }
Client的序列化方式必須和服務器的序列化方式保持一致。
容錯
Client提供了兩種容錯方式: Failfast
、 Failover
、 Failtry
:
- Failfast: 如果Client調用失敗,立即返回,不會重試
- Failover: 如果Client調用失敗,會嘗試從服務列表中選擇另外一個服務器調用,直到成功或者到達重試次數
- Failtry: 如果Client調用失敗,會繼續這個服務器重試,直到成功或者到達重試次數
重選算法
對於多個服務器,重選發送支持:
- 隨機選擇: 隨機選擇一個服務器並返回,可能和上一次的重復
- RoundRobin: 按順序選擇一個服務器
- 一致性哈希 [TODO]:使用 Jump Consistent Hash algorithm
- CallLeast [TODO]: 根據調用次數選擇壓力最小的服務器
擴展點
Client的擴展點如下:
- 讀取Response Header的前后
- 讀取Response Body的前后
- 寫Request的前后
RPCX例子
點對點
點對點的實現和Go net/rpc的使用基本一致。
Server
packagemain import"github.com/smallnest/rpcx" typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } typeArithint func(t *Arith) Mul(args *Args, reply *Reply) error { reply.C = args.A * args.B returnnil } func(t *Arith) Error(args *Args, reply *Reply) error { panic("ERROR") } funcmain() { server := rpcx.NewServer() server.RegisterName("Arith",new(Arith)) server.Serve("tcp","127.0.0.1:8972") }
Client
同步方式:
packagemain import( "fmt" "time" "github.com/smallnest/rpcx" ) typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } funcmain() { s := &rpcx.DirectClientSelector{Network: "tcp", Address:"127.0.0.1:8972", Timeout:10* time.Second} client := rpcx.NewClient(s) args := &Args{7,8} varreply Reply err := client.Call("Arith.Mul", args, &reply) iferr !=nil{ fmt.Printf("error for Arith: %d*%d, %v \n", args.A, args.B, err) } else{ fmt.Printf("Arith: %d*%d=%d \n", args.A, args.B, reply.C) } client.Close() }
異步方式(通過Channel獲得執行結果):
packagemain import( "fmt" "time" "github.com/smallnest/rpcx" ) typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } funcmain() { s := &rpcx.DirectClientSelector{Network: "tcp", Address:"127.0.0.1:8972", Timeout:10* time.Second} client := rpcx.NewClient(s) args := &Args{7,8} varreply Reply divCall := client.Go("Arith.Mul", args, &reply,nil) replyCall := <-divCall.Done // will be equal to divCall ifreplyCall.Error !=nil{ fmt.Printf("error for Arith: %d*%d, %v \n", args.A, args.B, replyCall.Error) } else{ fmt.Printf("Arith: %d*%d=%d \n", args.A, args.B, reply.C) } client.Close() }
多服務器
Server
這里例子啟動了兩個服務器,其中一個服務器故意將 7 * 8
計算成 560
,以便和另外一個服務器進行區分,我們可以觀察計算結果。
packagemain import"github.com/smallnest/rpcx" typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } typeArithint func(t *Arith) Mul(args *Args, reply *Reply) error { reply.C = args.A * args.B returnnil } func(t *Arith) Error(args *Args, reply *Reply) error { panic("ERROR") } typeArith2int func(t *Arith2) Mul(args *Args, reply *Reply) error { reply.C = args.A * args.B *10 returnnil } func(t *Arith2) Error(args *Args, reply *Reply) error { panic("ERROR") } funcmain() { server1 := rpcx.NewServer() server1.RegisterName("Arith",new(Arith)) server1.Start("tcp","127.0.0.1:8972") server2 := rpcx.NewServer() server2.RegisterName("Arith",new(Arith2)) server2.Serve("tcp","127.0.0.1:8973") }
Client
隨機選取服務器的例子:
packagemain import( "fmt" "time" "github.com/smallnest/rpcx" "github.com/smallnest/rpcx/clientselector" ) typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } funcmain() { server1 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8972"} server2 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8973"} servers := []clientselector.ServerPair{server1, server2} s := clientselector.NewMultiClientSelector(servers, rpcx.RandomSelect,10*time.Second) fori :=0; i <10; i++ { callServer(s) } } funccallServer(s rpcx.ClientSelector) { client := rpcx.NewClient(s) args := &Args{7,8} varreply Reply err := client.Call("Arith.Mul", args, &reply) iferr !=nil{ fmt.Printf("error for Arith: %d*%d, %v \n", args.A, args.B, err) } else{ fmt.Printf("Arith: %d*%d=%d \n", args.A, args.B, reply.C) } client.Close() }
RoundRobin選取服務器的例子
packagemain import( "fmt" "time" "github.com/smallnest/rpcx" "github.com/smallnest/rpcx/clientselector" ) typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } funcmain() { server1 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8972"} server2 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8973"} servers := []clientselector.ServerPair{server1, server2} s := clientselector.NewMultiClientSelector(servers, rpcx.RoundRobin,10*time.Second) fori :=0; i <10; i++ { callServer(s) } } funccallServer(s rpcx.ClientSelector) { client := rpcx.NewClient(s) args := &Args{7,8} varreply Reply err := client.Call("Arith.Mul", args, &reply) iferr !=nil{ fmt.Printf("error for Arith: %d*%d, %v \n", args.A, args.B, err) } else{ fmt.Printf("Arith: %d*%d=%d \n", args.A, args.B, reply.C) } client.Close() }
Failover
packagemain import( "fmt" "time" "github.com/smallnest/rpcx" "github.com/smallnest/rpcx/clientselector" ) typeArgsstruct{ A int`msg:"a"` B int`msg:"b"` } typeReplystruct{ C int`msg:"c"` } funcmain() { server1 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8972"} server2 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8973"} server3 := clientselector.ServerPair{Network: "tcp", Address:"127.0.0.1:8974"} servers := []clientselector.ServerPair{server1, server2, server3} s := clientselector.NewMultiClientSelector(servers, rpcx.RoundRobin,10*time.Second) fori :=0; i <10; i++ { callServer(s) } } funccallServer(s rpcx.ClientSelector) { client := rpcx.NewClient(s) client.FailMode = rpcx.Failover args := &Args{7,8} varreply Reply err := client.Call("Arith.Mul", args, &reply) iferr !=nil{ fmt.Printf("error for Arith: %d*%d, %v \n", args.A, args.B, err) } else{ fmt.Printf("Arith: %d*%d=%d \n", args.A, args.B, reply.C) } client.Close() }
Benchmark
rpcx基於Go net/rpc框架實現,它的插件機制並不會帶來多少性能的損失,如下面的測試,rpcx性能和官方的Go net/rpc持平。
[root@localhostrpcx]# go test -bench . -test.benchmem PASS BenchmarkNetRPC_gob-1610000018742ns/op321B/op9allocs/op BenchmarkNetRPC_jsonrpc-1610000021360ns/op1170B/op31allocs/op BenchmarkNetRPC_msgp-1610000018617ns/op776B/op35allocs/op BenchmarkRPCX_gob-1610000018718ns/op320B/op9allocs/op BenchmarkRPCX_json-1610000021238ns/op1170B/op31allocs/op BenchmarkRPCX_msgp-1610000018635ns/op776B/op35allocs/op BenchmarkRPCX_gencodec-1610000018454ns/op4485B/op17allocs/op BenchmarkRPCX_protobuf-1610000017234ns/op733B/op13allocs/op
參考文檔
- 誰能用通俗的語言解釋一下什么是RPC框架?
- DUBBO
- 支撐微博千億調用的輕量級RPC框架:Motan
- 你應該知道的 RPC 原理
- Twitter的RPC框架Finagle簡介
- armeria: Netty的作者正在開發的一個RPC庫
- wikipedia RPC
參考資料:
RPCX: http://www.tuicool.com/m/articles/vYB3euv
go-micro: https://github.com/micro/go-micro
https://blog.micro.mu/2016/03/28/go-micro.html