量化學習:批量將歷史數據以vnpy的數據格式添加到mongodb


由於期貨合約歷史的分鍾線數據過多,導入時間特別長,在這個過程中針對遇到的導入過慢的問題的解決思路如下:

1.首先,vnpy原本的loadcsv功能,是將csv文件讀取以后,按行進行數據轉換,按每一條的時間replaceone更新到mongodb中,由於本次是一次性向數據庫插入數據,所以處理后通過insert_many的動作,進行一次性插入,提高插入數據的效率。

2.再后來還是速度比較慢,考慮通過多線程的方式,增加線程數量,來提高效率。最后增加了一個線程池,來完成多線程的插入。

3.以上動作插入后,還覺得有些慢,考慮是不是向mongodb插入的時候,多個同時插入大量數據造成的插入效率低,后來沒有再驗證優化了。

5.考慮每一條數據都需要進行格式化處理,形成vnpy的vtBar結構,是否可以通過dataframe的操作,一次性形成所需要的格式,提升處理效率

6.由於有些文件有幾十M,pandas在讀取的時候也會花費一定的時間,是否可以通過其它的方式進行讀取,提高讀取的效率

在進行前兩步的優化后,導入效率比之前有較為名顯的提升,4403的文件,接近20個G

def loadHistoryData():
    file_list = os.listdir(data_path)
    file_list = file_list[last - 1:]
    global pos
    pos = last
    # 上次添加到670, BU1512已導入
    global count
    count = len(file_list)
    # 增加4個線程的線程池,多線程來提高導入效率
    pool = threadpool.ThreadPool(4)
    requests = threadpool.makeRequests(loadCsvData, file_list)
    for req in requests:
        pool.putRequest(req)
    pool.wait()

    print('--------歷史數據導入完成--------')

增加線程池的,提高處理效率

def loadCsvData(file_name):
    start = time()
    if file_lock.acquire():
        symbol_name = file_name[0: -8]
        file_path = data_path + '\\' + file_name
        print(u'合約%s數據開始導入' % (symbol_name))
        file_lock.release()

    if symbol_name[0: -4] in futures_symbol_map.keys():
        symbol_name = futures_symbol_map[symbol_name[0: -4]] + symbol_name[-4:]

    minute_df = pd.read_csv(file_path, encoding='GBK')
    global pos

    if pos_lock.acquire():
        pos += 1
        pos_index = pos
        pos_lock.release()
    if minute_df.empty:
        print(u'合約%s數據為空跳過,進度(%s / %s)' % (symbol_name, str(pos_index), str(count)))
        return

    cl = db[symbol_name]
    cl.ensure_index([('datetime', ASCENDING)], unique=True)  # 添加索引
    data_list = []
    for index, row in minute_df.iterrows():
        bar = generateVtBar(symbol_name, row)
        d = bar.__dict__
        data_list.append(d)

    cl.insert_many(data_list)

    e = time()
    cost = (e - start) * 1000

    print(u'合約%s數據導入完成,耗時%s毫秒,進度(%s / %s)' % (symbol_name, cost, str(pos_index), str(count)))

一次性插入一個合約的全部數據,減少與mongodb的交互,提高插入效率

 

Github地址:JQDataServiceForVNPY


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM