目標
選取幾個比特幣交易量大的幾個交易平台,查看對應的API,獲取該市場下貨幣對的ticker和depth信息。我們從網站上選取4個交易平台:bitfinex、okex、binance、gdax。對應的交易對是BTC/USD,BTC/USDT,BTC/USDT,BTC/USD。

一、ccxt庫
開始想着直接請求市場的API,然后再解析獲取下來的數據,但到github上發現一個比較好得python庫,里面封裝好了獲取比特幣市場的相關函數,這樣一來就省掉分析API的時間了。因此我只要傳入市場以及對應的貨幣對,利用庫里面的函數 fetch_ticker 和 fetch_order_book 就可以獲取到市場的ticker和depth信息(具體的使用方法可以查看ccxt手冊)。接下來以市場okex為例,利用ccxt庫獲取okex的ticker和depth信息。
# 引入庫
import ccxt
# 實例化市場
exchange = ccxt.okex()
# 交易對
symbol = 'BTC/USDT'
# 獲取ticker信息
ticker = exchange.fetch_ticker(symbol)
# 獲取depth信息
depth = exchange.fetch_order_book(symbol)
print('ticker:%s, depth:%s' % (ticker, depth))
運行后會得到結果如下圖,從此可以看出已經獲取到了ticker和depth信息。

二、獲取四個市場的信息(for循環)
接下來我們獲取四個市場的信息,深度里面有asks和bids,數據量稍微有點兒多,這里depth信息我只去前面五個,對於ticker我也只提取里面的info信息(具體代表什么含義就要參考一下對應市場的API啦)。將其簡單的封裝后,最開始我想的是for循環。想到啥就開始吧:
# 引入庫
import ccxt
import time
now = lambda: time.time()
start = now()
def getData(exchange, symbol):
data = {} # 用於存儲ticker和depth信息
# 獲取ticker信息
tickerInfo = exchange.fetch_ticker(symbol)
# 獲取depth信息
depth = {}
# 獲取深度信息
exchange_depth = exchange.fetch_order_book(symbol)
# 獲取asks,bids 最低5個,最高5個信息
asks = exchange_depth.get('asks')[:5]
bids = exchange_depth.get('bids')[:5]
depth['asks'] = asks
depth['bids'] = bids
data['ticker'] = tickerInfo
data['depth'] = depth
return data
def main():
# 實例化市場
exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()]
# 交易對
symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']
for i in range(len(exchanges)):
exchange = exchanges[i]
symbol = symbols[i]
data = getData(exchange, symbol)
print('exchange: %s data is %s' % (exchange.id, data))
if __name__ == '__main__':
main()
print('Run Time: %s' % (now() - start))
運行后會發現雖然每個市場的信息都獲取到了,執行完差不多花掉5.7秒,因為這是同步的,也就是按順序執行的,要是要想每隔一定時間同時獲取四個市場的信息,很顯然這種結果不符合我們的要求。

三、異步加協程(coroutine)
前面講的循環雖然可以輸出結果,但耗時長而且達不到想要的效果,接下來采用異步加協程(參考知乎上的一篇文章),要用到異步首先得引入asyncio庫,這個庫是3.4以后才有的,它提供了一種機制,使得你可以用協程(coroutines)、IO復用(multiplexing I/O)在單線程環境中編寫並發模型。這里python文檔有個小例子。
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

當事件循環開始運行時,它會在Task中尋找coroutine來執行調度,因為事件循環注冊了print_sum(),因此print_sum()被調用,執行result = await compute(x, y)這條語句(等同於result = yield from compute(x, y)),因為compute()自身就是一個coroutine,因此print_sum()這個協程就會暫時被掛起,compute()被加入到事件循環中,程序流執行compute()中的print語句,打印”Compute %s + %s …”,然后執行了await asyncio.sleep(1.0),因為asyncio.sleep()也是一個coroutine,接着compute()就會被掛起,等待計時器讀秒,在這1秒的過程中,事件循環會在隊列中查詢可以被調度的coroutine,而因為此前print_sum()與compute()都被掛起了,因此事件循環會停下來等待協程的調度,當計時器讀秒結束后,程序流便會返回到compute()中執行return語句,結果會返回到print_sum()中的result中,最后打印result,事件隊列中沒有可以調度的任務了,此時loop.close()把事件隊列關閉,程序結束。
接下來我們采用異步和協程(ps:ccxt庫也有對應的異步),運行后發現時間只用了1.9秒,比之前快了好多倍。
Run Time: 1.9661316871643066
相關代碼:
# 引入庫
import ccxt.async as ccxt
import asyncio
import time
now = lambda: time.time()
start = now()
async def getData(exchange, symbol):
data = {} # 用於存儲ticker和depth信息
# 獲取ticker信息
tickerInfo = await exchange.fetch_ticker(symbol)
# 獲取depth信息
depth = {}
# 獲取深度信息
exchange_depth = await exchange.fetch_order_book(symbol)
# 獲取asks,bids 最低5個,最高5個信息
asks = exchange_depth.get('asks')[:5]
bids = exchange_depth.get('bids')[:5]
depth['asks'] = asks
depth['bids'] = bids
data['ticker'] = tickerInfo
data['depth'] = depth
return data
def main():
# 實例化市場
exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()]
# 交易對
symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']
tasks = []
for i in range(len(exchanges)):
task = getData(exchanges[i], symbols[i])
tasks.append(asyncio.ensure_future(task))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__':
main()
print('Run Time: %s' % (now() - start))
三、定時爬取並用mongodb保存數據
在前面的基礎上,添加一個定時任務,實現每隔一段時間爬取一次數據,並將數據保存到mongodb數據庫。只需再前面的代碼上稍微改改就可以啦,代碼和運行結果如下:
import asyncio
import ccxt.async as ccxt
import time
import pymongo
# 獲取ticker和depth信息
async def get_exchange_tickerDepth(exchange, symbol): # 其中exchange為實例化后的市場
# print('start get_ticker')
while True:
print('%s is run %s' % (exchange.id, time.ctime()))
# 獲取ticher信息
tickerInfo = await exchange.fetch_ticker(symbol)
ticker = tickerInfo.get('info')
if type(ticker) == type({}):
ticker['timestamp'] = tickerInfo.get('timestamp')
ticker['high'] = tickerInfo.get('high')
ticker['low'] = tickerInfo.get('low')
ticker['last'] = tickerInfo.get('last')
else:
ticker = tickerInfo
# print(ticker)
# 獲取深度信息
depth = {}
exchange_depth = await exchange.fetch_order_book(symbol)
# 獲取asks,bids 最低5個,最高5個信息
asks = exchange_depth.get('asks')[:5]
bids = exchange_depth.get('bids')[:5]
depth['asks'] = asks
depth['bids'] = bids
# print('depth:{}'.format(depth))
data = {
'exchange': exchange.id,
'countries': exchange.countries,
'symbol': symbol,
'ticker': ticker,
'depth': depth
}
# 保存數據
save_exchangeDate(exchange.id, data)
print('********* %s is finished, time %s *********' % (exchange.id, time.ctime()))
# 等待時間
await asyncio.sleep(2)
# 存庫
def save_exchangeDate(exchangeName, data):
# 鏈接MongoDB
connect = pymongo.MongoClient(host='localhost', port=27017)
# 創建數據庫
exchangeData = connect['exchangeDataAsyncio']
# 創建表
exchangeInformation = exchangeData[exchangeName]
# print(table_name)
# 數據去重后保存
count = exchangeInformation.count()
if not count > 0:
exchangeInformation.insert_one(data)
else:
for item in exchangeInformation.find().skip(count - 1):
lastdata = item
if lastdata['ticker']['timestamp'] != data['ticker']['timestamp']:
exchangeInformation.insert_one(data)
def main():
exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(),
ccxt.gdax()]
symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']
tasks = []
for i in range(len(exchanges)):
task = get_exchange_tickerDepth(exchanges[i], symbols[i])
tasks.append(asyncio.ensure_future(task))
loop = asyncio.get_event_loop()
try:
# print(asyncio.Task.all_tasks(loop))
loop.run_forever()
except Exception as e:
print(e)
loop.stop()
loop.run_forever()
finally:
loop.close()
if __name__ == '__main__':
main()

五、小結
使用協程可以實現高效的並發任務。Python在3.4中引入了協程的概念,可是這個還是以生成器對象為基礎,3.5則確定了協程的語法。這里只簡單的使用了asyncio。當然實現協程的不僅僅是asyncio,tornado和gevent都實現了類似的功能。這里我有一個問題,就是運行一段時間后,里面的市場可能有請求超時等情況導致協程停止運行,我要怎樣才能獲取到錯誤然后重啟對應的協程。如果有大神知道的話請指點指點。
六、參考鏈接
1. Python黑魔法 --- 異步IO( asyncio) 協程 http://python.jobbole.com/87310/
2. Python並發編程之協程/異步IO https://www.ziwenxie.site/2016/12/19/python-asyncio/
3. 從0到1,Python異步編程的演進之路 https://zhuanlan.zhihu.com/p/25228075
4. Tasks and coroutines https://docs.python.org/3/library/asyncio-task.html
