Elasticsearch深分頁以及排序查詢問題
1.簡介
ES為了避免深分頁,不允許使用分頁(from&size)查詢10000條以后的數據,因此如果要查詢第10000條以后的數據,要使用ES提供的 scroll(游標) 來查詢
假設取的頁數較大時(深分頁),如請求第20頁,Elasticsearch不得不取出所有分片上的第1頁到第20頁的所有文檔,並做排序,最終再取出from后的size條結果作爲最終的返回值
假設你有16個分片,則需要在coordinate node彙總到 shards* (from+size)條記錄,即需要16*(20+10)記錄后做一次全局排序
所以,當索引非常非常大(千萬或億),是無法使用from + size 做深分頁的,分頁越深則越容易OOM,即便不OOM,也很消耗CPU和內存資源
因此ES使用index.max_result_window:10000作爲保護措施 ,即默認 from + size 不能超過10000,雖然這個參數可以動態修改,也可以在配置文件配置,但是最好不要這麽做,應該改用ES游標來取得數據
2.scroll游標原理
可以把 scroll 理解爲關系型數據庫里的 cursor,因此,scroll 並不適合用來做實時搜索,而更適用於后台批處理任務,比如群發
scroll 具體分爲初始化和遍歷兩步
初始化時將所有符合搜索條件的搜索結果緩存起來,可以想象成快照
在遍歷時,從這個快照里取數據
也就是說,在初始化后對索引插入、刪除、更新數據都不會影響遍歷結果
游標可以增加性能的原因,是因為如果做深分頁,每次搜索都必須重新排序,非常浪費,使用scroll就是一次把要用的數據都排完了,分批取出,因此比使用from+size還好
3.具體實例
初始化
請求
注意要在URL中的search后加上scroll=1m,不能寫在request body中,其中1m表示這個游標要保持開啟1分鍾
可以指定size大小,就是每次回傳幾筆數據,當回傳到沒有數據時,仍會返回200成功,只是hits裡的hits會是空list
在初始化時除了回傳_scroll_id,也會回傳前100筆(假設size=100)的數據
request body和一般搜索一樣,因此可以說在初始化的過程中,除了加上scroll設置游標開啟時間之外,其他的都跟一般的搜尋沒有兩樣 (要設置查詢條件,也會回傳前size筆的數據)
總結:
問題
解決辦法
1. 普通請求
假設我們想一次返回大量數據,下面代碼中一次請求58000條數據:
/** * 普通搜索 * @param client */ public static void search(Client client) { String index = "simple-index"; String type = "simple-type"; // 搜索條件 SearchRequestBuilder searchRequestBuilder = client.prepareSearch(); searchRequestBuilder.setIndices(index); searchRequestBuilder.setTypes(type); searchRequestBuilder.setSize(58000); // 執行 SearchResponse searchResponse = searchRequestBuilder.get(); // 搜索結果 SearchHit[] searchHits = searchResponse.getHits().getHits(); for (SearchHit searchHit : searchHits) { String source = searchHit.getSource().toString(); logger.info("--------- searchByScroll source {}", source); } // for }
返回如下報錯:
Caused by: QueryPhaseExecutionException[Result window is too large, from + size must be less than or equal to: [10000] but was [58000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.] at org.elasticsearch.search.internal.DefaultSearchContext.preProcess(DefaultSearchContext.java:212) at org.elasticsearch.search.query.QueryPhase.preProcess(QueryPhase.java:103) at org.elasticsearch.search.SearchService.createContext(SearchService.java:676) at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:620) at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:371) at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryTransportHandler.messageReceived(SearchServiceTransportAction.java:368) at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryTransportHandler.messageReceived(SearchServiceTransportAction.java:365) at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:33) at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75) at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376) at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ... 3 more
2. 使用scroll方式:
package com.smk.es.servicce; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress; import java.util.HashMap; import java.util.Map; public class TestEs { private String clusterName ="es-smk-sit"; private String clusterNode = "192.168.23.10"; private String clusterPort ="9301"; private String poolSize = "10"; private boolean snf = true; private String index = "smk-label"; private String type = "label"; public TransportClient transportClient() { TransportClient transportClient = null; try { Settings esSetting = Settings.builder() .put("cluster.name", clusterName) //集群名字 .put("client.transport.sniff", snf)//增加嗅探機制,找到ES集群 .put("thread_pool.search.size", Integer.parseInt(poolSize))//增加線程池個數,暫時設為5 .build(); //配置信息Settings自定義 transportClient = new PreBuiltTransportClient(esSetting); TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(clusterNode), Integer.valueOf(clusterPort)); transportClient.addTransportAddresses(transportAddress); } catch (Exception e) { e.printStackTrace(); System.out.println("elasticsearch TransportClient create error!!"); } System.out.println("es客戶端創建成功"); return transportClient; } public static String scrollId = ""; /** * 第一次查詢的方式 * @param client * @return */ private Map<String,Object> my(TransportClient client){ BoolQueryBuilder mustQuery = QueryBuilders.boolQuery(); //設置查詢條件 mustQuery.must(QueryBuilders.matchQuery("sex","男")); mustQuery.must(QueryBuilders.matchQuery("city","杭州市")); SearchResponse rep = client.prepareSearch() .setIndices(index) // 索引 .setTypes(type) //類型 .setQuery(mustQuery) .setScroll(TimeValue.timeValueMinutes(2)) //設置游標有效期 .setSize(100) //每頁的大小 .execute() .actionGet(); Map<String,Object> m = new HashMap<String,Object>(); m.put("scrollId",rep.getScrollId());//獲取返回的 游標值 m.put("id", (rep.getHits().getHits())[0].getId()); return m; } private Map<String,Object> my2(String scrollId,TransportClient client){ SearchResponse rep1 = client.prepareSearchScroll(scrollId) //設置游標 .setScroll(TimeValue.timeValueMinutes(2)) //設置游標有效期 .execute() .actionGet(); Map<String,Object> m = new HashMap<String,Object>(); m.put("scrollId",rep1.getScrollId()); SearchHit[] s = rep1.getHits().getHits(); if(s == null || s.length == 0){ return null; } m.put("id", (rep1.getHits().getHits())[0].getId()); return m; } public static void main(String[] args) { TestEs t = new TestEs(); TransportClient client = t.transportClient(); Map<String,Object> m1 = t.my(client); System.out.println("first:"+m1.get("id")); String s = m1.get("scrollId").toString(); System.out.println("first:"+s); int i = 0; while (true){ i++; Map<String,Object> m2 = t.my2(s,client); // 查詢不到數據了,就表示查詢完了 if(m2 == null){ break; } System.out.println("insert to mysql"); } System.out.println("總次數:"+i); System.out.println("end"); } }