ElasticSearch+Springboot實際應用:索引同步建設,搜索過程


1、介紹

     springboot框架,眾多自動化的部署和約定配置,造成了springboot的着手麻煩,熟練后可以快速快捷進行開發,常用作快捷開發的java底層框架。各位看官都是大神,自行體會。

     elasticsearch很受歡迎的的一款擁有活躍社區開源的搜索解決方案,底層用的是luence。
     elasticsearch具有很豐富的插件庫,對於很多開源框架都支持使得ES很是受歡迎。
 
2、安裝配置過程
安裝elasticsearch參考本博客的另一篇文章:http://www.cnblogs.com/zhongshengzhen/p/elasticsearch_mysql.html
mysql通過logstash同步數據到elasticsearch參考文章:http://www.cnblogs.com/zhongshengzhen/p/elasticsearch_logstash.html
logstash同步數據寫的較為簡單,這里補充完成。
 
采用增量的方式導入mysql新增、修改的數據,前提是數據庫中的數據不進行刪除,只修改數據庫字段的狀態。
 
logstash jdbc的配置內容見下:
 
[zsz@VS-zsz conf]$ cd /usr/local/logstash-2.4.0/conf 
[zsz@VS-zsz conf]$ vi logstash-mysql-news.conf
 


input {
  jdbc {
    jdbc_driver_library => "/usr/local/logstash-2.4.0/mysql-connector-java-5.1.39.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://******************:3306/******?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => "*******"
    jdbc_password => "*******************"
    statement => "SELECT n.pk,n.media_pk as mediapk,n.user_pk as userpk,n.access_source_pk as accesssourcepk,updated_at   FROM tablename1  n LEFT JOIN v ON(n.pk=v.news_pk) LEFT JOIN tablename2   c ON(n.pk=c.news_pk) WHERE date_sub(n.updated_at,interval 8 hour)  > :sql_last_value"
    last_run_metadata_path => "/usr/local/logstash-2.4.0/conf/lastRun.news"
    use_column_value => true
    tracking_column => updated_at
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    schedule => "*/5 * * * *"
    type => "news"
  }
jdbc {
    jdbc_driver_library => "/usr/local/logstash-2.4.0/mysql-connector-java-5.1.39.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://*************:3306/*******?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => "touchtv"
    jdbc_password => "op@touchtv"
    statement => "SELECT pk,name,avatar_url as avatarurl,`desc`,status,remark,identity_type as identitytype,updated_at   FROM tablename WHERE status=1 AND date_sub(updated_at,interval 8 hour)  > :sql_last_value"
    last_run_metadata_path => "/usr/local/logstash-2.4.0/conf/lastRun.media"
    use_column_value => true
    tracking_column => updated_at
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    schedule => "*/20 * * * *"
    type => "media"
  }

}

filter {
   json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => "192.168.*******"
    index => "indexname"
    document_id => "%{pk}"
  }


注意:
last_run_metadata_path => "/usr/local/logstash-2.4.0/conf/lastRun.news" ,建議做這個配置,用來記錄上次更新的時間或者ID,達到重啟logstash而不會重新全量導入數據的目的。很重要的配置。
use_column_value => true    ,必須配置,開啟字段跟蹤
tracking_column => updated_at ,必須配置,指定跟蹤的字段名,必須在返回的SQL結果集中存在的字段,要不然會有WARN。
:sql_last_value 上次執行記錄的點,只能是數字類型或者時間類型,具體可以參考官方文檔。
date_sub(n.updated_at,interval 8 hour) 這個需要做時間的轉換,是個大坑。由於logstash取的時間@timestamp的時間比本地早8個小時,這個時間是UTC時間,日志應統一采用這個時間所以做的轉換,也可以修改logstash的配置來處理,但是logstash的日志及很多插件都是用了UTC時間,修改后需要對周邊的搭配的框架(如Kibana)也進行修改,很是麻煩,所以建議不要修改logstash的UTC時間配置。
document_id => "%{pk}" 必須配置,這里是對index進行唯一性的命名,這個配置可以避免同一條數據的修改可以更新到相應的記錄上,相當於關系型數據庫中的主鍵。由此可以看到,數據表建模時,最好可以所有的表都有一個自增字段來唯一識別一條記錄。
 
3、啟停logstash腳本
restart.sh
 
confFile="logstash-mysql-news.conf"
basepath=$(cd `dirname $0`; pwd)

directory=${basepath%/*}

cd $directory


pidString=`ps -ef | grep rg.jruby.Main | grep "${confFile}" | grep -v grep | awk '{print $2}'`


if [ -n "$pidString" ]; then

        kill -9 $pidString        
        echo -e "\nSTOP successfully! \n"
else
        echo -e "\nNo need to be stoped because it had already been stoped\n"

fi
folder="${directory}/logs"

if [ ! -d "$folder" ]; then
  mkdir "$folder"
fi
nohup bin/logstash -w 4 -f conf/"${confFile}" -l logs/logstash.log.news.`date -d today +"%Y-%m-%d"` > logs/nohup.log.news.`date -d today +"%Y-%m-%d"` 2>&1 &

echo -e "RESTARTING..."
sleep 3

pidString=`ps -ef | grep rg.jruby.Main | grep -v grep | awk '{print $2}'`

if [ -n "$pidString" ]; then
  echo -e "\nRESTART SUCCESSFUL!\n"
else
  echo -e "\nRESTART FAILED!\n"
fi
 
采用nohup啟動logstash同步,同時對線程啟動或者暫停,各位看官可以根據個人需要修改,記得點贊啊,如果覺得不錯。
 
解析:bin/logstash -w 4 這個指定是根據jdbc的進程來決定的,這里是允許同時運行四個線程
 
4、springboot的配置
 
springboot框架的使用這里就不說了,其實就是一個用spring管理elasticsearch連接和映射的東西,為了方便,你也可以直接用main函數作為客戶端連接elasticsearch進行測試。連接方式可能不一樣,但是檢索過程可以相互借鑒。
 
配置文件配置elasticsearc
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
 
application.yml
spring.data.elasticsearch.cluster-name: mycluster
spring.data.elasticsearch.cluster-nodes : 192.168.31.78:9300,192.168.31.79:9300
spring.data.elasticsearch.repositories.enabled : true
這里nodes配置了兩部機器,相當於配置了一個雙節點的ES的集群,當一部機器進程關閉,依舊保證服務。
 
 
定義實體類,配置與elasticsearch的字段映射
package cn.search.domain;

import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
import org.springframework.data.elasticsearch.annotations.Document;
import java.sql.Timestamp;

/**
* Created by zhongshzh on 2016/10/17.
*/
@Document(indexName = "zsz", type = "news", shards = 10, replicas = 0, refreshInterval = "-1")
public class News {
@Id
private long pk;
@Version
private Long version;

private int mediapk;
private int userpk;
......}
配置了indexName和typeName,News.java都是些getter和setter方法,這里不一一列舉了。必須定義@Id的字段,要不然會報錯。
 
5、springboot的檢索
     5.1:繼承ElasticsearchRepository的方式檢索
package cn.search.domain.repository;

import cn.search.domain.Media;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

/**
* Created by zhongshzh on 2016/10/19.
*/
public interface MediaRepository extends ElasticsearchRepository<Media, String> {

public Media findByPk(int pk);

}
上面定義了一個方法findByPk(pk),通過pk進行檢索,這種方法的使用詳見springboot jpas,相同的使用方法。
 
     5.1:構造QueryBuilder 的方式檢索
 
采用springdata的方式不太方便,對於含有and... (or...or......)的查詢就無能為力了,所以更通用的是使用QueryBuilder。 
public Page<News> searchNews(String keyword, int pageSize, int pageNum) {

QueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.
termQuery("checkstatus", "0"))
.must(QueryBuilders.
multiMatchQuery(keyword, "title", "summary", "content"));

Pageable pageable = new PageRequest(pageNum, pageSize);
Page<News> pageNews = newsSearchRepository.search(queryBuilder, pageable);
if(pageNews==null || pageNews.getSize() < 1)
return null;
return pageNews;
}
 
QueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("checkstatus", "0")).must(QueryBuilders.multiMatchQuery(keyword, "title", "summary"));
 
跟SQL:select * from news where checkstatus=0 and (title like '%keyword%' or summary  like '%keyword%')的含義一樣。
獲得的結果集是根據相關度來排序的,自定義的排序,暫時我也不清楚,有了解的大神還請不吝賜教。
 
本文的原文地址:http://www.cnblogs.com/zhongshengzhen/p/elasticsearch_springboot.html
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM