量化交易平台—中層引擎設計


轉並修改於:用python的交易員

【前言】中層引擎在設計上主要是為了進一步封裝底層接口所暴露出的API函數,使得其更容易被上層的GUI和策略組件調用。本篇的內容會相對簡單,主要以LTS接口DEMO為例介紹一些設計方面的思路。

一、中層引擎設計

1、構造函數

 1 ########################################################################
 2 class MainEngine:
 3     """主引擎,負責對API的調度"""
 4 
 5     #----------------------------------------------------------------------
 6     def __init__(self):
 7         """Constructor"""
 8         self.ee = EventEngine()         # 創建事件驅動引擎
 9 
10         self.md = DemoMdApi(self.ee)    # 創建API接口
11         #self.md = DemoL2Api(self.ee)   # 如果使用L2行情就改為這行
12         self.td = DemoTdApi(self.ee)
13 
14         self.ee.start()                 # 啟動事件驅動引擎
15 
16         # 循環查詢持倉和賬戶相關
17         self.countGet = 0               # 查詢延時計數
18         self.lastGet = 'Account'        # 上次查詢的性質
19         self.ee.register(EVENT_TDLOGIN, self.initGet)  # 登錄成功后開始初始化查詢
20 
21         # 合約儲存相關
22         self.dictInstrument = {}        # 字典(保存合約查詢數據)
23         self.ee.register(EVENT_INSTRUMENT, self.insertInstrument)
  1. 首先以主引擎成員變量的形式創建事件驅動引擎ee、行情接口md和交易接口td的對象,兩個接口對象創建時傳入事件驅動引擎ee對象作為構造函數的參數。

  2. 在創建以上三個對象后,立即啟動事件驅動引擎,此后當用戶調用接口的連接、登錄等功能收到事件推送后,ee可以立即推送到監聽這些事件的組件進行處理。

  3. LTS和CTP接口的持倉情況和賬戶情況並不會通過回調函數主動推送,只有在投資者調用查詢函數時才會返回,我們的DEMO作為一個手動交易終端,選擇使用循環查詢的模式不斷獲取持倉和賬戶情況的更新。這里的每次查詢都會占用網絡帶寬導致系統延時的增加,對於某些運行全自動策略的程序,可以選擇不進行查詢,而通過對成交、行情等數據的統計來自行計算持倉和賬戶情況(甚至對於某些策略可以直接忽略該步驟,進一步降低延時水平)。

  4. LTS和CTP發送主動查詢指令時,存在流量控制(通常限制在1秒一次查詢),因此選擇間隔發送查詢賬戶和查詢持倉的指令。

  5. 在登錄成功后,我們需要查詢櫃台上所有可交易的合約信息,將這些信息保存到字典dictInstrument中,方便后面需要時進行查詢。這里很好的體現出了Python的方便,筆者在設計封裝API的回調函數時特別選擇使用Python字典的形式推送信息,包含合約信息的字典收到后,可以直接插入到dictInstrument字典進行保存(字典嵌字典),查詢時直接使用合約代碼即可。而用C++語言開發時通常還需要用到Sqlite之類的內存數據庫進行保存,麻煩了不少。

2、其他主動函數

 #----------------------------------------------------------------------
    def login(self, userid, mdPassword, tdPassword, brokerid, mdAddress, tdAddress):
        """登陸"""
        self.md.login(mdAddress, userid, mdPassword, brokerid)
        self.td.login(tdAddress, userid, tdPassword, brokerid)

    #----------------------------------------------------------------------
    def subscribe(self, instrumentid, exchangeid):
        """訂閱合約"""
        self.md.subscribe(instrumentid, exchangeid)

    #----------------------------------------------------------------------
    def getAccount(self):
        """查詢賬戶"""
        self.td.getAccount()

    #----------------------------------------------------------------------
    def getInvestor(self):
        """查詢投資者"""
        self.td.getInvestor()

    #----------------------------------------------------------------------
    def getPosition(self):
        """查詢持倉"""
        self.td.getPosition()

    #----------------------------------------------------------------------
    def getInstrument(self):
        """獲取合約"""
        event = Event(type_=EVENT_LOG)
        log = u'查詢合約信息'
        event.dict_['log'] = log
        self.ee.put(event)

        self.td.getInstrument()

    #----------------------------------------------------------------------
    def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset):
        """發單"""
        self.td.sendOrder(instrumentid, exchangeid, price, pricetype, volume, direction, offset)

    #----------------------------------------------------------------------
    def cancelOrder(self, instrumentid, exchangeid, orderref, frontid, sessionid):
        """撤單"""
        self.td.cancelOrder(instrumentid, exchangeid, orderref, frontid, sessionid)

    #----------------------------------------------------------------------
    def getAccountPosition(self, event):
        """循環查詢賬戶和持倉"""
        self.countGet = self.countGet + 1

        # 每5秒發一次查詢
        if self.countGet > 5:
            self.countGet = 0   # 清空計數

            if self.lastGet == 'Account':
                self.getPosition()
                self.lastGet = 'Position'
            else:
                self.getAccount()
                self.lastGet = 'Account'

    #----------------------------------------------------------------------
    def initGet(self, event):
        """在交易服務器登錄成功后,開始初始化查詢"""
        # 打開設定文件setting.vn
        f = shelve.open('setting.vn')

        # 嘗試讀取設定字典,若該字典不存在,則發出查詢請求
        try:
            d = f['instrument']

            # 如果本地保存的合約數據是今日的,則載入,否則發出查詢請求
            today = date.today()
            if d['date'] == today:
                self.dictInstrument = d['dictInstrument']

                event = Event(type_=EVENT_LOG)
                log = u'合約信息讀取完成'
                event.dict_['log'] = log
                self.ee.put(event)

                self.getInvestor()

                # 開始循環查詢
                self.ee.register(EVENT_TIMER, self.getAccountPosition)                 
            else:
                self.getInstrument()
        except KeyError:
            self.getInstrument()

        f.close()

    #----------------------------------------------------------------------
    def insertInstrument(self, event):
        """插入合約對象"""
        data = event.dict_['data']
        last = event.dict_['last']

        self.dictInstrument[data['InstrumentID']] = data

        # 合約對象查詢完成后,查詢投資者信息並開始循環查詢
        if last:
            # 將查詢完成的合約信息保存到本地文件,今日登錄可直接使用不再查詢
            self.saveInstrument()

            event = Event(type_=EVENT_LOG)
            log = u'合約信息查詢完成'
            event.dict_['log'] = log
            self.ee.put(event)

            self.getInvestor()

            # 開始循環查詢
            self.ee.register(EVENT_TIMER, self.getAccountPosition)

    #----------------------------------------------------------------------
    def selectInstrument(self, instrumentid):
        """獲取合約信息對象"""
        try:
            instrument = self.dictInstrument[instrumentid]
        except KeyError:
            instrument = None
        return instrument

    #----------------------------------------------------------------------
    def exit(self):
        """退出"""
        # 銷毀API對象
        self.td = None
        self.md = None

        # 停止事件驅動引擎
        self.ee.stop()

    #----------------------------------------------------------------------
    def saveInstrument(self):
        """保存合約屬性數據"""
        f = shelve.open('setting.vn')
        d = {}
        d['dictInstrument'] = self.dictInstrument
        d['date'] = date.today()
        f['instrument'] = d
        f.close()
  1. LTS和CTP的行情和交易賬戶通常是由經紀商一起提供的,為了簡潔起見,把兩個接口的登錄一起封裝在了login函數中。

  2. subscribe, getAccount, getPosition, getInvestor, getInstrument, sendOrder, cancelOrder都只是直接調用行情和交易接口的功能,做這種設計的目的是為了讓用戶在寫上層組件代碼時,可以直接寫self.mainEngine.subscribe('IF1506', 'CFFEX'),而不必去寫self.mainEngine.md.subscribe('IF1506', 'CFFEX'),很小的區別,但是相信我,程序規模大了后會讓你煩到崩潰。

  3. getAccountPosition用於實現循環查詢賬戶和持倉,這里我設為了每5秒發一次,基本夠用就好。

  4. initGet和insertInstrument兩個函數實現了登錄后初始化查詢相關的功能。首先登錄完成后,程序用shelve模塊打開本地保存的文件setting.vn,檢查其中的合約信息數據,如果該數據的更新日期為今日,則直接讀取使用,無需再次查詢可交易合約信息(一般該信息每天日內保持不變)。否則調用getInstrument函數查詢合約信息,insertInstrument函數負責將收到的合約信息插入到dictInstrument字典中,當所有合約信息都插入完成后,通過saveInstrument函數將該字典保存到setting.vn中,即可實現日內再次登錄無需查詢直接讀取。

  5. 合約信息查詢完成后,通常會再查詢一次投資者姓名getInvestor,主要用於輸出在程序的標題欄上(方便用戶檢查,防止登錄錯賬戶等),並將循環查詢函數getAccountPosition注冊到每秒觸發的定時器事件監聽上,開始循環查詢。

  6. 當用戶退出程序時,需要調用主引擎的exit函數,釋放行情和交易API接口對象(在C++環境中會自動析構),並停止事件驅動引擎的工作線程(否則可能報線程退出錯誤等)。

二、總結

  中層引擎的設計思路主要是封裝底層接口,從而讓用戶在開發頂層GUI和策略組件時無需直接調用底層接口的主動函數,降低開發和維護的復雜度。

  DEMO中的這個中層引擎非常簡單,功能上只實現了個循環查詢(其實也可以在接口層中實現,視乎需求)。而在實際開發中,中層引擎通常會包含用戶最常用的一些功能模塊,比如做期權策略的會加入期權定價引擎,做期現套利的會加入持倉組合的敞口監控對沖引擎等等。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM