轉並修改於:用python的交易員
【前言】中層引擎在設計上主要是為了進一步封裝底層接口所暴露出的API函數,使得其更容易被上層的GUI和策略組件調用。本篇的內容會相對簡單,主要以LTS接口DEMO為例介紹一些設計方面的思路。
一、中層引擎設計
1、構造函數
1 ######################################################################## 2 class MainEngine: 3 """主引擎,負責對API的調度""" 4 5 #---------------------------------------------------------------------- 6 def __init__(self): 7 """Constructor""" 8 self.ee = EventEngine() # 創建事件驅動引擎 9 10 self.md = DemoMdApi(self.ee) # 創建API接口 11 #self.md = DemoL2Api(self.ee) # 如果使用L2行情就改為這行 12 self.td = DemoTdApi(self.ee) 13 14 self.ee.start() # 啟動事件驅動引擎 15 16 # 循環查詢持倉和賬戶相關 17 self.countGet = 0 # 查詢延時計數 18 self.lastGet = 'Account' # 上次查詢的性質 19 self.ee.register(EVENT_TDLOGIN, self.initGet) # 登錄成功后開始初始化查詢 20 21 # 合約儲存相關 22 self.dictInstrument = {} # 字典(保存合約查詢數據) 23 self.ee.register(EVENT_INSTRUMENT, self.insertInstrument)
-
首先以主引擎成員變量的形式創建事件驅動引擎ee、行情接口md和交易接口td的對象,兩個接口對象創建時傳入事件驅動引擎ee對象作為構造函數的參數。
-
在創建以上三個對象后,立即啟動事件驅動引擎,此后當用戶調用接口的連接、登錄等功能收到事件推送后,ee可以立即推送到監聽這些事件的組件進行處理。
-
LTS和CTP接口的持倉情況和賬戶情況並不會通過回調函數主動推送,只有在投資者調用查詢函數時才會返回,我們的DEMO作為一個手動交易終端,選擇使用循環查詢的模式不斷獲取持倉和賬戶情況的更新。這里的每次查詢都會占用網絡帶寬導致系統延時的增加,對於某些運行全自動策略的程序,可以選擇不進行查詢,而通過對成交、行情等數據的統計來自行計算持倉和賬戶情況(甚至對於某些策略可以直接忽略該步驟,進一步降低延時水平)。
-
LTS和CTP發送主動查詢指令時,存在流量控制(通常限制在1秒一次查詢),因此選擇間隔發送查詢賬戶和查詢持倉的指令。
-
在登錄成功后,我們需要查詢櫃台上所有可交易的合約信息,將這些信息保存到字典dictInstrument中,方便后面需要時進行查詢。這里很好的體現出了Python的方便,筆者在設計封裝API的回調函數時特別選擇使用Python字典的形式推送信息,包含合約信息的字典收到后,可以直接插入到dictInstrument字典進行保存(字典嵌字典),查詢時直接使用合約代碼即可。而用C++語言開發時通常還需要用到Sqlite之類的內存數據庫進行保存,麻煩了不少。
2、其他主動函數
#---------------------------------------------------------------------- def login(self, userid, mdPassword, tdPassword, brokerid, mdAddress, tdAddress): """登陸""" self.md.login(mdAddress, userid, mdPassword, brokerid) self.td.login(tdAddress, userid, tdPassword, brokerid) #---------------------------------------------------------------------- def subscribe(self, instrumentid, exchangeid): """訂閱合約""" self.md.subscribe(instrumentid, exchangeid) #---------------------------------------------------------------------- def getAccount(self): """查詢賬戶""" self.td.getAccount() #---------------------------------------------------------------------- def getInvestor(self): """查詢投資者""" self.td.getInvestor() #---------------------------------------------------------------------- def getPosition(self): """查詢持倉""" self.td.getPosition() #---------------------------------------------------------------------- def getInstrument(self): """獲取合約""" event = Event(type_=EVENT_LOG) log = u'查詢合約信息' event.dict_['log'] = log self.ee.put(event) self.td.getInstrument() #---------------------------------------------------------------------- def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset): """發單""" self.td.sendOrder(instrumentid, exchangeid, price, pricetype, volume, direction, offset) #---------------------------------------------------------------------- def cancelOrder(self, instrumentid, exchangeid, orderref, frontid, sessionid): """撤單""" self.td.cancelOrder(instrumentid, exchangeid, orderref, frontid, sessionid) #---------------------------------------------------------------------- def getAccountPosition(self, event): """循環查詢賬戶和持倉""" self.countGet = self.countGet + 1 # 每5秒發一次查詢 if self.countGet > 5: self.countGet = 0 # 清空計數 if self.lastGet == 'Account': self.getPosition() self.lastGet = 'Position' else: self.getAccount() self.lastGet = 'Account' #---------------------------------------------------------------------- def initGet(self, event): """在交易服務器登錄成功后,開始初始化查詢""" # 打開設定文件setting.vn f = shelve.open('setting.vn') # 嘗試讀取設定字典,若該字典不存在,則發出查詢請求 try: d = f['instrument'] # 如果本地保存的合約數據是今日的,則載入,否則發出查詢請求 today = date.today() if d['date'] == today: self.dictInstrument = d['dictInstrument'] event = Event(type_=EVENT_LOG) log = u'合約信息讀取完成' event.dict_['log'] = log self.ee.put(event) self.getInvestor() # 開始循環查詢 self.ee.register(EVENT_TIMER, self.getAccountPosition) else: self.getInstrument() except KeyError: self.getInstrument() f.close() #---------------------------------------------------------------------- def insertInstrument(self, event): """插入合約對象""" data = event.dict_['data'] last = event.dict_['last'] self.dictInstrument[data['InstrumentID']] = data # 合約對象查詢完成后,查詢投資者信息並開始循環查詢 if last: # 將查詢完成的合約信息保存到本地文件,今日登錄可直接使用不再查詢 self.saveInstrument() event = Event(type_=EVENT_LOG) log = u'合約信息查詢完成' event.dict_['log'] = log self.ee.put(event) self.getInvestor() # 開始循環查詢 self.ee.register(EVENT_TIMER, self.getAccountPosition) #---------------------------------------------------------------------- def selectInstrument(self, instrumentid): """獲取合約信息對象""" try: instrument = self.dictInstrument[instrumentid] except KeyError: instrument = None return instrument #---------------------------------------------------------------------- def exit(self): """退出""" # 銷毀API對象 self.td = None self.md = None # 停止事件驅動引擎 self.ee.stop() #---------------------------------------------------------------------- def saveInstrument(self): """保存合約屬性數據""" f = shelve.open('setting.vn') d = {} d['dictInstrument'] = self.dictInstrument d['date'] = date.today() f['instrument'] = d f.close()
-
LTS和CTP的行情和交易賬戶通常是由經紀商一起提供的,為了簡潔起見,把兩個接口的登錄一起封裝在了login函數中。
-
subscribe, getAccount, getPosition, getInvestor, getInstrument, sendOrder, cancelOrder都只是直接調用行情和交易接口的功能,做這種設計的目的是為了讓用戶在寫上層組件代碼時,可以直接寫self.mainEngine.subscribe('IF1506', 'CFFEX'),而不必去寫self.mainEngine.md.subscribe('IF1506', 'CFFEX'),很小的區別,但是相信我,程序規模大了后會讓你煩到崩潰。
-
getAccountPosition用於實現循環查詢賬戶和持倉,這里我設為了每5秒發一次,基本夠用就好。
-
initGet和insertInstrument兩個函數實現了登錄后初始化查詢相關的功能。首先登錄完成后,程序用shelve模塊打開本地保存的文件setting.vn,檢查其中的合約信息數據,如果該數據的更新日期為今日,則直接讀取使用,無需再次查詢可交易合約信息(一般該信息每天日內保持不變)。否則調用getInstrument函數查詢合約信息,insertInstrument函數負責將收到的合約信息插入到dictInstrument字典中,當所有合約信息都插入完成后,通過saveInstrument函數將該字典保存到setting.vn中,即可實現日內再次登錄無需查詢直接讀取。
-
合約信息查詢完成后,通常會再查詢一次投資者姓名getInvestor,主要用於輸出在程序的標題欄上(方便用戶檢查,防止登錄錯賬戶等),並將循環查詢函數getAccountPosition注冊到每秒觸發的定時器事件監聽上,開始循環查詢。
-
當用戶退出程序時,需要調用主引擎的exit函數,釋放行情和交易API接口對象(在C++環境中會自動析構),並停止事件驅動引擎的工作線程(否則可能報線程退出錯誤等)。
二、總結
中層引擎的設計思路主要是封裝底層接口,從而讓用戶在開發頂層GUI和策略組件時無需直接調用底層接口的主動函數,降低開發和維護的復雜度。
DEMO中的這個中層引擎非常簡單,功能上只實現了個循環查詢(其實也可以在接口層中實現,視乎需求)。而在實際開發中,中層引擎通常會包含用戶最常用的一些功能模塊,比如做期權策略的會加入期權定價引擎,做期現套利的會加入持倉組合的敞口監控對沖引擎等等。
