注:重复造轮子了 以下可以改为spring batch 来导入。
说明
maven依赖
官方客户端 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.4/index.html
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.5.0</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.5.0</version>
<exclusions>
<exclusion>
<artifactId>commons-codec</artifactId>
<groupId>commons-codec</groupId>
</exclusion>
<!--此处要排除掉自带的,这个自带的版本低,会报错-->
<exclusion>
<artifactId>elasticsearch</artifactId>
<groupId>org.elasticsearch</groupId>
</exclusion>
</exclusions>
</dependency>
Util类
配置类
package com.crb.ocms.product.domain.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; /** * @Project crb-product-domain * @PackageName com.crb.ocms.product.admin.config * @ClassName ESConfiguration * @Author liqiang * @Date 2019/3/6 5:52 PM * @Description 用于加载es的相关配置 */ @ConfigurationProperties(prefix = "esconfig") @Data public class ESConfiguration { /** * index别名 */ private String aliasName; /** * 索引名字 */ private String indexName; /** * 全量索引的最大处理线程大小 */ private int threadSize; /** * 全量索引并行执行每个线程每次执行数据大小 */ private int treadDataSize; /** * typename */ private String typeName; /** * ESURL */ private String esUrl; /** * es ip */ private String host; /** * es端口 */ private Integer port; /*** * 用于标识是否正在处理的rediskey */ private String redissKey; /** *mappingJson */ private String mappingJson; }
util工具类
package com.crb.ocms.product.service.impl; import com.crb.ocms.product.domain.config.ESConfiguration; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpHost; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.client.*; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.elasticsearch.search.suggest.SuggestBuilders; import org.elasticsearch.search.suggest.SuggestionBuilder; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.*; import java.util.regex.Pattern; /** * @Project crb-product-service * @PackageName com.crb.ocms.demo.service.test.impl * @ClassName ESHLRestUtil * @Author liqiang * @Date 2019/2/25 13:45 * @Description es工具类 */ @Slf4j @Component @EnableConfigurationProperties(ESConfiguration.class) public class ESHLRestUtil { private RestHighLevelClient client = null; public RestHighLevelClient getClient() { return client; } public void setClient(RestHighLevelClient client) { this.client = client; } ESConfiguration esConfiguration; @Autowired public ESHLRestUtil(ESConfiguration configuration) { client = new SimpleRestHighLevelClient(RestClient.builder(new HttpHost(configuration.getHost(), configuration.getPort(), "http"))); this.esConfiguration=configuration; } public SimpleRestHighLevelClient getSimpleClient(){ return (SimpleRestHighLevelClient)client; } /** *根据indexname获得index名字 支持通配符匹配 * @param indexName * @return */ public String [] getIndexNames(String indexName) { GetIndexResponse getIndexResponse= null; try { getIndexResponse = getIndexInfo(indexName); } catch (IOException e) { e.printStackTrace(); } return getIndexResponse!=null?getIndexResponse.getIndices():null; } /** * 根据index名字获得index信息 * @param indexName * @return */ public GetIndexResponse getIndexInfo(String indexName) throws IOException { //GetIndexRequest request = new GetIndexRequest().indices(indexName); // try { // GetIndexResponse getIndexResponse =client.indices().get(request, RequestOptions.DEFAULT); // return getIndexResponse; // } catch (IOException e) { // e.printStackTrace(); // } //master_timeout /** * 因为现在是api使用6.5 线上是6.24 master_timeout 使用以下方式 替换掉 */ GetIndexRequest request = new GetIndexRequest().indices(indexName); String[] indices = request.indices() == null ? Strings.EMPTY_ARRAY : request.indices(); String endpoint =indexName; Request httpequest = new Request("GET", endpoint); httpequest.addParameter("ignore_unavailable", "false"); httpequest.addParameter("expand_wildcards", "open"); httpequest.addParameter("allow_no_indices", "true"); org.apache.http.HttpEntity entity = new NStringEntity("", ContentType.APPLICATION_JSON); httpequest.setEntity(entity); Response httpResponse= getClient().getLowLevelClient().performRequest(httpequest); GetIndexResponse getIndexResponse=getSimpleClient().parseGetIndexRespons(httpResponse); return getIndexResponse; } /** * 获得最大index名字 * @param names indexName_number 格式 * @return */ public Integer getMaxIndexNumber(String []names) { if(names==null||names.length<=0){ return null; } List<Integer> indexNumber=new ArrayList<Integer>(); for (String name: names) { String[] arrs=name.split("_"); if(arrs.length<=1){ continue; } indexNumber.add(Integer.valueOf(arrs[1])); } indexNumber.sort(new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { return o2 - o1; } }); return CollectionUtils.isEmpty(indexNumber)?null:indexNumber.get(0); } /** * 验证索引是否存在 * * @param index 索引名称 * @return * @throws Exception */ public boolean indexExists(String index) throws Exception { GetIndexRequest request = new GetIndexRequest(); request.indices(index); boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); return exists; } /** * 创建index * * @param index * @param indexType * @param properties 结构: {name:{type:text}} {age:{type:integer}} * @return * @throws Exception */ public boolean indexCreate(String index, String indexType, Map properties) throws Exception { if (indexExists(index)) { return true; } CreateIndexRequest request = new CreateIndexRequest(index); request.settings(Settings.builder().put("index.number_of_shards", 3) .put("index.number_of_replicas", 2)); Map jsonMap = new HashMap<>(); Map mapping = new HashMap<>(); mapping.put("properties", properties); jsonMap.put(indexType, mapping); request.mapping(indexType, jsonMap); CreateIndexResponse createIndexResponse = client.indices().create( request,RequestOptions.DEFAULT); boolean acknowledged = createIndexResponse.isAcknowledged(); return acknowledged; } /** * 删除指定索引 * @param indexName * @return * @throws IOException */ public boolean deleteIndex(String indexName) throws IOException { DeleteIndexRequest request = new DeleteIndexRequest(indexName); AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT); return deleteIndexResponse.isAcknowledged(); } /** * 创建索引 * @param index 索引名字 * @param settiongs settiongs * @return * @throws Exception * @author lqiang */ public boolean indexCreate(String index, String settiongs ) throws Exception { if (indexExists(index)) { return true; } CreateIndexRequest request = new CreateIndexRequest(index); // request.settings(Settings.builder().put("index.number_of_shards", 3) // .put("index.number_of_replicas", 2)); request.source(settiongs, XContentType.JSON); CreateIndexResponse createIndexResponse = client.indices().create( request,RequestOptions.DEFAULT); boolean acknowledged = createIndexResponse.isAcknowledged(); return acknowledged; } /** * 创建更新文档 * * @param index * @param indexType * @param documentId * @param josonStr * @return * @throws Exception */ public boolean documentCreate(String index, String indexType, String documentId, String josonStr) throws Exception { IndexRequest request = new IndexRequest(index, indexType, documentId); request.source(josonStr, XContentType.JSON); IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { return true; } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { return true; } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo .getFailures()) { throw new Exception(failure.reason()); } } return false; } /** * 创建更新文档 * * @param index * @param indexType * @param documentId * @param map * @return * @throws Exception */ public boolean documentCreate(String index, String indexType, String documentId, Map map) throws Exception { IndexRequest request = new IndexRequest(index, indexType, documentId); request.source(map); IndexResponse indexResponse = client.index(request,RequestOptions.DEFAULT); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { return true; } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { return true; } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo .getFailures()) { throw new Exception(failure.reason()); } } return false; } /** * 批量创建更新文档 * * @param index * @param indexType * @param list 建索引时传入_id作为docuemntId * @return * @throws Exception */ public boolean documentCreateBulk(String index, String indexType, List<Map<String, Object>> list) throws Exception { if (list.size() > 0) { BulkRequest bulkRequest = new BulkRequest(); for (Map map : list) { IndexRequest indexRequest; Object idObj = map.get("_id"); if (idObj != null && StringUtils.isNotBlank(idObj.toString())) { map.remove("_id"); String documentId = idObj.toString(); indexRequest = new IndexRequest(index, indexType, documentId); } else { indexRequest = new IndexRequest(index, indexType); } indexRequest.source(map); bulkRequest.add(indexRequest); } BulkResponse bulkResponse = client.bulk(bulkRequest,RequestOptions.DEFAULT); if (bulkResponse.hasFailures()) { System.out.println("索引异常信息:" + bulkResponse.buildFailureMessage()); return false; } } return true; } /** * 创建更新文档 * * @param index * @param indexType * @param documentId * @param routing * @param map * @return * @throws Exception */ public boolean documentCreate(String index, String indexType, String documentId, String routing, Map map) throws Exception { IndexRequest request = new IndexRequest(index, indexType, documentId); request.routing(routing); request.source(map); IndexResponse indexResponse = client.index(request,RequestOptions.DEFAULT); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { return true; } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { return true; } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo .getFailures()) { throw new Exception(failure.reason()); } } return false; } /** * 创建索引 * * @param index * @param indexType * @param josonStr * @return * @throws Exception */ public String documentCreate(String index, String indexType, String josonStr) throws Exception { IndexRequest request = new IndexRequest(index, indexType); request.source(josonStr, XContentType.JSON); IndexResponse indexResponse = client.index(request,RequestOptions.DEFAULT); String id = indexResponse.getId(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { return id; } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { return id; } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo .getFailures()) { throw new Exception(failure.reason()); } } return null; } /** * 创建索引 * * @param index * @param indexType * @param map * @return * @throws Exception */ public String documentCreate(String index, String indexType, Map map) throws Exception { IndexRequest request = new IndexRequest(index, indexType); request.source(map); IndexResponse indexResponse = client.index(request,RequestOptions.DEFAULT); String id = indexResponse.getId(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED || indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { return id; } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { return id; } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo .getFailures()) { throw new Exception(failure.reason()); } } return null; } public boolean documentDelete(String index, String indexType, String documentId) throws Exception { DeleteRequest request = new DeleteRequest(index, indexType, documentId); DeleteResponse deleteResponse = client.delete(request,RequestOptions.DEFAULT); if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { return true; } ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { return true; } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo .getFailures()) { throw new Exception(failure.reason()); } } return false; } /** * 为指定index指定别名 * @param indexName index名字 * @param aliasesName 别名名字 * @return */ public boolean aliases(String indexName,String aliasesName){ IndicesAliasesRequest request=new IndicesAliasesRequest(); IndicesAliasesRequest.AliasActions aliasAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD) .index(indexName) .alias(aliasesName); request.addAliasAction(aliasAction); try { AcknowledgedResponse indicesAliasesResponse = getClient().indices().updateAliases(request, RequestOptions.DEFAULT); return indicesAliasesResponse.isAcknowledged(); } catch (IOException e) { log.info("指定别名失败"); return false; } } /** * 指定别名 并删除oldIndex的别名 * @param oldindex * @param newIndex * @param aliasesName * @return */ public boolean aliases(String oldindex, String newIndex,String aliasesName) { IndicesAliasesRequest request=new IndicesAliasesRequest(); //旧的删除 IndicesAliasesRequest.AliasActions aliasAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE) .index(oldindex) .alias(aliasesName); request.addAliasAction(aliasAction); //新的绑定 aliasAction = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD) .index(newIndex) .alias(aliasesName); request.addAliasAction(aliasAction); try { AcknowledgedResponse indicesAliasesResponse = getClient().indices().updateAliases(request, RequestOptions.DEFAULT); return indicesAliasesResponse.isAcknowledged(); } catch (IOException e) { return false; } } /** * 根据id获得文档信息 * @param index indexName * @param type typeName * @param id id * @return * @throws IOException */ public GetResponse getDoucmentById(String index, String type, String id) throws IOException { GetRequest getRequest = new GetRequest( index,//索引 type,//类型 id);//文档ID GetResponse getResponse = client.get(getRequest,RequestOptions.DEFAULT); return getResponse; } /** * 简单的单表根据条件进行查询 * * @param index index * @param type type * @param parameters 参数 * @return */ public List<String> queryByMatch(String index, String type, Map<String, String> parameters) { List<String> jsons=new ArrayList<String>(); try { SearchRequest searchRequest=new SearchRequest(); BoolQueryBuilder booleanQueryBuilder= QueryBuilders.boolQuery(); SearchSourceBuilder searchSourceBuilder=new SearchSourceBuilder(); for (String key : parameters.keySet()) { booleanQueryBuilder.must().add(QueryBuilders.termQuery(key,parameters.get(key))); } searchSourceBuilder.query(booleanQueryBuilder); searchRequest.source(searchSourceBuilder); SearchResponse response = client.search(searchRequest,RequestOptions.DEFAULT); if(response.getHits().getHits()!=null){ for (SearchHit searchHit: response.getHits().getHits() ) { jsons.add(searchHit.getSourceAsString()); } } } catch (IOException e) { e.printStackTrace(); } return jsons; } /** * 只包含字母 * * @return 验证成功返回true,验证失败返回false */ public static boolean checkLetter(String cardNum) { String regex = "^[A-Za-z]+$"; return Pattern.matches(regex, cardNum); } /** * 验证中文 * * @param chinese 中文字符 * @return 验证成功返回true,验证失败返回false */ public static boolean checkChinese(String chinese) { String regex = "^[\u4E00-\u9FA5]+$"; return Pattern.matches(regex, chinese); } /** * Description:提示词,支持中文、拼音、首字母等 * <p> * 1、检测搜索词是中文还是拼音 * 2、若是中文,直接按照name字段提示 * 3、若是拼音(拼音+汉字),先按照name.keyword_pinyin获取,若是无结果按照首字母name.keyword_first_py获取 * @param index * @param type * @param field 提示字段名字 * @param text 文本 * @return * @author liqiang */ public Set<String> getSuggestWord(String index, String type, String field, String text) throws IOException { String postField=field; if (checkLetter(text)) { postField = field + ".keyword_pinyin"; } else if (checkChinese(text)) { postField = field; } else { postField = field + ".keyword_pinyin"; } Set<String> suggestTexts= postSuggestWord(index,type,postField,text); if(org.springframework.util.CollectionUtils.isEmpty(suggestTexts)){ return postSuggestWord(index,type,field+".keyword_first_py",text); } return suggestTexts; } /** * Description:提示词,支持中文、拼音、首字母等 * @param index * @param type * @param field 提示字段名字 * @param text 文本 * @return * @author liqiang */ private Set<String> postSuggestWord(String index, String type, String field, String text) throws IOException { // 1、创建search请求 SearchRequest searchRequest = new SearchRequest(index); searchRequest.types(type); // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各种查询的方法都在这。 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.fetchSource(false);//不返回_source数据 sourceBuilder.size(0);//忽略hits //做查询建议 //词项建议 SuggestionBuilder termSuggestionBuilder = SuggestBuilders.completionSuggestion(field).text(text); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("suggest_productName", termSuggestionBuilder); sourceBuilder.suggest(suggestBuilder); searchRequest.source(sourceBuilder); Set<String> suggestTextList = new HashSet<String>(); //3、发送请求 SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); //4、处理响应 //搜索结果状态信息 if (RestStatus.OK.equals(searchResponse.status())) { // 获取建议结果 Suggest suggest = searchResponse.getSuggest(); CompletionSuggestion termSuggestion = suggest.getSuggestion("suggest_productName"); for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) { for (CompletionSuggestion.Entry.Option option : entry) { suggestTextList.add(option.getText().string()); } } } return suggestTextList; } /** * @Project demo * @PackageName com.crb.ocms.product.service.impl * @ClassName SimpleRestHighLevelClient * @Author qiang.li * @Date 2019/4/2 10:45 AM * @Description es版本降级导致的高级api会生成部分属性 低版本不能识别 将解析结果的方法暴露在外面 */ public class SimpleRestHighLevelClient extends RestHighLevelClient { public SimpleRestHighLevelClient(RestClientBuilder restClientBuilder) { super(restClientBuilder); } /** * 父类解析的解析响应内容是受保护的 所以定义一个继承 * 解决版本不一致的 自定义请求 解析 * @param response * @param entityParser * @param <Req> * @param <Resp> * @return * @throws IOException */ public <Req extends Validatable, Resp> Resp parseEntity(Response response,CheckedFunction<XContentParser, Resp, IOException> entityParser)throws IOException { return this.parseEntity(response.getEntity(),entityParser); } /** * 解析获得idnex的响应 * @param response * @return * @throws IOException */ public GetIndexResponse parseGetIndexRespons(Response response) throws IOException { return parseEntity(response,GetIndexResponse::fromXContent); } /** * 解析查询响应 * @param response * @return * @throws IOException */ public SearchResponse parseSearchResponse(Response response) throws IOException { return parseEntity(response,SearchResponse::fromXContent); } /** * 解析索引迁移响应 * @param response * @return * @throws IOException */ public BulkByScrollResponse parseBulkByScrollResponse(Response response) throws IOException { return parseEntity(response,BulkByScrollResponse::fromXContent); } } }
封装的处理器
工作原理
将指定条件的数据迁移到新索引,然后再新索引上面进行导入(多线程并行导入),导入完毕删除老索引和别名 然后为新索引绑定别名 实现不停机更新
抽象接口
package com.crb.ocms.product.serviceTool; import org.elasticsearch.index.reindex.ReindexRequest; import java.io.IOException; import java.util.concurrent.Future; /** * @Project crb-product-service * @PackageName com.crb.ocms.product.serviceTool * @ClassName EsImportService * @Author liqiang * @Date 2019/3/29 1:28 PM * @Description es导入抽象接口 */ public interface ESImportService { /** * 获得索引迁移条件 * @param reindexRequest * @return * @throws IOException */ public boolean reindex(ReindexRequest reindexRequest) throws IOException; /** * 异步导入 * @return */ public Future<Boolean> importAllAsyn(); }
抽象的处理器
使用模板模式将通用代码抽出来
package com.crb.ocms.product.serviceTool; import com.crb.ocms.product.domain.config.ESConfiguration; import com.crb.ocms.product.domain.entity.MdProduct; import com.crb.ocms.product.domain.redisskey.MdEsProductRedisKeyEnum; import com.crb.ocms.product.domain.util.exceptions.OCmsExceptions; import com.crb.ocms.product.service.impl.ESHLRestUtil; import com.hazelcast.util.StringUtil; import lombok.extern.log4j.Log4j2; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; import org.apache.lucene.util.BytesRef; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexRequest; import org.redisson.api.RedissonClient; import org.springframework.data.domain.PageRequest; import org.springframework.web.client.RestTemplate; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** * @Project crb-product-service * @PackageName com.crb.ocms.product.serviceTool * @ClassName ESAbstrctImpor * @Author liqiang * @Date 2019/3/29 1:35 PM * @Description 抽象的es导入处理器 */ @Log4j2 public abstract class ESAbstractImport implements ESImportService { private RestTemplate restTemplate; private ESConfiguration esConfiguration; private ESHLRestUtil eshlRestUtil; private RedissonClient redissonClient; /** * 当前index名字 */ private String templateIndexName; /** * 之前的index名字 */ private String beforeIndexName; public ESAbstractImport(RestTemplate restTemplate, ESConfiguration esConfiguration,RedissonClient redissonClient){ esConfiguration.setIndexName("cmsproudct"); esConfiguration.setAliasName("cmsproductAliasName"); this.restTemplate=restTemplate; this.esConfiguration=esConfiguration; this.eshlRestUtil=new ESHLRestUtil(esConfiguration); this.redissonClient=redissonClient; } /** * 获得创建索引的mapping抽象方法 * @return */ public abstract String getMapping(); /** * 获得所有迁移requestBody抽象方法 * @return */ public abstract ReindexRequest getReindex(); /** * es索引秦阿姨 * @return */ @Override public boolean reindex(ReindexRequest reindexRequest) throws IOException { // BulkByScrollResponse bulkByScrollResponse= eshlRestUtil.getClient().reindex(reindexRequest,RequestOptions.DEFAULT); /** * 因为现在是api使用6.5 线上是6.24 查询语句多生成zero_terms_query 使用以下方式 替换掉 */ String endpoint ="/_reindex"; Request httpequest = new Request("POST", endpoint); BytesRef source = XContentHelper.toXContent(reindexRequest, XContentType.JSON, false).toBytesRef(); org.apache.http.HttpEntity entity = new NStringEntity(new String(source.bytes).replaceAll("\"zero_terms_query\":\"NONE\","," "), ContentType.APPLICATION_JSON); httpequest.setEntity(entity); Response httpResponse=eshlRestUtil.getClient().getLowLevelClient().performRequest(httpequest); BulkByScrollResponse getIndexResponse=eshlRestUtil.getSimpleClient().parseBulkByScrollResponse(httpResponse); return true; } /** * 导入数据 根据设置的线程大小根据数据大小算出线程数量 通过多线程并行处理 * @return */ public boolean importAll() throws IOException { log.info("正在产品全量导入2:"+esConfiguration.getEsUrl()); if (redissonClient.getMap(esConfiguration.getRedissKey()).isExists()) { throw new OCmsExceptions("全量索引正在导入中...."); } String []indexNames=eshlRestUtil.getIndexNames(esConfiguration.getIndexName()+"*"); if (indexNames!=null&&indexNames.length>0) { Integer number= eshlRestUtil.getMaxIndexNumber(indexNames); beforeIndexName=number==null?esConfiguration.getIndexName():esConfiguration.getIndexName()+"_"+number; templateIndexName=esConfiguration.getIndexName()+"_"+(number==null?1:(number+1)); } else { templateIndexName = esConfiguration.getIndexName(); } boolean isSuccess = true; long count = getCount(); //根据线程处理大小获得页码 int index = (new Double(Math.ceil(count*1.0 / esConfiguration.getTreadDataSize()))).intValue(); if (index <= 0 && count > 0) { index = 1; } //获得最大处理线程大小 int threadSize = index > esConfiguration.getThreadSize() ? esConfiguration.getThreadSize() : index; //导入处理器 ESAbstractImport.ProcessImportIndex processImportIndex = new ESAbstractImport.ProcessImportIndex(esConfiguration.getTreadDataSize(), index, templateIndexName); ThreadPoolExecutor executorService=null; try { //第一次导入直接创建索引 if (beforeIndexName == null) { log.info("开始创建索引"); eshlRestUtil.indexCreate(templateIndexName,getMapping()); log.info("创建索引成功"); } else { log.info("开始创建索引1"); eshlRestUtil.indexCreate(templateIndexName,getMapping()); log.info("创建索引成功1"); log.info("开始创建索引2"); //非第一次导入将子集数据迁移进来 reindex(getReindex()); log.info("开始创建索引2"); } /** 线程池的自定义配置,IO密集型任务. */ executorService = new ThreadPoolExecutor( // 核心线程数 threadSize, // 最大线程数 100, // 存活时间,30s 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(150), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); redissonClient.getMap(esConfiguration.getRedissKey()).put("templateIndexName", templateIndexName); //统一保存异步处理结果 List<Future<Boolean>> processResult = new ArrayList<Future<Boolean>>(); for (int i = 0; i < threadSize; i++) { processResult.add(executorService.submit(processImportIndex)); } for (Future<Boolean> future : processResult) { if (!future.get()) { isSuccess = future.get(); break; } } //表示重建成功 if (isSuccess) { //表示第一次初始化 指定别名 if (StringUtil.isNullOrEmpty(beforeIndexName)) { boolean isFail = false; int i = 0; do { isFail = !eshlRestUtil.aliases(templateIndexName,esConfiguration.getAliasName()); if(i>0) { //休眠 Thread.sleep(i * 1000); } i++; } while (isFail && i < 10); if (isFail) { log.info("开始删除索引2"); eshlRestUtil.deleteIndex(templateIndexName); log.info("开始删除成功"); } } else { boolean isFail = false; int i = 0; do { //重新绑定别名 失败重试十次 isFail = !eshlRestUtil.aliases(beforeIndexName,templateIndexName,esConfiguration.getAliasName()); if(i>0) { //休眠 Thread.sleep(i * 1000); } i++; } while (isFail && i < 10); if (isFail) { //删除备份索引 eshlRestUtil.deleteIndex(templateIndexName); throw new OCmsExceptions("索引重做失败,请重试"); } else { //删除备份索引 eshlRestUtil.deleteIndex(beforeIndexName); } } } else { for (Future<Boolean> future : processResult) { if (!future.get()) { //没有重做成功删除创建的新索引 eshlRestUtil.deleteIndex(templateIndexName); future.isCancelled();//取消其他正在执行的线程 } } } } catch (Exception e) { String uuid = UUID.randomUUID().toString(); log.error(uuid + "\n" + e.getMessage()); eshlRestUtil.deleteIndex(templateIndexName); throw new OCmsExceptions("<ES文档导入失败>异常编码" + uuid + "\n" + e.getMessage()); } finally { if (executorService != null) { executorService.shutdown(); } redissonClient.getBucket(esConfiguration.getRedissKey()).delete(); } return isSuccess; } /** * 异步的导入方法 * @return */ @Override public Future<Boolean> importAllAsyn() { ExecutorService executorService=new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); return executorService.submit(new Callable<Boolean>() { @Override public Boolean call() throws Exception { return importAll(); } }); } /** * 抽象的处理导入方法 * @param pageRequest * @return * @throws IOException */ public abstract boolean processImport(PageRequest pageRequest) throws IOException; public abstract Long getCount(); /** * 用于处理批量索引导入 */ class ProcessImportIndex implements Callable<Boolean> { //数据总页数 private int pageCount; //总条数 private int pageSize; private String indexName; private AtomicLong currentIndex = new AtomicLong(); //当前处理页数 AtomicInteger currentPage = new AtomicInteger(); public ProcessImportIndex(int pageSize, int pageCount, String indexName) { this.pageSize = pageSize; this.pageCount = pageCount; this.indexName = indexName; } /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ @Override public Boolean call() throws Exception { try { return process(); } catch (Exception e) { e.printStackTrace(); return false; } } public boolean process() throws IOException { int page = currentPage.getAndIncrement(); redissonClient.getMap(MdEsProductRedisKeyEnum.IMPORT_INDEX).put("pageCount", pageCount); while (page < pageCount) { redissonClient.getMap(MdEsProductRedisKeyEnum.IMPORT_INDEX).put("currentPage", page+1); processImport(new PageRequest(page, pageSize)); page = currentPage.getAndIncrement(); } return true; } public AtomicInteger getCurrentPage() { return currentPage; } public void setCurrentPage(AtomicInteger currentPage) { this.currentPage = currentPage; } } public String getTemplateIndexName() { return templateIndexName; } public void setTemplateIndexName(String templateIndexName) { this.templateIndexName = templateIndexName; } public interface BulkCallback { public void process(MdProduct mdProduct); } public RestTemplate getRestTemplate() { return restTemplate; } public void setRestTemplate(RestTemplate restTemplate) { this.restTemplate = restTemplate; } public ESConfiguration getEsConfiguration() { return esConfiguration; } public void setEsConfiguration(ESConfiguration esConfiguration) { this.esConfiguration = esConfiguration; } public ESHLRestUtil getEshlRestUtil() { return eshlRestUtil; } public void setEshlRestUtil(ESHLRestUtil eshlRestUtil) { this.eshlRestUtil = eshlRestUtil; } public RedissonClient getRedissonClient() { return redissonClient; } public void setRedissonClient(RedissonClient redissonClient) { this.redissonClient = redissonClient; } public String getBeforeIndexName() { return beforeIndexName; } public void setBeforeIndexName(String beforeIndexName) { this.beforeIndexName = beforeIndexName; } }
使用例子
package com.crb.ocms.product.serviceTool; import com.crb.ocms.product.domain.config.ESConfiguration; import com.crb.ocms.product.domain.entity.MdProduct; import com.crb.ocms.product.domain.repository.MdProductRepository; import com.crb.ocms.product.domain.vo.req.FindProductCharacterStocksVo; import com.crb.ocms.product.domain.vo.resp.ProductCharacterStocksVo; import com.crb.ocms.product.service.MdProductService; import com.crb.ocms.product.service.feign.IcProductStoreAccountService; import com.crb.ocms.product.service.impl.MdESProductServiceImpl; import lombok.extern.log4j.Log4j2; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.ReindexRequest; import org.redisson.api.RedissonClient; import org.springframework.data.domain.Example; import org.springframework.data.domain.ExampleMatcher; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.web.client.RestTemplate; import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; /** * @Project crb-product-service * @PackageName com.crb.ocms.product.serviceTool * @ClassName ProductStockImport * @Author liqiang * @Date 2019/3/29 2:15 PM * @Description 处理库存全量索引导入 */ @Log4j2 @Deprecated public class ProductStockImport extends ProductIndexImport { MdProductRepository mdProductRepository; IcProductStoreAccountService icProductStoreAccountService; String regionCode; public ProductStockImport(RestTemplate restTemplate, ESConfiguration esConfiguration, RedissonClient redissonClient, MdProductRepository mdProductRepository, IcProductStoreAccountService icProductStoreAccountService, String regionCode) { super(restTemplate, esConfiguration, redissonClient); this.mdProductRepository=mdProductRepository; this.regionCode=regionCode; this.icProductStoreAccountService=icProductStoreAccountService; } /** * 索引迁移条件。将指定条件数据迁移到新的索引 * @return */ @Override public ReindexRequest getReindex() { ReindexRequest reindexRequest=new ReindexRequest(); reindexRequest.setSourceIndices(getBeforeIndexName()); reindexRequest.setSourceQuery(QueryBuilders.boolQuery().mustNot(QueryBuilders.matchPhraseQuery("info","ic_product_store_account"))); reindexRequest.setDestIndex(getTemplateIndexName()); return reindexRequest; } /** * 已经迁移到经销权 先禁用 * @param pageRequest * @return * @throws IOException */ @Deprecated @Override public boolean processImport(PageRequest pageRequest) throws IOException { MdProduct mdProduct = new MdProduct(); mdProduct.setRegionCode(regionCode); ExampleMatcher exampleMatcher = ExampleMatcher.matching().withMatcher("regionCode", ExampleMatcher.GenericPropertyMatchers.exact()).withIgnorePaths("optCounter","createdDate","updatedDate","id");; Example example=Example.of(mdProduct,exampleMatcher); Page<MdProduct> result = mdProductRepository.findAll(example,pageRequest); for (MdProduct md:result){ if (md.getFullPalletSaleFlag()==null){ md.setFullPalletSaleFlag(0); } } List<Long> productIds=result.getContent().stream().map(MdProduct::getMdProductId).collect(Collectors.toList()); FindProductCharacterStocksVo findProductCharacterStocksVo=new FindProductCharacterStocksVo(); findProductCharacterStocksVo.setProductIds(productIds); findProductCharacterStocksVo.setRegionCode(regionCode); List<ProductCharacterStocksVo> productCharacterStocksVoList= icProductStoreAccountService.findProductCharacterStocks(findProductCharacterStocksVo); BulkRequest bulkRequest=new BulkRequest(); for (ProductCharacterStocksVo productCharacterStocksVo: productCharacterStocksVoList) { if(productCharacterStocksVo.getCharacterId().longValue()==-1L){ continue; } XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); builder.field("productId",productCharacterStocksVo.getProductId()); builder.field("factoryId",productCharacterStocksVo.getFactoryId()); builder.field("characterId",productCharacterStocksVo.getCharacterId()); builder.field("sumCount",productCharacterStocksVo.getSumCount()); builder.field("regionCode",regionCode); builder.field("info").startObject() .field("name","ic_product_store_account") .field("parent",productCharacterStocksVo.getProductId()+"_"+regionCode) .endObject(); builder.endObject(); String indexId="ic_product_store_account_"+productCharacterStocksVo.getProductId()+"_"+productCharacterStocksVo.getCharacterId()+"_"+productCharacterStocksVo.getFactoryId(); IndexRequest indexRequest=new IndexRequest(getTemplateIndexName(),getEsConfiguration().getTypeName(), indexId); indexRequest.source(builder); indexRequest.routing(productCharacterStocksVo.getProductId()+"_"+regionCode); bulkRequest.add(indexRequest); } log.info(bulkRequest.getDescription()); getEshlRestUtil().getClient().bulk(bulkRequest, RequestOptions.DEFAULT); return true; } /** * 数据总条数 * @return */ @Override public Long getCount() { MdProduct mdProduct = new MdProduct(); mdProduct.setRegionCode(regionCode); ExampleMatcher exampleMatcher = ExampleMatcher.matching(). withMatcher("regionCode", ExampleMatcher.GenericPropertyMatchers.exact()).withIgnorePaths("optCounter","createdDate","updatedDate","id"); Example example=Example.of(mdProduct,exampleMatcher); return mdProductRepository.count(example); } }
processImport就只用关系查询出指定分页的数据 然后导入到es