前言:
最近幫朋友review其模塊服務代碼, 使用的是python的twisted網絡框架. 鑒於之前並沒有使用過, 於是決定好好研究一番.
twisted的reactor模型很好的處理了網絡IO事件, 以及定時任務觸發. 但包處理后的業務邏輯操作, 需要根據具體的場景來決定.
本文將講述twisted如何實現half-sync/half-async的模式, 其線程池和defer模式是如何設計和使用的.
場景構造:
twisted服務接受業務請求, 后端需要訪問mysql. 由於mysql的接口是同步的, 如果安裝twisted默認的方式處理話, 其業務操作(mysql)會阻塞reactor的IO事件循環. 這大大降低了twisted的服務能力.
為了解決該類問題, twisted支持線程池. 把業務邏輯和IO事件分離, IO操作依舊是異步的, 而業務邏輯則采用線程池來處理.
工作線程池:
在具體講述defer模式之前, 先談談reactor自帶的線程池, 這也符合使用half-sync/half-async模式的直觀理解.
先來構造下一個基礎樣例代碼:
#! /usr/bin/python #-*- coding: UTF-8 -*- from twisted.internet import reactor from twisted.internet import protocol from twisted.protocols.basic import LineReceiver import time class DemoProtocol(LineReceiver): def lineReceived(self, line): # 進行數據包的處理 reactor.callInThread(self.handle_request, line) def handle_request(self, line): """ hanlde_request: 進行具體的業務邏輯處理 """ # 邊使用sleep(1)來代替模擬 time.sleep(1) # 借助callFromThread響應結果 reactor.callFromThread(self.write_response, line) def write_response(self, result): self.transport.write("ack:" + str(result) + "\r\n") class DemoProtocolFactory(protocol.Factory): def buildProtocol(self, addr): return DemoProtocol() reactor.listenTCP(9090, DemoProtocolFactory()) reactor.run()
DemoProtocol在收到一行消息, 需要處理一個業務需耗時一秒, 於是其調用callInThread來借助reactor的線程池來執行.
其callInThread的函數定義如下:
def callInThread(self, _callable, *args, **kwargs): self.getThreadPool().callInThread(_callable, *args, **kwargs)
從中, 我們可以印證之前的觀點, 借助線程池來完成耗時阻塞的業務工作.
再來看一下callFromThread的函數定義:
def callFromThread(self, f, *args, **kw): assert callable(f), "%s is not callable" % (f,) self.threadCallQueue.append((f, args, kw)) self.wakeUp()
其作用是把回調放入主線程(也是reactor主事件循環)的待執行隊列中, 並及時喚醒reactor.
我們把寫入響應的操作放入主循環中, 是為了讓IO集中在主循環中進行, 避免潛在的線程不安全的問題.
defer模式:
直接使用reactor的線程池, 非常容易實現half-sync/half-async的模式, 也讓IO和業務邏輯隔離. 但reactor設計之初, 更傾向於隱藏其內部的線程池. 於是其引入了defer模式.
讓我們實現與上等同的代碼片段:
#! /usr/bin/python #-*- coding: UTF-8 -*- from twisted.internet import reactor from twisted.internet import protocol from twisted.protocols.basic import LineReceiver from twisted.internet.threads import deferToThread import time class DemoProtocol(LineReceiver): def lineReceived(self, line): # 進行數據包的處理 deferToThread(self.handle_request, line).addCallback(self.write_response) def handle_request(self, line): """ hanlde_request: 進行具體的業務邏輯處理 """ # 邊使用sleep(1)來代替模擬 time.sleep(1) return line def write_response(self, result): self.transport.write("ack:" + str(result) + "\r\n") class DemoProtocolFactory(protocol.Factory): def buildProtocol(self, addr): return DemoProtocol() reactor.listenTCP(9090, DemoProtocolFactory()) reactor.run()
使用defer后, 代碼更加的簡潔. 其defer對象, 其實借用了線程池.
threads.deferToThread定義如下:
def deferToThread(f, *args, **kwargs): from twisted.internet import reactor return deferToThreadPool(reactor, reactor.getThreadPool(), f, *args, **kwargs) def deferToThreadPool(reactor, threadpool, f, *args, **kwargs): d = defer.Deferred() def onResult(success, result): if success: reactor.callFromThread(d.callback, result) else: reactor.callFromThread(d.errback, result) threadpool.callInThreadWithCallback(onResult, f, *args, **kwargs) return d
這邊我們可以發現deferToThread, 就是間接調用了callInThread函數, 另一方面, 對其回調函數的執行結果, 進行了onCallback, 以及onErrback的調用. 這些回調函數在主線程中運行.
defer模式簡化了程序編寫, 也改變了人們開發的思維模式.
測試回顧:
使用telnet進行測試, 結果正常.
另一方面, twisted的線程池, 其默認是采用延遲初始化的方式.
服務開啟時, 只有主線程一個, 隨着請求的到來, 其按需產生更多的worker thread.
而其線程池默認為10. 我們可以借助suggestThreadPoolSize方法來修改.
寫在最后:
如果你覺得這篇文章對你有幫助, 請小小打賞下. 其實我想試試, 看看寫博客能否給自己帶來一點小小的收益. 無論多少, 都是對樓主一種由衷的肯定.