ElasticSearch Java api 詳解_V1.0
集群的連接
作為Elasticsearch節點
實例化一個節點的客戶端是獲得客戶端的最簡單的方式。這個Client可以執行elasticsearch相關的操作。
import static org.elasticsearch.node.NodeBuilder.*; // on startup Node node = nodeBuilder().node(); Client client = node.client(); // on shutdown node.close();
當你啟動一個node
,它就加入了elasticsearch集群。你可以通過簡單的設置cluster.name
或者明確地使用clusterName
方法擁有不同的集群。
你能夠在你項目的/src/main/resources/elasticsearch.yml
文件中定義cluster.name
。只要elasticsearch.yml
在classpath目錄下面,你就能夠用到它來啟動你的節點。
cluster.name: yourclustername
或者通過java:
Node node = nodeBuilder().clusterName("yourclustername").node();
Client client = node.client();
利用Client的好處是,操作可以自動地路由到這些操作被執行的節點,而不需要執行雙跳(double hop)。例如,索引操作將會在該操作最終存在的分片上執行。
當你啟動了一個節點,最重要的決定是是否它將保有數據。大多數情況下,我們僅僅需要用到clients,而不需要分片分配給它們。這可以通過設置node.data
為false或者設置 node.client
為true來簡單實現。
import static org.elasticsearch.node.NodeBuilder.*; // on startup Node node = nodeBuilder().client(true).node(); Client client = node.client(); // on shutdown node.close();
傳輸(transport)客戶端
TransportClient
利用transport模塊遠程連接一個elasticsearch集群。它並不加入到集群中,只是簡單的獲得一個或者多個初始化的transport地址,並以輪詢的方式與這些地址進行通信。
// on startup Client client = new TransportClient() .addTransportAddress(new InetSocketTransportAddress("host1", 9300)) .addTransportAddress(new InetSocketTransportAddress("host2", 9300)); // on shutdown client.close();
注意,如果你有一個與elasticsearch
集群不同的集群,你可以設置機器的名字。
Settings settings = ImmutableSettings.settingsBuilder() .put("cluster.name", "myClusterName").build(); Client client = new TransportClient(settings); //Add transport addresses and do something with the client...
你也可以用elasticsearch.yml
文件來設置。
這個客戶端可以嗅到集群的其它部分,並將它們加入到機器列表。為了開啟該功能,設置client.transport.sniff
為true。
Settings settings = ImmutableSettings.settingsBuilder() .put("client.transport.sniff", true).build(); TransportClient client = new TransportClient(settings);
其它的transport客戶端設置有如下幾個:
Parameter | Description |
---|---|
client.transport.ignore_cluster_name | true:忽略連接節點的集群名驗證 |
client.transport.ping_timeout | ping一個節點的響應時間,默認是5s |
client.transport.nodes_sampler_interval | sample/ping 節點的時間間隔,默認是5s |
Java 索引API
索引API允許開發者索引類型化的JSON文檔到一個特定的索引,使其可以被搜索。
生成JSON文檔
有幾種不同的方式生成JSON文檔
- 利用
byte[]
或者作為一個String
手動生成 - 利用一個
Map
將其自動轉換為相應的JSON - 利用第三方庫如Jackson去序列化你的bean
- 利用內置的幫助函數XContentFactory.jsonBuilder()
手動生成
需要注意的是,要通過Date Format編碼日期。
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";
使用map
Map<String, Object> json = new HashMap<String, Object>(); json.put("user","kimchy"); json.put("postDate",new Date()); json.put("message","trying out Elasticsearch");
序列化bean
elasticsearch早就用到了Jackson,把它放在了org.elasticsearch.common.jackson
下面。你可以在你的pom.xml
文件里面添加你自己的Jackson版本。
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.1.3</version> </dependency>
這樣,你就可以序列化你的bean為JSON。
import com.fasterxml.jackson.databind.*; // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json String json = mapper.writeValueAsString(yourbeaninstance);
利用elasticsearch幫助類
elasticsearch提供了內置的幫助類來將數據轉換為JSON
import static org.elasticsearch.common.xcontent.XContentFactory.*; XContentBuilder builder = jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject()
注意,你也可以使用startArray(String)
和endArray()
方法添加數組。另外,field
可以接收任何類型的對象,你可以直接傳遞數字、時間甚至XContentBuilder對象。
可以用下面的方法查看json。
String json = builder.string();
索引文檔
下面的例子將JSON文檔索引為一個名字為“twitter”,類型為“tweet”,id值為1的索引。
import static org.elasticsearch.common.xcontent.XContentFactory.*; IndexResponse response = client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) .execute() .actionGet();
你也可以不提供id:
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json) .execute() .actionGet();
IndexResponse
將會提供給你索引信息
// Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion();
如果你在索引時提供了過濾,那么IndexResponse
將會提供一個過濾器(percolator )
IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
.setSource(json)
.execute()
.actionGet();
List<String> matches = response.matches();
Java 獲取API
獲取API允許你通過id從索引中獲取類型化的JSON文檔,如下例:
GetResponse response = client.prepareGet("twitter", "tweet", "1")
.execute()
.actionGet();
操作線程
The get API allows to set the threading model the operation will be performed when the actual execution of the API is performed on the same node (the API is executed on a shard that is allocated on the same server).
默認情況下,operationThreaded
設置為true表示操作執行在不同的線程上面。下面是一個設置為false的例子。
GetResponse response = client.prepareGet("twitter", "tweet", "1") .setOperationThreaded(false) .execute() .actionGet();
刪除API
刪除api允許你通過id,從特定的索引中刪除類型化的JSON文檔。如下例:
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
.execute()
.actionGet();
操作線程
The get API allows to set the threading model the operation will be performed when the actual execution of the API is performed on the same node (the API is executed on a shard that is allocated on the same server).
默認情況下,operationThreaded
設置為true表示操作執行在不同的線程上面。下面是一個設置為false的例子。
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1") .setOperationThreaded(false) .execute() .actionGet();
更新API
你能夠創建一個UpdateRequest
,然后將其發送給client。
UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
或者你也可以利用prepareUpdate
方法
client.prepareUpdate("ttl", "doc", "1") .setScript("ctx._source.gender = \"male\"" , ScriptService.ScriptType.INLINE) .get(); client.prepareUpdate("ttl", "doc", "1") .setDoc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .get();
1-3行用腳本來更新索引,5-10行用doc來更新索引。
當然,java API也支持使用upsert
。如果文檔還不存在,會根據upsert
內容創建一個新的索引。
IndexRequest indexRequest = new IndexRequest("index", "type", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject()); UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .upsert(indexRequest); client.update(updateRequest).get();
如果文檔index/type/1
已經存在,那么在更新操作完成之后,文檔為:
{ "name" : "Joe Dalton", "gender": "male" }
否則,文檔為:
{ "name" : "Joe Smith", "gender": "male" }
bulk API
bulk API允許開發者在一個請求中索引和刪除多個文檔。下面是使用實例。
import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item }
搜索API
搜索API允許開發者執行一個搜索查詢,返回滿足查詢條件的搜索信息。它能夠跨索引以及跨類型執行。查詢既可以用Java查詢API也可以用Java過濾API。 查詢的請求體由SearchSourceBuilder
構建。
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.FilterBuilders.*; import org.elasticsearch.index.query.QueryBuilders.*; SearchResponse response = client.prepareSearch("index1", "index2") .setTypes("type1", "type2") .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(QueryBuilders.termQuery("multi", "test")) // Query .setPostFilter(FilterBuilders.rangeFilter("age").from(12).to(18)) // Filter .setFrom(0).setSize(60).setExplain(true) .execute() .actionGet();
注意,所有的參數都是可選的。下面是最簡潔的形式。
// MatchAll on the whole cluster with all default options SearchResponse response = client.prepareSearch().execute().actionGet();
搜索模式(Java Class)
SearchRequestBuilder reqBuilder = client.prepareSearch(App.ESProp.INDEX_NAME) .setTypes("task_info").setSearchType(SearchType.DEFAULT) .setExplain(true); QueryStringQueryBuilder queryString = QueryBuilders .queryString("中華"); queryString.field("taskContent"); queryString.minimumShouldMatch("1"); reqBuilder.setQuery(QueryBuilders.boolQuery().should(queryString)) .setExplain(true); SearchResponse resp = reqBuilder.execute().actionGet(); SearchHit[] hits = resp.getHits().getHits(); List<Map<String, Object>> results = new ArrayList<Map<String, Object>>(); for (SearchHit hit : hits) { results.add(hit.getSource()); } System.out.println("result ---->>>>"); for (int i = 0; i < results.size(); i++) { System.out.println(results.get(i)); }
上面的實例中,包含了一個簡單的查詢,在此有幾點個人的理解,請看下面;
- 基本查詢器
SearchResponse response = client.prepareSearch().execute().actionGet();// 獲取全部
SearchRequestBuilder searchRequestBuilder = client.prepareSearch("index1", "index2"); 在索引為index1 和index2中進行文檔查詢
searchRequestBuilder.setTypes("type1", "type2"); // es 的搜索 Search 不但聯合多個庫(index1、index2),而是可以是跨類型的(即跨表的 type1、type2)
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); //設置查詢類型
searchRequestBuilder.setFrom(0).setSize(10); //設置分頁信息
searchRequestBuilder.addSort("crawlDate", SortOrder.DESC); // 按照時間降序
searchRequestBuilder.setExplain(true); // 設置是否按查詢匹配度排序
searchRequestBuilder.setSearchType,設置搜索類型,主要的搜索類型有:
QUERY_THEN_FETCH:查詢是針對所有的塊執行的,但返回的是足夠的信息,而不是文檔內容(Document)。結果會被排序和分級,基於此,只有相關的塊的文檔對象會被返回。由於被取到的僅僅是這些,故而返回的 hit 的大小正好等於指定的 size。這對於有許多塊的 index 來說是很便利的(返回結果不會有重復的,因為塊被分組了)
QUERY_AND_FETCH:最原始(也可能是最快的)實現就是簡單的在所有相關的 shard上執行檢索並返回結果。每個 shard 返回一定尺寸的結果。由於每個shard已經返回了一定尺寸的hit,這種類型實際上是返回多個 shard的一定尺寸的結果給調用者。
DFS_QUERY_THEN_FETCH:與 QUERY_THEN_FETCH 相同,預期一個初始的散射相伴用來為更准確的 score 計算分配了的term頻率。
DFS_QUERY_AND_FETCH:與 QUERY_AND_FETCH 相同,預期一個初始的散射相伴用來為更准確的 score 計算分配了的term頻率。
SCAN:在執行了沒有進行任何排序的檢索時執行瀏覽。此時將會自動的開始滾動結果集。
COUNT:只計算結果的數量,也會執行 facet。
- Match Query (鏈接內有詳細解釋)
QueryBuilder qb = QueryBuilders.matchQuery("name", "kimchy elasticsearch"); //name是field,kimchy elasticsearch是要查詢的字符串
- MultiMatch Query (鏈接內有詳細解釋)
QueryBuilder qb = QueryBuilders.multiMatchQuery( "kimchy elasticsearch", // Text you are looking for //kimchy elasticsearch是要查詢的字符串 "user", "message" // Fields you query on //user 和 message都是field );
- 構建文本查詢器
QueryStringQueryBuilder queryString = QueryBuilders.queryString("\"" + content + "\""); 構建文本查詢器 queryString.field(k); 設置匹配字段值
- termQuery
強制匹配原則,禁止進行分詞搜索
- Should
should查詢中會默認將查詢分成多個termQuery查詢,他的精准值采用minimumShouldMatch參數進行設置。
Spring ES 操作簡介
連接ES客戶端
@Bean public ElasticsearchTemplate elasticsearchTemplate() { return new ElasticsearchTemplate(client()); } @Bean public Client client(){ Settings settings = ImmutableSettings.settingsBuilder() .put("cluster.name", "elasticsearch") .put("client.transport.ping_timeout", "3s").build(); TransportClient client= new TransportClient(settings); TransportAddress address = new InetSocketTransportAddress("120.24.165.15", 9300); client.addTransportAddress(address); return client; } @Bean public ElasticsearchActionService elasticsearchService() { ElasticsearchActionService elasticsearchService = new ElasticsearchActionService(); elasticsearchService.init(elasticsearchTemplate()); return elasticsearchService; }
初始化索引(庫)
- 初始化文檔庫,建立索引,實現批量新增數據。
private ElasticsearchTemplate elasticsearchTemplate; @Autowired private Client esClient; public void init(ElasticsearchTemplate clzz) { elasticsearchTemplate = (ElasticsearchTemplate) clzz; if (!elasticsearchTemplate.indexExists(App.ESProp.INDEX_NAME)) { elasticsearchTemplate.createIndex(App.ESProp.INDEX_NAME); } elasticsearchTemplate.putMapping(TaskInfo.class); elasticsearchTemplate.putMapping(NewsInfo.class); } /** * 新增或者修改文檔信息 * @author 高國藩 * @date 2017年5月12日 下午3:16:27 * @param taskInfoList * @return */ public boolean update(List<TaskInfo> taskInfoList) { List<IndexQuery> queries = new ArrayList<IndexQuery>(); for (TaskInfo taskInfo : taskInfoList) { IndexQuery indexQuery = new IndexQueryBuilder().withId(taskInfo.getTaskId()).withObject(taskInfo).build(); queries.add(indexQuery); } elasticsearchTemplate.bulkIndex(queries); return true; }
- 采用注解方式,初始化Mapping文件(class)
package com.sk.system.es; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldIndex; import org.springframework.data.elasticsearch.annotations.FieldType; import com.sk.browser.config.App; /** * store 是否存儲 FieldIndex.not_analyzed 不進行分詞 indexAnalyzer="ik" 使用IK進行分詞處理 */ //@Document(indexName = APP.ESProp.INDEX_NAME, type = APP.ESProp.TYPE_TASK_INFO, indexStoreType = APP.ESProp.INDEX_STORE_TYPE, shards = APP.ESProp.SHARDS, replicas = APP.ESProp.REPLICAS, refreshInterval = APP.ESProp.REFRESH_INTERVAL) @Document(indexName = App.ESProp.INDEX_NAME, type = App.ESProp.TYPE_TASK_INFO) public class TaskInfo { @Id //標注ID,將作為文檔ID存在 @Field(index = FieldIndex.not_analyzed, store = true) private String taskId; @Field(type = FieldType.Integer, index = FieldIndex.not_analyzed, store = true) private Integer userId; @Field(type = FieldType.String, indexAnalyzer="ik", searchAnalyzer="ik", store = true) private String taskContent; @Field(type = FieldType.String, indexAnalyzer="ik", searchAnalyzer="ik", store = true) private String taskArea; @Field(type = FieldType.String, indexAnalyzer="ik", searchAnalyzer="ik", store = true) private String taskTags; @Field(type = FieldType.Integer, index = FieldIndex.not_analyzed, store = true) private Integer taskState; @Field(type = FieldType.String, index = FieldIndex.not_analyzed, store = true) private String updateTime; @Field(type = FieldType.String, indexAnalyzer="ik", searchAnalyzer="ik", store = true) private String userNickName; public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public String getTaskContent() { return taskContent; } public void setTaskContent(String taskContent) { this.taskContent = taskContent; } public String getTaskArea() { return taskArea; } public void setTaskArea(String taskArea) { this.taskArea = taskArea; } public String getTaskTags() { return taskTags; } public void setTaskTags(String taskTags) { this.taskTags = taskTags; } public Integer getTaskState() { return taskState; } public void setTaskState(Integer taskState) { this.taskState = taskState; } public String getUpdateTime() { return updateTime; } public void setUpdateTime(String updateTime) { this.updateTime = updateTime; } public String getUserNickName() { return userNickName; } public void setUserNickName(String userNickName) { this.userNickName = userNickName; } @Override public String toString() { return "TaskInfo [taskId=" + taskId + ", userId=" + userId + ", taskContent=" + taskContent + ", taskArea=" + taskArea + ", taskState=" + taskState + ", updateTime=" + updateTime + ", userNickName=" + userNickName + "]"; } public TaskInfo(String taskId, Integer userId, String taskContent, String taskArea, String taskTags, Integer taskState, String updateTime, String userNickName) { this.taskId = taskId; this.userId = userId; this.taskContent = taskContent; this.taskArea = taskArea; this.taskTags = taskTags; this.taskState = taskState; this.updateTime = updateTime; this.userNickName = userNickName; } public TaskInfo() { // TODO Auto-generated constructor stub } }