Python中使用grpc與consul


gRPC 客戶端和服務端可以在多種環境中運行和交互,並且可以用任何 gRPC 支持的語言來編寫。

gRPC 支持 C++ Java Python Go Ruby C# Node.js PHP Dart 等語言

gRPC 默認使用 protocol buffers,這是 Google 開源的一種輕便高效的結構化數據存儲格式,可以用於結構化數據串行化,或者說序列化。它很適合做數據存儲或 RPC 數據交換格式。

安裝 Google Protocol Buffer

方法一(建議使用)

參考文檔:gRPC Python Quickstart

1. 安裝 gRPC 
python -m pip install grpcio
# 或者 sudo python -m pip install grpcio # 在 El Capitan OSX 系統下可能會看到以下報錯 $ OSError: [Errno 1] Operation not permitted: '/tmp/pip-qwTLbI-uninstall/System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/six-1.4.1-py2.7.egg-info' # 可以使用以下命令 python -m pip install grpcio --ignore-installed 復制代碼
2. 安裝 gRPC tools

Python gPRC tools 包含 protocol buffer 編譯器和用於從 .proto 文件生成服務端和客戶端代碼的插件

python -m pip install grpcio-tools
復制代碼

 

 

Protobuf 基本使用

定義一個消息類型

先來看一個非常簡單的例子。假設你想定義一個“搜索請求”的消息格式,每一個請求含有一個查詢字符串、你感興趣的查詢結果所在的頁數,以及每一頁多少條查詢結果。可以采用如下的方式來定義消息類型的.proto文件了:

syntax = "proto3"; // 聲明使用 proto3 語法 message SearchRequest { string query = 1; // 每個字段都要指定數據類型 int32 page_number = 2; // 這里的數字2 是標識符,最小的標識號可以從1開始,最大到2^29 - 1, or 536,870,911。不可以使用其中的[19000-19999] int32 result_per_page = 3; // 這里是注釋,使用 // } 復制代碼
  • 文章的第一行指定了你正在使用 proto3 語法:如果不指定,編譯器會使用 proto2。這個指定語法必須是文件的非空非注釋的第一行
  • SearchRequest消息格式有三個字段,在消息中承載的數據分別對應於每一個字段。其中每個字段都有一個名字和一種類型。
  • 向.proto文件添加注釋,可以使用C/C++/java風格的雙斜杠(//) 語法格式。
  • 在消息體中,每個字段都有唯一的一個數字標識符。這些標識符用來在消息的二進制格式中識別各個字段,一旦開始使用就不能再改變。

[1,15]之內的標識號在編碼的時候會占用一個字節。[16,2047]之內的標識號則占用2個字節。所以應該為那些頻繁出現的消息元素保留 [1,15]之內的標識號。切記:要為將來有可能添加的、頻繁出現的標識號預留一些標識號。

指定字段規則

所指定的消息字段修飾符必須是如下之一:

  • singular:一個格式良好的消息應該有0個或者1個這種字段(但是不能超過1個)。

  • repeated:在一個格式良好的消息中,這種字段可以重復任意多次(包括0次)。重復的值的順序會被保留。

    在proto3中,repeated的標量域默認情況蝦使用packed。

    message Test4 { repeated int32 d = 4 [packed=true]; } 復制代碼

數值類型

一個標量消息字段可以含有一個如下的類型——該表格展示了定義於.proto文件中的類型,以及與之對應的、在自動生成的訪問類中定義的類型:

.proto Type Notes C++ Type Java Type Python Type[2] Go Type Ruby Type
double   double double float float64 Float
float   float float float float32 Float
int32 使用變長編碼,對於負值的效率很低,如果你的域有可能有負值,請使用sint64替代 int32 int int int32 Fixnum 或者 Bignum(根據需要)
uint32 使用變長編碼 uint32 int int/long uint32 Fixnum 或者 Bignum(根據需要)
uint64 使用變長編碼 uint64 long int/long uint64 Bignum
sint32 使用變長編碼,這些編碼在負值時比int32高效的多 int32 int int int32 Fixnum 或者 Bignum(根據需要)
sint64 使用變長編碼,有符號的整型值。編碼時比通常的int64高效。 int64 long int/long int64 Bignum
fixed32 總是4個字節,如果數值總是比總是比228大的話,這個類型會比uint32高效。 uint32 int int uint32 Fixnum 或者 Bignum(根據需要)
fixed64 總是8個字節,如果數值總是比總是比256大的話,這個類型會比uint64高效。 uint64 long int/long uint64 Bignum
sfixed32 總是4個字節 int32 int int int32 Fixnum 或者 Bignum(根據需要)
sfixed64 總是8個字節 int64 long int/long int64 Bignum
bool   bool boolean bool bool TrueClass/FalseClass
string 一個字符串必須是UTF-8編碼或者7-bit ASCII編碼的文本。 string String str/unicode string String (UTF-8)
bytes 可能包含任意順序的字節數據。 string ByteString str []byte String (ASCII-8BIT)

默認值

當一個消息被解析的時候,如果被編碼的信息不包含一個特定的singular元素,被解析的對象鎖對應的域被設置位一個默認值,對於不同類型指定如下:

  • 對於strings,默認是一個空string

  • 對於bytes,默認是一個空的bytes

  • 對於bools,默認是false

  • 對於數值類型,默認是0

  • 對於枚舉,默認是第一個定義的枚舉值,必須為0;

  • 對於消息類型(message),域沒有被設置,確切的消息是根據語言確定的,詳見generated code guide

    對於可重復域的默認值是空(通常情況下是對應語言中空列表)。

嵌套類型

你可以在其他消息類型中定義、使用消息類型,在下面的例子中,Result消息就定義在SearchResponse消息內,如:

message SearchResponse { message Result { string url = 1; string title = 2; repeated string snippets = 3; } repeated Result results = 1; } 復制代碼

在 message SearchResponse 中,定義了嵌套消息 Result,並用來定義SearchResponse消息中的results域。

Protobuf 文件編譯

從.proto文件生成了什么?

當用protocol buffer編譯器來運行.proto文件時,編譯器將生成所選擇語言的代碼,這些代碼可以操作在.proto文件中定義的消息類型,包括獲取、設置字段值,將消息序列化到一個輸出流中,以及從一個輸入流中解析消息。

  • 對C++來說,編譯器會為每個.proto文件生成一個.h文件和一個.cc文件,.proto文件中的每一個消息有一個對應的類。
  • 對Java來說,編譯器為每一個消息類型生成了一個.java文件,以及一個特殊的Builder類(該類是用來創建消息類接口的)。
  • 對Python來說,有點不太一樣——Python編譯器為.proto文件中的每個消息類型生成一個含有靜態描述符的模塊,,該模塊與一個元類(metaclass)在運行時(runtime)被用來創建所需的Python數據訪問類。
  • 對go來說,編譯器會位每個消息類型生成了一個.pd.go文件。
  • 對於Ruby來說,編譯器會為每個消息類型生成了一個.rb文件。
  • javaNano來說,編譯器輸出類似域java但是沒有Builder類
  • 對於Objective-C來說,編譯器會為每個消息類型生成了一個pbobjc.h文件和pbobjcm文件,.proto文件中的每一個消息有一個對應的類。
  • 對於C#來說,編譯器會為每個消息類型生成了一個.cs文件,.proto文件中的每一個消息有一個對應的類。

Python gRPC 示例

編譯

這里我們用Python 編譯一下,看得到什么:

// 文件名 hello.proto
syntax = "proto3";

package hello;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

使用以下命令編譯:

python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. ./hello.proto

生成了兩個文件:

  • hello_pb2.py 此文件包含生成的 request(HelloRequest) 和 response(HelloReply) 類。
  • hello_pb2_grpc.py 此文件包含生成的 客戶端(GreeterStub)和服務端(GreeterServicer)的類。

源碼地址為github.com/grpc/grpc/b…

雖然現在已經生成了服務端和客戶端代碼,但是我們還需要手動實現以及調用的方法。

創建服務端代碼

創建和運行 Greeter 服務可以分為兩個部分:

  • 實現我們服務定義的生成的服務接口:做我們的服務的實際的“工作”的函數。

  • 運行一個 gRPC 服務器,監聽來自客戶端的請求並傳輸服務的響應。

在當前目錄,打開文件 greeter_server.py,實現一個新的函數:

from concurrent import futures
import time

import grpc

import hello_pb2
import hello_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24


class Greeter(hello_pb2_grpc.GreeterServicer):
    # 工作函數
    def SayHello(self, request, context):
        return hello_pb2.HelloReply(message='Hello, %s!' % request.name)


def serve():
    # gRPC 服務器
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()  # start() 不會阻塞,如果運行時你的代碼沒有其它的事情可做,你可能需要循環等待。
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    serve()

 

更新客戶端代碼

在當前目錄,打開文件 greeter_client.py,實現一個新的函數:

from __future__ import print_function

import grpc

import hello_pb2
import hello_pb2_grpc


def run():
    channel = grpc.insecure_channel('localhost:50051')
    stub = hello_pb2_grpc.GreeterStub(channel)
    response = stub.SayHello(hello_pb2.HelloRequest(name='goodspeed'))
    print("Greeter client received: " + response.message)


if __name__ == '__main__':
    run()

 

 

對於返回單個應答的 RPC 方法("response-unary" 方法),gRPC Python 同時支持同步(阻塞)和異步(非阻塞)的控制流語義。對於應答流式 RPC 方法,調用會立即返回一個應答值的迭代器。調用迭代器的 next() 方法會阻塞,直到從迭代器產生的應答變得可用。

運行代碼

  1. 首先運行服務端代碼
python greeter_server.py
  1. 然后運行客戶端代碼
python greeter_client.py
# output Greeter client received: Hello, goodspeed!

源碼地址: https://github.com/grpc/grpc/tree/master/examples/python

 

Consul

#pip install python-consul
import consul
 
class Consul(object):
    def __init__(self, host, port):
        '''初始化,連接consul服務器'''
        self._consul = consul.Consul(host, port)
 
    def RegisterService(self, name, host, port, tags=None):
        tags = tags or []
        # 注冊服務
        self._consul.agent.service.register(
            name,
            name,
            host,
            port,
            tags,
            # 健康檢查ip端口,檢查時間:5,超時時間:30,注銷時間:30s
            check=consul.Check().tcp(host, port, "5s", "30s", "30s"))
 
    def GetService(self, name):
        services = self._consul.agent.services()
        service = services.get(name)
        if not service:
            return None, None
        addr = "{0}:{1}".format(service['Address'], service['Port'])
        return service, addr
 
if __name__ == '__main__':
    host="10.0.0.11" #consul服務器的ip
    port="" #consul服務器對外的端口
    consul_client=Consul(host,port)
 
    name="maple"
    host="10.0.0.11"
    port=8900
    consul_client.RegisterService(name,host,port)
 
    check = consul.Check().tcp(host, port, "5s", "30s", "30s")
    print(check)
    res=consul_client.GetService("maple")
    print(res)

 

consul與grpc實現服務發現

參考:https://www.cnblogs.com/yuzhenjie/p/9398569.html

 

 

參考鏈接

作者:goodspeed
鏈接:https://juejin.im/post/6844903618026405902
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM