python異步加協程獲取比特幣市場信息


目標

  選取幾個比特幣交易量大的幾個交易平台,查看對應的API,獲取該市場下貨幣對的ticker和depth信息。我們從網站上選取4個交易平台:bitfinex、okex、binance、gdax。對應的交易對是BTC/USD,BTC/USDT,BTC/USDT,BTC/USD。

 

、ccxt

  開始想着直接請求市場的API,然后再解析獲取下來的數據,但到github上發現一個比較好得python庫,里面封裝好了獲取比特幣市場的相關函數,這樣一來就省掉分析API的時間了。因此我只要傳入市場以及對應的貨幣對,利用庫里面的函數 fetch_ticker 和 fetch_order_book 就可以獲取到市場的ticker和depth信息(具體的使用方法可以查看ccxt手冊)。接下來以市場okex為例,利用ccxt庫獲取okex的ticker和depth信息。

# 引入庫
import ccxt

# 實例化市場
exchange = ccxt.okex()
# 交易對
symbol = 'BTC/USDT'

# 獲取ticker信息
ticker = exchange.fetch_ticker(symbol)
# 獲取depth信息
depth = exchange.fetch_order_book(symbol)

print('ticker:%s, depth:%s' % (ticker, depth))

   運行后會得到結果如下圖,從此可以看出已經獲取到了ticker和depth信息。

 

 

 二、獲取四個市場的信息(for循環)

   接下來我們獲取四個市場的信息,深度里面有asks和bids,數據量稍微有點兒多,這里depth信息我只去前面五個,對於ticker我也只提取里面的info信息(具體代表什么含義就要參考一下對應市場的API啦)。將其簡單的封裝后,最開始我想的是for循環。想到啥就開始吧:

# 引入庫
import ccxt
import time

now = lambda: time.time()
start = now()

def getData(exchange, symbol):
    data = {}  # 用於存儲ticker和depth信息
    # 獲取ticker信息
    tickerInfo = exchange.fetch_ticker(symbol)
    # 獲取depth信息
    depth = {}
    # 獲取深度信息
    exchange_depth = exchange.fetch_order_book(symbol)
    # 獲取asks,bids 最低5個,最高5個信息
    asks = exchange_depth.get('asks')[:5]
    bids = exchange_depth.get('bids')[:5]
    depth['asks'] = asks
    depth['bids'] = bids

    data['ticker'] = tickerInfo
    data['depth'] = depth

    return data

def main():
    # 實例化市場
    exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()]
    # 交易對
    symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']

    for i in range(len(exchanges)):
        exchange = exchanges[i]
        symbol = symbols[i]
        data = getData(exchange, symbol)
        print('exchange: %s data is %s' % (exchange.id, data))

if __name__ == '__main__':
    main()
    print('Run Time: %s' % (now() - start))

   運行后會發現雖然每個市場的信息都獲取到了,執行完差不多花掉5.7秒,因為這是同步的,也就是按順序執行的,要是要想每隔一定時間同時獲取四個市場的信息,很顯然這種結果不符合我們的要求。

 

 

三、異步加協程(coroutine)

  前面講的循環雖然可以輸出結果,但耗時長而且達不到想要的效果,接下來采用異步加協程(參考知乎上的一篇文章),要用到異步首先得引入asyncio庫,這個庫是3.4以后才有的,它提供了一種機制,使得你可以用協程(coroutines)、IO復用(multiplexing I/O)在單線程環境中編寫並發模型。這里python文檔有個小例子。

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

  

 

  當事件循環開始運行時,它會在Task中尋找coroutine來執行調度,因為事件循環注冊了print_sum(),因此print_sum()被調用,執行result = await compute(x, y)這條語句(等同於result = yield from compute(x, y)),因為compute()自身就是一個coroutine,因此print_sum()這個協程就會暫時被掛起,compute()被加入到事件循環中,程序流執行compute()中的print語句,打印”Compute %s + %s …”,然后執行了await asyncio.sleep(1.0),因為asyncio.sleep()也是一個coroutine,接着compute()就會被掛起,等待計時器讀秒,在這1秒的過程中,事件循環會在隊列中查詢可以被調度的coroutine,而因為此前print_sum()compute()都被掛起了,因此事件循環會停下來等待協程的調度,當計時器讀秒結束后,程序流便會返回到compute()中執行return語句,結果會返回到print_sum()中的result中,最后打印result,事件隊列中沒有可以調度的任務了,此時loop.close()把事件隊列關閉,程序結束。

  接下來我們采用異步和協程(ps:ccxt庫也有對應的異步),運行后發現時間只用了1.9秒,比之前快了好多倍。  

Run Time: 1.9661316871643066 

相關代碼:

# 引入庫
import ccxt.async as ccxt
import asyncio
import time

now = lambda: time.time()
start = now()

