ElasticSearch的基本原理與用法


一、簡介

ElasticSearch和Solr都是基於Lucene的搜索引擎,不過ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持需要ZooKeeper的支持。

這里有一個詳細的ElasticSearch和Solr的對比:http://solr-vs-elasticsearch.com/

二、基本用法

集群(Cluster): ES是一個分布式的搜索引擎,一般由多台物理機組成。這些物理機,通過配置一個相同的cluster name,互相發現,把自己組織成一個集群。

節點(Node):同一個集群中的一個Elasticsearch主機。

Node類型:

1)data node: 存儲index數據。Data nodes hold data and perform data related operations such as CRUD, search, and aggregations.

2)client node: 不存儲index,處理轉發客戶端請求到Data Node。

3)master node: 不存儲index,集群管理,如管理路由信息(routing infomation),判斷node是否available,當有node出現或消失時重定位分片(shards),當有node failure時協調恢復。(所有的master node會選舉出一個master leader node)

詳情參考:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html

主分片(Primary shard):索引(下文介紹)的一個物理子集。同一個索引在物理上可以切多個分片,分布到不同的節點上。分片的實現是Lucene 中的索引。

注意:ES中一個索引的分片個數是建立索引時就要指定的,建立后不可再改變。所以開始建一個索引時,就要預計數據規模,將分片的個數分配在一個合理的范圍。

副本分片(Replica shard):每個主分片可以有一個或者多個副本,個數是用戶自己配置的。ES會盡量將同一索引的不同分片分布到不同的節點上,提高容錯性。對一個索引,只要不是所有shards所在的機器都掛了,就還能用。

索引(Index):邏輯概念,一個可檢索的文檔對象的集合。類似與DB中的database概念。同一個集群中可建立多個索引。比如,生產環境常見的一種方法,對每個月產生的數據建索引,以保證單個索引的量級可控。

類型(Type):索引的下一級概念,大概相當於數據庫中的table。同一個索引里可以包含多個 Type。 

文檔(Document):即搜索引擎中的文檔概念,也是ES中一個可以被檢索的基本單位,相當於數據庫中的row,一條記錄。

字段(Field):相當於數據庫中的column。ES中,每個文檔,其實是以json形式存儲的。而一個文檔可以被視為多個字段的集合。比如一篇文章,可能包括了主題、摘要、正文、作者、時間等信息,每個信息都是一個字段,最后被整合成一個json串,落地到磁盤。

映射(Mapping):相當於數據庫中的schema,用來約束字段的類型,不過 Elasticsearch 的 mapping 可以不顯示地指定、自動根據文檔數據創建。

Elasticsearch集群可以包含多個索引(indices),每一個索引可以包含多個類型(types),每一個類型包含多個文檔(documents),然后每個文檔包含多個字段(Fields),這種面向文檔型的儲存,也算是NoSQL的一種吧。

ES比傳統關系型數據庫,對一些概念上的理解:

Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices   -> Types  -> Documents -> Fields

從創建一個Client到添加、刪除、查詢等基本用法:

1、創建Client

public ElasticSearchService(String ipAddress, int port) {
        client = new TransportClient()
                .addTransportAddress(new InetSocketTransportAddress(ipAddress,
                        port));
    }

這里是一個TransportClient。

ES下兩種客戶端對比:

TransportClient:輕量級的Client,使用Netty線程池,Socket連接到ES集群。本身不加入到集群,只作為請求的處理。

Node Client:客戶端節點本身也是ES節點,加入到集群,和其他ElasticSearch節點一樣。頻繁的開啟和關閉這類Node Clients會在集群中產生“噪音”。

2、創建/刪除Index和Type信息

    // 創建索引
    public void createIndex() {
        client.admin().indices().create(new CreateIndexRequest(IndexName))
                .actionGet();
    }

    // 清除所有索引
    public void deleteIndex() {
        IndicesExistsResponse indicesExistsResponse = client.admin().indices()
                .exists(new IndicesExistsRequest(new String[] { IndexName }))
                .actionGet();
        if (indicesExistsResponse.isExists()) {
            client.admin().indices().delete(new DeleteIndexRequest(IndexName))
                    .actionGet();
        }
    }
    
    // 刪除Index下的某個Type
    public void deleteType(){        client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet();
    }

    // 定義索引的映射類型
    public void defineIndexTypeMapping() {
        try {
            XContentBuilder mapBuilder = XContentFactory.jsonBuilder();
            mapBuilder.startObject()
            .startObject(TypeName)
            .startObject("_all").field("enabled", false).endObject()
                .startObject("properties")
                    .startObject(IDFieldName).field("type", "long").endObject()
                    .startObject(SeqNumFieldName).field("type", "long").endObject()
                    .startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").endObject()
                    .startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject()
                .endObject()
            .endObject()
            .endObject();

            PutMappingRequest putMappingRequest = Requests
                    .putMappingRequest(IndexName).type(TypeName)
                    .source(mapBuilder);
            client.admin().indices().putMapping(putMappingRequest).actionGet();
        } catch (IOException e) {
            log.error(e.toString());
        }
    

這里自定義了某個Type的索引映射(Mapping):

1)默認ES會自動處理數據類型的映射:針對整型映射為long,浮點數為double,字符串映射為string,時間為date,true或false為boolean。

2)字段的默認配置是indexed,但不是stored的,也就是 field("index", "yes").field("store", "no")。

3)這里Disable了“_all”字段,_all字段會把所有的字段用空格連接,然后用“analyzed”的方式index這個字段,這個字段可以被search,但是不能被retrieve。

4)針對string,ES默認會做“analyzed”處理,即先做分詞、去掉stop words等處理再index。如果你需要把一個字符串做為整體被索引到,需要把這個字段這樣設置:field("index", "not_analyzed")。

5)默認_source字段是enabled,_source字段存儲了原始Json字符串(original JSON document body that was passed at index time)。

詳情參考:

https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-store.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-all-field.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-source-field.html

3、索引數據

    // 批量索引數據
    public void indexHotSpotDataList(List<Hotspotdata> dataList) {
        if (dataList != null) {
            int size = dataList.size();
            if (size > 0) {
                BulkRequestBuilder bulkRequest = client.prepareBulk();
                for (int i = 0; i < size; ++i) {
                    Hotspotdata data = dataList.get(i);
                    String jsonSource = getIndexDataFromHotspotData(data);
                    if (jsonSource != null) {
                        bulkRequest.add(client
                                .prepareIndex(IndexName, TypeName,
                                        data.getId().toString())
                                .setRefresh(true).setSource(jsonSource));
                    }
                }

                BulkResponse bulkResponse = bulkRequest.execute().actionGet();
                if (bulkResponse.hasFailures()) {
                    Iterator<BulkItemResponse> iter = bulkResponse.iterator();
                    while (iter.hasNext()) {
                        BulkItemResponse itemResponse = iter.next();
                        if (itemResponse.isFailed()) {
                            log.error(itemResponse.getFailureMessage());
                        }
                    }
                }
            }
        }
    }

    // 索引數據
    public boolean indexHotspotData(Hotspotdata data) {
        String jsonSource = getIndexDataFromHotspotData(data);
        if (jsonSource != null) {
            IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName,
                    TypeName).setRefresh(true);
            requestBuilder.setSource(jsonSource)
                    .execute().actionGet();
            return true;
        }

        return false;
    }

    // 得到索引字符串
    public String getIndexDataFromHotspotData(Hotspotdata data) {
        String jsonString = null;
        if (data != null) {
            try {
                XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
                jsonBuilder.startObject().field(IDFieldName, data.getId())
                        .field(SeqNumFieldName, data.getSeqNum())
                        .field(IMSIFieldName, data.getImsi())
                        .field(IMEIFieldName, data.getImei())
                        .field(DeviceIDFieldName, data.getDeviceID())
                        .field(OwnAreaFieldName, data.getOwnArea())
                        .field(TeleOperFieldName, data.getTeleOper())
                        .field(TimeFieldName, data.getCollectTime())
                        .endObject();
                jsonString = jsonBuilder.string();
            } catch (IOException e) {
                log.equals(e);
            }
        }

        return jsonString;
    }

ES支持批量和單個數據索引。

4、查詢獲取數據

    // 獲取少量數據100個
    private List<Integer> getSearchData(QueryBuilder queryBuilder) {
        List<Integer> ids = new ArrayList<>();
        SearchResponse searchResponse = client.prepareSearch(IndexName)
                .setTypes(TypeName).setQuery(queryBuilder).setSize(100)
                .execute().actionGet();
        SearchHits searchHits = searchResponse.getHits();
        for (SearchHit searchHit : searchHits) {
            Integer id = (Integer) searchHit.getSource().get("id");
            ids.add(id);
        }
        return ids;
    }

    // 獲取大量數據
    private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) {
        List<Integer> ids = new ArrayList<>();
        // 一次獲取100000數據
        SearchResponse scrollResp = client.prepareSearch(IndexName)
                .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000))
                .setQuery(queryBuilder).setSize(100000).execute().actionGet();
        while (true) {
            for (SearchHit searchHit : scrollResp.getHits().getHits()) {
                Integer id = (Integer) searchHit.getSource().get(IDFieldName);
                ids.add(id);
            }
            scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
                    .setScroll(new TimeValue(600000)).execute().actionGet();
            if (scrollResp.getHits().getHits().length == 0) {
                break;
            }
        }

        return ids;
    }

這里的QueryBuilder是一個查詢條件,ES支持分頁查詢獲取數據,也可以一次性獲取大量數據,需要使用Scroll Search。

5、聚合(Aggregation Facet)查詢 

    // 得到某段時間內設備列表上每個設備的數據分布情況<設備ID,數量>
    public Map<String, String> getDeviceDistributedInfo(String startTime,
            String endTime, List<String> deviceList) {

        Map<String, String> resultsMap = new HashMap<>();

        QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList);
        QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime);
        QueryBuilder queryBuilder = QueryBuilders.boolQuery()
                .must(deviceQueryBuilder).must(rangeBuilder);

        TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE)
                .field(DeviceIDFieldName);
        SearchResponse searchResponse = client.prepareSearch(IndexName)
                .setQuery(queryBuilder).addAggregation(termsBuilder)
                .execute().actionGet();
        Terms terms = searchResponse.getAggregations().get("DeviceIDAgg");
        if (terms != null) {
            for (Terms.Bucket entry : terms.getBuckets()) {
                resultsMap.put(entry.getKey(),
                        String.valueOf(entry.getDocCount()));
            }
        }
        return resultsMap;
    }

Aggregation查詢可以查詢類似統計分析這樣的功能:如某個月的數據分布情況,某類數據的最大、最小、總和、平均值等。

詳情參考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html

三、集群配置

配置文件elasticsearch.yml

集群名和節點名:

#cluster.name: elasticsearch

#node.name: "Franz Kafka"

是否參與master選舉和是否存儲數據

#node.master: true

#node.data: true

分片數和副本數

#index.number_of_shards: 5
#index.number_of_replicas: 1

允許其他網絡訪問:  

network.host: 0

master選舉最少的節點數,這個一定要設置為整個集群節點個數的一半加1,即N/2+1

#discovery.zen.minimum_master_nodes: 1

discovery ping的超時時間,擁塞網絡,網絡狀態不佳的情況下設置高一點

#discovery.zen.ping.timeout: 3s

注意,分布式系統整個集群節點個數N要為奇數個!!

如何避免ElasticSearch發生腦裂(brain split):http://blog.trifork.com/2013/10/24/how-to-avoid-the-split-brain-problem-in-elasticsearch/

即使集群節點個數為奇數,minimum_master_nodes為整個集群節點個數一半加1,也難以避免腦裂的發生,詳情看討論:https://github.com/elastic/elasticsearch/issues/2488

四、常用查詢

curl -X<REST Verb> <Node>:<Port>/<Index>/<Type>/<ID>

Index info:

curl -XGET 'localhost:9200'
curl -XGET 'localhost:9200/_stats?pretty'
curl -XGET 'localhost:9200/{index}/_stats?pretty'
curl -XGET 'localhost:9200/_cluster/health?level=indices&pretty=true'
curl -XGET 'localhost:9200/{index}?pretty'
curl -XGET 'localhost:9200/_cat/indices?v'
curl -XGET 'localhost:9200/{index}/_mapping/{type}?pretty'

Mapping info:
curl -XGET 'localhost:9200/subscriber/_mapping/subscriber?pretty'

Index search:
curl -XGET 'localhost:9200/subscriber/subscriber/_search?pretty'

Search by ID:
curl -XGET 'localhost:9200/subscriber/subscriber/5000?pretty'

Search by field:
curl -XGET 'localhost:9200/subscriber/subscriber/_search?q=ipAddress:63.141.15.45&&pretty'

Delete index:
curl -XDELETE 'localhost:9200/subscriber?pretty'

Delete document by ID:
curl -XDELETE 'localhost:9200/subscriber/subscriber/5000?pretty'

Delete document by query:
curl -XDELETE 'localhost:9200/subscriber/subscriber/_query?q=ipAddress:63.141.15.45&&pretty'

五、基本原理

1、ES寫數據原理

每個doc,通過如下公式決定寫到哪個分片上:

shard= hash(routing) % number_of_primary_shards

Routing 是一個可變值,默認是文檔的 _id ,也可以自定義一個routing規則。

默認情況下,primary shard在寫操作前,需要確定大多數(a quorum, or majority)的shard copies是可用的。這樣是為了防止在有網絡分區(network partition)的情況下把數據寫到了錯誤的分區。

A quorum是由以下公式決定:

int( (primary + number_of_replicas) / 2 ) + 1,number_of_replicas是在index settings中指定的復制個數。

確定一致性的值有:one (只有primary shard),all (the primary and all replicas),或者是默認的quorum。

如果沒有足夠可用的shard copies,elasticsearch會等待直到超時,默認等待一分鍾。

  • 一個新文檔被索引之后,先被寫入到內存中,但是為了防止數據的丟失,會追加一份數據到事務日志(trans log)中。 不斷有新的文檔被寫入到內存,同時也都會記錄到事務日志中。這時新數據還不能被檢索和查詢。
  • 當達到默認的刷新時間或內存中的數據達到一定量后,會觸發一次 Refresh,將內存中的數據以一個新段形式刷新到文件緩存系統中並清空內存。這時雖然新段未被提交到磁盤,但是可以提供文檔的檢索功能且不能被修改。
  • 隨着新文檔索引不斷被寫入,當日志數據大小超過 512M 或者時間超過 30 分鍾時,會觸發一次 Flush。 內存中的數據被寫入到一個新段同時被寫入到文件緩存系統,文件系統緩存中數據通過 Fsync 刷新到磁盤中,生成提交點,日志文件被刪除,創建一個空的新日志。

2、ES讀數據原理

Elasticsearch中的查詢主要分為兩類,Get請求:通過ID查詢特定Doc;Search請求:通過Query查詢匹配Doc。

  • 對於Search類請求,查詢的時候是一起查詢內存和磁盤上的Segment,最后將結果合並后返回。這種查詢是近實時(Near Real Time)的,主要是由於內存中的Index數據需要一段時間后才會刷新為Segment。
  • 對於Get類請求,查詢的時候是先查詢內存中的TransLog,如果找到就立即返回,如果沒找到再查詢磁盤上的TransLog,如果還沒有則再去查詢磁盤上的Segment。這種查詢是實時(Real Time)的。這種查詢順序可以保證查詢到的Doc是最新版本的Doc,這個功能也是為了保證NoSQL場景下的實時性要求。

所有的搜索系統一般都是兩階段查詢,第一階段查詢到匹配的DocID,第二階段再查詢DocID對應的完整文檔,這種在Elasticsearch中稱為query_then_fetch,還有一種是一階段查詢的時候就返回完整Doc,在Elasticsearch中稱作query_and_fetch,一般第二種適用於只需要查詢一個Shard的請求。除了一階段,兩階段外,還有一種三階段查詢的情況。搜索里面有一種算分邏輯是根據TF(Term Frequency)和DF(Document Frequency)計算基礎分,但是Elasticsearch中查詢的時候,是在每個Shard中獨立查詢的,每個Shard中的TF和DF也是獨立的,雖然在寫入的時候通過_routing保證Doc分布均勻,但是沒法保證TF和DF均勻,那么就有會導致局部的TF和DF不准的情況出現,這個時候基於TF、DF的算分就不准。為了解決這個問題,Elasticsearch中引入了DFS查詢,比如DFS_query_then_fetch,會先收集所有Shard中的TF和DF值,然后將這些值帶入請求中,再次執行query_then_fetch,這樣算分的時候TF和DF就是准確的,類似的有DFS_query_and_fetch。這種查詢的優勢是算分更加精准,但是效率會變差。另一種選擇是用BM25代替TF/DF模型。

在新版本Elasticsearch中,用戶沒法指定DFS_query_and_fetch和query_and_fetch,這兩種只能被Elasticsearch系統改寫。

3、ES選主(select master)

ES的master選舉原理如下:

  1. 對所有可以成為master的節點根據nodeId排序,每次選舉每個節點都把自己所知道節點排一次序,然后選出第一個(第0位)節點,暫且認為它是master節點。
  2. 如果對某個節點的投票數達到一定的值(可以成為master節點數n/2+1)並且該節點自己也選舉自己,那這個節點就是master。否則重新選舉。
  3. 對於brain split問題,需要把候選master節點最小值設置為可以成為master節點數n/2+1(quorum)

六、Elasticsearch插件

1、elasticsearch-head是一個elasticsearch的集群管理工具:./elasticsearch-1.7.1/bin/plugin -install mobz/elasticsearch-head

github地址:https://github.com/mobz/elasticsearch-head

2、elasticsearch-sql:使用SQL語法查詢elasticsearch:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/1.3.5/elasticsearch-sql-1.3.5.zip --install sql

github地址:https://github.com/NLPchina/elasticsearch-sql

3、elasticsearch-bigdesk是elasticsearch的一個集群監控工具,可以通過它來查看ES集群的各種狀態。

安裝:./bin/plugin -install lukas-vlcek/bigdesk

訪問:http://192.103.101.203:9200/_plugin/bigdesk/

github地址:https://github.com/hlstudio/bigdesk

4、elasticsearch-servicewrapper插件是ElasticSearch的服務化插件

https://github.com/elasticsearch/elasticsearch-servicewrapper 

DEPRECATED: The service wrapper is deprecated and not maintained. 該項目已不再維護。 

 

例子代碼在GitHub上:https://github.com/luxiaoxun/Code4Java

 

參考:

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html

https://www.elastic.co/guide/en/elasticsearch/guide/current/distrib-write.html

http://stackoverflow.com/questions/10213009/solr-vs-elasticsearch

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM