【前言】在搞定交易接口后,我們開發交易系統的第一步就是要弄清楚系統的工作原理。本文是我看的公司的中頻平台文檔的總結,公司是基於開源的vn.py修改而來,但是驅動引擎是一樣的。
會參考vn.py官方文檔,公司是參考其修改的。但在正在研發的高頻交易平台上是根據CTP接口接發數據的格式,合並成單線程,配合solarflare的網卡,速度不吃虧。本文以及下面的三篇文章主要是現有的中頻平台的,當然高頻類似於次,增加了共享內存等。
一、計算機程序分類
所有的計算機程序都可以大致分為兩類:腳本型(單次運行)和連續運行型(直到用戶主動退出)。
1、腳本型
腳本型的程序包括最早的批處理文件以及使用Python做交易策略回測等等,這類程序的特點是在用戶啟動后會按照編程時設計好的步驟一步步運行,所有步驟運行完后自動退出。
2、連續運行型
連續運行型的程序包含了操作系統和絕大部分我們日常使用的軟件等等,這類程序啟動后會處於一個無限循環中連續運行,直到用戶主動退出時才會結束。
二、連續運行型程序
我們要開發的交易系統就是屬於連續運行型程序,而這種程序根據其計算邏輯的運行機制不同,又可以粗略的分為時間驅動和事件驅動兩種。
1、時間驅動
時間驅動的程序邏輯相對容易設計,簡單來說就是讓電腦每隔一段時間自動做一些事情。這個事情本身可以很復雜、包括很多步驟,但這些步驟都是線性的,按照順序一步步執行下來。
以下代碼展示了一個非常簡單的時間驅動的Python程序。
from time import sleep def demo(): print u'時間驅動的程序每隔1秒運行demo函數' while 1: demo() sleep(1.0)
時間驅動的程序本質上就是每隔一段時間固定運行一次腳本(上面代碼中的demo函數)。盡管腳本自身可以很長、包含非常多的步驟,但是我們可以看出這種程序的運行機制相對比較簡單、容易理解。
舉一些量化交易相關的例子:
-
每隔5分鍾,通過新浪財經網頁的公開API讀取一次滬深300成分股的價格,根據當日漲幅進行排序后輸出到電腦屏幕上。
-
每隔1秒鍾,檢查一次最新收到的股指期貨TICK數據,更新K線和其他技術指標,檢查是否滿足趨勢策略的下單條件,若滿足則執行下單。
對速度要求較高的量化交易方面(日內CTA策略、高頻策略等等),時間驅動的程序會存在一個非常大的缺點:對數據信息在反應操作上的處理延時。例子中,在每次邏輯腳本運行完等待的那1秒鍾里,程序對於接收到的新數據信息(行情、成交推送等等)是不會做出任何反應的,只有在等待時間結束后腳本再次運行時才會進行相關的計算處理。而處理延時在量化交易中的直接后果就是:市價單滑點、限價單錯過本可成交的價格。
時間驅動的程序在量化交易方面還存在一些其他的缺點:如浪費CPU的計算資源、實現異步邏輯復雜度高等等。
2、事件驅動
與時間驅動對應的就是事件驅動的程序:當某個新的事件被推送到程序中時(如API推送新的行情、成交),程序立即調用和這個事件相對應的處理函數進行相關的操作。
上面例子的事件驅動版:交易程序對股指TICK數據進行監聽,當沒有新的行情過來時,程序保持監聽狀態不進行任何操作;當收到新的數據時,數據處理函數立即更新K線和其他技術指標,並檢查是否滿足趨勢策略的下單條件執行下單。
對於簡單的程序,我們可以采用上面測試代碼中的方案,直接在API的回調函數中寫入相應的邏輯。但隨着程序復雜度的增加,這種方案會變得越來越不可行。假設我們有一個帶有圖形界面的量化交易系統,系統在某一時刻接收到了API推送的股指期貨行情數據,針對這個數據系統需要進行如下處理:
-
更新圖表上顯示的K線圖形(繪圖)
-
更新行情監控表中股指期貨的行情數據(表格更新)
-
策略1需要運行一次內部算法,檢查該數據是否會觸發策略進行下單(運算、下單)
-
策略2同樣需要運行一次內部算法,檢查該數據是否會觸發策略進行下單(運算、下單)
-
風控系統需要檢查最新行情價格是否會導致賬戶的整體風險超限,若超限需要進行報警(運算、報警)
此時將上面所有的操作都寫到一個回調函數中無疑變成了非常差的方案,代碼過長容易出錯不說,可擴展性也差,每添加一個策略或者功能則又需要修改之前的源代碼(有經驗的讀者會知道,經常修改生產代碼是一種非常危險的運營管理方法)。
小結:雖然我們的交易平台上沒有圖形界面,因為這只是一種輔助功能,不是生產的核心功能。但是,也有可能有其他的信號或者事件需要我們處理,那么何時處理?分配多少資源處理?所以應該降低耦合,為了解決這種情況,我們需要用到事件驅動引擎來管理不同事件的事件監聽函數並執行所有和事件驅動相關的操作。
三、事件驅動引擎原理
vn.py框架中的vn.event模塊包含了一個可擴展的事件驅動引擎。整個引擎的實現並不復雜,除去注釋、空行后大概也就100行左右的代碼:
# encoding: UTF-8 # 系統模塊 from Queue import Queue, Empty from threading import Thread # 第三方模塊 from PyQt4.QtCore import QTimer # 自己開發的模塊 from eventType import * ######################################################################## class EventEngine: """ 事件驅動引擎 事件驅動引擎中所有的變量都設置為了私有,這是為了防止不小心 從外部修改了這些變量的值或狀態,導致bug。 變量說明 __queue:私有變量,事件隊列 __active:私有變量,事件引擎開關 __thread:私有變量,事件處理線程 __timer:私有變量,計時器 __handlers:私有變量,事件處理函數字典 方法說明 __run: 私有方法,事件處理線程連續運行用 __process: 私有方法,處理事件,調用注冊在引擎中的監聽函數 __onTimer:私有方法,計時器固定事件間隔觸發后,向事件隊列中存入計時器事件 start: 公共方法,啟動引擎 stop:公共方法,停止引擎 register:公共方法,向引擎中注冊監聽函數 unregister:公共方法,向引擎中注銷監聽函數 put:公共方法,向事件隊列中存入新的事件 事件監聽函數必須定義為輸入參數僅為一個event對象,即: 函數 def func(event) ... 對象方法 def method(self, event) ... """ #---------------------------------------------------------------------- def __init__(self): """初始化事件引擎""" # 事件隊列 self.__queue = Queue() # 事件引擎開關 self.__active = False # 事件處理線程 self.__thread = Thread(target = self.__run) # 計時器,用於觸發計時器事件 self.__timer = QTimer() self.__timer.timeout.connect(self.__onTimer) # 這里的__handlers是一個字典,用來保存對應的事件調用關系 # 其中每個鍵對應的值是一個列表,列表中保存了對該事件進行監聽的函數功能 self.__handlers = {} #---------------------------------------------------------------------- def __run(self): """引擎運行""" while self.__active == True: try: event = self.__queue.get(block = True, timeout = 1) # 獲取事件的阻塞時間設為1秒 self.__process(event) except Empty: pass #---------------------------------------------------------------------- def __process(self, event): """處理事件""" # 檢查是否存在對該事件進行監聽的處理函數 if event.type_ in self.__handlers: # 若存在,則按順序將事件傳遞給處理函數執行 [handler(event) for handler in self.__handlers[event.type_]] # 以上語句為Python列表解析方式的寫法,對應的常規循環寫法為: #for handler in self.__handlers[event.type_]: #handler(event) #---------------------------------------------------------------------- def __onTimer(self): """向事件隊列中存入計時器事件""" # 創建計時器事件 event = Event(type_=EVENT_TIMER) # 向隊列中存入計時器事件 self.put(event) #---------------------------------------------------------------------- def start(self): """引擎啟動""" # 將引擎設為啟動 self.__active = True # 啟動事件處理線程 self.__thread.start() # 啟動計時器,計時器事件間隔默認設定為1秒 self.__timer.start(1000) #---------------------------------------------------------------------- def stop(self): """停止引擎""" # 將引擎設為停止 self.__active = False # 停止計時器 self.__timer.stop() # 等待事件處理線程退出 self.__thread.join() #---------------------------------------------------------------------- def register(self, type_, handler): """注冊事件處理函數監聽""" # 嘗試獲取該事件類型對應的處理函數列表,若無則創建 try: handlerList = self.__handlers[type_] except KeyError: handlerList = [] self.__handlers[type_] = handlerList # 若要注冊的處理器不在該事件的處理器列表中,則注冊該事件 if handler not in handlerList: handlerList.append(handler) #---------------------------------------------------------------------- def unregister(self, type_, handler): """注銷事件處理函數監聽""" # 嘗試獲取該事件類型對應的處理函數列表,若無則忽略該次注銷請求 try: handlerList = self.handlers[type_] # 如果該函數存在於列表中,則移除 if handler in handlerList: handlerList.remove(handler) # 如果函數列表為空,則從引擎中移除該事件類型 if not handlerList: del self.handlers[type_] except KeyError: pass #---------------------------------------------------------------------- def put(self, event): """向事件隊列中存入事件""" self.__queue.put(event)
1、初始化
當事件驅動引擎對象被創建時,初始化函數__init__會創建以下私有變量:
-
__queue:用來保存事件的隊列
-
__active:用來控制引擎啟動、停止的開關
-
__thread:負責處理事件、執行具體操作的線程
-
__timer:用來每隔一段時間觸發定時事件的計時器
-
__handlers:用來保存不同類型事件所對應的事件處理函數的字典
2、注冊事件處理函數
引擎提供了register方法,用來向引擎注冊事件處理函數的監聽,傳入參數為
-
type_:表示事件類型的常量字符串,由用戶自行定義,注意不同事件類型間不能重復
-
handler:當該類型的事件被觸發時,用戶希望進行相應操作的事件處理函數,函數的定義方法參考代碼中的注釋
當用戶調用register方法注冊事件處理函數時,引擎會嘗試獲取__handlers字典中該事件類型所對應的處理函數列表(若無則創建一個空列表),並向這個列表中添加該事件處理函數。使用了Python的列表對象,用戶可以很容易的控制同一個事件類型下多個事件處理函數的工作順序,因此對某些涉及多步操作的復雜算法可以保證按照正確的順序執行,這點是相比於某些系統0消息機制(如Qt的Signal/Slot)最大的優勢。
如當標的物行情發生變化時,期權高頻套利算法需要執行以下操作:
-
使用定價引擎先計算新的期權理論價、希臘值
-
使用風控引擎對當前持倉的風險度匯總,並計算報價的中間價
-
使用套利引擎基於預先設定的價差、下單手數等參數,計算具體價格並發單
以上三步操作,只需在交易系統啟動時按順序注冊監聽到標的物行情事件上,就可以保證操作順序的正確。
和register對應的是unregister方法,用於注銷事件處理函數的監聽,傳入參數相同,具體原理請參照源代碼。在實際應用中,用戶可以動態的組合使用register和unregister方法,只在需要監聽某些事件的時候監聽,完成后取消監聽,從而節省CPU資源。
這里讓筆者吐槽一下某些國內的C++平台(當然不是指所有的),每個策略對系統里所有的訂單回報進行監聽,如果是自身相關的就處理,不相關的就PASS。這種寫法,光是判斷是否和自身相關就得多做多少無謂的判斷、浪費多少CPU資源,隨着策略數量的增加,浪費呈線性增加的趨勢,這種平台還叫囂做高頻,唉......
3、觸發事件
用戶可以通過引擎的put方法向事件隊列__queue中存入事件,等待事件處理線程來進行處理,事件類的實現如下:
######################################################################## class Event: """事件對象""" #---------------------------------------------------------------------- def __init__(self, type_=None): """Constructor""" self.type_ = type_ # 事件類型 self.dict_ = {} # 字典用於保存具體的事件數據
對象創建時用戶可以選擇傳入事件類型字符串type_作為參數。dict_字典用於保存具體事件相關的數據信息,以供事件處理函數進行操作。
4、事件處理線程的連續運行
事件引擎的事件處理線程__thread中執行連續運行工作的函數為__run:當事件引擎的開關__active沒有被關閉時,引擎嘗試從事件隊列中讀取最新的事件,若讀取成功則立即調用__process函數處理該事件,若無法讀取(隊列為空)則進入阻塞狀態節省CPU資源,當阻塞時間(默認為1秒)結束時再次進入以上循環。
__process函數工作時,首先檢查事件對象的事件類型在__handlers字典中是否存在,若存在(說明有事件處理函數在監聽該事件)則按照注冊順序調用監聽函數列表中的事件處理函數進行相關操作。
5、計時器
事件引擎中的__timer是一個PyQt中的QTimer對象,提供的功能非常簡單:每隔一段時間(由用戶設定)自動運行函數__onTimer。__onTimer函數會創建一個類型為EVENT_TIMER(在eventType.py文件中定義)的事件對象,並調用引擎的put方法存入到事件隊列中。
敏感的讀者可能已經意識到了,這個計時器本質上是一個由時間驅動的功能。盡管我們在前文中提到了事件驅動在量化交易平台開發中的重要性,但不可否認某些交易功能的實現必須基於時間驅動,例如:下單后若2秒不成交則立即撤單、每隔5分鍾將當日的成交記錄保存到數據庫中等。這類功能在實現時就可以選擇使用事件處理函數對EVENT_TIMER類型的計時器事件進行監聽(參考下一章節“事件驅動引擎使用”中的示例)。
6、啟動、停止
用戶可以通過start和stop兩個方法來啟動和停止事件驅動引擎,原理很簡單讀者可以直接參考源代碼。
當啟動計時器時,事件間隔默認設定為了1秒(1000毫秒),這個參數用戶可以視乎自己的需求進行調整。假設用戶使用時間驅動的函數工作間隔為分鍾級,則可以選擇將參數設置為60秒(600000毫秒),以此類推。
四、事件驅動引擎使用
同樣在eventEngine.py中,包含了一段測試代碼test函數,用來展示事件驅動引擎的使用方法:
#---------------------------------------------------------------------- def test(): """測試函數""" import sys from datetime import datetime from PyQt4.QtCore import QCoreApplication def simpletest(event): print u'處理每秒觸發的計時器事件:%s' % str(datetime.now()) app = QCoreApplication(sys.argv) ee = EventEngine() ee.register(EVENT_TIMER, simpletest) ee.start() app.exec_() # 直接運行腳本可以進行測試 if __name__ == '__main__': test()
test函數整體上包含了這幾步:
-
導入相關的包(sys、datetime、PyQt4),注意由於EventEngine的實現中使用了PyQt4的QTimer類,因此整個程序的運行必須包含在Qt事件循環中,即使用QCoreApplication(或者PyQt4.QtGui中的QApplication)的exec_()方法在程序主線程中啟動事件循環。
-
定義一個簡單的函數simpletest,該函數包含一個輸入參數event對象,函數被調用后會打印一段字符以及當前的時間
-
創建QCoreApplication對象app
-
創建事件驅動引擎EventEngine對象ee
-
向引擎中注冊simpletest函數對定時器事件EVENT_TIMER的監聽
-
啟動事件驅動引擎
-
啟動Qt事件循環
整體上看,當用戶開發自己的程序時,需要修改的只是第2步和第5步:創建自己的事件處理函數並將這些函數注冊到相應的事件類型上進行監聽。
總結
有了API接口和事件驅動引擎,接下來我們可以開始開發自己的平台了,后面的幾篇文章將會一步步展示一個簡單的LTS交易平台的開發過程。