async def getData(exchange, symbol):
    data = {}  # 用於存儲ticker和depth信息
    # 獲取ticker信息
    tickerInfo = await exchange.fetch_ticker(symbol)
    # 獲取depth信息
    depth = {}
    # 獲取深度信息
    exchange_depth = await exchange.fetch_order_book(symbol)
    # 獲取asks,bids 最低5個,最高5個信息
    asks = exchange_depth.get('asks')[:5]
    bids = exchange_depth.get('bids')[:5]
    depth['asks'] = asks
    depth['bids'] = bids

    data['ticker'] = tickerInfo
    data['depth'] = depth

    return data

def main():
    # 實例化市場
    exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()]
    # 交易對
    symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']

    tasks = []
    for i in range(len(exchanges)):
        task = getData(exchanges[i], symbols[i])
        tasks.append(asyncio.ensure_future(task))

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

if __name__ == '__main__':
    main()
    print('Run Time: %s' % (now() - start))

  

 三、定時爬取並用mongodb保存數據

  在前面的基礎上,添加一個定時任務,實現每隔一段時間爬取一次數據,並將數據保存到mongodb數據庫。只需再前面的代碼上稍微改改就可以啦,代碼和運行結果如下:

 

import asyncio
import ccxt.async as ccxt
import time
import pymongo

# 獲取ticker和depth信息
async def get_exchange_tickerDepth(exchange, symbol):  # 其中exchange為實例化后的市場
    # print('start get_ticker')
    while True:
        print('%s is run %s' % (exchange.id, time.ctime()))

        # 獲取ticher信息
        tickerInfo = await exchange.fetch_ticker(symbol)
        ticker = tickerInfo.get('info')

        if type(ticker) == type({}):
            ticker['timestamp'] = tickerInfo.get('timestamp')
            ticker['high'] = tickerInfo.get('high')
            ticker['low'] = tickerInfo.get('low')
            ticker['last'] = tickerInfo.get('last')
        else:
            ticker = tickerInfo
        # print(ticker)

        # 獲取深度信息
        depth = {}
        exchange_depth = await exchange.fetch_order_book(symbol)
        # 獲取asks,bids 最低5個,最高5個信息
        asks = exchange_depth.get('asks')[:5]
        bids = exchange_depth.get('bids')[:5]
        depth['asks'] = asks
        depth['bids'] = bids
        # print('depth:{}'.format(depth))
        data = {
            'exchange': exchange.id,
            'countries': exchange.countries,
            'symbol': symbol,
            'ticker': ticker,
            'depth': depth
        }

        # 保存數據
        save_exchangeDate(exchange.id, data)
        print('********* %s is finished, time %s *********' % (exchange.id, time.ctime()))

        # 等待時間
        await asyncio.sleep(2)


# 存庫
def save_exchangeDate(exchangeName, data):
    # 鏈接MongoDB
    connect = pymongo.MongoClient(host='localhost', port=27017)
    # 創建數據庫
    exchangeData = connect['exchangeDataAsyncio']
    # 創建表
    exchangeInformation = exchangeData[exchangeName]
    # print(table_name)
    # 數據去重后保存
    count = exchangeInformation.count()
    if not count > 0:
        exchangeInformation.insert_one(data)
    else:
        for item in exchangeInformation.find().skip(count - 1):
            lastdata = item
        if lastdata['ticker']['timestamp'] != data['ticker']['timestamp']:
            exchangeInformation.insert_one(data)

def main():
    exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(),
                  ccxt.gdax()]
    symbols = ['BTC/USDT', 'BTC/USD', 'BTC/USDT', 'BTC/USD']
    tasks = []
    for i in range(len(exchanges)):
        task = get_exchange_tickerDepth(exchanges[i], symbols[i])
        tasks.append(asyncio.ensure_future(task))

    loop = asyncio.get_event_loop()

    try:
        # print(asyncio.Task.all_tasks(loop))
        loop.run_forever()

    except Exception as e:
        print(e)
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()

if __name__ == '__main__':
    main()

 

五、小結

  使用協程可以實現高效的並發任務。Python在3.4中引入了協程的概念,可是這個還是以生成器對象為基礎,3.5則確定了協程的語法。這里只簡單的使用了asyncio。當然實現協程的不僅僅是asyncio,tornado和gevent都實現了類似的功能。這里我有一個問題,就是運行一段時間后,里面的市場可能有請求超時等情況導致協程停止運行,我要怎樣才能獲取到錯誤然后重啟對應的協程。如果有大神知道的話請指點指點。

 

六、參考鏈接

1. Python黑魔法 --- 異步IO( asyncio) 協程  http://python.jobbole.com/87310/

2. Python並發編程之協程/異步IO  https://www.ziwenxie.site/2016/12/19/python-asyncio/

3. 從0到1,Python異步編程的演進之路  https://zhuanlan.zhihu.com/p/25228075

4. Tasks and coroutines  https://docs.python.org/3/library/asyncio-task.html

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM