springboot1.5.10兼容高版本6.1.1elasticsearch


1.引入依賴

<dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>transport-netty4-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>

2.配置信息:

/**
 * 讀取client配置信息
 * @author 
 *
 */
@Configuration
@Getter
@Setter
public class ClientConfig {
    
    /** 
     * elk集群地址 
     */  
    @Value("${elasticsearch.ip}")
    private String esHostName;  
    /** 
     * 端口 
     */  
    @Value("${elasticsearch.port}")
    private Integer esPort;  
    /** 
     * 集群名稱 
     */  
    @Value("${elasticsearch.cluster.name}")
    private String esClusterName;  
  
    /** 
     * 連接池 
     */  
    @Value("${elasticsearch.pool}")
    private Integer esPoolSize;  
  
    
    /** 
     * 是否服務啟動時重新創建索引
     */  
    @Value("${elasticsearch.regenerateIndexEnabled}")
    private Boolean esRegenerateIndexFlag; 
    
    
    /** 
     * 是否服務啟動時索引數據同步
     */  
    @Value("${elasticsearch.syncDataEnabled}")
    private Boolean esSyncDataEnabled; 
}

3.es配置啟動類:

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetAddress;

/**
 * es配置啟動類
 * @author
 *
 */
@Configuration
public class ElasticsearchConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);
    
    @Autowired
    ClientConfig clientConfig;
    
    @Bean
    public TransportClient init() {
        LOGGER.info("初始化開始。。。。。");  
        TransportClient transportClient = null;
  
        try {  
            /**
             *  配置信息 
             *  client.transport.sniff   增加嗅探機制,找到ES集群 
             *  thread_pool.search.size  增加線程池個數,暫時設為5  
             */
            Settings esSetting = Settings.builder()
                    .put("client.transport.sniff", true) 
                    .put("thread_pool.search.size", clientConfig.getEsPoolSize())
                    .build();  
            //配置信息Settings自定義
            transportClient = new PreBuiltTransportClient(esSetting);
            TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(clientConfig.getEsHostName()), clientConfig.getEsPort());
            transportClient.addTransportAddresses(transportAddress);  
  
  
        } catch (Exception e) {  
            LOGGER.error("elasticsearch TransportClient create error!!!", e);  
        }  
  
        return transportClient;  
    }  
    
    
}

4.操作工具類:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;


public class ElasticsearchUtils {

    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchUtils.class);

    @Autowired
    private TransportClient transportClient;

    private static TransportClient client;

    @PostConstruct
    public void init() {
        client = this.transportClient;
    }

    /**
     * 創建索引以及設置其內容
     * @param index
     * @param indexType
     * @param filePath:json文件路徑
     */
    public static void createIndex(String index,String indexType,String filePath) throws RuntimeException {
        try {
                StringBuffer strBuf = new StringBuffer();
                //解析json配置
                ClassPathResource resource = new ClassPathResource(filePath);
                InputStream inputStream = resource.getInputStream();

                int len = 0;
                byte[] buf = new byte[1024];
                while((len=inputStream.read(buf)) != -1) {
                    strBuf.append(new String(buf, 0, len, "utf-8"));
                }
                inputStream.close();
                //創建索引
                createIndex(index);
                //設置索引元素
                putMapping(index, indexType, strBuf.toString());

        }catch(Exception e){
            throw new RuntimeException(e.getMessage());
        }
    }


        /**
         * 創建索引
         *
         * @param index 索引名稱
         * @return
         */
        public static boolean createIndex(String index){

            try {
                if (isIndexExist(index)) {
                    //索引庫存在則刪除索引
                    deleteIndex(index);
                }
                CreateIndexResponse indexresponse = client.admin().indices().prepareCreate(index).setSettings(Settings.builder().put("index.number_of_shards", 5)
                        .put("index.number_of_replicas", 1)
                )
                        .get();
                LOGGER.info("創建索引 {} 執行狀態 {}", index , indexresponse.isAcknowledged());

                return indexresponse.isAcknowledged();
            }catch (Exception e) {
                throw new RuntimeException(e.getMessage());
            }

        }


    /**
     * 創建索引
     *
     * @param index 索引名稱
     * @param indexType 索引類型
     * @param mapping 創建的mapping結構
     * @return
     */
    public static boolean putMapping(String index,String indexType,String mapping) throws RuntimeException {
        if (!isIndexExist(index)) {
            throw new RuntimeException("創建索引庫"+index+"mapping"+mapping+"結構失敗,索引庫不存在!");
        }
        try {
            PutMappingResponse indexresponse = client.admin().indices().preparePutMapping(index).setType(indexType).setSource(mapping, XContentType.JSON).get();

            LOGGER.info("索引 {} 設置 mapping {} 執行狀態 {}", index ,indexType, indexresponse.isAcknowledged());

            return indexresponse.isAcknowledged();
        }catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }


    }

    /**
     * 判斷索引是否存在
     *
     * @param index
     * @return
     */
    public static boolean isIndexExist(String index) {
        IndicesExistsResponse inExistsResponse = client.admin().indices().exists(new IndicesExistsRequest(index))
                .actionGet();
        return inExistsResponse.isExists();
    }


    /**
     * 刪除索引
     *
     * @param index
     * @return
     */
    public static boolean deleteIndex(String index) throws RuntimeException{
        if (!isIndexExist(index)) {
            return true;
        }
        try {
            DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(index).execute().actionGet();
            if (dResponse.isAcknowledged()) {
                LOGGER.info("delete index " + index + "  successfully!");
            } else {
                LOGGER.info("Fail to delete index " + index);
            }
            return dResponse.isAcknowledged();
        } catch (Exception e) {

            throw new RuntimeException(e.getMessage());
        }
    }


    /**
     * 數據添加
     *
     * @param jsonObject
     *            要增加的數據
     * @param index
     *            索引,類似數據庫
     * @param type
     *            類型,類似表
     * @return
     */
    public static String addData(JSONObject jsonObject, String index, String type) {
        return addData(jsonObject, index, type, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
    }

    /**
     * 數據添加,正定ID
     *
     * @param jsonObject
     *            要增加的數據
     * @param index
     *            索引,類似數據庫
     * @param type
     *            類型,類似表
     * @param id
     *            數據ID
     * @return
     */
    public static String addData(JSONObject jsonObject, String index, String type, String id)throws RuntimeException {
        try {
            IndexResponse response = client.prepareIndex(index, type, id).setSource(jsonObject).get();

            LOGGER.info("addData response status:{},id:{}", response.status().getStatus(), response.getId());

            return response.getId();
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }
    }


    /**
     * 批量數據添加,
     *
     * @param list
     *            要增加的數據
     * @param pkName
     *            主鍵id
     * @param index
     *            索引,類似數據庫
     * @param type
     *            類型,類似表
     * @return
     */
    public static <T> void addBatchData(List<T> list, String pkName, String index, String type) {
        if(list == null || list.isEmpty()) {
            return;
        }
        // 創建BulkPorcessor對象
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {
                // TODO Auto-generated method stub
            }

            // 執行出錯時執行
            @Override
            public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {
                // TODO Auto-generated method stub
            }
            @Override
            public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {
                // TODO Auto-generated method stub
            }
        })
                // 1w次請求執行一次bulk
                .setBulkActions(1000)
                // 1gb的數據刷新一次bulk
                // .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
                // 固定5s必須刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                // 並發請求數量, 0不並發, 1並發允許執行
                .setConcurrentRequests(1)
                // 設置退避, 100ms后執行, 最大請求3次
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();

        for (T vo : list) {
            if(getPkValueByName(vo, pkName)!= null) {
                String id = getPkValueByName(vo, pkName).toString();
                bulkProcessor.add(new IndexRequest(index, type, id).source(JSON.toJSONString(vo), XContentType.JSON));
            }

        }
        bulkProcessor.close();
    }

    /**
     * 根據主鍵名稱獲取實體類主鍵屬性值
     *
     * @param clazz
     * @param pkName
     * @return
     */
    private static Object getPkValueByName(Object clazz, String pkName) {
        try {
            String firstLetter = pkName.substring(0, 1).toUpperCase();
            String getter = "get" + firstLetter + pkName.substring(1);
            Method method = clazz.getClass().getMethod(getter, new Class[] {});
            Object value = method.invoke(clazz, new Object[] {});
            return value;
        } catch (Exception e) {
            return null;
        }
    }


    /**
     * 通過ID 更新數據
     *
     * @param jsonObject
     *            要增加的數據
     * @param index
     *            索引,類似數據庫
     * @param type
     *            類型,類似表
     * @param id
     *            數據ID
     * @return
     */
    public static void updateDataById(JSONObject jsonObject, String index, String type, String id) throws RuntimeException {

        try{
            UpdateRequest updateRequest = new UpdateRequest();

            updateRequest.index(index).type(type).id(id).doc(jsonObject);

            client.update(updateRequest);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * 批量數據更新,
     *
     * @param list
     *            要增加的數據
     * @param pkName
     *            主鍵id
     * @param index
     *            索引,類似數據庫
     * @param type
     *            類型,類似表
     * @return
     */
    public static <T> void updateBatchData(List<T> list, String pkName, String index, String type) {
        // 創建BulkPorcessor對象
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {
                // TODO Auto-generated method stub
            }

            // 執行出錯時執行
            @Override
            public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {
                // TODO Auto-generated method stub
            }
            @Override
            public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {
                // TODO Auto-generated method stub
            }
        })
                // 1w次請求執行一次bulk
                .setBulkActions(1000)
                // 1gb的數據刷新一次bulk
                // .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
                // 固定5s必須刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                // 並發請求數量, 0不並發, 1並發允許執行
                .setConcurrentRequests(1)
                // 設置退避, 100ms后執行, 最大請求3次
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)).build();

        for (T vo : list) {
            String id = getPkValueByName(vo, pkName).toString();
            bulkProcessor.add(new UpdateRequest(index, type, id).doc(JSON.toJSONString(vo), XContentType.JSON));
        }
        bulkProcessor.close();
    }


    /**
     * 通過ID獲取數據
     *
     * @param index
     *            索引,類似數據庫
     * @param type
     *            類型,類似表
     * @param id
     *            數據ID
     * @param fields
     *            需要顯示的字段,逗號分隔(缺省為全部字段)
     * @return
     */
    public static Map<String, Object> searchDataById(String index, String type, String id, String fields) {

        GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id);

        if (StringUtils.isNotEmpty(fields)) {
            getRequestBuilder.setFetchSource(fields.split(","), null);
        }

        GetResponse getResponse = getRequestBuilder.execute().actionGet();

        return getResponse.getSource();
    }

    /**
     * 使用分詞查詢
     *
     * @param index
     *            索引名稱
     * @param type
     *            類型名稱,可傳入多個type逗號分隔
     * @param clz
     *            數據對應實體類
     * @param fields
     *            需要顯示的字段,逗號分隔(缺省為全部字段)
     * @param boolQuery
     *            查詢條件
     * @return
     */
    public static <T> List<T> searchListData(String index, String type, Class<T> clz, String fields,BoolQueryBuilder boolQuery) {
        return searchListData(index, type, clz, 0, fields, null,  null,boolQuery);
    }

    /**
     * 使用分詞查詢
     *
     * @param index
     *            索引名稱
     * @param type
     *            類型名稱,可傳入多個type逗號分隔
     * @param clz
     *            數據對應實體類
     * @param size
     *            文檔大小限制
     * @param fields
     *            需要顯示的字段,逗號分隔(缺省為全部字段)
     * @param sortField
     *            排序字段
     * @param highlightField
     *            高亮字段
     * @param boolQuery
     *            查詢條件
     * @return
     */
    public static <T> List<T> searchListData(String index, String type, Class<T> clz,
                                             Integer size, String fields, String sortField, String highlightField,BoolQueryBuilder boolQuery) throws RuntimeException{

        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
        if (StringUtils.isNotEmpty(type)) {
            searchRequestBuilder.setTypes(type.split(","));
        }
        // 高亮(xxx=111,aaa=222)
        if (StringUtils.isNotEmpty(highlightField)) {
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            // 設置高亮字段
            highlightBuilder.field(highlightField);
            searchRequestBuilder.highlighter(highlightBuilder);
        }
        searchRequestBuilder.setQuery(boolQuery);
        if (StringUtils.isNotEmpty(fields)) {
            searchRequestBuilder.setFetchSource(fields.split(","), null);
        }
        searchRequestBuilder.setFetchSource(true);

        if (StringUtils.isNotEmpty(sortField)) {
            searchRequestBuilder.addSort(sortField, SortOrder.DESC);
        }
        if (size != null && size > 0) {
            searchRequestBuilder.setSize(size);
        }
        searchRequestBuilder.setScroll(new TimeValue(1000));
        searchRequestBuilder.setSize(10000);
        // 打印的內容 可以在 Elasticsearch head 和 Kibana 上執行查詢
        LOGGER.info("\n{}", searchRequestBuilder);

        SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();

        long totalHits = searchResponse.getHits().totalHits;
        if(LOGGER.isDebugEnabled()) {
            long length = searchResponse.getHits().getHits().length;

            LOGGER.info("共查詢到[{}]條數據,處理數據條數[{}]", totalHits, length);
        }


        if (searchResponse.status().getStatus() ==200) {
            // 解析對象
            return setSearchResponse(clz, searchResponse, highlightField);
        }

        return null;
    }


    /**
     * 高亮結果集 特殊處理
     *
     * @param clz
     *            數據對應實體類
     * @param searchResponse
     *
     * @param highlightField
     *            高亮字段
     */
    private static <T> List<T> setSearchResponse(Class<T> clz, SearchResponse searchResponse, String highlightField) {
        List<T> sourceList = new ArrayList<T>();
        for (SearchHit searchHit : searchResponse.getHits().getHits()) {
            searchHit.getSourceAsMap().put("id", searchHit.getId());
            StringBuffer stringBuffer = new StringBuffer();
            if (StringUtils.isNotEmpty(highlightField)) {

                // System.out.println("遍歷 高亮結果集,覆蓋 正常結果集" + searchHit.getSourceAsMap());
                HighlightField highlight = searchHit.getHighlightFields().get(highlightField);
                if(highlight == null) {
                    continue;
                }
                Text[] text = highlight.getFragments();
                if (text != null) {
                    for (Text str : text) {
                        stringBuffer.append(str.string());
                    }
                    // 遍歷 高亮結果集,覆蓋 正常結果集
                    searchHit.getSourceAsMap().put(highlightField, stringBuffer.toString());
                }
            }

            T t = JSON.parseObject(JSON.toJSONString(searchHit.getSourceAsMap()), clz);
            sourceList.add(t);
        }

        return sourceList;
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM