Elasticsearch+Mongo億級別數據導入及查詢實踐


數據方案:
  • 在Elasticsearch中通過code及time字段查詢對應doc的mongo_id字段獲得mongodb中的主鍵_id
  • 通過獲得id再進入mongodb進行查詢
 
1,數據情況:
  • 全部為股票及指數的分鍾K線數據(股票代碼區分度較高)
  • Elasticsearch及mongodb都未分片且未優化參數配置,mongo表中只有主鍵_id索引
  • mongodb數據量:

    

  • Elasticsearch數據量:

    

2,將數據從mongo源庫導入Elasticsearch

import time
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
es = Elasticsearch()

conn = MongoClient('127.0.0.1', 27017)
db = conn.kline_db
my_set = db.min_kline
x = 1
tmp = []

#此處有個坑mongo查詢時由於數據量比較大時間較長需要設置游標不過期:no_cursor_timeout=True
for i in my_set.find(no_cursor_timeout=True):
    x+=1
    #每次插入100000條
    if x%100000 == 99999:
        #es批量插入
        success, _ = bulk(es, tmp, index='test_2', raise_on_error=True)
        print('Performed %d actions' % success)
        tmp = []
    if i['market'] == 'sz':
        market = 0
    else:
        market = 1
    #此處有個秒數時間類型及時區轉換
    tmp.append({"_index":'test_2',"_type": 'kline','_source':{'code':i['code'],'market':market,\
                'time':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(i['kline_time']/1000 - 8*60*60))\
                ,'mongo_id':str(i['_id'])}})

#將最后剩余在tmp中的數據插入
if len(tmp)>0:
    success, _ = bulk(es, tmp, index='test_2', raise_on_error=True)
    print('Performed %d actions' % success)

3,Elasticsearch+mongo查詢時間統計

import time
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from bson.objectid import ObjectId

#es連接
es = Elasticsearch()

#mongo連接
conn = MongoClient('127.0.0.1', 27017)
db = conn.kline_db  #連接kline_db數據庫,沒有則自動創建
my_set = db.min_kline

tmp = []

#計算運行時間裝飾器
def cal_run_time(func):
    def wrapper(*args,**kwargs):
        start_time = time.time()
        res = func(*args,**kwargs)
        end_time = time.time()
        print(str(func) +'---run time--- %s' % str(end_time-start_time))
        return res
    return wrapper

@cal_run_time
def query_in_mongo(tmp_list):
    k_list = []
    kline_data = my_set.find({'_id':{'$in':tmp_list}})
    for k in kline_data:
        k_list.append(k)
    return k_list

@cal_run_time
def query_in_es():
    #bool多條件查詢 must相當於and
    body = {
        "query": {
            "bool": {
                "must": [{
                    "range": {#范圍查詢
                        "time": {
                            "gte": '2017-01-10 00:00:00',  # >=
                            "lte": '2017-04-12 00:00:00'  # <=
                        }
                    }
                },
                    {"terms": {# == 或  in:terms 精確查詢
                        "code": ['000002','000001']
                    }
                    }
                ]
            }

        }
    }

    #根據body條件記性查詢
    scanResp = scan(es, body, scroll="10m", index="test_2",doc_type="kline", timeout="10m")

    #解析結果字典並放入tmp列表中
    for resp in scanResp:
        tmp.append(ObjectId(resp['_source']['mongo_id']))

    print(len(tmp))

    #--------------此處有個坑,直接使用search方法查詢到的結果集中最多只有10條記錄----------------
    # zz = es.search(index="test_2", doc_type="kline", body=body)
    # print(zz['hits']['total'])
    # for resp in zz['hits']['hits']:
    #     tmp.append(ObjectId(resp['_source']['mongo_id']))

query_in_es()

query_in_mongo(tmp)

運行結果如下:

第一行:查詢的doc個數:28320

第二行:es查詢所用時間:0.36s

第三行:mongo使用_id查詢所用時間 :0.34s

從結果來看對於3億多數據的查詢Elasticsearch的速度還是相當不錯的

※Elasticsearch主要的優勢在於可以進行快速的分詞模糊查詢,所以股票K線這個場景並沒有充分發揮其優勢,至於查詢效率,其實mysql,mongo等只要分庫分表合理一樣能夠達到。

※Elasticsearch+Mongo這個架構主要針對場景:使用mongo存儲海量數據,且這張表讀寫都很頻繁。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM