elastic客戶端TransportClient的使用


  關於TransportClient,elastic計划在Elasticsearch 7.0中棄用TransportClient,並在8.0中完全刪除它。后面,應該使用Java高級REST客戶端,它執行HTTP請求而不是序列化的Java請求。Java客戶端主要用途有:

  (1)在現有集群上執行標准索引,獲取,刪除和搜索操作

  (2)在正在運行的集群上執行管理任務

  獲取Elasticsearch客戶端最常用方法是創建連接到群集的TransportClient。

 

maven依賴

  <dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>6.4.3</version>
  </dependency>

  客戶端必須具有與群集中的節點相同的主要版本(例如2.x或5.x)。客戶端可以連接到具有不同次要版本(例如2.3.x)的群集,但可能不支持新功能。理想情況下,客戶端應具有與群集相同的版本。此博客目前正在使用的版本是6.4.3。

 

獲取TransportClient

  TransportClient使用傳輸模塊遠程連接到Elasticsearch集群。它不加入集群,而只是獲取一個或多個初始傳輸地址,並在每個操作上以循環方式與它們通信。

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class Elasticsearch {
    public static void main(String[] args) throws UnknownHostException {
       //on startup
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
                //繼續添加其他地址
        //on shutdown
        client.close();
    }
}

  如果出現:

  說明需要配置日志

  maven依賴

<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.11.1</version>
</dependency>

然后再resources里面添加log4j2.properties,內容如下:
appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout

rootLogger.level = info
rootLogger.appenderRef.console.ref = console

  如果出現"Exception in thread "main" java.lang.NoSuchMethodError: org.apache.logging.log4j.Logger.debug(Ljava/lang/String;Ljava/lang/Object;)", 則說明版本沖突,或者引的包是錯的。

   請注意,如果使用集群名稱不是默認的“elasticsearch”,則必須設置集群名稱, 通過對Settings經行設置:

Settings settings = Settings.builder()
        .put("cluster.name", "myClusterName").build();
TransportClient client = new PreBuiltTransportClient(settings);
//Add transport addresses and do something with the client...

   如果開啟嗅探功能,即自動檢測集群內其他的節點和新加入的節點,不需要全部都是用addTransportAddress添加,設置如下:

Settings settings = Settings.builder()
        .put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);

  

Document APIs

  Single document APIs

  (1)Index api

import com.fasterxml.jackson.core.JsonProcessingException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class Elasticsearch {
    public static void main(String[] args) throws UnknownHostException, JsonProcessingException {
        //on startup
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
                //繼續添加其他地址
        Map<String, Object> json = new HashMap<String, Object>();
        json.put("user", "kimchy");
        json.put("postDate", new Date());
        json.put("message", "trying out Elasticsearch");
     // index--twitter type--_doc id--1 IndexResponse indexResponse
= client.prepareIndex("twitter", "_doc", "1").setSource(json, XContentType.JSON).get(); // Index name String _index = indexResponse.getIndex(); // Type name String _type = indexResponse.getType(); // Document ID (generated or not) String _id = indexResponse.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = indexResponse.getVersion(); // status has stored current instance statement. RestStatus status = indexResponse.status(); //on shutdown client.close(); } }

  實現了數據的插入

  (2)GET API

  get API允許根據其id從索引中獲取類型化的JSON文檔。以下示例從名為twitter的索引中獲取JSON文檔,該類型名為_doc`,id值為1:

import com.fasterxml.jackson.core.JsonProcessingException;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class Elasticsearch {
    public static void main(String[] args) throws UnknownHostException, JsonProcessingException {
        //on startup
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
                //繼續添加其他地址
 
        GetResponse documentFields = client.prepareGet("twitter", "_doc", "1").get();
        System.out.println(documentFields.getIndex());
        System.out.println(documentFields.getType());
        System.out.println(documentFields.getId());
        System.out.println(documentFields.getSourceAsString());
        //on shutdown
        client.close();
    }
}

輸出結果:

  (3)DELETE  API

import com.fasterxml.jackson.core.JsonProcessingException;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class Elasticsearch {
    public static void main(String[] args) throws UnknownHostException, JsonProcessingException {
        //on startup
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
                //繼續添加其他地址

        DeleteResponse response = client.prepareDelete("twitter", "_doc", "1").get();
        System.out.println(response.toString());
        //on shutdown
        client.close();
    }
}

  結果如下:

   (4)Delete By Query API

import com.fasterxml.jackson.core.JsonProcessingException;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class Elasticsearch {
    public static void main(String[] args) throws UnknownHostException, JsonProcessingException {
        //on startup
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
                //繼續添加其他地址

        BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("user", "kimchy"))
                .source("twitter")
                .get();
        long deleted = response.getDeleted();

        //on shutdown
        client.close();
    }
}

以剛才那條index->twitter為例,此api先查詢出滿足條件的所有index,然后調用getDeleted函數刪除相關的數據。

   (5)Update API

  可以使用prepareUpdate()方法

import com.fasterxml.jackson.core.JsonProcessingException;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public class Elasticsearch {
    public static void main(String[] args) throws IOException {
        //on startup
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
                //繼續添加其他地址

        client.prepareUpdate("twitter", "_doc", "1")
                .setDoc(jsonBuilder()
                        .startObject()
                        .field("user", "chenmz")
                        .endObject())
                .get();
        //on shutdown
        client.close();
    }
}

用kibana查詢后:

發現名字已經發生改變,更改成功。

  Multi-document APIs

  (1)Multi Get API

  multi get API允許根據索引和id獲取文檔列表,往elastic中添加id為2的document,然后查詢:

import com.fasterxml.jackson.core.JsonProcessingException;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public class Elasticsearch {
    public static void main(String[] args) throws IOException {
        //on startup
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
                //繼續添加其他地址

        MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                .add("twitter", "_doc", "1", "2")
                .get();
        for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
            GetResponse response = itemResponse.getResponse();
            if (response.isExists()) {
                String json = response.getSourceAsString();
                System.out.println(json);
            }
        }

        //on shutdown
        client.close();
    }
}

運行結果:

   (2)Bulk API

  批量API允許在單個請求中索引和刪除多個文檔。以下演示增加多個索引。

import com.fasterxml.jackson.core.JsonProcessingException;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public class Elasticsearch {
    public static void main(String[] args) throws IOException {
        //on startup
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
                //繼續添加其他地址

        BulkRequestBuilder bulkRequest = client.prepareBulk();
        bulkRequest.add(client.prepareIndex("twitter", "_doc", "3")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("user", "zhangsan")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                        .endObject()
                )
        );

        bulkRequest.add(client.prepareIndex("twitter", "_doc", "4")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("user", "lisi")
                        .field("postDate", new Date())
                        .field("message", "another post")
                        .endObject()
                )
        );

        BulkResponse bulkResponse = bulkRequest.get();
        if (bulkResponse.hasFailures()) {
            // process failures by iterating through each bulk response item
        }

        //on shutdown
        client.close();
    }
}

  然后查詢,得到如下的結果:

  說明批量添加索引成功。

   (4)Reindex API

   可以提供查詢以過濾應該從源索引到目標索引重新索引哪些文檔。就是在已有的索引當中選擇一些放到另一個索引之中。

import com.fasterxml.jackson.core.JsonProcessingException;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public class Elasticsearch {
    public static void main(String[] args) throws IOException {
        //on startup
        TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
                //繼續添加其他地址
BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client).source("_all") .destination("target_index") .filter(QueryBuilders.matchQuery("user", "lisi")).get(); System.out.println(response.toString()); //on shutdown client.close(); } }

其中.source后面可以添加具體的索引.source("index1", "index2");等。

用Kibana查詢結果如下:

  生成一個target_index,下面多了一條user為lisi的document

 

ps:這邊的主要面向Document APIs, 其實呢還有很多其他的api, elastic封裝了很多的其他api也能達到同樣的效果。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM