基於python yield機制的異步操作同步化編程模型


又一個milestone即將結束,有了些許的時間總結研發過程中的點滴心得,今天總結下如何在編寫python代碼時對異步操作進行同步化模擬,從而提高代碼的可讀性和可擴展性。

     游戲引擎一般都采用分布式框架,通過一定的策略來均衡服務器集群的資源負載,從而保證服務器運算的高並發性和CPU高利用率,最終提高游戲的性能和負載。由於引擎的邏輯層調用是非搶占式的,服務器之間都是通過異步調用來進行通訊,導致游戲邏輯無法同步執行,所以在代碼層不得不人為地添加很多回調函數,使一個原本完整的功能碎片化地分布在各個回調函數中。

異步邏輯

     以游戲中的副本評分邏輯為例,在副本結束時副本管理進程需要收集副本中每個玩家的戰斗信息,再結合管理進程內部的統計信息最終給出一個副本評分,發放相應獎勵。因為每個玩家實體都隨機分布在不同進程中,所以管理進程需要通過異步調用來獲取玩家身上的戰斗信息。

實現代碼如下所示:

# -*- coding: gbk -*-
import random
 
# 玩家實體類
class Player(object):
    def __init__(self, entityId):
        super(Player, self).__init__()
        # 玩家標識
        self.entityId = entityId
 
    def onFubenEnd(self, mailBox):
        score = random.randint(1, 10)
        print "onFubenEnd player %d score %d"%(self.entityId, score)
 
        # 向副本管理進程發送自己的id和戰斗信息
        mailBox.onEvalFubenScore(self.entityId, score)
 
# 副本管理類
class FubenStub(object):
    def __init__(self, players):
        super(FubenStub, self).__init__()
        self.players = players
 
    def evalFubenScore(self):
        self.playerRelayCnt = 0
        self.totalScore = 0
 
        # 通知每個注冊的玩家,副本已經結束,索取戰斗信息
        for player in self.players:
            player.onFubenEnd(self)
 
    def onEvalFubenScore(self, entityId, score):
        # 收到其中一個玩家的戰斗信息
        print "onEvalFubenScore player %d score %d"%(entityId, score)
        self.playerRelayCnt += 1
        self.totalScore += score
 
        # 當收集完所有玩家的信息后,打印評分
        if len(self.players) == self.playerRelayCnt:
            print 'The fuben totalScore is %d'%self.totalScore
 
if __name__ == '__main__':
    # 模擬創建玩家實體
    players = [Player(i) for i in xrange(3)]
 
    # 副本開始時,每個玩家將自己的MailBox注冊到副本管理進程
    fs = FubenStub(players)
 
    # 副本進行中
    # ....
 
    # 副本結束,開始評分
    fs.evalFubenScore()

代碼簡化了副本評分邏輯的實現,其中Player類表示游戲的玩家實體,在游戲運行時無縫地在不同服務器中切換,FubenStub表示副本的管理進程,在副本剛開始的時候該副本內所有玩家會將自己的MailBox注冊到管理進程中,其中MailBox表示各個實體的遠程調用句柄。在副本結束時,FubenStub首先向各個玩家發送副本結束消息,同時請求玩家的戰斗信息,玩家在得到消息后,將自己的戰斗信息發送給FubenStub;然后當FubenStub收集完所有玩家的信息后,最終打印副本評分。

 

同步邏輯

    如果Player和FubenStub在同一進程中的話,那所有的操作都可以同步完成,在FubenStub向玩家發送副本結束消息的同時可以馬上得到該玩家的戰斗信息,實現代碼如下所示:

# -*- coding: gbk -*-
 
import random
 
class Player(object):
    def __init__(self, entityId):
        super(Player, self).__init__()
        self.entityId = entityId
 
    def onFubenEnd(self, mailBox):
        score = random.randint(1, 10)
        print "onFubenEnd player %d score %d"%(self.entityId, score)
        return self.entityId, score
 
class FubenStub(object):
    def __init__(self, players):
        super(FubenStub, self).__init__()
        self.players = players
 
    def evalFubenScore(self):
        totalScore = 0
        for player in self.players:
            entityId, score = player.onFubenEnd(self)
            print "onEvalFubenScore player %d score %d"%(entityId, score)
            totalScore += score
 
        print 'The fuben totalScore is %d'%totalScore
 
if __name__ == '__main__':
    players = [Player(i) for i in xrange(3)]
 
    fs = FubenStub(players)
    fs.evalFubenScore()

 從以上兩份代碼可以看到由於異步操作,FubenStub中的評分邏輯人為地分成兩個功能點:1)向玩家發送副本結束消息;2)接受玩家的戰斗信息;並且兩個功能點分布在兩個不同的函數中。如果游戲邏輯一旦復雜,勢必會造成功能點分散,出現過多onXXX異步回調函數,最終導致代碼的開發成本和維護成本提高,可讀性和可擴展性下降。

     如果有一種方法,可以讓函數在異步調用時暫時掛起,並且在回調函數得到返回值后恢復執行,那么就可以用同步化的編程模式開發異步邏輯。

 

yield 關鍵字

     yield 是 Python中的一個關鍵字,凡是函數體中出現了 yield 關鍵字, Python將改變整個函數的上下文,調用該函數不再返回值, 而是一個生成器對象。只有調用這個生成器的迭代函數next才能開始執行生成器對象,當生成器對象執行到包含 yield 表達式時, 函數將暫時掛起,等待下一次next調用來恢復執行,具體機制如下:

         1)調用生成器對象的next方法,啟動函數執行;

         2)當生成器對象執行到包含 yield 表達式時, 函數掛起;

         3)下一次 next 函數調用又會驅動該生成器對象繼續執行此后的語句, 直到遇見下一個 yield 再次掛起;

         4)如果某次 next 調用驅動了生成器繼續執行, 而此后函數正常結束,生成器會拋出 StopIteration 異常;

如下代碼所示:

def f():
    print "Before first yield"
    yield 1
    print "Before second yield"
    yield 2
    print "After second yield"
 
g = f()
print "Before first next"
g.next()
print "Before second next"
g.next()
print "Before third yield"
g.next()

執行結果為:

Before first next

Before first yield

Before second next

Before second yield

Before third yield

After second yield

StopIteration

     哈,有了讓函數暫時掛起的機制,最后就剩下如何傳遞異步調用的返回值問題了。其實生成器的next函數已經實現了將參數從生成器對象內部向外傳遞的機制,並且python還提供了一個send函數將參數從外向生成器對象內部傳遞的機制,具體機制如下:

         1) 調用next 函數驅動生成器時, next會同時等待生成器中下一個 yield 掛起,並將該yield后面的參數返回給next;

         2)往生成器中傳遞參數,需要將next函數替換成send,此時send的功能與next相同(驅動生成器執行,等待返回值),同時send將后面的參數傳遞給生成器內部之前掛起的yield;

如下代碼所示:

def f():
    msg = yield 'first yield msg'
    print "generator inner receive:", msg
    msg = yield 'second yield msg'
    print "generator inner receive:", msg
 
g = f()
msg = g.next()
print "generator outer receive:", msg
msg = g.send('first send msg')
print "generator outer receive:", msg
g.send('second send msg')

執行結果為:

generator outer receive: first yield msg

generator inner receive: first send msg

generator outer receive: second yield msg

generator inner receive: second send msg

StopIteration

同步化實現

     好了,萬事俱備只欠東風,下面就是簡單對yield機制進行工程上封裝以方便之后開發。下面的代碼提供了一個叫IFakeSyncCall的interface,所有包含異步操作的邏輯類都可以繼承這個接口:

class IFakeSyncCall(object):
    def __init__(self):
        super(IFakeSyncCall, self).__init__()
        self.generators = {}
 
    @staticmethod
    def FAKE_SYNCALL():
        def fwrap(method):
            def fakeSyncCall(instance, *args, **kwargs):
                instance.generators[method.__name__] = method(instance, *args, **kwargs)
                func, args = instance.generators[method.__name__].next()
                func(*args)
            return fakeSyncCall
        return fwrap
 
    def onFakeSyncCall(self, identify, result):
        try:
            func, args  = self.generators[identify].send(result)
            func(*args)
        except StopIteration:
            self.generators.pop(identify)

 其中interface中屬性generators用來保存類中已經開始執行的生成器對象;函數FAKE_SYNCALL是一個decorator,裝飾類中包含有yield的函數,改變函數的調用上下文,在fakeSyncCall內部封裝了對生成器對象的next調用;函數onFakeSyncCall封裝了所有onXXX函數的邏輯,其他實體通過調用這個函數傳遞異步回調的返回值。

下面就是經過同步化改進后的異步副本評分邏輯代碼:

# -*- coding: gbk -*-
import random
 
class Player(object):
    def __init__(self, entityId):
        super(Player, self).__init__()
        self.entityId = entityId
 
    def onFubenEnd(self, mailBox):
        score = random.randint(1, 10)
        print "onFubenEnd player %d score %d"%(self.entityId, score)
        mailBox.onFakeSyncCall('evalFubenScore', (self.entityId, score))
 
class FubenStub(IFakeSyncCall):
    def __init__(self, players):
        super(FubenStub, self).__init__()
        self.players = players
 
    @IFakeSyncCall.FAKE_SYNCALL()
    def evalFubenScore(self):
        totalScore = 0
        for player in self.players:
            entityId, score = yield (player.onFubenEnd, (self,))
            print "onEvalFubenScore player %d score %d"%(entityId, score)
            totalScore += score
 
        print 'the totalScore is %d'%totalScore
 
if __name__ == '__main__':
    players = [Player(i) for i in xrange(3)]
 
    fs = FubenStub(players)
    fs.evalFubenScore()

比較evalFubenScore函數,基本已經和原本的同步邏輯代碼相差無幾。

      利用yield機制實現同步化編程模型的另外一個優點是可以保證所有異步調用的邏輯串行化,從而保證數據的一致性和有效性,特別是在各種異步初始化流程中可以摒棄傳統的timer sleep機制,從源頭上扼殺一些隱藏很深的由於數據不一致性所導致的bug。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM