在Django中使用zerorpc
前言
隨着系統架構從集中式單點服務器到分布式微服務方向的遷移,RPC是一個不可回避的話題.如何在系統中引入對開發者友好,性能可靠的RPC服務是一個值得深思的問題.
在調研了Thrift,gRPC,zerorpc等方案后,基於以下2點最后選擇了zerorpc:
- Thrift,gRPC學習成本高,開發者需要重新定義返回結構增加了工作量
- zerorpc完美契合Python,能快速開發,並且支持Node.js,適用於當前技術棧
問題
雖然zerorpc可以直接嵌入當前系統框架中,但是還是有一些問題需要去考慮解決
-
rpc 接口如何定義
-
rpc 服務如何啟動
-
高並發情況下客戶端的可靠性
服務端
在當前的系統中大量使用Celery,djang-celery定義Task的方式是在每個install app中定義tasks.py文件,然后通過@task裝飾器來生成Task.所以這里為了方便定義rpc interface設計一套類似於Celery的規范.需要輸出rpc interface的app下面創建rpcs.py文件
# rpcs.py
# coding: utf-8
from eebo.core.utils.zrpc import rpc
from .models import Ticket
from .serializers import TicketSerializer
@rpc.register()
def get_ticket():
t = Ticket.objects.first()
s = TicketSerializer(t)
return s.data
@rpc.register(name='ticket_list', stream=True)
def get_tickets(n):
qs = Ticket.objects.all()[:n]
s = TicketSerializer(qs, many=True)
return iter(s.data)
rpc.register裝飾器用來注冊函數到rpc服務上,可選參數:
- name: 客戶調用方法名稱, 沒有寫的情況下就是func name如get_ticket
- stream: 默認False, 如果為True, 則使用zerorpc的流式響應傳輸, 數據量比較大的情況時使用, 返回可迭代對象
我們來看看eebo.core.utils.zrpc如何來實現這個注冊過程:
# coding: utf-8
import zerorpc
class RPC(object):
@classmethod
def register(cls, name=None, stream=False):
def _wrapper(func):
setattr(cls, name or func.__name__, zerorpc.stream(
lambda self, *args, **kwargs: func(*args, **kwargs)) if stream
else staticmethod(func))
return func
return _wrapper
rpc = RPC()
通過一個類方法來往類上面綁定方法,需要注意的是name的定義必須是全局唯一的.
現在我們有了定義rpc interface的方法,下面來看看如何啟動rpc server.
# runrpc.py
# coding: utf-8
import re
import sys
import imp as _imp
import importlib
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from eebo.core.utils.zrpc import rpc, ServerExecMiddleware
naiveip_re = re.compile(r"""^(?:
(?P<addr>
(?P<ipv4>\d{1,3}(?:\.\d{1,3}){3}) | # IPv4 address
(?P<ipv6>\[[a-fA-F0-9:]+\]) | # IPv6 address
(?P<fqdn>[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)*) # FQDN
):)?(?P<port>\d+)$""", re.X)
class Command(BaseCommand):
help = "Starts a lightweight RPC server for development."
default_addr = '127.0.0.1'
default_port = '4242'
def add_arguments(self, parser):
parser.add_argument('addrport',
nargs='?',
help='Optional port number, or ipaddr:port')
def handle(self, *args, **options):
self.use_ipv6 = False
if not options['addrport']:
self.addr = ''
self.port = self.default_port
else:
m = re.match(naiveip_re, options['addrport'])
if m is None:
raise CommandError('"%s" is not a valid port number '
'or address:port pair.' %
options['addrport'])
self.addr, _ipv4, _ipv6, _fqdn, self.port = m.groups()
if not self.port.isdigit():
raise CommandError("%r is not a valid port number." %
self.port)
if self.addr:
if _ipv6:
self.addr = self.addr[1:-1]
self.use_ipv6 = True
self._raw_ipv6 = True
elif self.use_ipv6 and not _fqdn:
raise CommandError('"%s" is not a valid IPv6 address.' %
self.addr)
if not self.addr:
self.addr = self.default_addr_ipv6 if self.use_ipv6 else self.default_addr
self._raw_ipv6 = self.use_ipv6
self.run(**options)
def run(self, **options):
"""Run the server, using the autoreloader if needed."""
self.autodiscover_rpc()
server = self.get_server()
try:
server.run()
except KeyboardInterrupt:
server.close()
sys.exit(0)
def autodiscover_rpc(self, related_name='rpcs'):
for pkg in settings.INSTALLED_APPS:
try:
pkg_path = importlib.import_module(pkg).__path__
except AttributeError:
continue
try:
_imp.find_module(related_name, pkg_path)
except ImportError:
continue
try:
importlib.import_module('{0}.{1}'.format(pkg, related_name))
except ImportError:
pass
def get_server(self, *args, **options):
"""Return the default zerorpc server for the runner."""
import zerorpc
server = zerorpc.Server(rpc, heartbeat=30)
server.bind("tcp://{0}:{1}".format(self.addr, self.port))
# close django old connections
zerorpc.Context.get_instance().register_middleware(ServerExecMiddleware())
# for sentry
try:
from raven.contrib.zerorpc import SentryMiddleware
if hasattr(settings, 'RAVEN_CONFIG'):
sentry = SentryMiddleware(hide_zerorpc_frames=False,
dsn=settings.RAVEN_CONFIG['dsn'])
zerorpc.Context.get_instance().register_middleware(sentry)
except ImportError:
pass
return server
runrpc.py是一個Django management commands 文件需要放到某個install app目錄的management/commands下面,啟動服務器:
python manage.py runrpc 0.0.0.0:4242
autodiscover_rpc自動發現rpc interface注冊函數get_server生成zerorpc server對象
在get_server中對zerorpc注冊了2個中間件,SentryMiddleware用於捕獲rpc interface拋出的異常發送到sentry,ServerExecMiddleware用於處理Django db connection,看看代碼:
# zrpc.py
# coding: utf-8
from django.db import close_old_connections
class ServerExecMiddleware(object):
def server_before_exec(self, request_event):
close_old_connections()
def server_after_exec(self, request_event, reply_event):
close_old_connections()
在每個rpc interface被調用前與調用后都調用close_old_connections關閉db connection,這里是為了實現django.db中對請求處理前與處理后注冊信號:
django.db.__init__.py signals.request_started.connect(close_old_connections) signals.request_finished.connect(close_old_connections)
目的是保證在rpc interface中使用ORM時,connection沒有超時斷開.
客戶端
由於rpc的調用是阻塞的,不能全局只創建一個client.但是也不能每個請求都創建client,所以這里參考redis-py的client實現,定義一個支持連接池的zerorpc client.
# zrpc.py
# coding: utf-8
import os
import zerorpc
from redis.connection import BlockingConnectionPool
from gevent.queue import LifoQueue
class Connection(object):
def __init__(self, connect_to, heartbeat=30):
self.client = zerorpc.Client(heartbeat=heartbeat)
self.client.connect(connect_to)
self.pid = os.getpid()
def disconnect(self):
self.client.close()
class RPCClient(object):
def __init__(self, connect_to, heartbeat=30):
self.connection_pool = BlockingConnectionPool(connection_class=Connection,
queue_class=LifoQueue, timeout=heartbeat, connect_to=connect_to, heartbeat=heartbeat)
def close(self):
self.connection_pool.disconnect()
def __getattr__(self, name):
return lambda *args, **kwargs: self(name, *args, **kwargs)
def __call__(self, name, *args, **kwargs):
connection = self.connection_pool.get_connection('')
try:
return getattr(connection.client, name)(*args, **kwargs)
finally:
self.connection_pool.release(connection)
這里直接復用了redis-py定義的連接池,當前系統使用gunicorn + gevent的方式啟動Django服務,所以queue_class使用了gevent的LifoQueue.
在使用過程中還發現了這個問題:
需要打個補丁解決:
import zmq.green as zmq # patch zmq garbage-collection Thread to use green Context: from zmq.utils.garbage import gc gc.context = zmq.Context()
總結
技術的選型需要契合項目實際情況,不要盲目上新技術引入不必要的成本.為了推廣方案,必須全局的考慮方案是否易使用,是否易部署.
完整代碼:
https://gist.github.com/zhu327/5b6c06eccc5758d4e642ee899a518687
