Java操作es集群步驟1:配置集群對象信息;2:創建客戶端;3:查看集群信息
1:集群名稱
默認集群名為elasticsearch,如果集群名稱和指定的不一致則在使用節點資源時會報錯。
2:嗅探功能
通過client.transport.sniff啟動嗅探功能,這樣只需要指定集群中的某一個節點(不一定是主節點),然后會加載集群中的其他節點,這樣只要程序不停即使此節點宕機仍然可以連接到其他節點。
3:查詢類型SearchType.QUERY_THEN_FETCH
Es中一共有四種查詢類型。
QUERY_AND_FETCH:
主節點將查詢請求分發到所有的分片中,各個分片按照自己的查詢規則即詞頻文檔頻率進行打分排序,然后將結果返回給主節點,主節點對所有數據進行匯總排序然后再返回給客戶端,此種方式只需要和es交互一次。
這種查詢方式存在數據量和排序問題,主節點會匯總所有分片返回的數據這樣數據量會比較大,二是各個分片上的規則可能不一致。
QUERY_THEN_FETCH:
主節點將請求分發給所有分片,各個分片打分排序后將數據的id和分值返回給主節點,主節點收到后進行匯總排序再根據排序后的id到對應的節點讀取對應的數據再返回給客戶端,此種方式需要和es交互兩次。
這種方式解決了數據量問題但是排序問題依然存在而且是es的默認查詢方式。
DFS_QUERY_AND_FETCH和DFS_QUERY_THEN_FETCH:
這兩種方式和前面兩種的區別在於將各個分片的規則統一起來進行打分。解決了排序問題但是DFS_QUERY_AND_FETCH仍然存在數據量問題,DFS_QUERY_THEN_FETCH兩種噢乖你問題都解決但是效率是最差的。
特點:
一個交互兩次,一個交互一次;一個統一打分規則一個不統一;一個分片返回詳細數據一個分片返回id。
4:分頁壓力
我們通過curl和java查詢時都可以指定分頁,但是頁數越往后服務器的壓力會越大。大多數搜索引擎都不會提供非常大的頁數搜索,原因有兩個一是用戶習慣一般不會看頁數大的搜索結果因為越往后越不准確,二是服務器壓力。
比如分片是5分頁單位是10查詢第10000到10010條記錄,es需要在所有分片上進行查詢,每個分片會產生10010條排序后的數據然后返回給主節點,主節點接收5個分片的數據一共是50050條然后再進行匯總最后再取其中的10000到10010條數據返回給客戶端,這樣一來看似只請求了10條數據但實際上es要匯總5萬多條數據,所以頁碼越大服務器的壓力就越大。
5:超時timeout
查詢時如果數據量很大,可以指定超時時間即到達此時間后無論查詢的結果是什么都會返回並且關閉連接,這樣用戶體驗較好缺點是查詢出的數據可能不完整,Java和curl都可以指定超時時間。
6:maven依賴
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>1.4.4</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.1.3</version>
- </dependency>
以下是java代碼
- package elasticsearch;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ExecutionException;
- import online.elasticsearch.bean.Student;
- import org.elasticsearch.ElasticsearchException;
- import org.elasticsearch.action.bulk.BulkItemResponse;
- import org.elasticsearch.action.bulk.BulkRequestBuilder;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteRequest;
- import org.elasticsearch.action.delete.DeleteResponse;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.action.search.SearchType;
- import org.elasticsearch.action.update.UpdateRequest;
- import org.elasticsearch.action.update.UpdateResponse;
- import org.elasticsearch.client.transport.TransportClient;
- import org.elasticsearch.cluster.node.DiscoveryNode;
- import org.elasticsearch.common.collect.ImmutableList;
- import org.elasticsearch.common.settings.ImmutableSettings;
- import org.elasticsearch.common.settings.Settings;
- import org.elasticsearch.common.text.Text;
- import org.elasticsearch.common.transport.InetSocketTransportAddress;
- import org.elasticsearch.common.transport.TransportAddress;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.XContentFactory;
- import org.elasticsearch.index.query.FilterBuilders;
- import org.elasticsearch.index.query.MatchQueryBuilder.Operator;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.SearchHits;
- import org.elasticsearch.search.aggregations.Aggregation;
- import org.elasticsearch.search.aggregations.AggregationBuilders;
- import org.elasticsearch.search.aggregations.Aggregations;
- import org.elasticsearch.search.aggregations.bucket.terms.Terms;
- import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
- import org.elasticsearch.search.aggregations.metrics.sum.Sum;
- import org.elasticsearch.search.highlight.HighlightField;
- import org.elasticsearch.search.sort.SortOrder;
- import org.junit.Before;
- import org.junit.Test;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- public class elastaicTest {
- TransportClient transportClient;
- //索引庫名
- String index = "shb01";
- //類型名稱
- String type = "stu";
- @Before
- public void before()
- {
- /**
- * 1:通過 setting對象來指定集群配置信息
- */
- Settings setting = ImmutableSettings.settingsBuilder()
- .put("cluster.name", "shb01")//指定集群名稱
- .put("client.transport.sniff", true)//啟動嗅探功能
- .build();
- /**
- * 2:創建客戶端
- * 通過setting來創建,若不指定則默認鏈接的集群名為elasticsearch
- * 鏈接使用tcp協議即9300
- */
- transportClient = new TransportClient(setting);
- TransportAddress transportAddress = new InetSocketTransportAddress("192.168.79.131", 9300);
- transportClient.addTransportAddresses(transportAddress);
- /**
- * 3:查看集群信息
- * 注意我的集群結構是:
- * 131的elasticsearch.yml中指定為主節點不能存儲數據,
- * 128的elasticsearch.yml中指定不為主節點只能存儲數據。
- * 所有控制台只打印了192.168.79.128,只能獲取數據節點
- *
- */
- ImmutableList<DiscoveryNode> connectedNodes = transportClient.connectedNodes();
- for(DiscoveryNode node : connectedNodes)
- {
- System.out.println(node.getHostAddress());
- }
- }
- /**
- * 通過prepareGet方法獲取指定文檔信息
- */
- @Test
- public void testGet() {
- GetResponse getResponse = transportClient.prepareGet(index, type, "1").get();
- System.out.println(getResponse.getSourceAsString());
- }
- /**
- * prepareUpdate更新索引庫中文檔,如果文檔不存在則會報錯
- * @throws IOException
- *
- */
- @Test
- public void testUpdate() throws IOException
- {
- XContentBuilder source = XContentFactory.jsonBuilder()
- .startObject()
- .field("name", "will")
- .endObject();
- UpdateResponse updateResponse = transportClient
- .prepareUpdate(index, type, "6").setDoc(source).get();
- System.out.println(updateResponse.getVersion());
- }
- /**
- * 通過prepareIndex增加文檔,參數為json字符串
- */
- @Test
- public void testIndexJson()
- {
- String source = "{\"name\":\"will\",\"age\":18}";
- IndexResponse indexResponse = transportClient
- .prepareIndex(index, type, "3").setSource(source).get();
- System.out.println(indexResponse.getVersion());
- }
- /**
- * 通過prepareIndex增加文檔,參數為Map<String,Object>
- */
- @Test
- public void testIndexMap()
- {
- Map<String, Object> source = new HashMap<String, Object>(2);
- source.put("name", "Alice");
- source.put("age", 16);
- IndexResponse indexResponse = transportClient
- .prepareIndex(index, type, "4").setSource(source).get();
- System.out.println(indexResponse.getVersion());
- }
- /**
- * 通過prepareIndex增加文檔,參數為javaBean
- *
- * @throws ElasticsearchException
- * @throws JsonProcessingException
- */
- @Test
- public void testIndexBean() throws ElasticsearchException, JsonProcessingException
- {
- Student stu = new Student();
- stu.setName("Fresh");
- stu.setAge(22);
- ObjectMapper mapper = new ObjectMapper();
- IndexResponse indexResponse = transportClient
- .prepareIndex(index, type, "5").setSource(mapper.writeValueAsString(stu)).get();
- System.out.println(indexResponse.getVersion());
- }
- /**
- * 通過prepareIndex增加文檔,參數為XContentBuilder
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ExecutionException
- */
- @Test
- public void testIndexXContentBuilder() throws IOException, InterruptedException, ExecutionException
- {
- XContentBuilder builder = XContentFactory.jsonBuilder()
- .startObject()
- .field("name", "Avivi")
- .field("age", 30)
- .endObject();
- IndexResponse indexResponse = transportClient
- .prepareIndex(index, type, "6")
- .setSource(builder)
- .execute().get();
- //.execute().get();和get()效果一樣
- System.out.println(indexResponse.getVersion());
- }
- /**
- * 通過prepareDelete刪除文檔
- *
- */
- @Test
- public void testDelete()
- {
- String id = "9";
- DeleteResponse deleteResponse = transportClient.prepareDelete(index,
- type, id).get();
- System.out.println(deleteResponse.getVersion());
- //刪除所有記錄
- transportClient.prepareDeleteByQuery(index).setTypes(type)
- .setQuery(QueryBuilders.matchAllQuery()).get();
- }
- /**
- * 刪除索引庫,不可逆慎用
- */
- @Test
- public void testDeleteeIndex()
- {
- transportClient.admin().indices().prepareDelete("shb01","shb02").get();
- }
- /**
- * 求索引庫文檔總數
- */
- @Test
- public void testCount()
- {
- long count = transportClient.prepareCount(index).get().getCount();
- System.out.println(count);
- }
- /**
- * 通過prepareBulk執行批處理
- *
- * @throws IOException
- */
- @Test
- public void testBulk() throws IOException
- {
- //1:生成bulk
- BulkRequestBuilder bulk = transportClient.prepareBulk();
- //2:新增
- IndexRequest add = new IndexRequest(index, type, "10");
- add.source(XContentFactory.jsonBuilder()
- .startObject()
- .field("name", "Henrry").field("age", 30)
- .endObject());
- //3:刪除
- DeleteRequest del = new DeleteRequest(index, type, "1");
- //4:修改
- XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("name", "jack_1").field("age", 19).endObject();
- UpdateRequest update = new UpdateRequest(index, type, "2");
- update.doc(source);
- bulk.add(del);
- bulk.add(add);
- bulk.add(update);
- //5:執行批處理
- BulkResponse bulkResponse = bulk.get();
- if(bulkResponse.hasFailures())
- {
- BulkItemResponse[] items = bulkResponse.getItems();
- for(BulkItemResponse item : items)
- {
- System.out.println(item.getFailureMessage());
- }
- }
- else
- {
- System.out.println("全部執行成功!");
- }
- }
- /**
- * 通過prepareSearch查詢索引庫
- * setQuery(QueryBuilders.matchQuery("name", "jack"))
- * setSearchType(SearchType.QUERY_THEN_FETCH)
- *
- */
- @Test
- public void testSearch()
- {
- SearchResponse searchResponse = transportClient.prepareSearch(index)
- .setTypes(type)
- .setQuery(QueryBuilders.matchAllQuery()) //查詢所有
- //.setQuery(QueryBuilders.matchQuery("name", "tom").operator(Operator.AND)) //根據tom分詞查詢name,默認or
- //.setQuery(QueryBuilders.multiMatchQuery("tom", "name", "age")) //指定查詢的字段
- //.setQuery(QueryBuilders.queryString("name:to* AND age:[0 TO 19]")) //根據條件查詢,支持通配符大於等於0小於等於19
- //.setQuery(QueryBuilders.termQuery("name", "tom"))//查詢時不分詞
- .setSearchType(SearchType.QUERY_THEN_FETCH)
- .setFrom(0).setSize(10)//分頁
- .addSort("age", SortOrder.DESC)//排序
- .get();
- SearchHits hits = searchResponse.getHits();
- long total = hits.getTotalHits();
- System.out.println(total);
- SearchHit[] searchHits = hits.hits();
- for(SearchHit s : searchHits)
- {
- System.out.println(s.getSourceAsString());
- }
- }
- /**
- * 多索引,多類型查詢
- * timeout
- */
- @Test
- public void testSearchsAndTimeout()
- {
- SearchResponse searchResponse = transportClient.prepareSearch("shb01","shb02").setTypes("stu","tea")
- .setQuery(QueryBuilders.matchAllQuery())
- .setSearchType(SearchType.QUERY_THEN_FETCH)
- .setTimeout("3")
- .get();
- SearchHits hits = searchResponse.getHits();
- long totalHits = hits.getTotalHits();
- System.out.println(totalHits);
- SearchHit[] hits2 = hits.getHits();
- for(SearchHit h : hits2)
- {
- System.out.println(h.getSourceAsString());
- }
- }
- /**
- * 過濾,
- * lt 小於
- * gt 大於
- * lte 小於等於
- * gte 大於等於
- *
- */
- @Test
- public void testFilter()
- {
- SearchResponse searchResponse = transportClient.prepareSearch(index)
- .setTypes(type)
- .setQuery(QueryBuilders.matchAllQuery()) //查詢所有
- .setSearchType(SearchType.QUERY_THEN_FETCH)
- // .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19)
- // .includeLower(true).includeUpper(true))
- .setPostFilter(FilterBuilders.rangeFilter("age").gte(18).lte(22))
- .setExplain(true) //explain為true表示根據數據相關度排序,和關鍵字匹配最高的排在前面
- .get();
- SearchHits hits = searchResponse.getHits();
- long total = hits.getTotalHits();
- System.out.println(total);
- SearchHit[] searchHits = hits.hits();
- for(SearchHit s : searchHits)
- {
- System.out.println(s.getSourceAsString());
- }
- }
- /**
- * 高亮
- */
- @Test
- public void testHighLight()
- {
- SearchResponse searchResponse = transportClient.prepareSearch(index)
- .setTypes(type)
- //.setQuery(QueryBuilders.matchQuery("name", "Fresh")) //查詢所有
- .setQuery(QueryBuilders.queryString("name:F*"))
- .setSearchType(SearchType.QUERY_THEN_FETCH)
- .addHighlightedField("name")
- .setHighlighterPreTags("<font color='red'>")
- .setHighlighterPostTags("</font>")
- .get();
- SearchHits hits = searchResponse.getHits();
- System.out.println("sum:" + hits.getTotalHits());
- SearchHit[] hits2 = hits.getHits();
- for(SearchHit s : hits2)
- {
- Map<String, HighlightField> highlightFields = s.getHighlightFields();
- HighlightField highlightField = highlightFields.get("name");
- if(null != highlightField)
- {
- Text[] fragments = highlightField.fragments();
- System.out.println(fragments[0]);
- }
- System.out.println(s.getSourceAsString());
- }
- }
- /**
- * 分組
- */
- @Test
- public void testGroupBy()
- {
- SearchResponse searchResponse = transportClient.prepareSearch(index).setTypes(type)
- .setQuery(QueryBuilders.matchAllQuery())
- .setSearchType(SearchType.QUERY_THEN_FETCH)
- .addAggregation(AggregationBuilders.terms("group_age")
- .field("age").size(0))//根據age分組,默認返回10,size(0)也是10
- .get();
- Terms terms = searchResponse.getAggregations().get("group_age");
- List<Bucket> buckets = terms.getBuckets();
- for(Bucket bt : buckets)
- {
- System.out.println(bt.getKey() + " " + bt.getDocCount());
- }
- }
- /**
- * 聚合函數,本例之編寫了sum,其他的聚合函數也可以實現
- *
- */
- @Test
- public void testMethod()
- {
- SearchResponse searchResponse = transportClient.prepareSearch(index).setTypes(type)
- .setQuery(QueryBuilders.matchAllQuery())
- .setSearchType(SearchType.QUERY_THEN_FETCH)
- .addAggregation(AggregationBuilders.terms("group_name").field("name")
- .subAggregation(AggregationBuilders.sum("sum_age").field("age")))
- .get();
- Terms terms = searchResponse.getAggregations().get("group_name");
- List<Bucket> buckets = terms.getBuckets();
- for(Bucket bt : buckets)
- {
- Sum sum = bt.getAggregations().get("sum_age");
- System.out.println(bt.getKey() + " " + bt.getDocCount() + " "+ sum.getValue());
- }
- }
- }