elasticsearch 導入基礎數據並索引之 geo_point


elasticsearch 中的地理信息存儲, 有geo_point形式和geo_shape兩種形式

此篇只敘述geo_point, 

地理位置需要聲明為特殊的類型, 不顯示在mapping中定義的話, 需要

{
    "pin" : {
        "location" : {
            "lat" : 40.12,
            "lon" : -71.34
        },
        "tag" : ["food", "family"],
        "text" : "my favorite family restaurant"
    }
}

如果仍然要顯示的在mapping中定義, 則需要將其聲明為 geo_point格式

{
    "pin" : {
        "properties" : {
            "location" : {
                "type" : "geo_point"
            }
        }
    }
}

es的類型有: string, long, date, geo_point, 以后知道了在補充, text(for binary), 

  range(integer_range, float_range, long_range, double_range, date_range)

  boolean, geo_point, geo_shape, ip, keyword, nested, token_count.. 可以參見這兒

多說一句: location的數據存放有3種形式: 

  

    1), location: lat + "," + lon    // 最開始用的這個, 但是做 geoHashCellQuery查詢測試時, 報錯了
  2) location: {

      "lat": ..., 
      "lon": ...    
    }               // 這個是我使用的導入方式
  3), location: [lon, lat]      // 這個沒用, 沒測試, 沒發言權     

 

 1, 導入查詢數據, 使用的建立mapping的方式, 因為需要聲明ik分詞器

package com.iwhere.geosearch;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.yaml.snakeyaml.tokens.StreamStartToken;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

/**
 * 從json文件中導入數據到es中
 * @author 231
 *
 */
public final class ImportData {

    private TransportClient client;
    
    @Test
    public void test1() throws Exception {
        createMapping("test", "catchModel");
        importData("test", "catchModel");
        System.out.println("success");
    }
    
    /**
     * 插入https://www.elastic.co/blog/geo-location-and-search的測試數據
     * @throws Exception 
     */
    @Test
    public void test2() throws Exception {
        String index = "geo";
        String type = "test";
        BulkRequestBuilder prepareBulk = client.prepareBulk();
        for (int i = 0; i < 50; i++) {
            XContentBuilder source = getJson(40 + i, -71.34 + i, "my favorite family restaurant");
            prepareBulk.add(client.prepareIndex(index, type).setSource(source));
        }
        BulkResponse response = prepareBulk.execute().actionGet();
    }
    
    public XContentBuilder getJson(double lat, double lon, String text) throws IOException {
        return XContentFactory.jsonBuilder()
                .startObject()
                    .startObject("pin")
                        .startObject("location").field("lat", lat).field("lon", lon).endObject()
                        .field("tag", "food", "family")
                        .field("text", text)
                    .endObject()
                .endObject();
    }
    
    
    /**
     * 導入數據
     * @throws Exception 
     */
    public void importData(String index, String type) throws Exception {
        BufferedReader br = new BufferedReader(new FileReader(new File("D://catchModel.json")));
        StringBuilder sb = new StringBuilder();
        String line = null;
        while((line = br.readLine()) != null) {
            sb.append(line);
        }
        
        BulkRequestBuilder prepareBulk = client.prepareBulk();
        JSONArray parseArray = JSON.parseArray(sb.toString());
        for (Object object : parseArray) {
            // 強轉為map, 否則報錯  the number of object passed must be even
            Map<String, Object> source = (Map<String, Object>) object;
//            IndexResponse response = client.prepareIndex(index, type).setSource(source).get();
            XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
                .startObject()
                    .field("taskName", source.get("taskName"))
                    .field("sessionId", source.get("sessionId"))
                    .field("geoNum", source.get("geoNum"))
                    .field("geoLevel", source.get("geoLevel"))
                    .field("createTime", source.get("createTime"))
                    .field("endTime", source.get("endTime"))
                    .startObject("location").field("lat", source.get("lbLat"))
                            .field("lon", source.get("lbLng"))
//                    .field("location", source.get("lbLat") + "," + source.get("lbLng"))
//                            XContentFactory.jsonBuilder()
//                                        .startObject()
//                                            .field("lat", source.get("lbLat"))
//                                            .field("lon", source.get("lblng"))
//                                        .endObject())
                    .endObject();
            prepareBulk.add(client.prepareIndex(index, type).setSource(xContentBuilder));
        }
        BulkResponse response = prepareBulk.get();
        System.out.println(response);
    }
    
    
    /**
     * 創建mapping, 添加ik分詞器等, 相當於創建數據庫表
     * 索引庫名: indices
     * 類型      : mappingType
     * field("indexAnalyzer", "ik"): 字段分詞ik索引
     * field("searchAnalyzer", "ik"): ik分詞查詢
     * @throws Exception 
     */
    public void createMapping(String indices, String type) throws Exception {
        
        // 創建index
        Map<String, Object> settings = new HashMap<>();
        settings.put("number_of_shards", 4);    // 分片數量
        settings.put("number_of_replicas", 0);    // 復制數量, 導入時最好為0, 之后2-3即可
        settings.put("refresh_interval", "10s");// 刷新時間
        
        CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate(indices);
        prepareCreate.setSettings(settings);
        
        // 創建mapping
        XContentBuilder mapping = XContentFactory.jsonBuilder()
            .startObject()
                .startObject(type)
//                    .startObject("_ttl")//有了這個設置,就等於在這個給索引的記錄增加了失效時間,  
//                    //ttl的使用地方如在分布式下,web系統用戶登錄狀態的維護.  
//                        .field("enabled", true)//默認的false的  
//                        .field("default", "5m")//默認的失效時間,d/h/m/s 即天/小時/分鍾/秒  
//                        .field("store", "yes")  
//                        .field("index", "not_analyzed")  
//                    .endObject() 
//                     .startObject("_timestamp")//這個字段為時間戳字段.即你添加一條索引記錄后,自動給該記錄增加個時間字段(記錄的創建時間),搜索中可以直接搜索該字段.  
//                        .field("enabled", true)  
//                        .field("store", "no")  
//                        .field("index", "not_analyzed")  
//                    .endObject() 
                .startObject("properties")
                .startObject("taskName").field("type", "string").field("analyzer", "ik").field("searchAnalyzer", "ik").endObject()
                .startObject("sessionId").field("type", "string").endObject()
                .startObject("geoNum").field("type", "string").endObject()
                .startObject("grandPaGeoNum").field("type", "string").endObject()
                .startArray("sonGeoNum").endArray()
                .startObject("geoLevel").field("type", "long").endObject()
                .startObject("state").field("type", "long").endObject()
                .startObject("createTime").field("type", "date").endObject()
                .startObject("endTime").field("type", "date").endObject()
                .startObject("location")
                    .field("type", "geo_point").field("geohash_prefix", true).field("geohash_precision", "1km").endObject()/*.field("lat_lon", true)*/.endObject()
            .endObject().endObject();
        System.out.println(mapping.string());
//        PutMappingResponse actionGet = client.admin().|indices().preparePutMapping(indices).setType(indices).setSource(mapping).execute().actionGet();
        prepareCreate.addMapping(type, mapping);
        CreateIndexResponse response = prepareCreate.execute().actionGet();
        System.out.println(response);
    }
    
    @Before
    public void before() {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContxt-escluster.xml");
        client = context.getBean(TransportClient.class);
    } 
}

2, 地理位置查詢

package com.iwhere.geosearch;

import java.net.InetSocketAddress;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.geo.GeoDistance;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.common.geo.builders.ShapeBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder;
import org.elasticsearch.index.query.GeoDistanceQueryBuilder;
import org.elasticsearch.index.query.GeoDistanceRangeQueryBuilder;
import org.elasticsearch.index.query.GeoPolygonQueryBuilder;
import org.elasticsearch.index.query.GeoShapeQueryBuilder;
import org.elasticsearch.index.query.GeohashCellQuery.Builder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Before;
import org.junit.Test;

/**
 * 使用dsl查詢
 * @author 231
 */
public class JavaESGEO {

    private TransportClient client;
    
    /**
     * Caused by: [test] QueryParsingException[Field [location] is not a geo_shape]
   * 報錯, 沒運行出來
*/ @Test public void testGeoShapeQuery() { GeoShapeQueryBuilder geoShapeQuery = QueryBuilders.geoShapeQuery("location", ShapeBuilder.newMultiPoint() .point(0, 0) .point(0, 10) .point(10, 10) .point(10, 0) .point(0, 0) , ShapeRelation.WITHIN); System.out.println(geoShapeQuery); SearchResponse response = client.prepareSearch("geo") .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(geoShapeQuery).get(); } /** * Caused by: java.lang.IllegalStateException: Shape with name [AVqw3mb-kOe4Yke4p-lh] found but missing shape field */ @Test public void testGeoShapeQuery1() { GeoShapeQueryBuilder queryBuilder = QueryBuilders.geoShapeQuery( "model.location", // field "AVqxrMyikOe4Yke4p_Wx", // id of document "catchModel", ShapeRelation.WITHIN) // type, relation .indexedShapeIndex("test") // name of index .indexedShapePath("location"); // filed specified as path SearchResponse response = client.prepareSearch() .setQuery(queryBuilder).execute().actionGet(); String string = response.getHits().getHits().toString(); System.out.println(string); } /** * 使用 BoundingBoxQuery進行查詢 */ @Test public void testGeoBoundingBoxQuery( ){ GeoBoundingBoxQueryBuilder queryBuilder = QueryBuilders.geoBoundingBoxQuery("location") .topRight(40.0, 117) .bottomLeft(39.9, 116); SearchResponse searchResponse = client.prepareSearch("test") .setQuery(queryBuilder).get(); System.out.println(searchResponse); System.err.println(searchResponse.getHits().totalHits()); } /** * distance query 查詢 */ @Test public void testDistanceQuery() { GeoDistanceQueryBuilder queryBuilder = QueryBuilders.geoDistanceQuery("location") .point(40, 116.5) .distance(20, DistanceUnit.KILOMETERS) .optimizeBbox("memory") .geoDistance(GeoDistance.ARC); SearchResponse response = client.prepareSearch("geo", "test") .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(queryBuilder).execute().actionGet(); System.out.println(response); System.err.println(response.getHits().totalHits()); } /** * 環形查詢 */ @Test public void testDistanceRangeQuery() { GeoDistanceRangeQueryBuilder queryBuilder = QueryBuilders.geoDistanceRangeQuery("location") .point(40, 116.5) // 中心點 .from("20km") // 內環 .to("25km") //外環 .includeLower(true) // 包含上屆 .includeUpper(true) // 包含下屆 .optimizeBbox("memory") // 邊界框 .geoDistance(GeoDistance.SLOPPY_ARC); SearchResponse response = client.prepareSearch("test") .setSearchType(SearchType.DFS_QUERY_AND_FETCH) .setQuery(queryBuilder).execute().actionGet(); System.out.println(response); System.out.println(response.getHits().totalHits()); } /** * 多邊形查詢 */ @Test public void testPolygonQuery() { GeoPolygonQueryBuilder queryBuilder = QueryBuilders.geoPolygonQuery("location") .addPoint(39, 116) .addPoint(39, 117) .addPoint(40, 117); SearchResponse response = client.prepareSearch("test", "geo") .setQuery(queryBuilder).get(); System.out.println(response); System.err.println(response.getHits().totalHits()); } /** * geoHash查詢 * 要使用, 需要先開啟 * "location": { "type": "geo_point", "geohash_prefix": true, "geohash_precision": "1km" // 精度, 可在mapping中指定, 也可在代碼中指定 */ @Test public void testGeoHashCellQuery() { Builder precision = QueryBuilders.geoHashCellQuery("location", new GeoPoint(39.9, 116)) .neighbors(true) .precision(3); SearchResponse response = client.prepareSearch("test") .setQuery(precision).get(); System.out.println(response); System.err.println(response.getHits().totalHits()); } @Before public void testBefore() { Settings settings = Settings.settingsBuilder().put("cluster.name", "wenbronk_escluster") .put("client.transport.sniff", true).build(); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300))); System.out.println("success to connect escluster"); } }

其他配置信息見: spring整合java一章

 基礎數據從mongodb中拷貝來的, 在github有一個小量的數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM