Elasticsearch使用備忘


最近我們需要對大約2T(6.5億條)日志做全文檢索,Elasticsearch看起來很火爆,又有很多產品使用(Facebook、github、stackoverflow),值得一試。以下是一些基礎知識筆記。

Elasticsearch是一個基於Lucene構建的開源、分布式、RESTful的搜索引擎,能夠實現近實時(NRT)搜索,穩定、可靠、安裝方便。性能不錯、水平擴展、文檔齊全、社區火爆,這幾點很重要。

如果之前已經了解過分布式存儲系統、query切詞、檢索相關性策略,Elasticsearch的使用學習起來會很快。

1 基礎概念

Elasticsearch是一個近實時的系統,從你寫入數據到數據可以被檢索到,一般會有1秒鍾的延時。Elasticsearch是基於Lucene的,Lucene的讀寫是兩個分開的句柄,往寫句柄寫入的數據刷新之后,讀句柄重新打開,這才能讀到新寫入的數據。

名詞解釋:

Cluster:集群。

Index:索引,Index相當於關系型數據庫的DataBase。

Type:類型,這是索引下的邏輯划分,一般把有共性的文檔放到一個類型里面,相當於關系型數據庫的table。

Document:文檔,Json結構,這點跟MongoDB差不多。

Shard、Replica:分片,副本。

分片有兩個好處,一個是可以水平擴展,另一個是可以並發提高性能。在網絡環境下,可能會有各種導致分片無法正常工作的問題,所以需要有失敗預案。ES支持把分片拷貝出一份或者多份,稱為副本分片,簡稱副本。副本有兩個好處,一個是實現高可用(HA,High Availability),另一個是利用副本提高並發檢索性能。

分片和副本的數量可以在創建index的時候指定,index創建之后,只能修改副本數量,不能修改分片。

健康狀態:

安裝了head插件之后,可以在web上看到集群健康狀態,集群處於綠色表示當前一切正常,集群處於黃色表示當前有些副本不正常,集群處於紅色表示部分數據無法正常提供。綠色和黃色狀態下,集群都是能提供完整數據的,紅色狀態下集群提供的數據是有缺失的。

2 搭建ElasticSearch

首先安裝java,設置好JAVA_HOME環境變量(export JAVA_HOME=.../java8),然后安裝Elasticsearch。

參考官方文檔:https://www.elastic.co/guide/en/elasticsearch/reference/current/_installation.html

設置配置的時候,ES可能因為各種原因不能自動找到集群,所以把地址也設置上,如:

discovery.zen.ping.unicast.hosts: ["host_name...:9301", "host_name_xxx:port_yyy"...]

安裝head插件:拉取 https://github.com/mobz/elasticsearch-head 代碼,將其放到./plugins/head 目錄下。

啟動之前設置ES使用的內存:export ES_HEAP_SIZE=10g。

elasticsearcy.yml配置文件中的一些配置點:

#設置集群名字
 cluster.name: cswuyg_qa_pair_test
#設置node名字
 node.name: xxx-node
#設置節點域名
 network.host: 10.111.111.1
#設置內部傳輸端口和外部HTTP訪問端口
 transport.tcp.port: 9302
 http.port: 8302
#設置集群其它節點地址
 discovery.zen.ping.unicast.hosts: ["xxxhost:yyyport"]
#設置中文切詞插件
 index.analysis.analyzer.ik.type: "ik"

elasticsearch -d 以守護進程方式啟動,啟動之后,就可以在瀏覽器里使用head插件看到集群信息,如:

http://host_name_xxx:port_yyy/_plugin/head/

上圖:啟動了三個Elasticsearch實例,創建了三個Index;ceshi Index有一主shard,兩replica shard;qa_pair1 Index只有主shard;website Index有一主shard,一replica shard。

3 測試Elasticsearch使用

Elasticsearch提供RESTful API,我采用Postman(chrome的一個插件)作為輔助客戶端向ES發送請求。

可以向任意一個節點發起請求,雖然ES有Master的概念,但任意一個node都可以接受讀寫請求。

先創建一個index:

POST http://10.11.111.11:8301/test_index

查看創建的index:

GET  http://10.11.111.11:8301/_cat/indices?v

寫入數據:

查詢數據:

(1)使用id直接查:
GET http://xxxhost:8201/qa_xx2/qa_xx3/1235
(2)DSL查詢:

往查詢url POST數據即可:
URL格式:http://xxxhost:8201/qa_xx2/qa_xx3/_search

a. 查詢title中包含有cswuyg字段的文檔。Highlight設置高亮命中的詞。POST方法的body:

{
    "query": {
        "match": {
            "title": {
                "query": "cswuyg "
            }
        }
    },
    "highlight": {
        "fields": {
            "title": {
                
            }
        }
    }
}
View Code

b. bool組合查詢,命中的文檔的title字段必須能命中“餐廳”、“好吃”、“深圳”,可以是完全命中,也可以是名字其中的個別字。“便宜”則是可選命中。
POST方法的body:

{
    "query": {
        "bool": {
            "must": [{
                "match": {
                    "title": {
                        "query": "餐廳"
                    }
                }
            },
            {
                "match": {
                    "title": {
                        "query": "好吃"
                    }
                }
            },
            {
                "match": {
                    "title": {
                        "query": "深圳"
                    }
                }
            }],
            "should": [{
                "match": {
                    "title": "便宜"
                }
            }]
        }
    },
    "highlight": {
        "fields": {
            "title": {
                
            }
        }
    }
}
View Code

如果要求每一個字都命中,可以把match修改為match_phrase。

{
    'query': {
        'bool': {
            'should': [{
                'match': {
                    'title': {
                        'query': '張三',
                        'boost': 0.2
                    }
                }
            }],
            'must': [{
                'match_phrase': {
                    'title': {
                        'query': '李四',
                        'boost': 0.69
                    }
                }
            },
            {
                'match_phrase': {
                    'title': {
                        'query': '王五',
                        'boost': 0.11
                    }
                }
            }]
        }
    }
}
View Code

例子:要求必須完全命中“酒后”和“標准",“駕駛”可以部分命中

{
    "query": {
        "bool": {
            "must": [{
                "match_phrase": {
                    "question": {
                        "query": "酒后",
                        "boost": 0.69
                    }
                }
            },
            {
                "match": {
                    "question": {
                        "query": "駕駛",
                        "boost": 0.11
                    }
                }
            },
            {
                "match_phrase": {
                    "question": {
                        "query": "標准",
                        "boost": 0.2
                    }
                }
            }]
        }
    }
}
View Code

c. 給查詢詞設置權重(boost)。POST方法的body:

{
    "query": {
        "bool": {
            "must": {
                "match": {
                    "title": {
                        "query": "好吃的餐廳",
                        "boost": 1
                    }
                }
            },
            "must": {
                "match": {
                    "title": {
                        "query": "深圳灣",
                        "boost": 100
                    }
                }
            },
            "should": [{
                "match": {
                    "title": "便宜"
                }
            }]
        }
    },
    "highlight": {
        "fields": {
            "title": {
                
            }
        }
    }
}
View Code

d. filter查詢,也就是kv查詢,不涉及檢索的相關性打分,title必須是完全命中,如果建庫時是有對這個字段切詞的,則查詢時,需要是切詞后的某個詞去查詢,如“今天天氣”,建庫切詞為“今天”和“天氣”,那么filter查詢的時候需要使用“今天”或者“天氣”才能命中。POST方法的body:

{
    "query": {
        "bool": {
            "filter": [{
                "term": {
                    "title": "好吃的"
                }
            }]
        }
    }
}
View Code

e. 完全匹配某個短語,這就要求“好厲害”三個字組成的詞必須在文檔中出現,不能是只出現其中的個別字(match就是這樣)。POST方法的body:

{
    "query": {
        "match_phrase": {
            "title": {
                "query": "好厲害"
            }
        }
    }
}
View Code

 (3)運維

a. 去掉副本,調研的時候希望不要副本,這樣子寫入會快點

PUT http://10.11.111.11:8202/qa_pair2/_settings
{
   "number_of_replicas" : 0
}

 4 使用ik中文切詞插件

Elasticsearch默認的中文切詞插件是單字切詞,這不能滿足我們要求,需要安裝中文切詞插件。

插件github地址:https://github.com/medcl/elasticsearch-analysis-ik
源碼安裝:編譯時需要聯網,可以在windows下編譯完之后,把elasticsearch-analysis-ik-1.9.3.zip拷貝到linux機器的./plugin/head目錄下解壓。
配置:在配置文件./config/elasticsearch.yml末尾添加配置: index.analysis.analyzer.ik.type: "ik"

測試ik切詞:http://host_name_xx:port_yyy/qa_pair/_analyze?analyzer=ik&pretty=true&text=我是中國人"

5 使用python讀寫Elasticsearch

驅動安裝:使用pip安裝elasticsearch

讀取文件批量插入數據示例:

#!/home/work/bin/python
#-*-coding:utf8-*-
"""
讀取文件,入庫到es
使用:python insert_demo.py xxx_file_name

Authors: cswuyg 
Date: 2016.06.18
"""

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch import exceptions
import traceback
import datetime
import sys

reload(sys)
sys.setdefaultencoding('utf-8')

#設置mappings
def _create_index(es, index_name="cswuyg", doc_type_name="cswuyg"):
    my_settingss = {
            'number_of_shards': 18,
            'number_of_replicas': 0
            }
    my_mappings = {
            "cswuyg": {
                '_all': {
                    'enabled': 'false'
                    },
                "properties": {
                    "title": {
                        'type': 'string',
                        'store': 'no',
                        'term_vector': 'with_positions_offsets',
                        'analyzer': 'ik_max_word',
                        'boost': 8
                        },
                    "url": {
                        "type": "string",
                        'index': 'not_analyzed'
                        },
                    'content': {
                        'type': 'string',
                        'store': 'no',
                        'term_vector': 'with_positions_offsets',
                        'analyzer': 'ik_max_word',
                        'boost': 8
                        }
                    }
                }
            }
    settings = {
            'settings': my_settingss, 
            'mappings': my_mappings
            }

    create_index = es.indices.create(index=index_name, body=settings)

#將文件中的數據存儲到es中
def _save_data(es, input_file):
    #讀入數據
    all_data = list()
    count = 0
    with open(input_file) as f_r:
        for line in f_r:
        count += 1
            all_data.append({
                '_index': 'cswuyg',
                '_type': 'cswuyg',
                '_source': {
                    'title': line
                    }
                })
            if len(all_data) == 100:
                success, _ = bulk(es, all_data, index='cswuyg', raise_on_error=True)
                all_data = list()
                print('{1}: finish {0}'.format(count, input_file))
    if len(all_data) != 0:
        success, _ = bulk(es, all_data, index='cswuyg', raise_on_error=True)
        all_data = list()
        print('{1}: finish {0}'.format(count, input_file))
    print('{0}: finish all'.format(input_file))

def _insert_data(es, file_name):
    start_time = datetime.datetime.now()
    _save_data(es, file_name)
    cost_time = datetime.datetime.now() - start_time
    print('all cost time{0}'.format(cost_time))

def _main():
    if len(sys.argv) != 2:
        print('need file argument')
        return 
    es = Elasticsearch(hosts=["10.200.100.80:8301"], timeout=500)
    try:
        _create_index(es)
    except exceptions.RequestError:
        print(traceback.format_exc())
    _insert_data(es, sys.argv[1]);

if __name__ == '__main__':
    _main()
View Code

測試文件數據格式:

我是中國人
我愛中國

檢索示例(從文件中獲取檢索條件:切好詞且打好分的Term):

#!/home/work/bin/python
#-*-coding:utf8-*-
"""
檢索 es
策略:
從文件中讀取已經切好詞,且給好權重的term,且選好必選term的一行文本,處理成match_phrase + bool查詢
默認策略2

使用方法:python search_demo.py test_file
output:
es_query
query\ttitle\tsall_score

output demo:
{'query': {'bool': {'should': [{'match': {'title': {'query': '\xe6\x88\x91', 'boost': 0.2}}}], 'must': [{'match_phrase': {'title': {'query': '\xe4\xb8\xad\xe5\x9b\xbd', 'boost': 0.69}}}, {'match_phrase': {'title': {'query': '\xe7\x88\xb1', 'boost': 0.11}}}]}}}
我愛中國        我愛中國        {"should": ["我"], "score": {"我": 0.2, "中國": 0.69, "愛": 0.11}, "must": ["中國", "愛"]}
我愛中國        我愛中國        {"should": ["我"], "score": {"我": 0.2, "中國": 0.69, "愛": 0.11}, "must": ["中國", "愛"]}
Authors: cswuyg
Date: 2016.06.18
"""
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch import exceptions
import sys
import json
from log import logger
import traceback

reload(sys)
sys.setdefaultencoding('utf-8')

def _release_es_query_by_file_info(file_info):
    #輸入格式: raw_query\tmust_term\tshould_term\tall_score
    logger.debug('file_info:{0}'.format(file_info))
    file_info_list = file_info.split('\t')
    print file_info_list
    raw_query = file_info_list[0]
    must_term = file_info_list[3]
    should_term= file_info_list[4]
    all_score = file_info_list[5]
    json_score = json.loads(all_score, encoding='utf-8')
    
    ret_obj = {}
    ret_obj['must'] = must_term.split()
    ret_obj['should'] = should_term.split()
    ret_obj['score'] = json_score 

    bool_query = dict()
    must_query = list()
    should_query = list()
    for item in must_term.split(' '):
        must_query.append({'match_phrase': {'title': {'query': item, 'boost': json_score[unicode(item)]}}})
    bool_query['must'] = must_query

    for item in should_term.split(' '):
        should_query.append({'match': {'title': {'query': item, 'boost': json_score[unicode(item)]}}})
    bool_query['should'] = should_query

    es_query = {'query': {'bool': bool_query}}
    print es_query
    return raw_query, es_query, json.dumps(ret_obj, ensure_ascii=False) 

def _do_query_use_file_info(es, file_info):
    raw_query, query, all_score = _release_es_query_by_file_info(file_info.strip('\r\n'))
    res = es.search(index='cswuyg', doc_type='cswuyg', body=query, size=100)
    if (len(res['hits']['hits']) == 0):
        logger.debug('len(res["hits"]["hits"]) == 0')
        print("{0}\t{1}\t{2}".format(raw_query, "", all_score))
        return 

    for item in res['hits']['hits']:
        try:
            print("{0}\t{1}\t{2}".format(raw_query, item['_source']['title'].strip('\r\n'), all_score))
        except:
            logger.debug(traceback.format_exc())
            logger.debug(item['_source']['title'])
    print('\r\n')

def _main():
    if len(sys.argv) != 2:
        print('argv error')
        return
    else:
        print('argv[1] = {0}'.format(sys.argv[1]))
    es = Elasticsearch(hosts=["10.200.100.80:8301"], timeout=5000)
    with open(sys.argv[1]) as f_r:
        for item in f_r:
            try:
                _do_query_use_file_info(es, item)
            except:
                logger.debug(traceback.format_exc())
    
if __name__ == '__main__':
    _main()
View Code

測試文件數據格式:

我愛中國            中國 愛    我    {"我": 0.20, "中國": 0.69, "愛": 0.11} 

6 部分學習資料

官方入門文檔:https://www.elastic.co/guide/en/elasticsearch/reference/current/getting-started.html

使用ik中文分詞插件:http://blog.csdn.net/liuzhenfeng/article/details/39404435

提升性能:http://zhousheng29.iteye.com/blog/2101905

Elasticsearch有哪些用戶:https://www.elastic.co/use-cases

 

附上我們的ES使用效果:

使用了ES 2.3版本,集群使用了4台機器18塊磁盤,啟了18個節點,每個節點15G內存,共270G內存。Index無副本,disable掉了_all索引,2T數據入庫完占了大約4T磁盤空間,10進程並發寫入,速度可以達到1W條/s(寫入多了后面會變慢),部分磁盤寫入延遲達到幾百毫秒,瓶頸在磁盤IO上。首次召回100W+的文檔耗時2s+,但如果是觸發了緩存的召回,可以達到50ms級別的耗時。多加機器,增加shard可以提高讀寫性能。

 

 

后續如有繼續使用再持續更新。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM