關於MainEngine的代碼閱讀
在入口文件中,我們看到了除了窗體界面的產生,還有關於MainEngine
和EventEngin
部分。今天來學習下MainEngine
的代碼。
首先在run代碼中,我們看到以下的代碼
main_engine.add_gateway(DeribitGateway)
main_engine.add_app(OptionMasterApp)
從上述代碼可以基本猜測所有的網管,設置,甚至策略引擎行情,都跟MainEngine有關系,MainEngine應該是一個主線,把所有的組件穿插起來了。想必MainEngine一定是一條重要的大魚。下面我們進入MainEngin開始學習 位置:\vnpy\trader\engine.py
MainEngine的大概脈絡
class MainEngine:
#初始化
def __init__(self, event_engine: EventEngine = None):
pass
#添加引擎
def add_engine(self, engine_class: Any):
pass
#添加網管
def add_gateway(self, gateway_class: Type[BaseGateway]):
pass
#添加app
def add_app(self, app_class: Type[BaseApp]):
pass
#初始化引擎
def init_engines(self):
pass
#寫入日志
def write_log(self, msg: str, source: str = ""):
pass
#獲得引擎
def get_engine(self, engine_name: str):
pass
#獲得默認設置
def get_default_setting(self, gateway_name: str):
pass
#獲得所有引擎的名字
def get_all_gateway_names(self):
pass
#獲得所有的APP
def get_all_apps(self):
pass
#獲得所有的交易所
def get_all_exchanges(self):
pass
#連接到行情
def connect(self, setting: dict, gateway_name: str):
pass
#合約訂閱
def subscribe(self, req: SubscribeRequest, gateway_name: str):
pass
#下單
def send_order(self, req: OrderRequest, gateway_name: str):
pass
#取消訂單
def cancel_order(self, req: CancelRequest, gateway_name: str):
pass
#批量下單
def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str):
pass
#批量取消訂單
def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str):
pass
#歷史查詢
def query_history(self, req: HistoryRequest, gateway_name: str):
pass
#關閉
def close(self):
pass
我把所有的方法體都去掉,僅僅保留方法名稱和參數,從上面的結構基本上能看出來MainEngine
幾乎是一個把各個組件(GateWay, App, Engin)都連接在一起,同時提供了跟交易相關的連接、訂閱、下單、撤單、歷史記錄等都相關的柔和在一起的大雜燴。接下來我們就一個一個方法的閱讀學習。
__init__
def __init__(self, event_engine: EventEngine = None):
""""""
if event_engine:
self.event_engine = event_engine
else:
self.event_engine = EventEngine()
self.event_engine.start()
self.gateways = {}
self.engines = {}
self.apps = {}
self.exchanges = []
os.chdir(TRADER_DIR) # Change working directory
self.init_engines() # Initialize function engines
初始化的代碼基本上就是實例化事件引擎(EventEngin),然后啟動,同時為getways
,engines
,apps
,exchange
建立一個字典,同時調用init_engines()的方法。
各種add方法,AddEngine, AddGateWay, AddApp
def add_engine(self, engine_class: Any):
"""
Add function engine.
"""
engine = engine_class(self, self.event_engine)
self.engines[engine.engine_name] = engine
return engine
def add_gateway(self, gateway_class: Type[BaseGateway]):
"""
Add gateway.
"""
gateway = gateway_class(self.event_engine)
self.gateways[gateway.gateway_name] = gateway
# Add gateway supported exchanges into engine
for exchange in gateway.exchanges:
if exchange not in self.exchanges:
self.exchanges.append(exchange)
return gateway
def add_app(self, app_class: Type[BaseApp]):
"""
Add app.
"""
app = app_class()
self.apps[app.app_name] = app
engine = self.add_engine(app.engine_class)
return engine
上述3個Add方法比較簡單,基本上都是把添加各種APP、Engine、Gateway的話,都把它們的實例化和name形成鍵值對放入map中去。唯一我們可以得到的線索是gateway 和Exchanges之間應該是有對應關系的話,話一句話說,gateway就是交易場所的API對接網管。所以每次添加一個gateway,就添加了一個交易所名稱。
get方法
def get_gateway(self, gateway_name: str):
"""
Return gateway object by name.
"""
gateway = self.gateways.get(gateway_name, None)
if not gateway:
self.write_log(f"找不到底層接口:{gateway_name}")
return gateway
def get_engine(self, engine_name: str):
"""
Return engine object by name.
"""
engine = self.engines.get(engine_name, None)
if not engine:
self.write_log(f"找不到引擎:{engine_name}")
return engine
def get_default_setting(self, gateway_name: str):
"""
Get default setting dict of a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.get_default_setting()
return None
基本上是從map中根據指定的名字獲得對象的方法。一目了然。get_default_setting
方法應該屬於get_gateway的一個下屬方法,獲得gateway的setting.
get_all方法
def get_all_apps(self):
"""
Get all app objects.
"""
return list(self.apps.values())
def get_all_exchanges(self):
"""
Get all exchanges.
"""
return self.exchanges
也是對add進入的對象進行全局獲取的方法。
connect
def connect(self, setting: dict, gateway_name: str):
"""
Start connection of a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.connect(setting)
基本上可以理解connect是對gateway的一種裝飾方法。通過add可以插入gateway,然后調用connect方法的話,可以通過gateway_name獲得接口,然后再調用gateway的connect方法。不出意料下面的訂閱行情、下單、撤單、批量撤單都是類似的作用。
gateway的其他類似方法
def subscribe(self, req: SubscribeRequest, gateway_name: str):
"""
Subscribe tick data update of a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.subscribe(req)
def send_order(self, req: OrderRequest, gateway_name: str):
"""
Send new order request to a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.send_order(req)
else:
return ""
def cancel_order(self, req: CancelRequest, gateway_name: str):
"""
Send cancel order request to a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.cancel_order(req)
def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str):
"""
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.send_orders(reqs)
else:
return ["" for req in reqs]
def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str):
"""
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.cancel_orders(reqs)
def query_history(self, req: HistoryRequest, gateway_name: str):
"""
Send cancel order request to a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.query_history(req)
else:
return None
init_engine
def init_engines(self):
"""
Init all engines.
"""
self.add_engine(LogEngine)
self.add_engine(OmsEngine)
self.add_engine(EmailEngine)
把日志, OMSEngin, EmialEngin裝配進來
close 方法
def close(self):
"""
Make sure every gateway and app is closed properly before
programme exit.
"""
# Stop event engine first to prevent new timer event.
self.event_engine.stop()
for engine in self.engines.values():
engine.close()
for gateway in self.gateways.values():
gateway.close()
基本上是程序退出以后的善后工作罷了。
總結
通過對MainEngin代碼的梳理,我們看到其實MainEngine本身沒有什么深奧之處。就是一個適配器的模式,把所有的操作抽象成了APP,Gateway, Engine。並且提供了一個統一的操作入口,這樣可以方便實現擴展。