一、前言
上篇介紹了 ES 的基本概念及環境搭建,本篇將結合實際需求介紹整個實現過程及核心代碼。
二、安裝 ES ik 分析器插件
2.1 ik 分析器簡介
GitHub 地址:https://github.com/medcl/elasticsearch-analysis-ik
提供兩種分詞模式:「 ik_max_word 」及「 ik_smart 」
分詞模式 | 描述 |
---|---|
ik_max_word | 會將文本做最細粒度的拆分,比如會將“中華人民共和國國歌”拆分為“中華人民共和國,中華人民,中華,華人,人民共和國,人民,人,民,共和國,共和,和,國國,國歌”,會窮盡各種可能的組合 |
ik_smart | 會做最粗粒度的拆分,比如會將“中華人民共和國國歌”拆分為“中華人民共和國,國歌” |
2.2 安裝步驟
① 進入 ES 的 bin 目錄
$ cd /usr/local/elasticsearch/bin/
② 通過 elasticsearch-plugin 命令安裝 ik 插件
$ ./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v5.5.3/elasticsearch-analysis-ik-5.5.3.zip
③ 安裝成功后會在 plugins 目錄出現 analysis-ik 文件夾
三、數據同步
3.1 方案設計
通過 Logstash 實現 MySQL 數據庫中的數據同步到 ES 中,第一次全量同步,后續每分鍾增量同步一次
3.2 實現步驟
3.2.1 安裝 logstash-input-jdbc 插件
① 進入 Logstash 的 bin 目錄
$ cd /usr/local/logstash/bin/
② 使用 logstash-plugin 命令安裝 logstash-input-jdbc 插件
$ ./logstash-plugin install logstash-input-jdbc
3.2.2 MySQL 同步數據配置
① 首先在 Logstash 安裝目錄中新建「MySQL 輸入數據源
」相關目錄
/usr/local/logstash/mysql > MySQL 輸入數據源目錄
/usr/local/logstash/mysql/config > 配置文件目錄
/usr/local/logstash/mysql/metadata > 追蹤字段記錄文件目錄
/usr/local/logstash/mysql/statement > SQL 腳本目錄
② 其次上傳「MySQL JDBC 驅動
」至 /usr/local/logstash/mysql 目錄中
mysql-connector-java-5.1.40.jar
③ 然后新建「 SQL 腳本文件」,即 /usr/local/logstash/mysql/statement 目錄中新建 yb_knowledge.sql 文件,內容如下:
SELECT
id,
create_time AS createTime,
modify_time AS modifyTime,
is_deleted AS isDeleted,
knowledge_title AS knowledgeTitle,
author_name AS authorName,
knowledge_content AS knowledgeContent,
reference_count AS referenceCount
FROM
yb_knowledge
WHERE
modify_time >= :sql_last_value
④ 之后再新建「配置文件」,即 /usr/local/logstash/mysql/config 目錄中新建 yb_knowledge.conf 文件,內容如下:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://192.168.1.192/test"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "/usr/local/logstash/mysql/mysql-connector-java-5.1.40.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
jdbc_default_timezone => "UTC"
lowercase_column_names => false
# 使用其它字段追蹤,而不是用時間
use_column_value => true
# 追蹤的字段
tracking_column => "modifyTime"
record_last_run => true
# 上一個sql_last_value值的存放文件路徑, 必須要在文件中指定字段的初始值
last_run_metadata_path => "/usr/local/logstash/mysql/metadata/yb_knowledge.txt"
# 執行的sql文件路徑+名稱
statement_filepath => "/usr/local/logstash/mysql/statement/yb_knowledge.sql"
# 設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鍾都更新
schedule => "* * * * *"
# 索引類型
type => "knowledge"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
if [type] == "knowledge" {
elasticsearch {
hosts => ["localhost:9200"]
index => "yb_knowledge"
document_id => "%{id}"
}
}
stdout {
# JSON格式輸出
codec => json_lines
}
}
⑤ 進入 Logstash 的 bin 目錄啟動,啟動時既可指定單個要加載的 conf 文件,也可以指定整個 config 目錄
$ ./logstash -f ../mysql/config/yb_knowledge.conf
注:啟動 Logstash 時,不管有多少個配置文件最后都會編譯成一個文件,也就是說無論有多少個 input 或 output ,最終只有一個 pipeline
注:每分鍾的 0 秒 Logstash 會自動去同步數據
elasticsearch-head 中可看到最終結果如下:
3.2.3 自定義模板配置
此時建立的索引中字符串字段是用的默認分析器 「standard」,會把中文拆分成一個個漢字,這顯然不滿足我們的需求,所以需要自定義配置以使用 ik 分析器
① 首先在 Logstash 安裝目錄中新建「自定義模板文件」目錄
/usr/local/logstash/template > 自定義模板文件目錄
② 其次在該目錄中新建 yb_knowledge.json 模板文件,內容如下:
{
"template": "yb_knowledge",
"settings": {
"index.refresh_interval": "5s",
"number_of_shards": "1",
"number_of_replicas": "1"
},
"mappings": {
"knowledge": {
"_all": {
"enabled": false,
"norms": false
},
"properties": {
"@timestamp": {
"type": "date",
"include_in_all": false
},
"@version": {
"type": "keyword",
"include_in_all": false
},
"knowledgeTitle": {
"type": "text",
"analyzer": "ik_max_word"
},
"knowledgeContent": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
},
"aliases": {
"knowledge": {}
}
}
③ 然后修改 yb_knowledge.conf 文件中的 output 插件,指定要使用的模板文件路徑
if [type] == "knowledge" {
elasticsearch {
hosts => ["localhost:9200"]
index => "yb_knowledge"
document_id => "%{id}"
template_overwrite => true
template => "/usr/local/logstash/template/yb_knowledge.json"
template_name => "yb_knowledge"
}
}
④ 之后停止 Logstash 並刪除 metadata 目錄下 sql_last_value 的存放文件
$ rm -rf /usr/local/logstash/mysql/metadata/yb_knowledge.txt
⑤ 最后刪除先前創建的 yb_knowledge 索引並重啟 Logstash
注:重建索引后可以通過「_analyze」測試分詞結果
3.2.4 自動重載配置文件
為了可以自動檢測配置文件的變動和自動重新加載配置文件,需要在啟動的時候使用以下命令
$ ./logstash -f ../mysql/config/ --config.reload.automatic
默認檢測配置文件的間隔時間是 3 秒,可以通過以下命令改變
--config.reload.interval <second>
配置文件自動重載工作原理:
- 檢測到配置文件變化
- 通過停止所有輸入停止當前 pipline (即管道)
- 用新的配置創建一個新的 pipeline
- 檢查配置文件語法是否正確
- 檢查所有的輸入和輸出是否可以初始化
- 檢查成功使用新的 pipeline 替換當前的 pipeline
- 檢查失敗,使用舊的繼續工作
- 在重載過程中, Logstash 進程沒有重啟
注:自動重載配置文件不支持 stdin 這種輸入類型
四、代碼實現
以下代碼實現基於 Spring Boot 2.0.4,通過 Spring Data Elasticsearch 提供的 API 操作 ES
4.1 搭建 Spring Boot 項目
4.2 引入核心依賴包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
4.3 添加 Spring Boot ES 相關配置項
在 application.properties 文件中添加 ES 相關配置項
spring.data.elasticsearch.cluster-name = compass
spring.data.elasticsearch.cluster-nodes = xxx.xxx.xxx.xxx:9300
spring.data.elasticsearch.repositories.enabled = true
4.4 核心代碼
Spring Data Elasticsearch 提供了類似數據庫操作的 repository 接口,可以使我們像操作數據庫一樣操作 ES
① 定義實體
@Data
@Document(indexName = "knowledge", type = "knowledge")
public class KnowledgeDO {
@Id
private Integer id;
private Integer isDeleted;
private java.time.LocalDateTime createTime;
private java.time.LocalDateTime modifyTime;
private String knowledgeTitle;
private String authorName;
private String knowledgeContent;
private Integer referenceCount;
}
② 定義 repository 接口
public interface KnowledgeRepository extends ElasticsearchRepository<KnowledgeDO, Integer> {
}
③ 構建查詢對象
private SearchQuery getKnowledgeSearchQuery(KnowledgeSearchParam param) {
Pageable pageable = PageRequest.of(param.getStart() / param.getSize(), param.getSize());
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 使用 filter 比使用 must query 性能要好
boolQuery.filter(QueryBuilders.termQuery("isDeleted", IsDeletedEnum.NO.getKey()));
// 多字段查詢
MultiMatchQueryBuilder multiMatchQuery = QueryBuilders.multiMatchQuery(param.getKeyword(), "knowledgeTitle", "knowledgeContent");
boolQuery.must(multiMatchQuery);
return new NativeSearchQueryBuilder()
.withPageable(pageable)
.withQuery(boolQuery)
.build();
}
注:上述查詢類似於 MySQL 中的 select 語句「select * from yb_knowledge where is_deleted = 0 and (knowledge_title like '%keyword%' or knowledge_content like '%keyword%')」
④ 獲取返回結果
SearchQuery searchQuery = getKnowledgeSearchQuery(param);
Page<KnowledgeDO> page = knowledgeRepository.search(searchQuery);
注:最終結果默認會按照相關性得分倒序排序,即每個文檔跟查詢關鍵詞的匹配程度
五、結語
至此一個簡易的搜索服務已經實現完畢,后續將繼續介紹一些附加功能,如同義詞搜索、拼音搜索以及搜索結果高亮等
六、其它
6.1 注意事項
① 在 Logstash 的 config 目錄執行啟動命令時會觸發以下錯誤,所以請移步 bin 目錄執行啟動命令
ERROR Unable to locate appender "${sys:ls.log.format}_console" for logger config "root"
② Logstash 中 last_run_metadata_path 文件中保存的 sql_last_value 值是最新一條記錄的 tracking_column 值,而不是所有記錄中最大的 tracking_column 值
③ 當 MySQL 中字段類型為 tinyint(1) 時,同步到 ES 后該字段會轉化成布爾類型,改為 tinyint(4) 可避免該問題
6.2 如何使 ES 中的字段名與 Java 實體字段名保持一致?
Java 實體字段通常是小駝峰形式命名,而我們數據庫表字段都是下划線形式的,所以需要將兩者建立映射關系,方法如下:
① 修改 statement_filepath 的 SQL 腳本,表字段用 AS
設置成小駝峰式的別名,與 Java 實體字段名保持一致
② Logstash 配置文件中的 jdbc 配置還需要加一個配置項 lowercase_column_names => false
,否則在 ES中字段名默認都是以小寫形式存儲,不支持駝峰形式
6.3 Logstash 自定義模板詳解
① 第一次啟動 Logstash 時默認會生成一個名叫 「logstash」 的模板到 ES 里,可以通過以下命令查看
curl -XGET 'http://localhost:9200/_template/logstash'
注:默認模板內容如下:
{
"logstash": {
"order": 0,
"version": 50001,
"template": "logstash-*",
"settings": {
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"message_field": {
"path_match": "message",
"mapping": {
"norms": false,
"type": "text"
},
"match_mapping_type": "string"
}
},
{
"string_fields": {
"mapping": {
"norms": false,
"type": "text",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"match_mapping_type": "string",
"match": "*"
}
}
],
"_all": {
"norms": false,
"enabled": true
},
"properties": {
"@timestamp": {
"include_in_all": false,
"type": "date"
},
"geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"@version": {
"include_in_all": false,
"type": "keyword"
}
}
}
},
"aliases": {}
}
}
② 使用默認模板,適合剛入門時快速驗證使用,但不滿足實際需求場景,此時可以在 Logstash 「配置文件」中「 output 」插件中指定自定義模板 覆蓋
默認模板
output {
if [type] == "knowledge" {
elasticsearch {
hosts => ["localhost:9200"]
index => "yb_knowledge"
document_id => "%{id}"
template_overwrite => true
template => "/usr/local/logstash/template/yb_knowledge.json"
template_name => "yb_knowledge"
}
}
stdout {
# JSON格式輸出
codec => json_lines
}
}
配置項 | 說明 |
---|---|
template_overwrite | 是否覆蓋默認模板 |
template | 自定義模板文件路徑 |
template_name | 自定義模板名 |
注意事項:
-
如果不指定「 template_name 」則會永久覆蓋默認的「 logstash 」模板,后續即使刪除了自定義模板文件,在使用默認模板的情況下創建的索引還是使用先前自定義模板的配置。所以使用自定義模板時建議指定「 template_name 」防止出現一些難以察覺的問題。
-
如果不小心覆蓋了默認模板,需要重置默認模板則執行以下命令后重啟 Logstash。
curl -XDELETE 'http://localhost:9200/_template/logstash'
-
ES 會按照一定的規則來嘗試自動 merge 多個都匹配上了的模板規則,最終運用到索引上。所以如果某些自定義模板不再使用記得使用上述命令及時刪除,避免新舊版本的模板規則同時作用在索引上引發問題。
例:「 t1 」為舊模板,「 t2 」為新模板,它們的匹配規則一致,唯一的區別是「 t2 」刪除了其中一個字段的規則,此時如果「 t1 」模板不刪除則新建的索引還是會應用已刪除的那條規則。
-
模板是可以設置 order 參數的,默認的 order 值就是 0。order 值越大,在 merge 模板規則的時候優先級越高。這也是解決新舊版本同一條模板規則沖突的一個解決辦法。
③ 自定義模板中設置索引別名
,增加「 aliases 」配置項,如 yb_knowledge => knowledge
"template": "yb_knowledge",
...省略中間部分...
"aliases": {
"knowledge": {}
}
6.4 Logstash 多個配置文件里的 input 、filter 、 output 是否相互獨立?
不獨立;Logstash 讀取多個配置文件只是簡單的將所有配置文件整合到了一起。如果要彼此獨立,可以通過 type
或 tags
區分,然后在 output 配置中用 if 語句判斷一下
6.5 如何不停機重建索引?
① 首先新建新索引 v2
② 其次將源索引 v1
的數據導入新索引 v2
中
③ 然后設置索引別名(刪除源索引 v1
別名,添加新索引 v2
別名)
④ 之后修改 Logstash 配置文件中 output 的 index 值為 v2
注:前提是 Logstash 啟動時指定
config.reload.automatic
設置項開啟配置文件自動重載
⑤ 再次執行步驟二增量同步源索引 v1
中已修改但沒同步到新索引 v2
中的數據
⑥ 最后刪除源索引 v1