java-es查詢


前釋:此為結合項目中的應用場景自己想的開發方案,項目在自己本機搭建,自定義模擬數據(不是海量數據)。

1:應用背景。

  全國300多城市的小區,及多方發布的房源數據的一個檢索功能。

  全國的房源小區數據搜索。(多條件模糊查詢,經緯度查詢)

2:技術實現:ELK(elasticSearch+logstash+kibana)+ filebeat + kafka

3:數據來源:

  1. 原始數據

  2. 其它平台的定時推送過來的房源數據

  3. 平台本身用戶發布的房源數據

4:數據源處理方案:

  4.1. 原始數據采用sqoop處理。

    實現:這里不說明如何實現,本章主要實現的是es在java中的檢索應用。

  4.2. 定時推送的數據定的有2種解決方案,一種是采用binlog方式,二是采用接收數據的接口在裝數時候傳數據到kafka一份。

    本人想采用kafka方式,原因:對binlog不熟。

    實現:如同4.3中的數據。

  4.3. 對於本身的平台數據。

    3.1:將發布房源的數據寫入log的目錄文件中(寫入的格式及文件名稱形式可自定義)。

    3.2:filebeat采集日志信息寫入kafka中。

    3.3:logstash消費kafka中的數據並解析數據output到es中。

    實現:這里不說明如何實現,本章主要實現的是es在java中的檢索應用。

5:數據的檢索實現。(實現之后未能及時的隨筆記錄,有些點可能已經忘記了)

  5.1:版本及客戶端使用。

      我的版本使用的是6.4.2版本。

      客戶端有transport client和rest client,我選擇的是rest方式。(在8.0版本已經棄用了transport方式)。

  5.2:jar包引入。

      

  5.3:客戶端的連接類。

@Configuration
public class EsConfiguration {
 
    private static String hosts = "192.168.147.101,192.168.147.102,192.168.147.103"; // 集群地址,多個用,隔開
    private static int port = 9200; // 使用的端口號
    private static String schema = "http"; // 使用的協議
    private static ArrayList<HttpHost> hostList = null;
 
    private static int connectTimeOut = 1000; // 連接超時時間
    private static int socketTimeOut = 30000; // 連接超時時間
    private static int connectionRequestTimeOut = 500; // 獲取連接的超時時間
 
    private static int maxConnectNum = 100; // 最大連接數
    private static int maxConnectPerRoute = 100; // 最大路由連接數
 
    static {
        hostList = new ArrayList<>();
        String[] hostStrs = hosts.split(",");
        for (String host : hostStrs) {
            hostList.add(new HttpHost(host, port, schema));
        }
    }
 
    @Bean
    public RestHighLevelClient client() {
        RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0]));
        // 異步httpclient連接延時配置
        builder.setRequestConfigCallback(new RequestConfigCallback() {
            @Override
            public Builder customizeRequestConfig(Builder requestConfigBuilder) {
                requestConfigBuilder.setConnectTimeout(connectTimeOut);
                requestConfigBuilder.setSocketTimeout(socketTimeOut);
                requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
                return requestConfigBuilder;
            }
        });
        // 異步httpclient連接數配置
        builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                httpClientBuilder.setMaxConnTotal(maxConnectNum);
                httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
                return httpClientBuilder;
            }
        });
        RestHighLevelClient client = new RestHighLevelClient(builder);
        return client;
    }
}

  5.4:實體類。

    

       

  5.5:重點,整合java使用的測試類。在搜索的方法中有測試場景時候的情況的注釋。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { Demo01Application.class })
public class Demo01ApplicationTests {

    @Autowired
    private RestHighLevelClient client;


    public static String INDEX_TEST = null;
    public static String TYPE_TEST = null;
    public static Tests tests = null;
    public static List<Tests> testsList = null;

    @Test
    public void contextLoads() throws IOException, CloneNotSupportedException {
//        // 判斷是否存在索引
//        existsIndex("indexName");
//        // 創建索引
//        createIndex("house", "house-info");
//        // 刪除索引
//        deleteIndex("house");
//        // 判斷數據是否存在
//        exists("user", "user-info", "JO3hP24BlvWqEof7y5BF");
//        // 根據ID獲取數據
//        get("index01", "type", "201");
//
//        List<String> idList = null;
//        List<Map<String, Object>> dataList = null;
//        // //批量更新
//        bulkUpdate("index_name", "index_type", idList, dataList);
//        // 批量添加
//        bulkAdd("index_name", "index_type", idList, dataList);
//        // 批量刪除
//       List<Map<String, Object>> dataList = null;
//       List<String> idList = new ArrayList<String>();
//       idList.add("ZsSfRW4B3jWdK-k5x4lo");
//       idList.add("ZcSZRW4B3jWdK-k5E4ld");
//        bulkDelete("user", "user-info", idList, dataList);
//        
//        //坐標范圍查詢
//        searchPoint();
//        //關鍵字查詢
//        search("蔣", "user-info", "user");
        
        client.close();
    }

    /**
     * 創建索引
     * 
     * @param index_name
     * @param index_type
     * @throws IOException
     */
    private void createIndex(String index_name, String index_type) throws IOException {
        CreateIndexRequest request = new CreateIndexRequest(index_name);// 創建索引
        // 創建的每個索引都可以有與之關聯的特定設置===================設置主片與副本數======================
        //主片的個數(默認5個)確定是不會變的,副本數(默認1個)是可以改變的
        request.settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 1));
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
        xContentBuilder.startObject()
                // 索引庫名(類似數據庫中的表),可以指定字段的類型,String中text是分詞的,keyword不分詞
                .startObject(index_type).startObject("properties")
                .startObject("doc.haCode").field("type", "keyword").endObject()
                .startObject("doc.haStatus").field("type", "integer").field("index", false).endObject()
                .startObject("doc.haName").field("type", "text").endObject()
                .startObject("doc.haAddr").field("type", "text").endObject()
                .startObject("doc.haPrice").field("type", "float").field("index", false).endObject()
                .startObject("doc.location").field("type", "geo_point").endObject()
                //不參與索引創建
                .startObject("doc.haImage").field("type", "text").field("index", false).endObject()
                .startObject("doc.haDate").field("type", "date").endObject()
                .endObject().endObject().endObject();
        // 創建索引時創建文檔類型映射==========================定義mapping,指定字段類型,如一些特殊字段=========================
        request.mapping(index_type, xContentBuilder);

        // 異步執行
        // 異步執行創建索引請求需要將CreateIndexRequest實例和ActionListener實例傳遞給異步方法:
        // CreateIndexResponse的典型監聽器如下所示:
        // 異步方法不會阻塞並立即返回。
        ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
            @Override
            public void onResponse(CreateIndexResponse createIndexResponse) {
                // 如果執行成功,則調用onResponse方法;
            }

            @Override
            public void onFailure(Exception e) {
                // 如果失敗,則調用onFailure方法。
            }
        };
        CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
        System.out.println("createIndex: " + JSON.toJSONString(createIndexResponse));
        xContentBuilder.close();
        // client.indices().createAsync(request, RequestOptions.DEFAULT,
        // listener);//要執行的CreateIndexRequest和執行完成時要使用的ActionListener
    }

    /**
     * 判斷索引是否存在
     * 
     * @param index_name
     * @return
     * @throws IOException
     */
    public boolean existsIndex(String index_name) throws IOException {
        GetIndexRequest request = new GetIndexRequest();
        request.indices(index_name);
        boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
        System.out.println("existsIndex: " + exists);
        return exists;
    }

    /**
     * 刪除索引
     * 
     * @param index
     * @return
     * @throws IOException
     */
    public void deleteIndex(String index_name) throws IOException {
        DeleteIndexRequest request = new DeleteIndexRequest(index_name);
        DeleteIndexResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
        System.out.println("deleteIndex: " + JSON.toJSONString(deleteIndexResponse));
    }

    /**
     * 判斷記錄是否存在
     * 
     * @param index
     * @param type
     * @param tests
     * @return
     * @throws IOException
     */
    public boolean exists(String index_name, String index_type, String index_id) throws IOException {
        GetRequest getRequest = new GetRequest(index_name, index_type, index_id);
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
        System.out.println("exists: " + exists);
        return exists;
    }

    /**
     * 根據ID獲取記錄信息
     * 
     * @param index
     * @param type
     * @param id
     * @throws IOException
     */
    public void get(String index_name, String index_type, String index_id) throws IOException {
        GetRequest getRequest = new GetRequest(index_name, index_type, index_id);
        GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
        System.out.println("get: " + JSON.toJSONString(getResponse));
    }

    /**
     * 批量增加 addTestList方法封裝list數據
     * 
     * @throws IOException
     */
    private void bulkAdd(String index_name, String index_type, List<String> idList, List<Map<String, Object>> dataList)
            throws IOException {
        BulkRequest bulkAddRequest = new BulkRequest();
        for (int i = 0; i < dataList.size(); i++) {
            IndexRequest indexRequest = new IndexRequest(index_name, index_type, idList.get(i));
            indexRequest.source(JSON.toJSONString(dataList.get(i)), XContentType.JSON);
            bulkAddRequest.add(indexRequest);
        }
        BulkResponse bulkAddResponse = client.bulk(bulkAddRequest, RequestOptions.DEFAULT);
        System.out.println("bulkAddResponse: " + JSON.toJSONString(bulkAddResponse));
    }
    
    /**
     * 批量更新
     * 
     * @param index
     * @param type
     * @param tests
     * @throws IOException
     */
    public void bulkUpdate(String index_name, String index_type, List<String> idList,
            List<Map<String, Object>> dataList) throws IOException {
        BulkRequest bulkUpdateRequest = new BulkRequest();
        for (int i = 0; i < dataList.size(); i++) {
            UpdateRequest updateRequest = new UpdateRequest(index_name, index_type, idList.get(i));
            updateRequest.doc(JSON.toJSONString(dataList.get(i)), XContentType.JSON);
            bulkUpdateRequest.add(updateRequest);
        }
        BulkResponse bulkUpdateResponse = client.bulk(bulkUpdateRequest, RequestOptions.DEFAULT);
        System.out.println("bulkUpdate: " + JSON.toJSONString(bulkUpdateResponse));
    }

    /**
     * 刪除記錄
     * 
     * @param index
     * @param type
     * @param id
     * @throws IOException
     */
    public void bulkDelete(String index_name, String index_type, List<String> idList,
            List<Map<String, Object>> dataList) throws IOException {
        BulkRequest bulkDeleteRequest = new BulkRequest();
        for (int i = 0; i < idList.size(); i++) {
            DeleteRequest deleteRequest = new DeleteRequest(index_name, index_type, idList.get(i));
            bulkDeleteRequest.add(deleteRequest);
        }
        BulkResponse bulkDeleteResponse = client.bulk(bulkDeleteRequest, RequestOptions.DEFAULT);
        System.out.println("bulkDelete: " + JSON.toJSONString(bulkDeleteResponse));
    }

    /**
     * 搜索
     * 
     * @param index 要搜索的索引庫
     * @param type 要搜索的索引庫類型
     * @param name  要搜索的關鍵字
     * @throws IOException
     */
    public void search(String name, String type, String index) throws IOException {
        //query查詢:
            //match查詢:知道分詞器存在,會對查詢的關鍵字分詞;
            //team一個關鍵詞/teams多個關鍵詞查詢:不知道分詞器,不會對查詢的關鍵字分詞; 較精確的查詢
            //例子:條件是‘我你’ team查詢的是含有‘我你’的,match查詢含有‘我你’‘我’‘你’的都能查詢出 
            //QueryBuilders.matchQuery("name", name)
        
        //filter查詢:不計算相關性,且有cache,速度比query查詢快
            // boolBuilder多條件查詢:must相當於and, should相當於or,mustnot不符合條件的
        
        //聚合查詢
            // sum min max avg cardinality基數(類似去重之后的數量) teams分組
        
        // matchQuery(提高召回率,關鍵字會被分詞), 
        // matchPhraseQuery(關鍵字不會分詞), match_phrase提高精准度
        // matchQuery單一查詢QueryBuilders.matchQuery("name", name),
        // multiMatchQuery匹配多列查詢QueryBuilders.multiMatchQuery("music","name","interest"),
        // wildcardQuery模糊匹配查詢QueryBuilders.wildcardQuery("name", "*jack*") *多個 ?一個
        // QueryBuilders.matchPhraseQuery("字段名字", "前綴");//前綴查詢
        // QueryBuilders.fuzzyQuery("字段名字", "關鍵字");    //模糊查詢,跟關鍵字類似的都可以查詢出來 --> 關鍵字:tet 有可能會搜尋出 test text等
        // rangeQuery區間查詢
        // geoDistanceQuery經緯度范圍商家
        // 排序:ScoreSortBuilder,FieldSortBuilder
        //關鍵字查詢
        BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
        // boolBuilder.must(QueryBuilders.matchQuery("name", name)); //
        // 這里可以根據字段進行搜索,must表示符合條件的,相反的mustnot表示不符合條件的
        // boolBuilder.must(QueryBuilders.matchQuery("sex", name));
        // boolBuilder.must(QueryBuilders.matchQuery("id", tests.getId().toString()));

        // boolQueryBuilder.must(QueryBuilders.termQuery("field","value"));
        // boolQueryBuilder.must(QueryBuilders.wildcardQuery("field","value"));
        // boolQueryBuilder.must(QueryBuilders.rangeQuery("field").gt("value"));
        // boolQueryBuilder.must(QueryBuilders.termsQuery("field","value"));

//        boolBuilder.should(boolBuilder.filter(QueryBuilders.matchPhraseQuery("name", name)));
//        boolBuilder.should(boolBuilder.filter(QueryBuilders.matchPhraseQuery("sex", name)));
//        boolBuilder.should(QueryBuilders.matchPhraseQuery("sex", name));
        boolBuilder.should(QueryBuilders.matchPhraseQuery("doc.username", name));
        // RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("id");
        // //區間查詢,都是閉區間
        // rangeQueryBuilder.gte(1);
        // rangeQueryBuilder.lte(1);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(boolBuilder);
        // sourceBuilder.query(rangeQueryBuilder);
        sourceBuilder.from(0);
        sourceBuilder.size(100); // 獲取記錄數,默認10
        // sourceBuilder.sort("id", SortOrder.ASC);
//        sourceBuilder.fetchSource(new String[] { "id", "name", "sex", "age" }, new String[] {}); // 第一個是獲取字段,第二個是過濾的字段,默認獲取全部
        sourceBuilder.fetchSource(new String[] { "doc.username", "doc.password"}, new String[] {}); // 第一個是獲取字段,第二個是過濾的字段,默認獲取全部
        SearchRequest searchRequest = new SearchRequest(index);
        searchRequest.types(type);
        searchRequest.source(sourceBuilder);
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
        System.out.println("search: " + JSON.toJSONString(response));
        SearchHits hits = response.getHits();
        SearchHit[] searchHits = hits.getHits();
        for (SearchHit hit : searchHits) {
            System.out.println("search -> " + hit.getSourceAsString());
            System.out.println("search -> " + hit.getSourceAsMap().get("doc"));
            UserEntity parseObject = JSONObject.parseObject(JSONObject.toJSONString(hit.getSourceAsMap().get("doc")), UserEntity.class);
            System.out.println("search -> " + parseObject.getUsername());
        }
    }

    /**
     * 范圍查詢,左右都是閉集
     * 
     * @param fieldKey
     * @param start
     * @param end
     * @return
     */
    public RangeQueryBuilder rangeMathQuery(String fieldKey, String start, String end) {
        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(fieldKey);
        rangeQueryBuilder.gte(start);
        rangeQueryBuilder.lte(end);
        return rangeQueryBuilder;
    }

    //坐標查詢 范圍查詢
    public void searchPoint() throws IOException {
        double a = 40.215193;
        double b = 116.680852;
        GeoDistanceQueryBuilder builder = QueryBuilders.geoDistanceQuery("location");
        builder.point(a, b);
        builder.distance(1000, DistanceUnit.MILES);
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(builder);
        GeoDistanceSortBuilder sort = SortBuilders.geoDistanceSort("location", a, b).order(SortOrder.ASC)
                .unit(DistanceUnit.KILOMETERS);
        sourceBuilder.sort("id", SortOrder.ASC);
        SearchRequest searchRequest = new SearchRequest("index1");
        searchRequest.types("type1");
        searchRequest.source(sourceBuilder);
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        SearchHit[] searchHits = hits.getHits();
        for (SearchHit hit : searchHits) {
            System.out.println("search -> " + hit.getSourceAsString());
        }
    }

    /**
     * 創建新增數據
     * 
     * @throws IOException
     */
    public void addTestList() throws IOException {
        double lat = 38.929986;
        double lon = 113.395645;
        for (long i = 201; i < 202; i++) {
            double max = 0.00001;
            double min = 0.000001;
            Random random = new Random();
            double s = random.nextDouble() % (max - min + 1) + max;
            DecimalFormat df = new DecimalFormat("######0.000000");
            // System.out.println(s);
            String lons = df.format(s + lon);
            String lats = df.format(s + lat);
            Double dlon = Double.valueOf(lons);
            Double dlat = Double.valueOf(lats);
            Tests tests = new Tests();
            tests.setId(i);
            tests.setName("名字啊" + i);
            tests.setSex("電話啊" + i);
            GeoPoint location = new GeoPoint();
            location.setLat(dlat);
            location.setLon(dlon);
            tests.setLocation(location);
            testsList.add(tests);
        }
    }
}

 

6:注意的點

  6.1:當時有想到可不可以關聯查詢?

      首先不建議關聯查詢,盡量一個表表述完整。原因關聯查詢會慢幾倍-幾百倍,失去了快速查詢的意義。

      這個是關聯查詢的方法:https://blog.csdn.net/tuposky/article/details/80988915

      這個是驗證關聯會慢的 :http://www.matools.com/blog/190652134

  6.2:如果說有海量的數據,且數據的字段很多怎么處理?

      目前我能想到的就是將海量數據存入hbase中,在es中存入要檢索的關鍵信息。能力一般水平有限,還希望各路大神指點。

  6.3:es的數據存儲字段定義的問題?

      es的數據是存儲在 "_source" 下面的,es本身會有很多屬性,所以如果字段中有type,host,path等值,那么數據插入不進去。

      解決辦法:logstash中的output配置target => "doc"  將數據放在  _source下的doc中,數據就是doc.xxx

      但是由於有經緯度數據,經緯度類型數據無法報存到上述方法的doc中,未找到原因,所以字段就盡量避開type,host,path等值就好了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM