twisted的defer模式和線程池


 

前言:
  最近幫朋友review其模塊服務代碼, 使用的是python的twisted網絡框架. 鑒於之前並沒有使用過, 於是決定好好研究一番.
  twisted的reactor模型很好的處理了網絡IO事件, 以及定時任務觸發. 但包處理后的業務邏輯操作, 需要根據具體的場景來決定.
  本文將講述twisted如何實現half-sync/half-async的模式, 其線程池和defer模式是如何設計和使用的.

場景構造:
  twisted服務接受業務請求, 后端需要訪問mysql. 由於mysql的接口是同步的, 如果安裝twisted默認的方式處理話, 其業務操作(mysql)會阻塞reactor的IO事件循環. 這大大降低了twisted的服務能力.
  為了解決該類問題, twisted支持線程池. 把業務邏輯和IO事件分離, IO操作依舊是異步的, 而業務邏輯則采用線程池來處理.

  

工作線程池:
  在具體講述defer模式之前, 先談談reactor自帶的線程池, 這也符合使用half-sync/half-async模式的直觀理解.
  先來構造下一個基礎樣例代碼:

#! /usr/bin/python
#-*- coding: UTF-8 -*-

from twisted.internet import reactor
from twisted.internet import protocol
from twisted.protocols.basic import LineReceiver

import time

class DemoProtocol(LineReceiver):
               
    def lineReceived(self, line):
        # 進行數據包的處理
        reactor.callInThread(self.handle_request, line)
    
    def handle_request(self, line):
        """
            hanlde_request:
                進行具體的業務邏輯處理 
        """
        # 邊使用sleep(1)來代替模擬
        time.sleep(1)
        # 借助callFromThread響應結果
        reactor.callFromThread(self.write_response, line)
    
    def write_response(self, result):
        self.transport.write("ack:" + str(result) + "\r\n")

class DemoProtocolFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return DemoProtocol()
    

reactor.listenTCP(9090, DemoProtocolFactory())
reactor.run()

  DemoProtocol在收到一行消息, 需要處理一個業務需耗時一秒, 於是其調用callInThread來借助reactor的線程池來執行.
  其callInThread的函數定義如下:

    def callInThread(self, _callable, *args, **kwargs):
            self.getThreadPool().callInThread(_callable, *args, **kwargs)

  從中, 我們可以印證之前的觀點, 借助線程池來完成耗時阻塞的業務工作.
  再來看一下callFromThread的函數定義:

    def callFromThread(self, f, *args, **kw):
            assert callable(f), "%s is not callable" % (f,)
            self.threadCallQueue.append((f, args, kw))
            self.wakeUp()

  其作用是把回調放入主線程(也是reactor主事件循環)的待執行隊列中, 並及時喚醒reactor.
  我們把寫入響應的操作放入主循環中, 是為了讓IO集中在主循環中進行, 避免潛在的線程不安全的問題.

defer模式:
  直接使用reactor的線程池, 非常容易實現half-sync/half-async的模式, 也讓IO和業務邏輯隔離. 但reactor設計之初, 更傾向於隱藏其內部的線程池. 於是其引入了defer模式.
  讓我們實現與上等同的代碼片段:

#! /usr/bin/python
#-*- coding: UTF-8 -*-

from twisted.internet import reactor
from twisted.internet import protocol
from twisted.protocols.basic import LineReceiver
from twisted.internet.threads import deferToThread

import time

class DemoProtocol(LineReceiver):
               
    def lineReceived(self, line):
        # 進行數據包的處理
        deferToThread(self.handle_request, line).addCallback(self.write_response)
    
    def handle_request(self, line):
        """
            hanlde_request:
                進行具體的業務邏輯處理 
        """
        # 邊使用sleep(1)來代替模擬
        time.sleep(1)
        return line
    
    def write_response(self, result):
        self.transport.write("ack:" + str(result) + "\r\n")
    

class DemoProtocolFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return DemoProtocol()
    

reactor.listenTCP(9090, DemoProtocolFactory())
reactor.run()

  使用defer后, 代碼更加的簡潔. 其defer對象, 其實借用了線程池.
  threads.deferToThread定義如下:

def deferToThread(f, *args, **kwargs):
    from twisted.internet import reactor
    return deferToThreadPool(reactor, reactor.getThreadPool(),
                             f, *args, **kwargs)

def deferToThreadPool(reactor, threadpool, f, *args, **kwargs):
    d = defer.Deferred()

    def onResult(success, result):
        if success:
            reactor.callFromThread(d.callback, result)
        else:
            reactor.callFromThread(d.errback, result)

    threadpool.callInThreadWithCallback(onResult, f, *args, **kwargs)

    return d

  這邊我們可以發現deferToThread, 就是間接調用了callInThread函數, 另一方面, 對其回調函數的執行結果, 進行了onCallback, 以及onErrback的調用. 這些回調函數在主線程中運行.
  defer模式簡化了程序編寫, 也改變了人們開發的思維模式.

測試回顧:
  使用telnet進行測試, 結果正常.
  
  另一方面, twisted的線程池, 其默認是采用延遲初始化的方式.
  服務開啟時, 只有主線程一個, 隨着請求的到來, 其按需產生更多的worker thread.
  而其線程池默認為10. 我們可以借助suggestThreadPoolSize方法來修改.

寫在最后:
  
如果你覺得這篇文章對你有幫助, 請小小打賞下. 其實我想試試, 看看寫博客能否給自己帶來一點小小的收益. 無論多少, 都是對樓主一種由衷的肯定.

   

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM