(轉)Python——gRPC詳解及實戰避坑方案(下)


原文:https://juejin.cn/post/6854573218884222989

前言

上一篇文章講述了RPC服務的概念和gRPC的基本使用、proto語法的使用教程。然而在我們真正把gRPC服務部署到生產環境上的時候,會遇到很多問題,首選要考慮的就是協議的數據認證問題,其次,gRPC也支持流式通信的模式,本篇文章會做一個介紹說明。

RPC的身份認證

RPC服務一般在服務內網使用,但是也有存在於外網的RPC服務,但是不論是哪一種RPC服務,走http2.0還是其他基於TCP實現socket的協議,在部署生產環境的時候還是需要增加身份加密認證機制的,客戶端與服務端需要做一個可信任的認證,以保障數據的安全性。

RPC服務一般的加密認證方法

基於 SSL/TLS 的通道加密 通常身份認證機制是通過 TLS/SSL 對傳輸通道進行加密,以防止請求和響應消息中的敏感數據泄漏。使用的場景主要有三種:

  • 后端微服務直接開放給端側,例如手機 App、TV、多屏等,沒有統一的 API Gateway/SLB 做安全接入和認證;
  • 后端微服務直接開放給 DMZ 部署的管理或者運維類 Portal;
  • 后端微服務直接開放給第三方合作伙伴 / 渠道。 除了上述常用的跨網絡場景之外,對於一些安全等級要求比較高的業務場景,即便是內網通信,只要跨主機 /VM/ 容器通信,都強制要求對傳輸通道進行加密。在該場景下,即便只存在內網各模塊的 RPC 調用,仍然需要做 SSL/TLS。

使用 SSL/TLS 的典型場景如下所示:

在這里插入圖片描述

 

目前使用最廣的 SSL/TLS 工具 / 類庫就是 OpenSSL,它是為網絡通信提供安全及數據完整性的一種安全協議,囊括了主要的密碼算法、常用的密鑰和證書封裝管理功能以及 SSL 協議。

多數 SSL 加密網站是用名為 OpenSSL 的開源軟件包,由於這也是互聯網應用最廣泛的安全傳輸方法,被網銀、在線支付、電商網站、門戶網站、電子郵件等重要網站廣泛使用。

針對敏感數據的單獨加密 有些 RPC 調用並不涉及敏感數據的傳輸,或者敏感字段占比較低,為了最大程度的提升吞吐量,降低調用時延,通常會采用 HTTP/TCP + 敏感字段單獨加密的方式,既保障了敏感信息的傳輸安全,同時也降低了采用 SSL/TLS 加密通道帶來的性能損耗,對於 JDK 原生的 SSL 類庫,這種性能提升尤其明顯。

它的工作原理如下所示:

在這里插入圖片描述

 

通常使用 Handler 攔截機制,對請求和響應消息進行統一攔截,根據注解或者加解密標識對敏感字段進行加解密,這樣可以避免侵入業務。

采用該方案的缺點主要有兩個:

  • 對敏感信息的識別可能存在偏差,容易遺漏或者過度保護,需要解讀數據和隱私保護方面的法律法規,而且不同國家對敏感數據的定義也不同,這會為識別帶來很多困難;
  • 接口升級時容易遺漏,例如開發新增字段,忘記識別是否為敏感數據。

gRPC的加密認證方法

對於gRPC,SSL/TLS協議也是基本的身份加密認證方法,SSL/TLS協議采用公鑰加密法,客戶端向服務器端索要公鑰,然后用公鑰加密信息,服務器收到密文后,用自己的私鑰解密。

SSL/TLS SSL/TLS 分為單向認證和雙向認證,在實際業務中,單向認證使用較多,即客戶端認證服務端,服務端不認證客戶端。認證流程如下:

  • 客戶端向服務端傳送客戶端 SSL 協議的版本號、支持的加密算法種類、產生的隨機數,以及其它可選信息;
  • 服務端返回握手應答,向客戶端傳送確認 SSL 協議的版本號、加密算法的種類、隨機數以及其它相關信息;
  • 服務端向客戶端發送自己的公鑰;
  • 客戶端對服務端的證書進行認證,服務端的合法性校驗包括:證書是否過期、發行服務器證書的 CA 是否可靠、發行者證書的公鑰能否正確解開服務器證書的“發行者的數字簽名”、服務器證書上的域名是否和服務器的實際域名相匹配等;
  • 客戶端隨機產生一個用於后面通訊的“對稱密碼”,然后用服務端的公鑰對其加密,將加密后的“預主密碼”傳給服務端;
  • 服務端將用自己的私鑰解開加密的“預主密碼”,然后執行一系列步驟來產生主密碼;
  • 客戶端向服務端發出信息,指明后面的數據通訊將使用主密碼為對稱密鑰,同時通知服務器客戶端的握手過程結束;
  • 服務端向客戶端發出信息,指明后面的數據通訊將使用主密碼為對稱密鑰,同時通知客戶端服務器端的握手過程結束;
  • SSL 的握手部分結束,SSL 安全通道建立,客戶端和服務端開始使用相同的對稱密鑰對數據進行加密,然后通過 Socket 進行傳輸。

實踐gRPC的TLS認證

我們可以簡單通過一個例子來實踐一下gRPC的服務端和客戶端的TLS認證機制,其中還包括證書的生成、服務端、客戶端初始化的編寫等等步驟。

生成證書

openssl req -newkey rsa:2048 -nodes -keyout server.key -x509 -days 3650 -out server.crt
復制代碼

在執行生成證書的過程中,需要填入Country Name、State or Province Name、Locality Name、Organization Name、Organizational Unit Name、Common Name、Email Address等等,這些按需要填入就可以了,或者都可以留空。 注:其中的Common Name最好是自己定義之后填上,這個在客戶端連接的時候可以指定連接的名字,否則留空的話有可能自動獲取不到

我們可以定義Common Name為rpc_service,全部回車填完就會生成server.keyserver.crt文件

部署gRPC服務

接上篇文章的proto文件定義(blog.csdn.net/dream_succe…) 實現服務端代碼server.py:

from concurrent import futures
import time
import grpc
import test_pb2
import test_pb2_grpc

# 實現 proto 文件中定義的 SearchService
class RequestRpc(test_pb2_grpc.SearchService):
    # 實現 proto 文件中定義的 rpc 調用
    def doRequest(self, request, context):
        return test_pb2.Search(query = 'hello {msg}'.format(msg = request.name)) # return的數據是符合定義的SearchResponse格式

def serve():
    # 啟動 rpc 服務,這里可定義最大接收和發送大小(單位M),默認只有4M
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=[
        ('grpc.max_send_message_length', 100 * 1024 * 1024),
        ('grpc.max_receive_message_length', 100 * 1024 * 1024)])
    
    test_pb2_grpc.add_SearchServiceServicer_to_server(RequestRpc(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    try:
        while True:
            time.sleep(60*60*24) # one day in seconds
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    serve()
復制代碼

客戶端代碼client.py:

import grpc
import helloworld_pb2
import helloworld_pb2_grpc

def run():
    # 連接 rpc 服務器
    channel = grpc.insecure_channel('localhost:50051')
    # 調用 rpc 服務
    stub = test_pb2_grpc.SearchServiceStub(channel)
    response = stub.doRequest(test_pb2.SearchRequest(query='henry'))
    print("client received: ", response)

if __name__ == '__main__':
    run()
復制代碼

加入TLS認證

將生成的server.keyserver.crt文件放在server.py的項目目錄下,其中去過client.py在不同項目的話,server.crt文件還需要放置在client.py的項目目錄下,我們給服務端加上TLS認證邏輯:

from concurrent import futures
import time
import grpc
import test_pb2
import test_pb2_grpc

# 實現 proto 文件中定義的 SearchService
class RequestRpc(test_pb2_grpc.SearchService):
    # 實現 proto 文件中定義的 rpc 調用
    def doRequest(self, request, context):
        return test_pb2.Search(query = 'hello {msg}'.format(msg = request.name)) # return的數據是符合定義的SearchResponse格式

def serve():
    # 啟動 rpc 服務,這里可定義最大接收和發送大小(單位M),默認只有4M
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=[
        ('grpc.max_send_message_length', 100 * 1024 * 1024),
        ('grpc.max_receive_message_length', 100 * 1024 * 1024)])
    
    test_pb2_grpc.add_SearchServiceServicer_to_server(RequestRpc(), server)
    
    server.add_insecure_port('[::]:50051') # 注釋掉這句不安全的啟動服務的方法,加入以下寫法:
    # read in key and certificate
        with open('server.key', 'rb') as f:
            private_key = f.read()
        with open('server.crt', 'rb') as f:
            certificate_chain = f.read()

        # create server credentials
        server_credentials = grpc.ssl_server_credentials(
            ((private_key, certificate_chain,),))
        server.add_secure_port('[::]:50051', server_credentials)
    
    server.start()
    try:
        while True:
            time.sleep(60*60*24) # one day in seconds
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    serve()
復制代碼

對應的客戶端邏輯為:

import grpc
import helloworld_pb2
import helloworld_pb2_grpc

def run():
    # 讀取證書
    with open('server.crt', 'rb') as f:
        trusted_certs = f.read()
    credentials = grpc.ssl_channel_credentials(
        root_certificates=trusted_certs)
        
    # 連接 rpc 服務器,這里填入證書的COMMON NAME,我們定義的是rpc_service
    channel = grpc.secure_channel("{}:{}".format('localhost', 50051), credentials,
        options=(('grpc.ssl_target_name_override', "rpc_service",),
                 ('grpc.max_send_message_length', 100 * 1024 * 1024),
                 ('grpc.max_receive_message_length', 100 * 1024 * 1024)))    
    # 調用 rpc 服務
    stub = test_pb2_grpc.SearchServiceStub(channel)
    response = stub.doRequest(test_pb2.SearchRequest(query='henry'))
    print("client received: ", response)

if __name__ == '__main__':
    run()
復制代碼

以上就可以實現帶TLS認證的gRPC服務啦

gRPC的流式通信

流式通信的方式

grpc同http通訊一樣,也是基於“請求響應”模式的一種通訊。 根據不同業務場景,可以分為:

  1. 客戶端單次請求,服務端回應一次:
  2. 客戶端一次請求,服務端流式應答(其實相當於返回給客戶端多條數據)
  3. 客戶端流式請求,服務端回應一次
  4. 客戶端流式請求,服務端流式應答 類似於建立socket連接,做成聊天會話一樣,服務端也可以給客戶端推送很多數據,比如客戶端在初始化連接的時候發個空消息或者消息id同步的數據給服務端,服務端可以把要同步給客戶端的數據流式地傳遞給客戶端。 在這里插入圖片描述

流式通信的具體實現

例如我們要實現一個客戶端試試從服務端獲取位置經緯度的功能,我們可以在通過這種流式處理來完成。 服務端,在server.start()之前add這個流式獲取經緯度的方法:

def ListFeatures(self, request, context):
  left = min(request.lo.longitude, request.hi.longitude)
  right = max(request.lo.longitude, request.hi.longitude)
  top = max(request.lo.latitude, request.hi.latitude)
  bottom = min(request.lo.latitude, request.hi.latitude)
  for feature in self.db:
    if (feature.location.longitude >= left and
        feature.location.longitude <= right and
        feature.location.latitude >= bottom and
        feature.location.latitude <= top):
      yield feature
復制代碼

python里面是通過yield關鍵字實現一個生成器,流式響應的。 客戶端通過for循環或者也可以用生成器逐步獲取服務端的流式響應:

for feature in stub.ListFeatures(rectangle):
復制代碼

什么時候用Streaming RPC

  1. 大規模數據包
  2. 實時場景

gRPC的異常處理

隨着互聯網的快速發展,互聯網服務早已不是單體應用,而是由若干個模塊組成的微服務,每個模塊可以進行單獨的擴容、縮容,獨立上線部署等等;模塊與模塊之間通過網絡進行聯通。我們的應用必須對網絡錯誤進行妥善的處理。比如網絡出現抖動,正在通信的對端機器正好重新上線等。

gRPC的異常類型

gRPC有自己一套類似HTTP status code的錯誤碼,每個錯誤碼都是個字符串,如 INTERNAL、ABORTED、UNAVAILABLE。 我們經常遇到的就是“StatusCode=Unavailable, Detail="failed to connect to all addresses”這樣的報錯 原因分析

  1. RPC的客戶端請求沒有到達服務端,例如網絡解析抖動,雲主機ip地址變更等
  2. 服務端初始化的實例有問題,無法處理客戶端請求
  3. RPC客戶端與服務端的連接斷開或者連接未完成

對於以上情況,我們就需要增加重連機制、重發機制來保證服務可靠性。

重連機制

我們可以在連接gRPC服務的時候添加try except機制捕獲異常,一旦發現連接異常可以嘗試重試連接,如果在flask框架或者tornado框架里面,可以把連接方法寫成一個單例,然后在框架封裝好的請求異常處理機制里面添加重連,例如flask可以定義:

@app.errorhandler(Exception)
def handle_exception(e):
	# 此處添加e的類型判斷,如果是RPC連接出錯,執行重新連接的代碼
復制代碼

如果是tornado可以定義:

    def log_exception(self, typ, value, tb):
        if issubclass(typ, RpcConnectError):
        		# 此處添加重新連接RPC的代碼
復制代碼

這樣就可以保證在你的系統中RPC的連接異常得到重連嘗試,以達到可靠性要求

重試機制

gRPC的channel在初始化的時候可以指定options參數,我們之前指定了最大發送和最大接收的字節數大小,這里也可以指定自動重試的配置,詳情參考:github.com/grpc/propos…透明重試,透明重試會在服務端的應用邏輯並沒有接收到請求的時候,gRPC 會進行自動的重試。透明重試可以解決了上訴原因分析里面的第1、2點,我們也可以自行配置重試,通過配置service config的retryPolicy參數。

在這里插入圖片描述

 

實現案例:

options = [('grpc.max_send_message_length', 100 * 1024 * 1024),
                       ('grpc.max_receive_message_length', 100 * 1024 * 1024),
                       ('grpc.enable_retries', 1),
                       ('grpc.service_config', '{ "retryPolicy":{ "maxAttempts": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }')]
            channel = grpc.insecure_channel("{}:{}".format('localhost', 50051),
                                            options=options)
復制代碼

上面的代碼開啟了grpc.enable_retries,雖然默認開啟,但是設置為0也可以關閉掉透明重試; 另外grpc.service_config是一個配置,我們可以配置重試策略(參數配置可參考:grpc.github.io/grpc/core/g…):

{
    "retryPolicy":{
        "maxAttempts": 4,
        "initialBackoff": "0.1s",
        "maxBackoff": "1s",
        "backoffMutiplier": 2,
        "retryableStatusCodes": [
            "UNAVAILABLE" ] 
    }
}
復制代碼

針對UNAVAILABLE這種錯誤進行重試,可以指定重試次數等等,具體參數含義可參考官網,簡單介紹一下:

  • maxAttempts 必須是大於 1 的整數,對於大於5的值會被視為5
  • initialBackoff 和 maxBackoff 必須指定,並且必須具有大於0
  • backoffMultiplier 必須指定,並且大於零
  • retryableStatusCodes 必須制定為狀態碼的數據,不能為空,並且沒有狀態碼必須是有效的 gPRC 狀態碼,可以是整數形式,並且不區分大小寫

對沖策略

對沖是指在不等待響應的情況主動發送單次調用的多個請求 如果一個方法使用對沖策略,那么首先會像正常的 RPC 調用一樣發送第一次請求,如果配置時間內沒有響應,那么直接發送第二次請求,以此類推,直到發送了 maxAttempts 次

在這里插入圖片描述 當對沖請求接收到 nonFatalStatusCodes后,會立即發送下一個對沖請求,不管 hedgingDelay 如果受到其他的狀態碼,則所有未完成的對沖請求都將被取消,並且將狀態碼返回給調用者

 

實現案例

options = [('grpc.max_send_message_length', 100 * 1024 * 1024),
                       ('grpc.max_receive_message_length', 100 * 1024 * 1024),
                       ('grpc.enable_retries', 1),
                       ('grpc.service_config', '{ "hedgingPolicy":{ "maxAttempts": 4, "hedgingDelay": "0.5s", "nonFatalStatusCodes":[ "UNAVAILABLE", "INTERNAL", "ABORTED" ] } }')]
            channel = grpc.insecure_channel("{}:{}".format('localhost', 50051),
                                            options=options)
復制代碼

注意: 使用對沖的時候,請求可能會訪問到不同的后端(如果設置了負載均衡),那么就要求方法在多次執行下是安全,並且符合預期的

重試限流

當客戶端的失敗和成功比超過某個閾值時,gRPC 會通過禁用這些重試策略來防止由於重試導致服務器過載,這就是重試限流策略。同樣可以在servie config中配置:

"retryThrottling":{
    "maxTokens": 10,
    "tokenRatio": 0.1
}
復制代碼

對於每一個服務器,gRPC 客戶端會維護一個 token_count 變量,最初設置為 maxToken , 值的范圍是 0 - maxToken

對於每個 RPC 請求都會對 token_count 產生一下效果

  • 每個失敗的 RPC 請求都會遞減token_count 1
  • 成功 RPC 將會遞增 token_count和tokenRatio 如果 token_count <= ( maxTokens / 2), 則關閉重試策略,直到 token_count > (maxTokens/2),恢復重試

對於對沖 RPC,發送第一個RPC請求后,如果 token_count 大於(maxTokens/2),才會發送后續的對沖請求 當 token_count <= ( maxTokens / 2) 時,重試請求會被取消,並且將狀態碼返回給調用者

實際的限流參數配置,還是需要根據服務器的性能資源來衡量。

參考文獻


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM