elasticsearch基本操作之--使用java操作elasticsearch


/**

 * 系統環境: vm12 下的centos 7.2

 * 當前安裝版本: elasticsearch-2.4.0.tar.gz

 */

es 查詢共有4種查詢類型

QUERY_AND_FETCH: 

 主節點將查詢請求分發到所有的分片中,各個分片按照自己的查詢規則即詞頻文檔頻率進行打分排序,然后將結果返回給主節點,主節點對所有數據進行匯總排序然后再返回給客戶端,此種方式只需要和es交互一次。

      這種查詢方式存在數據量和排序問題,主節點會匯總所有分片返回的數據這樣數據量會比較大,二是各個分片上的規則可能不一致。

QUERY_THEN_FETCH: 

主節點將請求分發給所有分片,各個分片打分排序后將數據的id和分值返回給主節點,主節點收到后進行匯總排序再根據排序后的id到對應的節點讀取對應的數據再返回給客戶端,此種方式需要和es交互兩次。

      這種方式解決了數據量問題但是排序問題依然存在而且是es的默認查詢方式

DEF_QUERY_AND_FETCH: 和 DFS_QUERY_THEN_FETCH: 

將各個分片的規則統一起來進行打分。解決了排序問題但是DFS_QUERY_AND_FETCH仍然存在數據量問題,DFS_QUERY_THEN_FETCH兩種噢乖你問題都解決但是效率是最差的。

 

 

1, 獲取client, 兩種方式獲取

@Before
    public void before() throws Exception {
        Map<String, String> map = new HashMap<String, String>();  
        map.put("cluster.name", "elasticsearch_wenbronk");  
        Settings.Builder settings = Settings.builder().put(map);  
        client = TransportClient.builder().settings(settings).build()  
                        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); 
    }
@Before
    public void before11() throws Exception {
        // 創建客戶端, 使用的默認集群名, "elasticSearch"
//        client = TransportClient.builder().build()
//                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300));

        // 通過setting對象指定集群配置信息, 配置的集群名
        Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 設置集群名
//                .put("client.transport.sniff", true) // 開啟嗅探 , 開啟后會一直連接不上, 原因未知
//                .put("network.host", "192.168.50.37")
                .put("client.transport.ignore_cluster_name", true) // 忽略集群名字驗證, 打開后集群名字不對也能連接上
//                .put("client.transport.nodes_sampler_interval", 5) //報錯,
//                .put("client.transport.ping_timeout", 5) // 報錯, ping等待時間,
                .build();
         client = TransportClient.builder().settings(settings).build()
                 .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)));
         // 默認5s
         // 多久打開連接, 默認5s
         System.out.println("success connect");
    }

PS: 官網給的2種方式都不能用, 需要合起來才能用, 浪費老子一下午...

其他參數的意義: 

 

代碼: 

package com.wenbronk.javaes;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkProcessor.Listener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
import org.junit.Before;
import org.junit.Test;

import com.alibaba.fastjson.JSONObject;

/**
 * 使用java API操作elasticSearch
 * 
 * @author 231
 *
 */
public class JavaESTest {

    private TransportClient client;
    private IndexRequest source;
    
    /**
     * 獲取連接, 第一種方式
     * @throws Exception
     */
//    @Before
    public void before() throws Exception {
        Map<String, String> map = new HashMap<String, String>();  
        map.put("cluster.name", "elasticsearch_wenbronk");  
        Settings.Builder settings = Settings.builder().put(map);  
        client = TransportClient.builder().settings(settings).build()  
                        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); 
    }

/** * 查看集群信息 */ @Test public void testInfo() { List<DiscoveryNode> nodes = client.connectedNodes(); for (DiscoveryNode node : nodes) { System.out.println(node.getHostAddress()); } } /** * 組織json串, 方式1,直接拼接 */ public String createJson1() { String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; return json; } /** * 使用map創建json */ public Map<String, Object> createJson2() { Map<String,Object> json = new HashMap<String, Object>(); json.put("user", "kimchy"); json.put("postDate", new Date()); json.put("message", "trying out elasticsearch"); return json; } /** * 使用fastjson創建 */ public JSONObject createJson3() { JSONObject json = new JSONObject(); json.put("user", "kimchy"); json.put("postDate", new Date()); json.put("message", "trying out elasticsearch"); return json; } /** * 使用es的幫助類 */ public XContentBuilder createJson4() throws Exception { // 創建json對象, 其中一個創建json的方式 XContentBuilder source = XContentFactory.jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying to out ElasticSearch") .endObject(); return source; } /** * 存入索引中 * @throws Exception */ @Test public void test1() throws Exception { XContentBuilder source = createJson4(); // 存json入索引中 IndexResponse response = client.prepareIndex("twitter", "tweet", "1").setSource(source).get(); // // 結果獲取 String index = response.getIndex(); String type = response.getType(); String id = response.getId(); long version = response.getVersion(); boolean created = response.isCreated(); System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created); } /** * get API 獲取指定文檔信息 */ @Test public void testGet() { // GetResponse response = client.prepareGet("twitter", "tweet", "1") // .get(); GetResponse response = client.prepareGet("twitter", "tweet", "1") .setOperationThreaded(false) // 線程安全 .get(); System.out.println(response.getSourceAsString()); } /** * 測試 delete api */ @Test public void testDelete() { DeleteResponse response = client.prepareDelete("twitter", "tweet", "1") .get(); String index = response.getIndex(); String type = response.getType(); String id = response.getId(); long version = response.getVersion(); System.out.println(index + " : " + type + ": " + id + ": " + version); } /** * 測試更新 update API * 使用 updateRequest 對象 * @throws Exception */ @Test public void testUpdate() throws Exception { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("twitter"); updateRequest.type("tweet"); updateRequest.id("1"); updateRequest.doc(XContentFactory.jsonBuilder() .startObject() // 對沒有的字段添加, 對已有的字段替換 .field("gender", "male") .field("message", "hello") .endObject()); UpdateResponse response = client.update(updateRequest).get(); // 打印 String index = response.getIndex(); String type = response.getType(); String id = response.getId(); long version = response.getVersion(); System.out.println(index + " : " + type + ": " + id + ": " + version); } /** * 測試update api, 使用client * @throws Exception */ @Test public void testUpdate2() throws Exception { // 使用Script對象進行更新 // UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1") // .setScript(new Script("hits._source.gender = \"male\"")) // .get(); // 使用XContFactory.jsonBuilder() 進行更新 // UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1") // .setDoc(XContentFactory.jsonBuilder() // .startObject() // .field("gender", "malelelele") // .endObject()).get(); // 使用updateRequest對象及script // UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1") // .script(new Script("ctx._source.gender=\"male\"")); // UpdateResponse response = client.update(updateRequest).get(); // 使用updateRequest對象及documents進行更新 UpdateResponse response = client.update(new UpdateRequest("twitter", "tweet", "1") .doc(XContentFactory.jsonBuilder() .startObject() .field("gender", "male") .endObject() )).get(); System.out.println(response.getIndex()); } /** * 測試update * 使用updateRequest * @throws Exception * @throws InterruptedException */ @Test public void testUpdate3() throws InterruptedException, Exception { UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1") .script(new Script("ctx._source.gender=\"male\"")); UpdateResponse response = client.update(updateRequest).get(); } /** * 測試upsert方法 * @throws Exception * */ @Test public void testUpsert() throws Exception { // 設置查詢條件, 查找不到則添加生效 IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "2") .source(XContentFactory.jsonBuilder() .startObject() .field("name", "214") .field("gender", "gfrerq") .endObject()); // 設置更新, 查找到更新下面的設置 UpdateRequest upsert = new UpdateRequest("twitter", "tweet", "2") .doc(XContentFactory.jsonBuilder() .startObject() .field("user", "wenbronk") .endObject()) .upsert(indexRequest); client.update(upsert).get(); } /** * 測試multi get api * 從不同的index, type, 和id中獲取 */ @Test public void testMultiGet() { MultiGetResponse multiGetResponse = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("anothoer", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetResponse) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String sourceAsString = response.getSourceAsString(); System.out.println(sourceAsString); } } } /** * bulk 批量執行 * 一次查詢可以update 或 delete多個document */ @Test public void testBulk() throws Exception { BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(XContentFactory.jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject())); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(XContentFactory.jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject())); BulkResponse response = bulkRequest.get(); System.out.println(response.getHeaders()); } /** * 使用bulk processor * @throws Exception */ @Test public void testBulkProcessor() throws Exception { // 創建BulkPorcessor對象 BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() { public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) { // TODO Auto-generated method stub } // 執行出錯時執行 public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) { // TODO Auto-generated method stub } public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) { // TODO Auto-generated method stub } }) // 1w次請求執行一次bulk .setBulkActions(10000) // 1gb的數據刷新一次bulk .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) // 固定5s必須刷新一次 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 並發請求數量, 0不並發, 1並發允許執行 .setConcurrentRequests(1) // 設置退避, 100ms后執行, 最大請求3次 .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); // 添加單次請求 bulkProcessor.add(new IndexRequest("twitter", "tweet", "1")); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2")); // 關閉 bulkProcessor.awaitClose(10, TimeUnit.MINUTES); // 或者 bulkProcessor.close(); } }

 

tes2代碼: 

package com.wenbronk.javaes;

import java.net.InetSocketAddress;

import org.apache.lucene.queryparser.xml.FilterBuilderFactory;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.sort.SortParseElement;
import org.junit.Before;
import org.junit.Test;

/**
 * 使用java API操作elasticSearch
 * search API
 * @author 231
 *
 */
public class JavaESTest2 {

    private TransportClient client;

    /**
     * 獲取client對象
     */
    @Before
    public void testBefore() {
        Builder builder = Settings.settingsBuilder();
        builder.put("cluster.name", "wenbronk_escluster");
//                .put("client.transport.ignore_cluster_name", true);
        Settings settings = builder.build();
        
        org.elasticsearch.client.transport.TransportClient.Builder transportBuild = TransportClient.builder();
        TransportClient client1 = transportBuild.settings(settings).build();
        client = client1.addTransportAddress((new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300))));
        System.out.println("success connect to escluster");
        
    }
    
    /**
     * 測試查詢
     */
    @Test
    public void testSearch() {
//        SearchRequestBuilder searchRequestBuilder = client.prepareSearch("twitter", "tweet", "1");
//        SearchResponse response = searchRequestBuilder.setTypes("type1", "type2")
//                            .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
//                            .setQuery(QueryBuilders.termQuery("user", "test"))
//                            .setPostFilter(QueryBuilders.rangeQuery("age").from(0).to(1))
//                            .setFrom(0).setSize(2).setExplain(true)
//                            .execute().actionGet();
        SearchResponse response = client.prepareSearch()
                .execute().actionGet();
//        SearchHits hits = response.getHits();
//        for (SearchHit searchHit : hits) {
//            for(Iterator<SearchHitField> iterator = searchHit.iterator(); iterator.hasNext(); ) {
//                SearchHitField next = iterator.next();
//                System.out.println(next.getValues());
//            }
//        }
        System.out.println(response);
    }
    
    /**
     * 測試scroll api
     * 對大量數據的處理更有效
     */
    @Test
    public void testScrolls() {
        QueryBuilder queryBuilder = QueryBuilders.termQuery("twitter", "tweet");
        
        SearchResponse response = client.prepareSearch("twitter")
        .addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC)
        .setScroll(new TimeValue(60000))
        .setQuery(queryBuilder)
        .setSize(100).execute().actionGet();
        
        while(true) {
            for (SearchHit hit : response.getHits().getHits()) {
                System.out.println("i am coming");
            }
            SearchResponse response2 = client.prepareSearchScroll(response.getScrollId())
                .setScroll(new TimeValue(60000)).execute().actionGet();
            if (response2.getHits().getHits().length == 0) {
                System.out.println("oh no=====");
                break;
            }
        }
        
    }
    
    /**
     * 測試multiSearch
     */
    @Test
    public void testMultiSearch() {
        QueryBuilder qb1 = QueryBuilders.queryStringQuery("elasticsearch");
        SearchRequestBuilder requestBuilder1 = client.prepareSearch().setQuery(qb1).setSize(1);
        
        QueryBuilder qb2 = QueryBuilders.matchQuery("user", "kimchy");
        SearchRequestBuilder requestBuilder2 = client.prepareSearch().setQuery(qb2).setSize(1);
        
        MultiSearchResponse multiResponse = client.prepareMultiSearch().add(requestBuilder1).add(requestBuilder2)
                .execute().actionGet();
        long nbHits = 0;
        for (MultiSearchResponse.Item item : multiResponse.getResponses()) {
            SearchResponse response = item.getResponse();
            nbHits = response.getHits().getTotalHits();
            SearchHit[] hits = response.getHits().getHits();
            System.out.println(nbHits);
        }
        
    }
    
    /**
     * 測試聚合查詢
     */
    @Test
    public void testAggregation() {
        SearchResponse response = client.prepareSearch()
                .setQuery(QueryBuilders.matchAllQuery()) // 先使用query過濾掉一部分
                .addAggregation(AggregationBuilders.terms("term").field("user"))
                .addAggregation(AggregationBuilders.dateHistogram("agg2").field("birth")
                    .interval(DateHistogramInterval.YEAR))
                .execute().actionGet();
        Aggregation aggregation2 = response.getAggregations().get("term");
        Aggregation aggregation = response.getAggregations().get("agg2");
//        SearchResponse response2 = client.search(new SearchRequest().searchType(SearchType.QUERY_AND_FETCH)).actionGet();
    }
    
    /**
     * 測試terminate
     */
    @Test
    public void testTerminateAfter() {
        SearchResponse response = client.prepareSearch("twitter").setTerminateAfter(1000).get();
        if (response.isTerminatedEarly()) {
            System.out.println("ternimate");
        }
    }
    
    /**
     * 過濾查詢: 大於gt, 小於lt, 小於等於lte, 大於等於gte
     */
    @Test
    public void testFilter() {
        SearchResponse response = client.prepareSearch("twitter")  
                .setTypes("")  
                .setQuery(QueryBuilders.matchAllQuery()) //查詢所有  
                .setSearchType(SearchType.QUERY_THEN_FETCH)  
//              .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19)  
//                      .includeLower(true).includeUpper(true))  
//                .setPostFilter(FilterBuilderFactory .rangeFilter("age").gte(18).lte(22))  
                .setExplain(true) //explain為true表示根據數據相關度排序,和關鍵字匹配最高的排在前面  
                .get();  
    }
    
    /**
     * 分組查詢
     */
    @Test
    public void testGroupBy() {
        client.prepareSearch("twitter").setTypes("tweet")
        .setQuery(QueryBuilders.matchAllQuery())
        .setSearchType(SearchType.QUERY_THEN_FETCH)
        .addAggregation(AggregationBuilders.terms("user")
                .field("user").size(0)        // 根據user進行分組
                                            // size(0) 也是10
        ).get();
    }
    
    
    
    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM