介紹
使用python做web開發面臨的一個最大的問題就是性能,在解決C10K問題上顯的有點吃力。有些異步框架Tornado、Twisted、Gevent 等就是為了解決性能問題。這些框架在性能上有些提升,但是也出現了各種古怪的問題難以解決。
在python3.6中,官方的異步協程庫asyncio正式成為標准。在保留便捷性的同時對性能有了很大的提升,已經出現許多的異步框架使用asyncio。
使用較早的異步框架是aiohttp,它提供了server端和client端,對asyncio做了很好的封裝。但是開發方式和最流行的微框架flask不同,flask開發簡單,輕量,高效。正是結合這些優點, 以Sanic為基礎,集成多個流行的庫來搭建微服務。Sanic框架是和Flask相似的異步協程框架,簡單輕量,並且性能很高。本項目就是以Sanic為基礎搭建的python微服務框架。(思想適用於其他語言)
微服務設計原則個人總結:

X 軸 :指的是水平復制,很好理解,就是講單體系統多運行幾個實例,做個集群加負載均衡的模式。Z 軸 :是基於類似的數據分區,比如一個互聯網打車應用突然或了,用戶量激增,集群模式撐不住了,那就按照用戶請求的地區進行數據分區,北京、上海、四川等多建幾個集群。簡單理解數據庫拆分,比如分庫分表 Y 軸 :就是我們所說的微服務的拆分模式,就是基於不同的業務拆分。
微服務總體架構: 
特點
- 使用sanic異步框架,簡單,輕量,高效。
- 使用uvloop為核心引擎,使sanic在很多情況下單機並發甚至不亞於Golang。
- 使用asyncpg為數據庫驅動,進行數據庫連接,執行sql語句執行。
- 使用aiohttp為Client,對其他微服務進行訪問。
- 使用peewee為ORM,但是只是用來做模型設計和migration。
- 使用opentracing為分布式追蹤系統。
- 使用unittest做單元測試,並且使用mock來避免訪問其他微服務。
- 使用swagger做API標准,能自動生成API文檔。
服務端
使用sanic異步框架,有較高的性能,但是使用不當會造成blocking, 對於有IO請求的都要選用異步庫。添加庫要慎重 。sanic使用uvloop異步驅動,uvloop基於libuv使用Cython編寫,性能比nodejs還要高。功能說明:
啟動前
@app.listener('before_server_start')
async def before_srver_start(app,loop):queue= asyncio.Queue()
app.queue=queueloop.create_task(consume(queue, app.config.ZIPKIN_SERVER))
reporter = AioReporter(queue=queue)
tracer = BasicTracer(recorder=reporter)
tracer.register_required_propagators()
opentracing.tracer = tracer
app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)
- 創建DB連接池
- 創建Client連接
- 創建queue, 消耗span,用於日志追蹤
- 創建opentracing.tracer進行日志追蹤
中間件
@app.middleware('request')asyncdefcros(request):ifrequest.method =='POST'orrequest.method =='PUT':
request['data'] = request.json
span = before_request(request)
request['span'] = span@app.middleware('response')asyncdefcors_res(request, response):span = request['span']if'span'inrequestelseNoneifresponseisNone:returnresponse
result = {'code':}ifnotisinstance(response, HTTPResponse):ifisinstance(response, tuple)andlen(response) ==2:
result.update({'data': response[],'pagination': response[1]
})else:
result.update({'data': response})
response = json(result)ifspan:
span.set_tag('http.status_code',"200")ifspan:
span.set_tag('component', request.app.name)
span.finish()returnresponse
- 創建span, 用於日志追蹤
- 對response進行封裝,統一格式
異常處理
對拋出的異常進行處理,返回統一格式
任務
創建task消費queue中對span,用於日志追蹤
異步處理
Example:
asyncdefasync_request(datas):# async handler requestresults =awaitasyncio.gather(*[data[2]fordataindatas])forindex, objinenumerate(results):
data = datas[index]
data[][data[1]] = results[index]@user_bp.get('/<id:int>')@doc.summary("get user info")@doc.description("get user info by id")@doc.produces(Users)asyncdefget_users_list(request, id):asyncwithrequest.app.db.acquire(request)ascur:
record =awaitcur.fetch(""" SELECT * FROM users WHERE id = $1 """, id)
datas = [
[record,'city_id', get_city_by_id(request, record['city_id'])]
[record,'role_id', get_role_by_id(request, record['role_id'])]
]awaitasync_request(datas)returnrecord
get_city_by_id, get_role_by_id是並行處理。
相關連接
sanic
模型設計 & ORM
Peewee is a simple and small ORM. It has few (but expressive) concepts, making it easy to learn and intuitive to use。ORM使用peewee, 只是用來做模型設計和migration, 數據庫操作使用asyncpg。
Example:
# models.pyclassUsers(Model):id = PrimaryKeyField()
create_time = DateTimeField(verbose_name='create time',
default=datetime.datetime.utcnow)
name = CharField(max_length=128, verbose_name="user's name")
age = IntegerField(null=False, verbose_name="user's age")
sex = CharField(max_length=32, verbose_name="user's sex")
city_id = IntegerField(verbose_name='city for user', help_text=CityApi)
role_id = IntegerField(verbose_name='role for user', help_text=RoleApi)classMeta:db_table ='users'# migrations.pyfromsanic_ms.migrationsimportMigrationModel, info, dbclassUserMigration(MigrationModel):_model = Users# @info(version="v1")# def migrate_v1(self):# migrate(self.add_column('sex'))defmigrations():try:
um = UserMigration()withdb.transaction():
um.auto_migrate()
print("Success Migration")exceptExceptionase:raiseeif__name__ =='__main__':
migrations()
- 運行命令 python migrations.py
- migrate_v1函數添加字段sex, 在BaseModel中要先添加name字段
- info裝飾器會創建表migrate_record來記錄migrate,version每個model中必須唯一,使用version來記錄是否執行過,還可以記錄author,datetime
- migrate函數必須以 migrate_ 開頭
相關連接
peewee
數據庫操作
asyncpg is the fastest driver among common Python, NodeJS and Go implementations使用asyncpg為數據庫驅動, 對數據庫連接進行封裝, 執行數據庫操作。
不使用ORM做數據庫操作,一個原因是性能,ORM會有性能的損耗,並且無法使用asyncpg高性能庫。另一個是單個微服務是很簡單的,表結構不會很復雜,簡單的SQL語句就可以處理來,沒必要引入ORM。使用peewee只是做模型設計
Example:
sql ="SELECT * FROM users WHERE name=$1"name="test"asyncwithrequest.app.db.acquire(request)ascur:data= await cur.fetchrow(sql,name)
asyncwithrequest.app.db.transaction(request)ascur:data= await cur.fetchrow(sql,name)
- acquire() 函數為非事務, 對於只涉及到查詢的使用非事務,可以提高查詢效率
- tansaction() 函數為事務操作,對於增刪改必須使用事務操作
- 傳入request參數是為了獲取到span,用於日志追蹤
- TODO 數據庫讀寫分離
相關連接
asyncpgbenchmarks
客戶端
使用aiohttp中的client,對客戶端進行了簡單的封裝,用於微服務之間訪問。Don’t create a session per request. Most likely you need a session per application which performs all requests altogether. A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.
Example:
@app.listener('before_server_start')asyncdefbefore_srver_start(app, loop):app.client = Client(loop, url='http://host:port')asyncdefget_role_by_id(request, id):cli = request.app.client.cli(request)asyncwithcli.get('/cities/{}'.format(id))asres:returnawaitres.json()@app.listener('before_server_stop')asyncdefbefore_server_stop(app, loop):app.client.close()
對於訪問不同的微服務可以創建多個不同的client,這樣每個client都會keep-alives
日志 & 分布式追蹤系統
裝飾器logger
@logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO)
asyncdefget_city_by_id(request, id):
cli = request.app.client.cli(request)
- type: 日志類型,如 method, route
- category: 日志類別,默認為app的name
- detail: 日志詳細信息
- description: 日志描述,默認為函數的注釋
- tracing: 日志追蹤,默認為True
- level: 日志級別,默認為INFO
分布式追蹤系統
- OpenTracing是以Dapper,Zipkin等分布式追蹤系統為依據, 建立了統一的標准。
- Opentracing跟蹤每一個請求,記錄請求所經過的每一個微服務,以鏈條的方式串聯起來,對分析微服務的性能瓶頸至關重要。
- 使用opentracing框架,但是在輸出時轉換成zipkin格式。因為大多數分布式追蹤系統考慮到性能問題,都是使用的thrift進行通信的,本着簡單,Restful風格的精神,沒有使用RPC通信。以日志的方式輸出, 可以使用fluentd, logstash等日志收集再輸入到Zipkin。Zipkin是支持HTTP輸入的。
- 生成的span先無阻塞的放入queue中,在task中消費隊列的span。后期可以添加上采樣頻率。
- 對於DB,Client都加上了tracing
相關連接
opentracingzipkinjaeger
API接口
api文檔使用swagger標准。Example:
fromsanic_msimportdoc@user_bp.post('/')@doc.summary('create user')@doc.description('create user info')@doc.consumes(Users)@doc.produces({'id': int})asyncdefcreate_user(request):data = request['data']asyncwithrequest.app.db.transaction(request)ascur:
record =awaitcur.fetchrow(""" INSERT INTO users(name, age, city_id, role_id)
VALUES($1, $2, $3, $4, $5)
RETURNING id
""", data['name'], data['age'], data['city_id'], data['role_id']
)return{'id': record['id']}
- summary: api概要
- description: 詳細描述
- consumes: request的body數據
- produces: response的返回數據
- tag: API標簽
- 在consumes和produces中傳入的參數可以是peewee的model,會解析model生成API數據, 在field字段的help_text參數來表示引用對象
- http://host:ip/openapi/spec.json 獲取生成的json數據
相關連接
swagger
Response 數據
在返回時,不要返回sanic的response,直接返回原始數據,會在Middleware中對返回的數據進行處理,返回統一的格式,具體的格式可以[查看]
單元測試
單元測試使用unittest。mock是自己創建了MockClient,因為unittest還沒有asyncio的mock,並且sanic的測試接口也是發送request請求,所以比較麻煩. 后期可以使用pytest。Example:
from sanic_ms.tests import APITestCase
from server import appclassTestCase(APITestCase):_app = app
_blueprint ='visit'defsetUp(self):super(TestCase,self).setUp()self._mock.get('/cities/1',
payload={'id':1,'name':'shanghai'})self._mock.get('/roles/1',
payload={'id':1,'name':'shanghai'})deftest_create_user(self):
data = {'name':'test','age':2,'city_id':1,'role_id':1,
}
res =self.client.create_user(data=data)
body = ujson.loads(res.text)self.assertEqual(res.status,200)
- 其中_blueprint為blueprint名稱
- 在setUp函數中,使用_mock來注冊mock信息, 這樣就不會訪問真實的服務器, payload為返回的body信息
- 使用client變量調用各個函數, data為body信息,params為路徑的參數信息,其他參數是route的參數
代碼覆蓋
coverage erase
coverage run --source . -m sanic_ms tests
coveragexml-oreports/coverage.xmlcoverage2clover-i reports/coverage.xml-oreports/clover.xmlcoveragehtml -d reports
- coverage2colver 是將coverage.xml 轉換成 clover.xml,bamboo需要的格式是clover的。
相關連接
unittestcoverage
異常處理
使用 app.error_handler = CustomHander() 對拋出的異常進行處理 Example:
fromsanic_ms.exceptionimportServerError@visit_bp.delete('/users/<id:int>')asyncdefdel_user(request, id):raiseServerError(error='內部錯誤',code=10500, message="msg")
- code: 錯誤碼,無異常時為0,其余值都為異常
- message: 狀態碼信息
- error: 自定義錯誤信息
- status_code: http狀態碼,使用標准的http狀態碼
