前言
前面歷經33篇內容的講解,與ES的請求操作都是在Kibana平台上用Restful請求完成的,一直沒發布Java或python的客戶端代碼,Restful才是運用、理解ES核心功能最直接的表達方式,但實際項目中肯定是以Java/python來完成ES請求的發起與數據處理的,前面理解了ES的核心功能,后面Java API的使用將會非常簡單,剩余的未覆蓋的功能API,自行查閱文檔即可。
概要
本篇講解Elasticsearch的客戶端API開發的一些示例,以Java語言為主,介紹一些最常用,最核心的API。
代碼示例
引入依賴
我們以maven項目為例,添加項目依賴
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.3.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.3.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
</dependency>
建立ES連接
- 創建Settings對象,指定集群名稱
- 創建TransportClient對象,手動指定IP、端口即可
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
如果集群的節點數比較多,為每個node分別指定IP、Port可行性不高,我們可以使用集群節點自動探查的功能,代碼如下:
// 將client.transport.sniff設置為true即可打開集群節點自動探查功能
Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build();
// 只需要指定一個node就行
TransportClient client = new PreBuiltTransportClient(settings);
transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));
基本CRUD
最基本的CRUD代碼,可以當作入門demo來寫:
/**
* 創建員工信息(創建一個document)
* @param client
*/
private static void createEmployee(TransportClient client) throws Exception {
IndexResponse response = client.prepareIndex("company", "employee", "1")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("name", "jack")
.field("age", 27)
.field("position", "technique")
.field("country", "china")
.field("join_date", "2017-01-01")
.field("salary", 10000)
.endObject())
.get();
System.out.println(response.getResult());
}
/**
* 獲取員工信息
* @param client
* @throws Exception
*/
private static void getEmployee(TransportClient client) throws Exception {
GetResponse response = client.prepareGet("company", "employee", "1").get();
System.out.println(response.getSourceAsString());
}
/**
* 修改員工信息
* @param client
* @throws Exception
*/
private static void updateEmployee(TransportClient client) throws Exception {
UpdateResponse response = client.prepareUpdate("company", "employee", "1")
.setDoc(XContentFactory.jsonBuilder()
.startObject()
.field("position", "technique manager")
.endObject())
.get();
System.out.println(response.getResult());
}
/**
* 刪除 員工信息
* @param client
* @throws Exception
*/
private static void deleteEmployee(TransportClient client) throws Exception {
DeleteResponse response = client.prepareDelete("company", "employee", "1").get();
System.out.println(response.getResult());
}
搜索
我們之前使用Restful的搜索,現在改用java實現,原有的Restful示例如下:
GET /company/employee/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"position": "technique"
}
}
],
"filter": {
"range": {
"age": {
"gte": 30,
"lte": 40
}
}
}
}
},
"from": 0,
"size": 1
}
等同於這樣的Java代碼:
SearchResponse response = client.prepareSearch("company")
.setTypes("employee")
.setQuery(QueryBuilders.termQuery("position", "technique")) // Query
.setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40)) // Filter
.setFrom(0).setSize(60)
.get();
聚合查詢
聚合查詢稍微麻煩一些,請求的封裝和響應報文的解析,都是根據實際返回的結構來做的,例如下面的查詢:
需求:
- 按照country國家來進行分組
- 在每個country分組內,再按照入職年限進行分組
- 最后計算每個分組內的平均薪資
Restful的請求如下:
GET /company/employee/_search
{
"size": 0,
"aggs": {
"group_by_country": {
"terms": {
"field": "country"
},
"aggs": {
"group_by_join_date": {
"date_histogram": {
"field": "join_date",
"interval": "year"
},
"aggs": {
"avg_salary": {
"avg": {
"field": "salary"
}
}
}
}
}
}
}
}
用Java編寫的請求如下:
SearchResponse sr = node.client().prepareSearch()
.addAggregation(
AggregationBuilders.terms("by_country").field("country")
.subAggregation(AggregationBuilders.dateHistogram("by_year")
.field("dateOfBirth")
.dateHistogramInterval(DateHistogramInterval.YEAR)
.subAggregation(AggregationBuilders.avg("avg_children").field("children"))
)
)
.execute().actionGet();
對響應的處理,則需要一層一層獲取數據:
Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();
StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");
Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();
while(groupByCountryBucketIterator.hasNext()) {
Bucket groupByCountryBucket = groupByCountryBucketIterator.next();
System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount());
Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");
Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();
while(groupByJoinDateBucketIterator.hasNext()) {
org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();
System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount());
Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");
System.out.println(avgSalary.getValue());
}
}
client.close();
}
upsert請求
private static void upsert(TransportClient transport) {
try {
IndexRequest index = new IndexRequest("book_shop", "books", "2").source(
XContentFactory.jsonBuilder().startObject()
.field("name", "mysql從入門到刪庫跑路")
.field("tags", "mysql")
.field("price", 32.8)
.endObject());
UpdateRequest update = new UpdateRequest("book_shop", "books", "2")
.doc(XContentFactory.jsonBuilder()
.startObject().field("price", 31.8)
.endObject())
.upsert(index);
UpdateResponse response = transport.update(update).get();
System.out.println(response.getVersion());
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
mget請求
public static void mget(TransportClient transport) {
MultiGetResponse res = transport.prepareMultiGet()
.add("book_shop", "books", "1")
.add("book_shop", "books", "2")
.get();
for (MultiGetItemResponse item : res.getResponses()) {
System.out.println(item.getResponse());
}
}
bulk請求
public static void bulk(TransportClient transport) {
try {
BulkRequestBuilder bulk = transport.prepareBulk();
bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource(
XContentFactory.jsonBuilder().startObject()
.field("name", "設計模式從入門到拷貝代碼")
.field("tags", "設計模式")
.field("price", 55.9)
.endObject()));
bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource(
XContentFactory.jsonBuilder().startObject()
.field("name", "架構設計從入門到google搜索")
.field("tags", "架構設計")
.field("price", 68.9)
.endObject()));
bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder()
.startObject().field("price", 32.8)
.endObject())));
BulkResponse bulkRes = bulk.get();
if (bulkRes.hasFailures()) {
System.out.println("Error...");
}
} catch (IOException e) {
e.printStackTrace();
}
}
scorll請求
public static void scorll(TransportClient client) {
SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new TimeValue(60000)).setSize(1).get();
int batchCnt = 0;
do {
// 循環讀取scrollid信息,直到結果為空
for(SearchHit hit: bookShop.getHits().getHits()) {
System.out.println("batchCnt:" + ++batchCnt);
System.out.println(hit.getSourceAsString());
}
bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
} while (bookShop.getHits().getHits().length != 0);
}
搜索模板
public static void searchTemplates(TransportClient client) {
Map<String,Object> params = new HashMap<>(10);
params.put("from",0);
params.put("size",10);
params.put("tags","java");
SearchTemplateResponse str = new SearchTemplateRequestBuilder(client)
.setScript("page_query_by_tags")
.setScriptType(ScriptType.STORED)
.setScriptParams(params)
.setRequest(new SearchRequest())
.get();
for(SearchHit hit:str.getResponse().getHits().getHits()) {
System.out.println(hit.getSourceAsString());
}
}
多條件組合查詢
public static void otherSearch(TransportClient client) {
SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get();
SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get();
SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入門")).get();
SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get();
System.out.println(response1.getHits().getHits()[0].getSourceAsString());
System.out.println(response2.getHits().getHits()[0].getSourceAsString());
System.out.println(response3.getHits().getHits()[0].getSourceAsString());
System.out.println(response4.getHits().getHits()[0].getSourceAsString());
// 多個條件組合
SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("tags", "java"))
.mustNot(QueryBuilders.matchQuery("name", "跑路"))
.should(QueryBuilders.matchQuery("name", "入門"))
.filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get();
System.out.println(response5.getHits().getHits()[0].getSourceAsString());
}
地理位置查詢
public static void geo(TransportClient client) {
GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114);
List<GeoPoint> points = new ArrayList<>();
points.add(new GeoPoint(23,115));
points.add(new GeoPoint(25,113));
points.add(new GeoPoint(21,112));
GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points);
GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS);
SearchResponse response = client.prepareSearch("location").setQuery(query3).get();
for(SearchHit hit:response.getHits().getHits()) {
System.out.println(hit.getSourceAsString());
}
}
小結
上述的那些案例demo,快速瀏覽一下即可,如果已經在開發ES相關的項目,還是多參考官方的API文檔:https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.3/index.html。上面有很詳盡的API說明和使用Demo
專注Java高並發、分布式架構,更多技術干貨分享與心得,請關注公眾號:Java架構社區
可以掃左邊二維碼添加好友,邀請你加入Java架構社區微信群共同探討技術