Elasticsearch Sliced Scroll分頁檢索案例分享


The best elasticsearch highlevel java rest api-----bboss 

Elasticsearch Sliced Scroll分頁檢索案例分享 

我們在文章《Elasticsearch Scroll分頁檢索案例分享》中介紹了elasticsearch scroll的基本用法,本文介紹Elasticsearch Sliced Scroll分頁檢索功能。

1.准備工作

參考文檔《高性能elasticsearch ORM開發庫使用介紹》導入和配置es客戶端

2.定義Sliced Scroll檢索dsl

創建配置文件-在resources目錄下定義文件scroll.xml

esmapper/scroll.xml

文件內容包含Sliced Scroll檢索dsl語句-scrollSliceQuery

<property name="scrollSliceQuery">
        <![CDATA[
         {
           "slice": {
                "id": $id,
                "max": $max
            },
            "size":$size,
            "query": {
                "term" : {
                    "gc.jvmGcOldCount" : 3
                }
            }
        }
        ]]>
    </property>

3.串行方式執行slice檢索

/** * 串行方式執行slice scroll操作 */
@Test
public void testSliceScroll() {
	ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
	List<String> scrollIds = new ArrayList<>();
	long starttime = System.currentTimeMillis();
	//scroll slice分頁檢索
	int max = 6;
	long realTotalSize = 0;
	for (int i = 0; i < max; i++) {
		Map params = new HashMap();
		params.put("id", i);
		params.put("max", max);//最多6個slice,不能大於share數
		params.put("size", 100);//每頁100條記錄
		ESDatas<Map> sliceResponse = clientUtil.searchList("agentstat-*/_search?scroll=1m",
				"scrollSliceQuery", params,Map.class);
		List<Map> sliceDatas = sliceResponse.getDatas();
		realTotalSize = realTotalSize + sliceDatas.size();
		long totalSize = sliceResponse.getTotalSize();
		String scrollId = sliceResponse.getScrollId();
		if (scrollId != null)
			scrollIds.add(scrollId);
		System.out.println("totalSize:" + totalSize);
		System.out.println("scrollId:" + scrollId);
		if (sliceDatas != null && sliceDatas.size() >= 100) {//每頁100條記錄,迭代scrollid,遍歷scroll分頁結果
			do {
				sliceResponse = clientUtil.searchScroll("1m", scrollId, Map.class);
				String sliceScrollId = sliceResponse.getScrollId();
				if (sliceScrollId != null)
					scrollIds.add(sliceScrollId);
				sliceDatas = sliceResponse.getDatas();
				if (sliceDatas == null || sliceDatas.size() < 100) {
					break;
				}
				realTotalSize = realTotalSize + sliceDatas.size();
			} while (true);
		}
	}
      //打印處理耗時和實際檢索到的數據
	long endtime = System.currentTimeMillis();
	System.out.println("耗時:"+(endtime - starttime)+",realTotalSize:"+realTotalSize);
	//查詢存在es服務器上的scroll上下文信息
	String scrolls = clientUtil.executeHttp("_nodes/stats/indices/search", ClientUtil.HTTP_GET);
	System.out.println(scrolls);
	//處理完畢后清除scroll上下文信息
	if(scrollIds.size() > 0) {
		scrolls = clientUtil.deleteScrolls(scrollIds);
		System.out.println(scrolls);
	}
	//清理完畢后查看scroll上下文信息
	scrolls = clientUtil.executeHttp("_nodes/stats/indices/search", ClientUtil.HTTP_GET);
	System.out.println(scrolls);
}

4.並行方式執行slice檢索

//用來存放實際slice檢索總記錄數
long realTotalSize ;
//輔助方法,用來累計每次scroll獲取到的記錄數
synchronized void incrementSize(int size){
	this.realTotalSize = this.realTotalSize + size;
}
/** * 並行方式執行slice scroll操作 */
@Test
public void testParralSliceScroll() {
	final ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
	final List<String> scrollIds = new ArrayList<>();
	long starttime = System.currentTimeMillis();
	//scroll slice分頁檢索
	final int max = 6;
	final CountDownLatch countDownLatch = new CountDownLatch(max);//線程任務完成計數器,每個線程對應一個sclice,每運行完一個slice任務,countDownLatch計數減去1
<span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> j = <span class="hljs-number">0</span>; j &lt; max; j++) {<span class="hljs-comment">//啟動max個線程,並行處理每個slice任務</span>
	<span class="hljs-keyword">final</span> <span class="hljs-keyword">int</span> i = j;
	Thread sliceThread = <span class="hljs-keyword">new</span> Thread(<span class="hljs-keyword">new</span> Runnable() {<span class="hljs-comment">//多線程並行執行scroll操作做,每個線程對應一個sclice</span>

		<span class="hljs-meta">@Override</span>
		<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">run</span><span class="hljs-params">()</span> </span>{
			Map params = <span class="hljs-keyword">new</span> HashMap();
			params.put(<span class="hljs-string">"id"</span>, i);
			params.put(<span class="hljs-string">"max"</span>, max);<span class="hljs-comment">//最多6個slice,不能大於share數</span>
			params.put(<span class="hljs-string">"size"</span>, <span class="hljs-number">100</span>);<span class="hljs-comment">//每頁100條記錄</span>
			ESDatas&lt;Map&gt; sliceResponse = clientUtil.searchList(<span class="hljs-string">"agentstat-*/_search?scroll=1m"</span>,
					<span class="hljs-string">"scrollSliceQuery"</span>, params,Map.class);
			List&lt;Map&gt; sliceDatas = sliceResponse.getDatas();
			incrementSize( sliceDatas.size());<span class="hljs-comment">//統計實際處理的文檔數量</span>
			<span class="hljs-keyword">long</span> totalSize = sliceResponse.getTotalSize();
			String scrollId = sliceResponse.getScrollId();
			<span class="hljs-keyword">if</span> (scrollId != <span class="hljs-keyword">null</span>)
				scrollIds.add(scrollId);
			System.out.println(<span class="hljs-string">"totalSize:"</span> + totalSize);
			System.out.println(<span class="hljs-string">"scrollId:"</span> + scrollId);
			<span class="hljs-keyword">if</span> (sliceDatas != <span class="hljs-keyword">null</span> &amp;&amp; sliceDatas.size() &gt;= <span class="hljs-number">100</span>) {<span class="hljs-comment">//每頁100條記錄,迭代scrollid,遍歷scroll分頁結果</span>
				<span class="hljs-keyword">do</span> {
					sliceResponse = clientUtil.searchScroll(<span class="hljs-string">"1m"</span>, scrollId, Map.class);
					String sliceScrollId = sliceResponse.getScrollId();
					<span class="hljs-keyword">if</span> (sliceScrollId != <span class="hljs-keyword">null</span>)
						scrollIds.add(sliceScrollId);
					sliceDatas = sliceResponse.getDatas();
					<span class="hljs-keyword">if</span> (sliceDatas == <span class="hljs-keyword">null</span> || sliceDatas.size() &lt; <span class="hljs-number">100</span>) {
						<span class="hljs-keyword">break</span>;
					}
					incrementSize( sliceDatas.size());<span class="hljs-comment">//統計實際處理的文檔數量</span>
				} <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>);
			}
			countDownLatch.countDown();<span class="hljs-comment">//slice檢索完畢后計數器減1</span>
		}

	});
	sliceThread.start();<span class="hljs-comment">//啟動線程</span>
}
<span class="hljs-keyword">try</span> {
	countDownLatch.await();<span class="hljs-comment">//等待所有的線程執行完畢,計數器變成0</span>
} <span class="hljs-keyword">catch</span> (InterruptedException e) {
	e.printStackTrace();
}
  <span class="hljs-comment">//打印處理耗時和實際檢索到的數據</span>
<span class="hljs-keyword">long</span> endtime = System.currentTimeMillis();
System.out.println(<span class="hljs-string">"耗時:"</span>+(endtime - starttime)+<span class="hljs-string">",realTotalSize:"</span>+realTotalSize);
<span class="hljs-comment">//查詢存在es服務器上的scroll上下文信息</span>
String scrolls = clientUtil.executeHttp(<span class="hljs-string">"_nodes/stats/indices/search"</span>, ClientUtil.HTTP_GET);

// System.out.println(scrolls);
//處理完畢后清除scroll上下文信息
if(scrollIds.size() > 0) {
scrolls = clientUtil.deleteScrolls(scrollIds);
// System.out.println(scrolls);
}
//清理完畢后查看scroll上下文信息
scrolls = clientUtil.executeHttp("_nodes/stats/indices/search", ClientUtil.HTTP_GET);
// System.out.println(scrolls);
}

通過串行運行和並行運行結果比較,並行處理的性能要好很多,實際檢索到的文檔數量等價一致。

5.參考文檔

https://www.elastic.co/guide/en/elasticsearch/reference/6.2/search-request-scroll.html

6.開發交流

elasticsearch技術交流群:166471282

posted @ 2019-05-05 16:34  星朝  閱讀( 473)  評論( 0編輯  收藏


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM