原文:https://juejin.cn/post/6854573218884222989
前言
上一篇文章講述了RPC服務的概念和gRPC的基本使用、proto語法的使用教程。然而在我們真正把gRPC服務部署到生產環境上的時候,會遇到很多問題,首選要考慮的就是協議的數據認證問題,其次,gRPC也支持流式通信的模式,本篇文章會做一個介紹說明。
RPC的身份認證
RPC服務一般在服務內網使用,但是也有存在於外網的RPC服務,但是不論是哪一種RPC服務,走http2.0還是其他基於TCP實現socket的協議,在部署生產環境的時候還是需要增加身份加密認證機制的,客戶端與服務端需要做一個可信任的認證,以保障數據的安全性。
RPC服務一般的加密認證方法
基於 SSL/TLS 的通道加密 通常身份認證機制是通過 TLS/SSL 對傳輸通道進行加密,以防止請求和響應消息中的敏感數據泄漏。使用的場景主要有三種:
- 后端微服務直接開放給端側,例如手機 App、TV、多屏等,沒有統一的 API Gateway/SLB 做安全接入和認證;
- 后端微服務直接開放給 DMZ 部署的管理或者運維類 Portal;
- 后端微服務直接開放給第三方合作伙伴 / 渠道。 除了上述常用的跨網絡場景之外,對於一些安全等級要求比較高的業務場景,即便是內網通信,只要跨主機 /VM/ 容器通信,都強制要求對傳輸通道進行加密。在該場景下,即便只存在內網各模塊的 RPC 調用,仍然需要做 SSL/TLS。
使用 SSL/TLS 的典型場景如下所示:

目前使用最廣的 SSL/TLS 工具 / 類庫就是 OpenSSL,它是為網絡通信提供安全及數據完整性的一種安全協議,囊括了主要的密碼算法、常用的密鑰和證書封裝管理功能以及 SSL 協議。
多數 SSL 加密網站是用名為 OpenSSL 的開源軟件包,由於這也是互聯網應用最廣泛的安全傳輸方法,被網銀、在線支付、電商網站、門戶網站、電子郵件等重要網站廣泛使用。
針對敏感數據的單獨加密 有些 RPC 調用並不涉及敏感數據的傳輸,或者敏感字段占比較低,為了最大程度的提升吞吐量,降低調用時延,通常會采用 HTTP/TCP + 敏感字段單獨加密的方式,既保障了敏感信息的傳輸安全,同時也降低了采用 SSL/TLS 加密通道帶來的性能損耗,對於 JDK 原生的 SSL 類庫,這種性能提升尤其明顯。
它的工作原理如下所示:

通常使用 Handler 攔截機制,對請求和響應消息進行統一攔截,根據注解或者加解密標識對敏感字段進行加解密,這樣可以避免侵入業務。
采用該方案的缺點主要有兩個:
- 對敏感信息的識別可能存在偏差,容易遺漏或者過度保護,需要解讀數據和隱私保護方面的法律法規,而且不同國家對敏感數據的定義也不同,這會為識別帶來很多困難;
- 接口升級時容易遺漏,例如開發新增字段,忘記識別是否為敏感數據。
gRPC的加密認證方法
對於gRPC,SSL/TLS協議也是基本的身份加密認證方法,SSL/TLS協議采用公鑰加密法,客戶端向服務器端索要公鑰,然后用公鑰加密信息,服務器收到密文后,用自己的私鑰解密。
SSL/TLS SSL/TLS 分為單向認證和雙向認證,在實際業務中,單向認證使用較多,即客戶端認證服務端,服務端不認證客戶端。認證流程如下:
- 客戶端向服務端傳送客戶端 SSL 協議的版本號、支持的加密算法種類、產生的隨機數,以及其它可選信息;
- 服務端返回握手應答,向客戶端傳送確認 SSL 協議的版本號、加密算法的種類、隨機數以及其它相關信息;
- 服務端向客戶端發送自己的公鑰;
- 客戶端對服務端的證書進行認證,服務端的合法性校驗包括:證書是否過期、發行服務器證書的 CA 是否可靠、發行者證書的公鑰能否正確解開服務器證書的“發行者的數字簽名”、服務器證書上的域名是否和服務器的實際域名相匹配等;
- 客戶端隨機產生一個用於后面通訊的“對稱密碼”,然后用服務端的公鑰對其加密,將加密后的“預主密碼”傳給服務端;
- 服務端將用自己的私鑰解開加密的“預主密碼”,然后執行一系列步驟來產生主密碼;
- 客戶端向服務端發出信息,指明后面的數據通訊將使用主密碼為對稱密鑰,同時通知服務器客戶端的握手過程結束;
- 服務端向客戶端發出信息,指明后面的數據通訊將使用主密碼為對稱密鑰,同時通知客戶端服務器端的握手過程結束;
- SSL 的握手部分結束,SSL 安全通道建立,客戶端和服務端開始使用相同的對稱密鑰對數據進行加密,然后通過 Socket 進行傳輸。
實踐gRPC的TLS認證
我們可以簡單通過一個例子來實踐一下gRPC的服務端和客戶端的TLS認證機制,其中還包括證書的生成、服務端、客戶端初始化的編寫等等步驟。
生成證書
openssl req -newkey rsa:2048 -nodes -keyout server.key -x509 -days 3650 -out server.crt
復制代碼
在執行生成證書的過程中,需要填入Country Name、State or Province Name、Locality Name、Organization Name、Organizational Unit Name、Common Name、Email Address等等,這些按需要填入就可以了,或者都可以留空。 注:其中的Common Name最好是自己定義之后填上,這個在客戶端連接的時候可以指定連接的名字,否則留空的話有可能自動獲取不到
我們可以定義Common Name為rpc_service,全部回車填完就會生成server.key和server.crt文件
部署gRPC服務
接上篇文章的proto文件定義(blog.csdn.net/dream_succe…) 實現服務端代碼server.py:
from concurrent import futures
import time
import grpc
import test_pb2
import test_pb2_grpc
# 實現 proto 文件中定義的 SearchService
class RequestRpc(test_pb2_grpc.SearchService):
# 實現 proto 文件中定義的 rpc 調用
def doRequest(self, request, context):
return test_pb2.Search(query = 'hello {msg}'.format(msg = request.name)) # return的數據是符合定義的SearchResponse格式
def serve():
# 啟動 rpc 服務,這里可定義最大接收和發送大小(單位M),默認只有4M
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=[
('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024)])
test_pb2_grpc.add_SearchServiceServicer_to_server(RequestRpc(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(60*60*24) # one day in seconds
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
復制代碼
客戶端代碼client.py:
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
def run():
# 連接 rpc 服務器
channel = grpc.insecure_channel('localhost:50051')
# 調用 rpc 服務
stub = test_pb2_grpc.SearchServiceStub(channel)
response = stub.doRequest(test_pb2.SearchRequest(query='henry'))
print("client received: ", response)
if __name__ == '__main__':
run()
復制代碼
加入TLS認證
將生成的server.key和server.crt文件放在server.py的項目目錄下,其中去過client.py在不同項目的話,server.crt文件還需要放置在client.py的項目目錄下,我們給服務端加上TLS認證邏輯:
from concurrent import futures
import time
import grpc
import test_pb2
import test_pb2_grpc
# 實現 proto 文件中定義的 SearchService
class RequestRpc(test_pb2_grpc.SearchService):
# 實現 proto 文件中定義的 rpc 調用
def doRequest(self, request, context):
return test_pb2.Search(query = 'hello {msg}'.format(msg = request.name)) # return的數據是符合定義的SearchResponse格式
def serve():
# 啟動 rpc 服務,這里可定義最大接收和發送大小(單位M),默認只有4M
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=[
('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024)])
test_pb2_grpc.add_SearchServiceServicer_to_server(RequestRpc(), server)
server.add_insecure_port('[::]:50051') # 注釋掉這句不安全的啟動服務的方法,加入以下寫法:
# read in key and certificate
with open('server.key', 'rb') as f:
private_key = f.read()
with open('server.crt', 'rb') as f:
certificate_chain = f.read()
# create server credentials
server_credentials = grpc.ssl_server_credentials(
((private_key, certificate_chain,),))
server.add_secure_port('[::]:50051', server_credentials)
server.start()
try:
while True:
time.sleep(60*60*24) # one day in seconds
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
復制代碼
對應的客戶端邏輯為:
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
def run():
# 讀取證書
with open('server.crt', 'rb') as f:
trusted_certs = f.read()
credentials = grpc.ssl_channel_credentials(
root_certificates=trusted_certs)
# 連接 rpc 服務器,這里填入證書的COMMON NAME,我們定義的是rpc_service
channel = grpc.secure_channel("{}:{}".format('localhost', 50051), credentials,
options=(('grpc.ssl_target_name_override', "rpc_service",),
('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024)))
# 調用 rpc 服務
stub = test_pb2_grpc.SearchServiceStub(channel)
response = stub.doRequest(test_pb2.SearchRequest(query='henry'))
print("client received: ", response)
if __name__ == '__main__':
run()
復制代碼
以上就可以實現帶TLS認證的gRPC服務啦
gRPC的流式通信
流式通信的方式
grpc同http通訊一樣,也是基於“請求響應”模式的一種通訊。 根據不同業務場景,可以分為:
- 客戶端單次請求,服務端回應一次:
- 客戶端一次請求,服務端流式應答(其實相當於返回給客戶端多條數據)
- 客戶端流式請求,服務端回應一次
- 客戶端流式請求,服務端流式應答 類似於建立socket連接,做成聊天會話一樣,服務端也可以給客戶端推送很多數據,比如客戶端在初始化連接的時候發個空消息或者消息id同步的數據給服務端,服務端可以把要同步給客戶端的數據流式地傳遞給客戶端。
流式通信的具體實現
例如我們要實現一個客戶端試試從服務端獲取位置經緯度的功能,我們可以在通過這種流式處理來完成。 服務端,在server.start()之前add這個流式獲取經緯度的方法:
def ListFeatures(self, request, context):
left = min(request.lo.longitude, request.hi.longitude)
right = max(request.lo.longitude, request.hi.longitude)
top = max(request.lo.latitude, request.hi.latitude)
bottom = min(request.lo.latitude, request.hi.latitude)
for feature in self.db:
if (feature.location.longitude >= left and
feature.location.longitude <= right and
feature.location.latitude >= bottom and
feature.location.latitude <= top):
yield feature
復制代碼
python里面是通過yield關鍵字實現一個生成器,流式響應的。 客戶端通過for循環或者也可以用生成器逐步獲取服務端的流式響應:
for feature in stub.ListFeatures(rectangle):
復制代碼
什么時候用Streaming RPC
- 大規模數據包
- 實時場景
gRPC的異常處理
隨着互聯網的快速發展,互聯網服務早已不是單體應用,而是由若干個模塊組成的微服務,每個模塊可以進行單獨的擴容、縮容,獨立上線部署等等;模塊與模塊之間通過網絡進行聯通。我們的應用必須對網絡錯誤進行妥善的處理。比如網絡出現抖動,正在通信的對端機器正好重新上線等。
gRPC的異常類型
gRPC有自己一套類似HTTP status code的錯誤碼,每個錯誤碼都是個字符串,如 INTERNAL、ABORTED、UNAVAILABLE。 我們經常遇到的就是“StatusCode=Unavailable, Detail="failed to connect to all addresses”這樣的報錯 原因分析
- RPC的客戶端請求沒有到達服務端,例如網絡解析抖動,雲主機ip地址變更等
- 服務端初始化的實例有問題,無法處理客戶端請求
- RPC客戶端與服務端的連接斷開或者連接未完成
對於以上情況,我們就需要增加重連機制、重發機制來保證服務可靠性。
重連機制
我們可以在連接gRPC服務的時候添加try except機制捕獲異常,一旦發現連接異常可以嘗試重試連接,如果在flask框架或者tornado框架里面,可以把連接方法寫成一個單例,然后在框架封裝好的請求異常處理機制里面添加重連,例如flask可以定義:
@app.errorhandler(Exception)
def handle_exception(e):
# 此處添加e的類型判斷,如果是RPC連接出錯,執行重新連接的代碼
復制代碼
如果是tornado可以定義:
def log_exception(self, typ, value, tb):
if issubclass(typ, RpcConnectError):
# 此處添加重新連接RPC的代碼
復制代碼
這樣就可以保證在你的系統中RPC的連接異常得到重連嘗試,以達到可靠性要求
重試機制
gRPC的channel在初始化的時候可以指定options參數,我們之前指定了最大發送和最大接收的字節數大小,這里也可以指定自動重試的配置,詳情參考:github.com/grpc/propos…透明重試,透明重試會在服務端的應用邏輯並沒有接收到請求的時候,gRPC 會進行自動的重試。透明重試可以解決了上訴原因分析里面的第1、2點,我們也可以自行配置重試,通過配置service config的retryPolicy參數。

實現案例:
options = [('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024),
('grpc.enable_retries', 1),
('grpc.service_config', '{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }')]
channel = grpc.insecure_channel("{}:{}".format('localhost', 50051),
options=options)
復制代碼
上面的代碼開啟了grpc.enable_retries,雖然默認開啟,但是設置為0也可以關閉掉透明重試; 另外grpc.service_config是一個配置,我們可以配置重試策略(參數配置可參考:grpc.github.io/grpc/core/g…):
{
"retryPolicy":{
"maxAttempts": 4,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMutiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE" ]
}
}
復制代碼
針對UNAVAILABLE這種錯誤進行重試,可以指定重試次數等等,具體參數含義可參考官網,簡單介紹一下:
- maxAttempts 必須是大於 1 的整數,對於大於5的值會被視為5
- initialBackoff 和 maxBackoff 必須指定,並且必須具有大於0
- backoffMultiplier 必須指定,並且大於零
- retryableStatusCodes 必須制定為狀態碼的數據,不能為空,並且沒有狀態碼必須是有效的 gPRC 狀態碼,可以是整數形式,並且不區分大小寫
對沖策略
對沖是指在不等待響應的情況主動發送單次調用的多個請求 如果一個方法使用對沖策略,那么首先會像正常的 RPC 調用一樣發送第一次請求,如果配置時間內沒有響應,那么直接發送第二次請求,以此類推,直到發送了 maxAttempts 次

實現案例
options = [('grpc.max_send_message_length', 100 * 1024 * 1024),
('grpc.max_receive_message_length', 100 * 1024 * 1024),
('grpc.enable_retries', 1),
('grpc.service_config', '{ "hedgingPolicy":{ "maxAttempts": 4, "hedgingDelay": "0.5s", "nonFatalStatusCodes":[ "UNAVAILABLE", "INTERNAL", "ABORTED" ] } }')]
channel = grpc.insecure_channel("{}:{}".format('localhost', 50051),
options=options)
復制代碼
注意: 使用對沖的時候,請求可能會訪問到不同的后端(如果設置了負載均衡),那么就要求方法在多次執行下是安全,並且符合預期的
重試限流
當客戶端的失敗和成功比超過某個閾值時,gRPC 會通過禁用這些重試策略來防止由於重試導致服務器過載,這就是重試限流策略。同樣可以在servie config中配置:
"retryThrottling":{
"maxTokens": 10,
"tokenRatio": 0.1
}
復制代碼
對於每一個服務器,gRPC 客戶端會維護一個 token_count 變量,最初設置為 maxToken , 值的范圍是 0 - maxToken
對於每個 RPC 請求都會對 token_count 產生一下效果
- 每個失敗的 RPC 請求都會遞減token_count 1
- 成功 RPC 將會遞增 token_count和tokenRatio 如果 token_count <= ( maxTokens / 2), 則關閉重試策略,直到 token_count > (maxTokens/2),恢復重試
對於對沖 RPC,發送第一個RPC請求后,如果 token_count 大於(maxTokens/2),才會發送后續的對沖請求 當 token_count <= ( maxTokens / 2) 時,重試請求會被取消,並且將狀態碼返回給調用者
實際的限流參數配置,還是需要根據服務器的性能資源來衡量。