由於期貨合約歷史的分鍾線數據過多,導入時間特別長,在這個過程中針對遇到的導入過慢的問題的解決思路如下:
1.首先,vnpy原本的loadcsv功能,是將csv文件讀取以后,按行進行數據轉換,按每一條的時間replaceone更新到mongodb中,由於本次是一次性向數據庫插入數據,所以處理后通過insert_many的動作,進行一次性插入,提高插入數據的效率。
2.再后來還是速度比較慢,考慮通過多線程的方式,增加線程數量,來提高效率。最后增加了一個線程池,來完成多線程的插入。
3.以上動作插入后,還覺得有些慢,考慮是不是向mongodb插入的時候,多個同時插入大量數據造成的插入效率低,后來沒有再驗證優化了。
5.考慮每一條數據都需要進行格式化處理,形成vnpy的vtBar結構,是否可以通過dataframe的操作,一次性形成所需要的格式,提升處理效率
6.由於有些文件有幾十M,pandas在讀取的時候也會花費一定的時間,是否可以通過其它的方式進行讀取,提高讀取的效率

在進行前兩步的優化后,導入效率比之前有較為名顯的提升,4403的文件,接近20個G

def loadHistoryData(): file_list = os.listdir(data_path) file_list = file_list[last - 1:] global pos pos = last # 上次添加到670, BU1512已導入 global count count = len(file_list) # 增加4個線程的線程池,多線程來提高導入效率 pool = threadpool.ThreadPool(4) requests = threadpool.makeRequests(loadCsvData, file_list) for req in requests: pool.putRequest(req) pool.wait() print('--------歷史數據導入完成--------')
增加線程池的,提高處理效率
def loadCsvData(file_name): start = time() if file_lock.acquire(): symbol_name = file_name[0: -8] file_path = data_path + '\\' + file_name print(u'合約%s數據開始導入' % (symbol_name)) file_lock.release() if symbol_name[0: -4] in futures_symbol_map.keys(): symbol_name = futures_symbol_map[symbol_name[0: -4]] + symbol_name[-4:] minute_df = pd.read_csv(file_path, encoding='GBK') global pos if pos_lock.acquire(): pos += 1 pos_index = pos pos_lock.release() if minute_df.empty: print(u'合約%s數據為空跳過,進度(%s / %s)' % (symbol_name, str(pos_index), str(count))) return cl = db[symbol_name] cl.ensure_index([('datetime', ASCENDING)], unique=True) # 添加索引 data_list = [] for index, row in minute_df.iterrows(): bar = generateVtBar(symbol_name, row) d = bar.__dict__ data_list.append(d) cl.insert_many(data_list) e = time() cost = (e - start) * 1000 print(u'合約%s數據導入完成,耗時%s毫秒,進度(%s / %s)' % (symbol_name, cost, str(pos_index), str(count)))
一次性插入一個合約的全部數據,減少與mongodb的交互,提高插入效率
