要想通過ES API對es的操作,必須獲取到TransportClient對象,讓后根據TransportClient獲取到IndicesAdminClient對象后,方可以根據IndicesAdminClient對象提供的方法對ES的index進行操作:create index,update index(update index settings,update index mapping),delete index,open index,close index。
准備工作(創建TransportClient,IndicesAdminClient)
第一步:導入ES6.4.2的依賴包:
<dependencies> <!--Spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.11</artifactId> <version>0.9.5</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.11</artifactId> <version>3.2.0</version> <type>jar</type> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.4.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.4.2</version> </dependency> </dependencies>
備注:這里依賴可能有點多,elastricsearch api操作的話就是依賴org.elasticsearch.client。
第二步:獲取TransportClient,IndicesAdminClient對象:
/** * 獲取ES Client API對象。 * */ public static TransportClient getClient() { Map<String, String> esOptionsMap = getSparkESCommonOptions(); return getClient(esOptionsMap); } /** * 獲取ES Client API對象。 * */ public static TransportClient getClient(Map<String, String> esOptionsMap) { Settings settings = Settings.builder()// .put("cluster.name", esOptionsMap.get("cluster.name")) // .put("client.transport.sniff", true)// .build(); PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings); TransportClient client = preBuiltTransportClient; // 192.168.1.120,192.168.1.121,192.168.1.122,192.168.1.123 String esNodeStr = esOptionsMap.get("es.nodes"); String[] esNodeArr = esNodeStr.split(","); try { for (String esNode : esNodeArr) { client.addTransportAddress(new TransportAddress(InetAddress.getByName(esNode), 9300)); } } catch (UnknownHostException e) { e.printStackTrace(); throw new RuntimeException(e); } return client; } public static IndicesAdminClient getAdminClient() { Map<String, String> esOptionsMap = getSparkESCommonOptions(); return getAdminClient(esOptionsMap); } public static IndicesAdminClient getAdminClient(Map<String, String> esOptionsMap) { TransportClient client = getClient(esOptionsMap); IndicesAdminClient adminClient = client.admin().indices(); return adminClient; }
備注:其中getSparkESCommonOptions()中配置對象包含:
cluster.name=es-application es.nodes=192.168.1.120,192.168.1.121,192.168.1.122,192.168.1.123 es.port=9200 es.index.auto.create=true pushdown=true es.nodes.wan.only=true es.mapping.date.rich=false #//設置讀取es中date數據類型字段時,把它當做string來讀取。 es.scroll.size=10000
ES API之Exists/Create Index:
創建index之前,需要判斷index及其對應的類型是否存在,使用這個方法:
/** * 是否ES包含某個索引類型 * * @param indexName * index * @param indexType * index對應的type * */ public static boolean typeExists(String indexName, String indexType) { TypesExistsResponse typeResponse = getAdminClient().prepareTypesExists(indexName).setTypes(indexType).execute().actionGet(); if (typeResponse.isExists()) { return true; } return false; } /** * 判斷ES中是否存在某個index<br> * 是否包含類型,待驗證,看別人調用時是不需要帶類型的。 * */ public static boolean indexExists(String... indices) { IndicesExistsRequest request = new IndicesExistsRequest(indices); IndicesExistsResponse response = getAdminClient().exists(request).actionGet(); if (response.isExists()) { return true; } return false; }
創建index,包含兩種:不指定mapping和isettings只創建一個空的index;指定mapping和settings創建復雜的index。
創建一個空的index:
/** * 創建簡單索引——沒有指定mapping<br> * 此時數據插入時,會讀取數據的數據的字段名稱,自動創建mapping字段(但是,存在問題數據類型不能完好的控制,比如double類型可能會被匹配為float,date類型的格式消失) * */ public static boolean indexCreate(String indexName) { CreateIndexResponse response = getAdminClient().prepareCreate(indexName).get(); return response.isAcknowledged(); }
備注:此時數據插入時,會讀取數據的數據的字段名稱,自動創建mapping字段(但是,存在問題數據類型不能完好的控制,比如double類型可能會被匹配為float,date類型的格式消失)
創建復雜的index:
/** * 創建復雜索引(類型/mapping),指定索引的setting和mapping,其中mappingSource是一個json數據字符串。 * * @param indexName * 索引名 * @param indexType * 索引類型名 * @param builder * 索引mapping */ public static boolean indexCreate(String indexName, String indexType, XContentBuilder builder) { Settings settings = Settings.builder() // .put("index.mapping.ignore_malformed", true)// .put("index.refresh_interval", "60s") // .put("index.number_of_shards", 4)// .put("index.number_of_replicas", 0)// .put("index.max_result_window", 500000)// .put("index.translog.durability", "async")// .put("index.translog.sync_interval", "120s")// .put("index.translog.flush_threshold_size", "2gb")// .put("index.merge.scheduler.max_thread_count", 1)// .build(); return indexCreate(indexName, indexType, builder, settings); } /** * 創建復雜索引(類型/mapping),指定索引的setting和mapping,其中mappingSource是一個json數據字符串。 * * @param indexName * 索引名 * @param indexType * 索引類型名 * @param builder * 索引mapping * @param settings * 索引settings<br> * setting http://192.168.1.120:9200/twitter/_settings?pretty<br> * "settings":<br> * {<br> * ----"index":<br> * ----{<br> * --------"mapping":<br> * --------{<br> * ------------"ignore_malformed":"true"<br> * --------},<br> * --------"refresh_interval":"60s",<br> * --------"number_of_shards":"4",<br> * --------"translog":<br> * --------{<br> * ------------"flush_threshold_size":"2048m",<br> * ------------"sync_interval":"120s",<br> * ------------"durability":"async"<br> * --------},<br> * --------"provided_name":"indexName",<br> * --------"merge":{<br> * ------------"scheduler":<br> * ------------{<br> * ----------------"max_thread_count":"1"<br> * ------------}<br> * --------},<br> * --------"max_result_window":"500000",<br> * --------"creation_date":"1540781909323",<br> * --------"number_of_replicas":"0",<br> * --------"uuid":"5c079b5tQrGdX0fF23xtQA",<br> * --------"version":{"created":"6020499"}<br> * ----}<br> * }<br> */ public static boolean indexCreate(String indexName, String indexType, XContentBuilder builder, Settings settings) { if (indexExists(indexName)) { return false; } // CreateIndexResponse准備創建索引,增加setSetting()方法可以設置setting參數,否則將會按默認設置 CreateIndexResponse cIndexResponse = getAdminClient().prepareCreate(indexName)// .setSettings(settings)// setting .addMapping(indexType, builder)// type,mapping 這種方式也可以,經過測試。 .get(); return cIndexResponse.isAcknowledged(); }
如何根據Avro創建動態生成Mapping呢?
/** * 重建index * * @throws IOException * */ protected void createIndex(String indexName, String indexType) throws IOException { Tuple3<List<String>, Map<String, String>, ExpressionEncoder<Row>> src = getTargetSchema(srcSchemaKey, true); Map<String, Map<String, String>> extFields = new HashMap<String, Map<String, String>>(); Map<String, String> insertDateProperty = new HashMap<String, String>(); insertDateProperty.put("type", "date"); insertDateProperty.put("format", "yyyy-MM-dd"); extFields.put("index_date", insertDateProperty); Map<String, String> typeProperty = new HashMap<String, String>(); typeProperty.put("type", "keyword"); extFields.put("type", typeProperty); XContentBuilder mappingSource = getMapping(indexType, src._2(), extFields); if (!indexCreate(indexName, indexType, mappingSource)) { throw new RuntimeException("重新創建index" + indexName + "時,設置mapping失敗!"); } } /** * * @param indexType * index類型 * @param schemaColVsTypeMap * 從*.avsc schema文件中讀取出的字段,格式:colName vs colType * @param extFields * 新增擴展字段(在*.avsc schema文件中沒有包含的字段)<br> * @return mapping:<br> * {<br> * ----"mrs_rsrp_d_2018.10.26":<br> * ----{<br> * --------"aliases":{},<br> * --------"mappings":<br> * --------{<br> * -----------"_doc":{<br> * -----------"properties":<br> * -----------{<br> * --------------"cgi":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},<br> * --------------"timestamp":{"type":"long"}<br> * -----------}<br> * --------},<br> * --------"settings":{}<br> * ----}<br> * }<br> * @throws 生成XContentBuilder時 * ,拋出異常。 */ public static XContentBuilder getMapping(String indexType, Map<String, String> schemaColVsTypeMap, Map<String, Map<String, String>> extFields) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder()// .startObject()// .startObject(indexType)// .startObject("_all").field("enabled", false).endObject()// 是否包一個row中的所有字段作為一個大的索引字段,支持從所有列中查詢 // .startObject("_source").field("enabled", false).endObject()// 不可以設為false,否則從es中查不到字段(其屬性決定了那些字段存儲到es,默認所有字段都存儲,也可以通過include,exclude指定特定字段存儲與不存儲) // .startObject("_field_names").field("enabled", false).endObject()// .startObject("properties"); for (Map.Entry<String, String> kv : schemaColVsTypeMap.entrySet()) { String colName = kv.getKey(); String colType = kv.getValue(); // "insert_time":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"}, // "scan_start_time":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"}, // "scan_stop_time":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"}, if (colName.equalsIgnoreCase("scan_start_time")// || colName.equalsIgnoreCase("scan_stop_time")// || colName.equalsIgnoreCase("insert_time")) { builder.startObject(colName) // .field("type", "date")// .field("format", "yyyy-MM-dd HH:mm:ss")// 也可以 yyyy/MM/dd||yyyy/MM/dd HH:mm:ss .field("index", "true") // not_analyzed|analyzed .endObject(); } // "city_name":{"type":"text","fields":{"keyword":{"type":"keyword"}}}, // "province_name":{"type":"text","fields":{"keyword":{"type":"keyword"}}}, // "region_name":{"type":"text","fields":{"keyword":{"type":"keyword"}}}, else if (colName.equalsIgnoreCase("city_name")// || colName.equalsIgnoreCase("region_name")// || colName.equalsIgnoreCase("province_name")) { builder.startObject(colName).field("type", "keyword").endObject(); } else { if (colType.equalsIgnoreCase("long")) { builder.startObject(colName).field("type", "long").endObject(); } else if (colType.equalsIgnoreCase("string")) { builder.startObject(colName).field("type", "keyword").endObject(); } else if (colType.equalsIgnoreCase("double")) { builder.startObject(colName).field("type", "double").endObject(); } else { builder.startObject(colName).field("type", colType).endObject(); } } } // 追加擴展字段到mapping字段中 for (Map.Entry<String, Map<String, String>> kv : extFields.entrySet()) { String colName = kv.getKey(); builder.startObject(colName); for (Map.Entry<String, String> kvProperty : kv.getValue().entrySet()) { builder.field(kvProperty.getKey(), kvProperty.getValue()); } builder.endObject(); } builder.endObject();// end of properties builder.endObject();// end of indexType builder.endObject();// end of start return builder; } /** * 返回 target columns list,column vs column type map,expression encoder * */ protected Tuple3<List<String>, Map<String, String>, ExpressionEncoder<Row>> getTargetSchema(String schemaFilePath, boolean withTimestamp) { Broadcast<String> targetSchemaContent = null; try { String avroContent = getHdfsFileContent(schemaFilePath); targetSchemaContent = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()).broadcast(avroContent); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } Schema.Parser parser = new Schema.Parser(); Schema targetSchema = parser.parse(targetSchemaContent.getValue()); List<String> targetColumns = new ArrayList<String>(); Map<String, String> targetKeyTypeItems = new LinkedHashMap<String, String>(); for (Field field : targetSchema.getFields()) { targetColumns.add(field.name()); List<Schema> types = targetSchema.getField(field.name()).schema().getTypes(); String datatype = types.get(types.size() - 1).getName(); targetKeyTypeItems.put(field.name(), datatype); } ExpressionEncoder<Row> encoder = SchemaHelper.createSchemaEncoder(targetSchema, withTimestamp); return new Tuple3<List<String>, Map<String, String>, ExpressionEncoder<Row>>(targetColumns, targetKeyTypeItems, encoder); } /** * 將schema轉化為Encoder */ protected static ExpressionEncoder<Row> createSchemaEncoder(Schema schema, boolean withTimestamp) { StructType type = (StructType) SchemaConverters.toSqlType(schema).dataType(); if (withTimestamp) { List<String> fields = java.util.Arrays.asList(type.fieldNames()); if (!fields.contains("timestamp")) { type = type.add("timestamp", DataTypes.TimestampType); } else { int index = type.fieldIndex("timestamp"); StructField field = type.fields()[index]; type.fields()[index] = new StructField(field.name(), DataTypes.TimestampType, field.nullable(), field.metadata()); } } ExpressionEncoder<Row> encoder = RowEncoder.apply(type); return encoder; } /** * 讀取hdfs上文件內容 */ protected static String getHdfsFileContent(String filePath){ String content = ""; try { reader = getHDFSFileReader(filePath); String line=null; while ((line = reader.readLine()) != null) { if (!line.startsWith("#") && line.trim().length() > 0) { content+=line.trim(); } } reader.close(); } catch (FileNotFoundException e) { e.printStackTrace(); throw new RuntimeException("file not found exception:" + this.avroSchemaPath); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException("reading file while an error was thrown:" + this.avroSchemaPath); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { e1.printStackTrace(); } } } return content; } protected static BufferedReader getHDFSFileReader(String hdfsFile) { try { System.out.println("hdfsfile: " + hdfsFile); Path configPath = new Path(hdfsFile); FileSystem fs = FileSystem.get(new Configuration()); if (fs.exists(configPath)) { return new BufferedReader(new InputStreamReader(fs.open(configPath))); } else { throw new FileNotFoundException("file(" + configPath + ") not found."); } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); } finally { } }
所有代碼都在這里,具體的不加介紹了。
ES API之Update Index:
所謂的修改index,也就是修改index的settings和mapping:
/** * 修改ES索引的mapping屬性 * */ public static boolean indexUpdateMapping(String indexName, String indexType, XContentBuilder builder) { org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest mapping = Requests.putMappingRequest(indexName).type(indexType) .source(builder); PutMappingResponse pMappingResource = getAdminClient().putMapping(mapping).actionGet(); return pMappingResource.isAcknowledged(); } /** * 修改ES索引的settings屬性<br> * 更新索引屬性(更新索引的settings屬性,這是更改已經創建的屬性、但有些一旦創建不能更改,需要按照自己的需求來進行選擇使用) * */ public static boolean indexUpdatSettings(String indexName, Map<String, String> settingsMap) { Builder settings = Settings.builder();// for (Map.Entry<String, String> kv : settingsMap.entrySet()) { settings.put(kv.getKey(), kv.getValue()); } return indexUpdatSettings(indexName, settings); } /** * 修改ES索引的settings屬性<br> * 更新索引屬性(更新索引的settings屬性,這是更改已經創建的屬性、但有些一旦創建不能更改,需要按照自己的需求來進行選擇使用) * */ public static boolean indexUpdatSettings(String indexName, Builder settings) { UpdateSettingsResponse uIndexResponse = getAdminClient().prepareUpdateSettings(indexName)// .setSettings(settings)// .execute().actionGet(); return uIndexResponse.isAcknowledged(); }
/** * 修改索引,修改索引的setting。 * * @param indexName * 索引名稱<br> * 如果不需要實時精確的查詢結果,可以把每個索引的index.refresh_interval設置為30s,如果在導入大量的數據,可以把這個值先設置為-1,完成數據導入之后在設置回來<br> * 如果在用bulk導入大量的數據,可以考慮不要副本,設置index.number_of_replicas: * 0。有副本存在的時候,導入數據需要同步到副本,並且副本也要完成分析,索引和段合並的操作,影響導入性能。可以不設置副本導入數據然后在恢復副本。<br> * <b>注意</b>:<br> * 有些屬性一旦創建就不可以修改,比如:index.number_of_shards,修改會拋出異常。 */ public static boolean indexUpdateSettings(String indexName) { Settings settings = Settings.builder() // // .put("index.mapping.ignore_malformed", false)// .put("index.refresh_interval", "30s") // // .put("index.number_of_shards", 4)// .put("index.number_of_replicas", 1)// // .put("index.max_result_window", 500000)// // // .put("index.translog.durability", "async")// // .put("index.translog.sync_interval", "120s")// // .put("index.translog.flush_threshold_size", "2gb")// // .put("index.merge.scheduler.max_thread_count", 1)// .build(); return indexUpdatSettings(indexName, settings); }
ES API之Delete/Open/Close Index:
/** * 刪除ES中某個或者多個索引 * */ public static boolean indexDelete(String... indices) { DeleteIndexResponse dIndexResponse = getAdminClient().prepareDelete(indices).execute().actionGet(); if (dIndexResponse.isAcknowledged()) { System.out.println("刪除索引成功"); return true; } else { System.out.println("刪除索引失敗"); return false; } } /** * 關閉ES中某個或者多個索引<br> * curl -XPOST "http://127.0.0.1:9200/indexname/_close" * */ public static boolean indexClose(String... indices) { CloseIndexResponse cIndexResponse = getAdminClient().prepareClose(indices).execute().actionGet(); if (cIndexResponse.isAcknowledged()) { System.out.println("關閉索引成功"); return true; } return false; } /** * 開啟ES中某個或者多個索引<br> * curl -XPOST "http://127.0.0.1:9200/indexname/_open" * */ public static boolean indexOpen(String... indices) { OpenIndexResponse oIndexResponse = getAdminClient().prepareOpen(indices).execute().actionGet(); if (oIndexResponse.isAcknowledged()) { System.out.println("開啟索引成功"); return true; } return false; }