用過 Solr 的朋友都知道,Solr 可以直接在配置文件中配置數據庫連接從而完成索引的同步創建,但是 ElasticSearch 本身並不具備這樣的功能,那如何建立索引呢?方法其實很多,可以使用 Java API 的方式建立索引,也可以通過 Logstash 的插件 logstash-input-jdbc 完成,今天來探討下如何使用 logstash-input-jdbc 完成全量同步以及增量同步。

環境

本文以及后續 es 系列文章都基於 5.5.3 這個版本的 elasticsearch ,這個版本比較穩定,可以用於生產環境。

默認你已經搭建好 es 的基礎環境,如還未搭建好,請參考前文。接下來只講解 logstash 的安裝使用。本文使用最新版本的 logstash ,版本號為 6.4.0。

系列文章

數據准備

如圖所示,如果要實現這個搜索,首先要創建相關的索引,篩選條件有男生/女生,還有分類,屬性,字數,連載狀態,品質等,排序條件有人氣,時間,字數,收藏,推薦,點擊。

這些數據正常都不會再同一張表當中,而是分布於不同的表中,但這些數據都與作品的 id 緊緊關聯。本文為了方便演示,把這些數據都放在同一張表當中,如果在實際使用的過程當中,如果遇到多張表的情況,可以寫 sql 進行聯合查詢,同樣也可以建立索引,實現方式詳見下文。

創建表結構

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE TABLE `book` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`create_time` datetime(0) NULL DEFAULT NULL,
`update_time` datetime(0) NULL DEFAULT NULL,
`status` tinyint(11) NULL DEFAULT NULL,
`name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
`intro` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL,
`icon` int(11) NULL DEFAULT NULL,
`author` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
`words` int(11) NULL DEFAULT NULL,
`collection` int(11) NULL DEFAULT NULL,
`goods` int(11) NULL DEFAULT NULL,
`click` int(11) NULL DEFAULT NULL,
`site` tinyint(11) NULL DEFAULT NULL,
`sort` tinyint(11) NULL DEFAULT NULL,
`vip` tinyint(11) NULL DEFAULT NULL,
`popularity` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ;

導入測試數據

注:測試數據是通過爬蟲在盜版網站抓的,個別數據胡亂填寫的。

 INSERT INTO `book`(`id`, `create_time`, `update_time`, `status`, `name`, `intro`, `icon`, `author`, `words`, `collection`, `goods`, `click`, `site`, `sort`, `vip`, `popularity`) VALUES (1, '2017-12-27 20:53:01', '2018-09-10 20:53:46', 0, '火爆娛樂天王', '  有人說:“他是至高無上的音樂教皇!”有人說:“他是無人能及的影視國王!”還有人說:“他是神級作家、話劇大師、偉大的音樂家……”他使華夏元素狂暴沖擊西方文化,卻又能做出令西方人也不及的歐美音樂影視。他造就了無數的歌星影星,創作出無數的經典,建立了一個龐大的娛樂帝國。當所有人稱頌他膜拜他的時候,只有他自己知道,他不過是一個意外來自異時空的文化使者。  ', 1, '茶與酒之歌', 230000, 4234, 42315, 523, 2, 9, 1, 12423);
INSERT INTO `book`(`id`, `create_time`, `update_time`, `status`, `name`, `intro`, `icon`, `author`, `words`, `collection`, `goods`, `click`, `site`, `sort`, `vip`, `popularity`) VALUES (2, '2017-12-27 20:53:01', '2017-12-27 20:53:03', 0, '無限平行進化', '  日復一日的枯燥生活讓廖宇開始產生了難以抑制的厭倦。沉重龐大的經濟負擔使他的生活開始產生了變化。歷史的缺失更是成為了他心中不散的謎團。某一天突如其來的陌生信息終於給他打開了一扇門。性格各異的X戰警與復雜龐大的復仇者聯盟會再度爆發怎樣的戰爭。加勒比海上是否還會揚起第四艘傳奇戰艦的旗幟。鋪天蓋地的宇宙蟲族與冰冷機械的人工智能誰才是最具有侵略性的文明。最終廖宇會如何選擇,向左還是向右…… 各位書友要是覺得《無限平行進化》還不錯的話請不要忘記向您QQ群和微博里的朋友推薦哦!無限平行進化最新章節,無限平行進化無彈窗,無限平行進化全文閱讀.  ', 0, '雪色銀狼CS', 555555, 450000, 13412, 555, 2, 9, 1, 342);
INSERT INTO `book`(`id`, `create_time`, `update_time`, `status`, `name`, `intro`, `icon`, `author`, `words`, `collection`, `goods`, `click`, `site`, `sort`, `vip`, `popularity`) VALUES (3, '2017-12-27 20:53:02', '2018-09-15 20:53:55', 0, '逆天葯神', '  一個意外,讓他在星空的彼岸兩地相望。三人攜手,殺出了這片大陸的四方威名。他,張凡,既然獲得新生,就絕不再做弱者。既然要成為強者,就必須讓這片星空顫抖!“既然是我的未來,那便只能由我主宰”  ', 1, '楓葉', 340000, 213334, 3421, 42314, 2, 9, 1, 52315);
INSERT INTO `book`(`id`, `create_time`, `update_time`, `status`, `name`, `intro`, `icon`, `author`, `words`, `collection`, `goods`, `click`, `site`, `sort`, `vip`, `popularity`) VALUES (4, '2017-12-27 20:53:02', '2018-08-16 20:53:59', 0, '錦綉風華之第一農家女1', '  前世她是鐵血手腕的帝國集團總裁,卻被心愛之人設計,魂歸天國。 再次睜眼,眼前的三間茅草屋,一對小瘦猴。 就算是她定力再強悍,在他們喊出那聲“娘”的時候,還是讓她差點沒跳起來。 前世活到28還是清白如蘭,一個穿越就讓她勉強算是B的身材,孕育出一對兒女? 當然,這還是其次,最重要的是她丫的居然是未婚生子,這在現代都遭人白眼的事情,那個天殺的能告訴她,這個身體的原主,是不是太牛叉了,居然沒有被浸豬籠。 只是,當這對瘦的皮包骨的小包子在她跟前,腫着兩對眼泡忍者淚花跑前跑后,就算是她再不想面對現實,也無法坐視不理。 既然讓她再次重生,她勢必要左手揮舞鋤頭,右手執筆算盤,帶着一對可愛的包子,發家致富。 購田地,建豪宅,買下人,顧長工,一切都再朝着讓她滿意的方向前進,而那些眼紅嫉妒之輩,完全都是她業余之時的消遣,根本就不是一個等級。 但是,常在河邊走,哪有不濕鞋,這個道理她明白,卻沒想到那鞋子會濕的這么快,面對那個如同妖孽般,表面謙謙公子,風華無雙,實則腹黑狡詐,怎么坑死她怎么來,還讓她有火沒處發。 精彩小劇場 當一對粉雕玉琢的包子被一個風華絕代的男人一手一個抱進來,君瑤真心的黑面了,淚奔了。 他們這兩個沒良心的到底明不明白,什么是引狼入室啊。 “多謝寧公子送他們回來,您看如今天色已晚,為了寧公子的名聲,小婦人也不敢久留,寧公子請回吧。”她快步上前,一把一個把小包子從那個男人懷里蒿出來,快言快語的下了逐客令。 男人好看的眉毛微挑,隨后一模情緒從眸中迅速划過,快的難以捕捉。 “無妨,我來陪陪這兩個小家伙,君娘子不必多心。” “我…”她差點沒被噎死,她有什么好多心的,就沖着他覬覦她的孩子,就不能讓她留下。 只是她的話沒有說完,就被男人貼面而來的俊彥嚇得向后退去,而男人含笑的黑眸和清淡的話語,卻讓她差點怒火狂飆,“還是你想我把他們帶回去?” 君瑤大驚,帶回去?那是絕對不可能的,她的兒女,誰敢打主意,誰就沒活路,話雖然很直白,卻獨獨對他不管用。  ', 1, '席妖妖', 444902, 23, 5353, 21323, 2, 9, 1, 314);
INSERT INTO `book`(`id`, `create_time`, `update_time`, `status`, `name`, `intro`, `icon`, `author`, `words`, `collection`, `goods`, `click`, `site`, `sort`, `vip`, `popularity`) VALUES (5, '2017-12-27 20:53:02', '2018-09-05 20:54:05', 0, '伏魔', '  富家子弟墨浞因為發現了村子中的秘密,在良心與親情的折磨下,逃到了邊境小城。因為一個香-艷而又恐怖的夢,墨浞經歷了一些詭異的事,從而得知自己的前世與今生的使命。踏上藏地,歷經磨難,克服了自己的心魔,戰勝了...  ', 1, '一葉style', 490000, 526, 7687, 9, 2, 29, 1, 41516);
 

創建索引

使用 postman 等工具創建 es 索引,其中使用到了 ik 以及 pinyin 分詞器,具體配置可以參考我前面的文章。

PUT: http://192.168.200.192:9200/novel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
{
"settings":{
"number_of_shards":5,
"number_of_replicas":1,
"analysis":{
"analyzer":{
"pinyin_analyzer":{
"tokenizer":"ik_smart",
"filter":[
"full_pinyin_no_space",
"full_pinyin_with_space",
"first_letter_pinyin"
]
}
},
"filter":{
"full_pinyin_no_space":{
"type":"pinyin",
"first_letter":"none",
"padding_char":""
},
"full_pinyin_with_space":{
"type":"pinyin",
"first_letter":"none",
"padding_char":" "
},
"first_letter_pinyin":{
"type":"pinyin",
"first_letter":"only",
"padding_char":""
}
}
}
},
"mappings":{
"book":{
"properties":{
"id":{
"type":"integer"
},
"words":{
"type":"integer"
},
"intro":{
"analyzer":"ik_smart",
"search_analyzer":"ik_smart",
"type":"text"
},
"name":{
"analyzer":"pinyin_analyzer",
"search_analyzer":"pinyin_analyzer",
"type":"text"
},
"sort":{
"type":"integer"
},
"updatetime":{
"type":"date",
"format":"yyyy-MM-dd HH:mm:ss"
},
"vip":{
"type":"boolean"
},
"site":{
"type":"integer"
},
"author":{
"analyzer":"pinyin_analyzer",
"search_analyzer":"pinyin_analyzer",
"type":"text"
},
"collection":{
"type":"integer"
},
"click":{
"type":"integer"
},
"popularity":{
"type":"integer"
},
"goods":{
"type":"integer"
},
"status":{
"type":"integer"
}
}
}
}
}

注意,這里索引的所有字段都是小寫的,不要包含大寫,否則后續會出現問題。
索引創建好了之后,安裝 logstash。

安裝 logstash

Tips: 因為 logstash 啟動的時候,會占用較高的 CPU ,建議不要放在 es 集群的服務器上,最好換一台服務器進行安裝。

1
2
3
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.0.tar.gz

tar -xvf logstash-6.4.0.tar.gz

因為建立索引需要使用到 logstash-input-jdbc,所以先配置這個插件,這個插件在 logstash 5.0 之后就默認自帶了,無需再次安裝

配置 logstash同步之全量同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
## 進入 logstash 根目錄
cd /usr/local/es/logstash-6.4.0

## 創建存放 logstash-input-jdbc 相關配置以及依賴的目錄
mkdir logstash-input-jdbc

cd logstash-input-jdbc

## 創建兩個目錄
mkdir lib & mkdir conf

## 下載 jdbc 所需的 mysql 驅動到 lib 目錄
wget http://central.maven.org/maven2/mysql/mysql-connector-java/6.0.6/mysql-connector-java-6.0.6.jar -P lib/

## 進入 conf/ 目錄
cd conf/

## 創建兩個文件,一個是 jdbc 的 sql,另一個是 logstash-input-jdbc 的配置文件

vim mysql2es.sql

首先編寫 sql, 構造索引所需的數據。由於是第一次同步到 es,所以進行全量同步,不過這里的全量也並非是一次性把查出庫內所有數據,而是偽全量的增量同步。

mysql2es.sql 內容為

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SELECT
`id`,
IFNULL( `update_time`, '1970-01-01 08:00:00' ) AS `updatetime`,
`status`,
`name`,
IFNULL( `intro`, '' ) AS `intro`,
`author`,
`words`,
`collection`,
`goods`,
`click`,
`site`,
`sort`,
`vip`,
`popularity`
FROM
book
WHERE
id >= :sql_last_value and id < :sql_last_value + 11

注意為了避免一次性查詢,對數據庫造成太大壓力,因此這里使用了增量的方式來完成初始化同步到 es,這里的 :sql_last_value 是 logstash 上一步同步的最后的值,這里為 id,也可以是時間。因為演示用的測試數據較少,所以每次只同步了 10 條記錄,如果實際使用,一次同步 1000 條比較合適,即 :sql_last_value + 1001

sql 查詢出的字段名要與索引字段名稱對應上,否則無法映射同步。

這里的 mysql2es.sql 將在 logstash-input-jdbc 中用到,創建文件 mysql2es.conf

1
vim mysql2es.conf

mysql2es.conf 的內容是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
input {
stdin {
}
jdbc {
# 數據庫
jdbc_connection_string => "jdbc:mysql://192.168.199.192:3306/novel"
# 用戶名密碼
jdbc_user => "root"
jdbc_password => "123456"
# jar包的位置
jdbc_driver_library => "/usr/local/es/logstash-6.4.0/logstash-input-jdbc/lib/mysql-connector-java-6.0.6.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 讀取這個sql
statement_filepath => "/usr/local/es/logstash-6.4.0/logstash-input-jdbc/conf/mysql2es.sql"
# 指定時區
jdbc_default_timezone => "Asia/Shanghai"
# 每分鍾執行一次同步(分 時 天 月 年),比如每十分鍾(*/10 * * * *)
schedule => "* * * * *"
#索引的類型
type => "book"

use_column_value => "true"
#tracking_column_type: 遞增字段的類型,numeric 表示數值類型, timestamp 表示時間戳類型
tracking_column_type => "numeric"
# 遞增字段
tracking_column => "id"
# 保存每次同步時遞增字段的最后一個值到這個文件
last_run_metadata_path => "/usr/local/es/logstash-6.4.0/logstash-input-jdbc/conf/full_sync"
}
}

filter {

json {
source => "message"
remove_field => ["message"]
}
}

output {
elasticsearch {
hosts => "192.168.199.192:9200"
# index 索引名
index => "novel"
# 需要關聯的數據庫中有有一個id字段,對應索引的id號
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}

啟動logstash同步

1
/usr/local/es/logstash-6.4.0/bin/logstash -f /usr/local/es/logstash-6.4.0/logstash-input-jdbc/conf/mysql2es.conf

啟動后稍等一會兒,如果配置正確會打印出執行的 sql:

1
SELECT `id`,IFNULL(`update_time`,'1970-01-01 08:00:00') AS `updatetime`,`status`,`name`,IFNULL(`intro`,'') AS `intro`,`author`,`words`,`collection`,`goods`,`click`,`site`,`sort`,`vip`,`popularity` FROM book WHERE id> 0 AND id< 0+11

查看 ElasticSearchHead 控制台,發現也已經有了 10 條索引數據。
由於每次同步 10 條,每分鍾同步一次,5分鍾后,測試的 50條記錄也已經全部被同步到 es 里了。

ctrl + c 停止同步進程,查看文件

1
cat /usr/local/es/logstash-6.4.0/logstash-input-jdbc/conf/full_sync

文件中的數字正是上次最后同步的 id

logstash增量同步

根據 id 進行同步,如果對數據庫中已被同步到 es 的數據進行了修改,這個數據也不會被同步更新到 es 當中去。

根據 id 每次同步 1000 條,這種同步方式也只適合第一次全量進行初始化時候使用,后續增量同步最好根據時間戳的方式完成。

修改 mysql2es.sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SELECT
`id`,
IFNULL( `update_time`, '1970-01-01 08:00:00' ) AS `updatetime`,
`status`,
`name`,
IFNULL( `intro`, '' ) AS `intro`,
`author`,
`words`,
`collection`,
`goods`,
`click`,
`site`,
`sort`,
`vip`,
`popularity`
FROM
book
WHERE
update_time >= convert_tz(:sql_last_value, '+00:00','-08:00')
order by update_time asc

注意這里使用到了 convert_tz 這個函數,原因是 logstash 會在同步時在最后同步時間增加 8 個小時,因此需要使用函數,減去 8 個小時才是正確的時間。 因為logstash 記錄最后一次同步的值是最后一條記錄的,所以,最好根據 update_time 進行升序排序,即取的值是離現在最近的時間。

修改 mysql2es.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
input {
stdin {
}
jdbc {
# 數據庫
jdbc_connection_string => "jdbc:mysql://192.168.199.192:3306/novel"
# 用戶名密碼
jdbc_user => "root"
jdbc_password => "123456"
# jar包的位置
jdbc_driver_library => "/usr/local/es/logstash-6.4.0/logstash-input-jdbc/lib/mysql-connector-java-6.0.6.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 讀取這個sql
statement_filepath => "/usr/local/es/logstash-6.4.0/logstash-input-jdbc/conf/mysql2es.sql"
# 指定時區
jdbc_default_timezone => "Asia/Shanghai"
# 每分鍾
schedule => "* * * * *"
#索引的類型
type => "book"

use_column_value => "true"
#tracking_column_type: 遞增字段的類型,numeric 表示數值類型, timestamp 表示時間戳類型
tracking_column_type => "timestamp"
# 遞增字段
tracking_column => "updatetime"
# 保存每次同步時遞增字段的最后一個值到這個文件
last_run_metadata_path => "/usr/local/es/logstash-6.4.0/logstash-input-jdbc/conf/incr_sync"
}
}

filter {

json {
source => "message"
remove_field => ["message"]
}
}

output {
elasticsearch {
hosts => "192.168.199.192:9200"
# index 索引名
index => "novel"
# 需要關聯的數據庫中有有一個id字段,對應索引的id號
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}

修改增量的字段為 updatetime,且字段類型設置為 timestamp,
指定記錄最后同步時間的文件為 incr_sync ,同時在 conf 目錄下創建這個文件 incr_sync,設置一下初始值,即同步大於設置這個值的數據。

1
2
3
4
5
6
7
8
9
cd /usr/local/es/logstash-6.4.0/logstash-input-jdbc/conf/
vim incr_conf

# 文件內容如下,這里具體時間根據你的情況自行修改

--- !ruby/object:DateTime '2018-09-04 16:47:14.000000000 Z'


## 注意修改只需要改 2018-09-04 16:47:14 這一部分,其他地方就不要改,改了格式可能會不對。

保存退出,執行同步命令

1
/usr/local/es/logstash-6.4.0/bin/logstash -f /usr/local/es/logstash-6.4.0/logstash-input-jdbc/conf/mysql2es.conf

過一會兒,正常輸出 sql 語句

1
SELECT `id`,IFNULL(`update_time`,'1970-01-01 08:00:00') AS `updatetime`,`status`,`name`,IFNULL(`intro`,'') AS `intro`,`author`,`words`,`collection`,`goods`,`click`,`site`,`sort`,`vip`,`popularity` FROM book WHERE update_time> '2018-09-04 16:47:14' order by update_time asc

incr_sync 文件中的值,會因為每次同步同步取的是最后一條記錄的值,所以最好對時間進行排序

配置文件中,當在input的jdbc下,增加type屬性時,會導致該索引下增加type字段。所以sql查詢出的字段不要用type,如果有,as成其他的名字,不然的話,這里判斷會有異常

logstash 同步后台運行

上面的運行命令如果推出終端或者按下 ctrl + c,同步就會終止,所以我們要讓同步任務在后台運行。

1
nouhup /usr/local/es/logstash-6.4.0/bin/logstash -f /usr/local/es/logstash-6.4.0/logstash-input-jdbc/conf/mysql2es.conf &

使用 nohup 和 * 號將要執行的語句包裹起來,就可以實現后台運行了,同時會在當前執行命令的目錄生成一個 nohup.out 的文件,這里 logstash 的運行日志會被寫入到這個文件當中。

1
tail -fn300 nohup.out

參考