前釋:此為結合項目中的應用場景自己想的開發方案,項目在自己本機搭建,自定義模擬數據(不是海量數據)。
1:應用背景。
全國300多城市的小區,及多方發布的房源數據的一個檢索功能。
全國的房源小區數據搜索。(多條件模糊查詢,經緯度查詢)
2:技術實現:ELK(elasticSearch+logstash+kibana)+ filebeat + kafka
3:數據來源:
1. 原始數據
2. 其它平台的定時推送過來的房源數據
3. 平台本身用戶發布的房源數據
4:數據源處理方案:
4.1. 原始數據采用sqoop處理。
實現:這里不說明如何實現,本章主要實現的是es在java中的檢索應用。
4.2. 定時推送的數據定的有2種解決方案,一種是采用binlog方式,二是采用接收數據的接口在裝數時候傳數據到kafka一份。
本人想采用kafka方式,原因:對binlog不熟。
實現:如同4.3中的數據。
4.3. 對於本身的平台數據。
3.1:將發布房源的數據寫入log的目錄文件中(寫入的格式及文件名稱形式可自定義)。
3.2:filebeat采集日志信息寫入kafka中。
3.3:logstash消費kafka中的數據並解析數據output到es中。
實現:這里不說明如何實現,本章主要實現的是es在java中的檢索應用。
5:數據的檢索實現。(實現之后未能及時的隨筆記錄,有些點可能已經忘記了)
5.1:版本及客戶端使用。
我的版本使用的是6.4.2版本。
客戶端有transport client和rest client,我選擇的是rest方式。(在8.0版本已經棄用了transport方式)。
5.2:jar包引入。
5.3:客戶端的連接類。
@Configuration public class EsConfiguration { private static String hosts = "192.168.147.101,192.168.147.102,192.168.147.103"; // 集群地址,多個用,隔開 private static int port = 9200; // 使用的端口號 private static String schema = "http"; // 使用的協議 private static ArrayList<HttpHost> hostList = null; private static int connectTimeOut = 1000; // 連接超時時間 private static int socketTimeOut = 30000; // 連接超時時間 private static int connectionRequestTimeOut = 500; // 獲取連接的超時時間 private static int maxConnectNum = 100; // 最大連接數 private static int maxConnectPerRoute = 100; // 最大路由連接數 static { hostList = new ArrayList<>(); String[] hostStrs = hosts.split(","); for (String host : hostStrs) { hostList.add(new HttpHost(host, port, schema)); } } @Bean public RestHighLevelClient client() { RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0])); // 異步httpclient連接延時配置 builder.setRequestConfigCallback(new RequestConfigCallback() { @Override public Builder customizeRequestConfig(Builder requestConfigBuilder) { requestConfigBuilder.setConnectTimeout(connectTimeOut); requestConfigBuilder.setSocketTimeout(socketTimeOut); requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut); return requestConfigBuilder; } }); // 異步httpclient連接數配置 builder.setHttpClientConfigCallback(new HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.setMaxConnTotal(maxConnectNum); httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute); return httpClientBuilder; } }); RestHighLevelClient client = new RestHighLevelClient(builder); return client; } }
5.4:實體類。
5.5:重點,整合java使用的測試類。在搜索的方法中有測試場景時候的情況的注釋。
@RunWith(SpringRunner.class) @SpringBootTest(classes = { Demo01Application.class }) public class Demo01ApplicationTests { @Autowired private RestHighLevelClient client; public static String INDEX_TEST = null; public static String TYPE_TEST = null; public static Tests tests = null; public static List<Tests> testsList = null; @Test public void contextLoads() throws IOException, CloneNotSupportedException { // // 判斷是否存在索引 // existsIndex("indexName"); // // 創建索引 // createIndex("house", "house-info"); // // 刪除索引 // deleteIndex("house"); // // 判斷數據是否存在 // exists("user", "user-info", "JO3hP24BlvWqEof7y5BF"); // // 根據ID獲取數據 // get("index01", "type", "201"); // // List<String> idList = null; // List<Map<String, Object>> dataList = null; // // //批量更新 // bulkUpdate("index_name", "index_type", idList, dataList); // // 批量添加 // bulkAdd("index_name", "index_type", idList, dataList); // // 批量刪除 // List<Map<String, Object>> dataList = null; // List<String> idList = new ArrayList<String>(); // idList.add("ZsSfRW4B3jWdK-k5x4lo"); // idList.add("ZcSZRW4B3jWdK-k5E4ld"); // bulkDelete("user", "user-info", idList, dataList); // // //坐標范圍查詢 // searchPoint(); // //關鍵字查詢 // search("蔣", "user-info", "user"); client.close(); } /** * 創建索引 * * @param index_name * @param index_type * @throws IOException */ private void createIndex(String index_name, String index_type) throws IOException { CreateIndexRequest request = new CreateIndexRequest(index_name);// 創建索引 // 創建的每個索引都可以有與之關聯的特定設置===================設置主片與副本數====================== //主片的個數(默認5個)確定是不會變的,副本數(默認1個)是可以改變的 request.settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 1)); XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); xContentBuilder.startObject() // 索引庫名(類似數據庫中的表),可以指定字段的類型,String中text是分詞的,keyword不分詞 .startObject(index_type).startObject("properties") .startObject("doc.haCode").field("type", "keyword").endObject() .startObject("doc.haStatus").field("type", "integer").field("index", false).endObject() .startObject("doc.haName").field("type", "text").endObject() .startObject("doc.haAddr").field("type", "text").endObject() .startObject("doc.haPrice").field("type", "float").field("index", false).endObject() .startObject("doc.location").field("type", "geo_point").endObject() //不參與索引創建 .startObject("doc.haImage").field("type", "text").field("index", false).endObject() .startObject("doc.haDate").field("type", "date").endObject() .endObject().endObject().endObject(); // 創建索引時創建文檔類型映射==========================定義mapping,指定字段類型,如一些特殊字段========================= request.mapping(index_type, xContentBuilder); // 異步執行 // 異步執行創建索引請求需要將CreateIndexRequest實例和ActionListener實例傳遞給異步方法: // CreateIndexResponse的典型監聽器如下所示: // 異步方法不會阻塞並立即返回。 ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() { @Override public void onResponse(CreateIndexResponse createIndexResponse) { // 如果執行成功,則調用onResponse方法; } @Override public void onFailure(Exception e) { // 如果失敗,則調用onFailure方法。 } }; CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT); System.out.println("createIndex: " + JSON.toJSONString(createIndexResponse)); xContentBuilder.close(); // client.indices().createAsync(request, RequestOptions.DEFAULT, // listener);//要執行的CreateIndexRequest和執行完成時要使用的ActionListener } /** * 判斷索引是否存在 * * @param index_name * @return * @throws IOException */ public boolean existsIndex(String index_name) throws IOException { GetIndexRequest request = new GetIndexRequest(); request.indices(index_name); boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); System.out.println("existsIndex: " + exists); return exists; } /** * 刪除索引 * * @param index * @return * @throws IOException */ public void deleteIndex(String index_name) throws IOException { DeleteIndexRequest request = new DeleteIndexRequest(index_name); DeleteIndexResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT); System.out.println("deleteIndex: " + JSON.toJSONString(deleteIndexResponse)); } /** * 判斷記錄是否存在 * * @param index * @param type * @param tests * @return * @throws IOException */ public boolean exists(String index_name, String index_type, String index_id) throws IOException { GetRequest getRequest = new GetRequest(index_name, index_type, index_id); getRequest.fetchSourceContext(new FetchSourceContext(false)); getRequest.storedFields("_none_"); boolean exists = client.exists(getRequest, RequestOptions.DEFAULT); System.out.println("exists: " + exists); return exists; } /** * 根據ID獲取記錄信息 * * @param index * @param type * @param id * @throws IOException */ public void get(String index_name, String index_type, String index_id) throws IOException { GetRequest getRequest = new GetRequest(index_name, index_type, index_id); GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); System.out.println("get: " + JSON.toJSONString(getResponse)); } /** * 批量增加 addTestList方法封裝list數據 * * @throws IOException */ private void bulkAdd(String index_name, String index_type, List<String> idList, List<Map<String, Object>> dataList) throws IOException { BulkRequest bulkAddRequest = new BulkRequest(); for (int i = 0; i < dataList.size(); i++) { IndexRequest indexRequest = new IndexRequest(index_name, index_type, idList.get(i)); indexRequest.source(JSON.toJSONString(dataList.get(i)), XContentType.JSON); bulkAddRequest.add(indexRequest); } BulkResponse bulkAddResponse = client.bulk(bulkAddRequest, RequestOptions.DEFAULT); System.out.println("bulkAddResponse: " + JSON.toJSONString(bulkAddResponse)); } /** * 批量更新 * * @param index * @param type * @param tests * @throws IOException */ public void bulkUpdate(String index_name, String index_type, List<String> idList, List<Map<String, Object>> dataList) throws IOException { BulkRequest bulkUpdateRequest = new BulkRequest(); for (int i = 0; i < dataList.size(); i++) { UpdateRequest updateRequest = new UpdateRequest(index_name, index_type, idList.get(i)); updateRequest.doc(JSON.toJSONString(dataList.get(i)), XContentType.JSON); bulkUpdateRequest.add(updateRequest); } BulkResponse bulkUpdateResponse = client.bulk(bulkUpdateRequest, RequestOptions.DEFAULT); System.out.println("bulkUpdate: " + JSON.toJSONString(bulkUpdateResponse)); } /** * 刪除記錄 * * @param index * @param type * @param id * @throws IOException */ public void bulkDelete(String index_name, String index_type, List<String> idList, List<Map<String, Object>> dataList) throws IOException { BulkRequest bulkDeleteRequest = new BulkRequest(); for (int i = 0; i < idList.size(); i++) { DeleteRequest deleteRequest = new DeleteRequest(index_name, index_type, idList.get(i)); bulkDeleteRequest.add(deleteRequest); } BulkResponse bulkDeleteResponse = client.bulk(bulkDeleteRequest, RequestOptions.DEFAULT); System.out.println("bulkDelete: " + JSON.toJSONString(bulkDeleteResponse)); } /** * 搜索 * * @param index 要搜索的索引庫 * @param type 要搜索的索引庫類型 * @param name 要搜索的關鍵字 * @throws IOException */ public void search(String name, String type, String index) throws IOException { //query查詢: //match查詢:知道分詞器存在,會對查詢的關鍵字分詞; //team一個關鍵詞/teams多個關鍵詞查詢:不知道分詞器,不會對查詢的關鍵字分詞; 較精確的查詢 //例子:條件是‘我你’ team查詢的是含有‘我你’的,match查詢含有‘我你’‘我’‘你’的都能查詢出 //QueryBuilders.matchQuery("name", name) //filter查詢:不計算相關性,且有cache,速度比query查詢快 // boolBuilder多條件查詢:must相當於and, should相當於or,mustnot不符合條件的 //聚合查詢 // sum min max avg cardinality基數(類似去重之后的數量) teams分組 // matchQuery(提高召回率,關鍵字會被分詞), // matchPhraseQuery(關鍵字不會分詞), match_phrase提高精准度 // matchQuery單一查詢QueryBuilders.matchQuery("name", name), // multiMatchQuery匹配多列查詢QueryBuilders.multiMatchQuery("music","name","interest"), // wildcardQuery模糊匹配查詢QueryBuilders.wildcardQuery("name", "*jack*") *多個 ?一個 // QueryBuilders.matchPhraseQuery("字段名字", "前綴");//前綴查詢 // QueryBuilders.fuzzyQuery("字段名字", "關鍵字"); //模糊查詢,跟關鍵字類似的都可以查詢出來 --> 關鍵字:tet 有可能會搜尋出 test text等 // rangeQuery區間查詢 // geoDistanceQuery經緯度范圍商家 // 排序:ScoreSortBuilder,FieldSortBuilder //關鍵字查詢 BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery(); // boolBuilder.must(QueryBuilders.matchQuery("name", name)); // // 這里可以根據字段進行搜索,must表示符合條件的,相反的mustnot表示不符合條件的 // boolBuilder.must(QueryBuilders.matchQuery("sex", name)); // boolBuilder.must(QueryBuilders.matchQuery("id", tests.getId().toString())); // boolQueryBuilder.must(QueryBuilders.termQuery("field","value")); // boolQueryBuilder.must(QueryBuilders.wildcardQuery("field","value")); // boolQueryBuilder.must(QueryBuilders.rangeQuery("field").gt("value")); // boolQueryBuilder.must(QueryBuilders.termsQuery("field","value")); // boolBuilder.should(boolBuilder.filter(QueryBuilders.matchPhraseQuery("name", name))); // boolBuilder.should(boolBuilder.filter(QueryBuilders.matchPhraseQuery("sex", name))); // boolBuilder.should(QueryBuilders.matchPhraseQuery("sex", name)); boolBuilder.should(QueryBuilders.matchPhraseQuery("doc.username", name)); // RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("id"); // //區間查詢,都是閉區間 // rangeQueryBuilder.gte(1); // rangeQueryBuilder.lte(1); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(boolBuilder); // sourceBuilder.query(rangeQueryBuilder); sourceBuilder.from(0); sourceBuilder.size(100); // 獲取記錄數,默認10 // sourceBuilder.sort("id", SortOrder.ASC); // sourceBuilder.fetchSource(new String[] { "id", "name", "sex", "age" }, new String[] {}); // 第一個是獲取字段,第二個是過濾的字段,默認獲取全部 sourceBuilder.fetchSource(new String[] { "doc.username", "doc.password"}, new String[] {}); // 第一個是獲取字段,第二個是過濾的字段,默認獲取全部 SearchRequest searchRequest = new SearchRequest(index); searchRequest.types(type); searchRequest.source(sourceBuilder); SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println("search: " + JSON.toJSONString(response)); SearchHits hits = response.getHits(); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { System.out.println("search -> " + hit.getSourceAsString()); System.out.println("search -> " + hit.getSourceAsMap().get("doc")); UserEntity parseObject = JSONObject.parseObject(JSONObject.toJSONString(hit.getSourceAsMap().get("doc")), UserEntity.class); System.out.println("search -> " + parseObject.getUsername()); } } /** * 范圍查詢,左右都是閉集 * * @param fieldKey * @param start * @param end * @return */ public RangeQueryBuilder rangeMathQuery(String fieldKey, String start, String end) { RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(fieldKey); rangeQueryBuilder.gte(start); rangeQueryBuilder.lte(end); return rangeQueryBuilder; } //坐標查詢 范圍查詢 public void searchPoint() throws IOException { double a = 40.215193; double b = 116.680852; GeoDistanceQueryBuilder builder = QueryBuilders.geoDistanceQuery("location"); builder.point(a, b); builder.distance(1000, DistanceUnit.MILES); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(builder); GeoDistanceSortBuilder sort = SortBuilders.geoDistanceSort("location", a, b).order(SortOrder.ASC) .unit(DistanceUnit.KILOMETERS); sourceBuilder.sort("id", SortOrder.ASC); SearchRequest searchRequest = new SearchRequest("index1"); searchRequest.types("type1"); searchRequest.source(sourceBuilder); SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { System.out.println("search -> " + hit.getSourceAsString()); } } /** * 創建新增數據 * * @throws IOException */ public void addTestList() throws IOException { double lat = 38.929986; double lon = 113.395645; for (long i = 201; i < 202; i++) { double max = 0.00001; double min = 0.000001; Random random = new Random(); double s = random.nextDouble() % (max - min + 1) + max; DecimalFormat df = new DecimalFormat("######0.000000"); // System.out.println(s); String lons = df.format(s + lon); String lats = df.format(s + lat); Double dlon = Double.valueOf(lons); Double dlat = Double.valueOf(lats); Tests tests = new Tests(); tests.setId(i); tests.setName("名字啊" + i); tests.setSex("電話啊" + i); GeoPoint location = new GeoPoint(); location.setLat(dlat); location.setLon(dlon); tests.setLocation(location); testsList.add(tests); } } }
6:注意的點
6.1:當時有想到可不可以關聯查詢?
首先不建議關聯查詢,盡量一個表表述完整。原因關聯查詢會慢幾倍-幾百倍,失去了快速查詢的意義。
這個是關聯查詢的方法:https://blog.csdn.net/tuposky/article/details/80988915
這個是驗證關聯會慢的 :http://www.matools.com/blog/190652134
6.2:如果說有海量的數據,且數據的字段很多怎么處理?
目前我能想到的就是將海量數據存入hbase中,在es中存入要檢索的關鍵信息。能力一般水平有限,還希望各路大神指點。
6.3:es的數據存儲字段定義的問題?
es的數據是存儲在 "_source" 下面的,es本身會有很多屬性,所以如果字段中有type,host,path等值,那么數據插入不進去。
解決辦法:logstash中的output配置target => "doc" 將數據放在 _source下的doc中,數據就是doc.xxx
但是由於有經緯度數據,經緯度類型數據無法報存到上述方法的doc中,未找到原因,所以字段就盡量避開type,host,path等值就好了。