最近一次數據遷移,需要將MySQL的數據導出、處理后導入到新表和ES。這里做個簡單記錄,方便后續查詢。
注: 為了寫文章方便及隱私安全,實際內容會有所簡化。例如表結構簡化、數據庫連接部分全部用 xxx 表示、目錄及文件名均為化名等。
實踐過程
原表:
book_db 庫
- b_book(id,create_time,update_time,price,title,intro)
新表:
book 庫
- book(id,price,title,create_time,update_time)
- book_ext(id,book_id,intro,create_time)
MySQL導出
mkdir -p /tmp/
# 導出原始數據
mysql -hxxx -uxxx -pxxx book_db --default-character-set=utf8 -e 'select id,create_time,update_time,price,title,intro from b_book' | sed 's/NULL//g' > /tmp/b_book.csv
sed 's/NULL//g'
是因為導出的數據有些字段存的NULL,新表不需要存儲NULL,所以去掉。
導出的數據每行默認以\t
分隔,第一行包含字段名。這里我們刪掉第一行:
sed -i '1d' /tmp/b_book.csv
數據處理
cd /tmp/
# 處理create_time,update_time,price,並生成文件 book.csv
cat b_book.csv | awk -F '\t' -v OFS=' @@@ ' '{gsub(/[-:]/," ",$2); $2=mktime($2);gsub(/[-:]/,"",$3);$3=mktime($3);$4=$4*100;$6="";print $0}' > book.csv
# 生成文件 book_ext.csv
cat b_book.csv | awk -F '\t' -v OFS=' @@@ ' '{print $1,$6}' > book_ext.csv
# 生成文件 book_es.csv
cat b_book.csv | awk -F '\t' -v OFS=' @@@ ' '{$4=$4*100;print $0}' > book_es.csv
因為原表里時間都是datetime格式,新表是時間戳格式,這里處理成時間戳格式。價格原表是以元為單位,這里*100
是為了處理成以分為單位。
-v OFS=' @@@ '
表示輸出的時候每列以@@@
為分隔符。原因是原表里的intro
字段存儲的是html,可能包含常用轉義字符,這里使用@@@
確保能正確分隔每列。
導入到MySQL
mysql -hxxx -uxxx -pxxx book
Load Data LOCAL InFile '/tmp/book.csv' Into Table book
character set utf8
Fields Terminated By ' @@@ ' Enclosed By '' Escaped By '' Lines Terminated By '\n'
(id,create_time,update_time,price,title);
Load Data LOCAL InFile '/tmp/book_ext.csv' Into Table book_ext
character set utf8
Fields Terminated By ' @@@ ' Enclosed By '' Escaped By '' Lines Terminated By '\n'
(book_id,intro);
說明:
- Terminated 字段分隔符(列分隔符)。一般是空格或者
\t
- Enclosed 字段括起字符。沒有為空字符即可
- Escaped 轉義字符。沒有為空字符即可
- Terminated 記錄分隔符(行結束符)
Into Table
代表插入,記錄已存在(唯一鍵約束)則失敗不再往下執行。Replace Into Table
代表覆蓋,記錄已存在則覆蓋(是整條記錄覆蓋,沒有列出的字段給默認值)。Ignore Into Table
遇到已存在直接跳過。
導入到ES
由於生產的book_es.csv
文件比較大,所以這里按20000條生成一個文件,防止文件過大,ES導入失敗。
cd /tmp/
awk '{filename = "book_es.csv." int((NR-1)/20000) ".csv"; print >> filename}' book_es.csv
ConvertBookToEs.php
是PHP腳本,生成ES批量導入的文件。見附錄。執行后生成很多book_es.csv.*.csv.json
文件。
php ConvertBookToEs.php
importToEs.sh
是ES批量導入腳本,如下:
#!/bin/bash
for file in `ls /tmp/book_es.csv.*.csv.json`
do
echo $file;
curl -XPOST http://xxx:9200/book/doc/_bulk -H "Content-Type: application/json" --data-binary "@$file" >> importToEs.log
done
執行腳本:
sh importToEs.sh
等待數分鍾,便執行完畢了。
CASE WHEN 按字段更新批量更新
格式示例:
更新單值:
UPDATE categories SET
display_order = CASE id
WHEN 1 THEN 3
WHEN 2 THEN 4
WHEN 3 THEN 5
END
WHERE id IN (1,2,3)
更新多值:
UPDATE categories SET
display_order = CASE id
WHEN 1 THEN 3
WHEN 2 THEN 4
WHEN 3 THEN 5
END,
title = CASE id
WHEN 1 THEN 'New Title 1'
WHEN 2 THEN 'New Title 2'
WHEN 3 THEN 'New Title 3'
END
WHERE id IN (1,2,3)
PHP封裝:
/**
* 批量更新函數
* @param $data array 待更新的數據,二維數組格式
* @param array $params array 值相同的條件,鍵值對應的一維數組
* @param string $field string 值不同的條件,默認為id
* @return bool|string
*/
function batchUpdate($data, $field, $table, $params = [])
{
if (!is_array($data) || !$field || !is_array($params)) {
return false;
}
//in條件
$in_fields = array_column($data, $field);
$in_fields = implode(',', array_map(function ($value) {
return "'" . $value . "'";
}, $in_fields));
$updates = parseUpdate($data, $field);
$where = parseParams($params);
$sql = sprintf("UPDATE `%s` SET %s WHERE `%s` IN (%s) %s;\n", $table, $updates, $field, $in_fields, $where);
return $sql;
}
/**
* 將二維數組轉換成CASE WHEN THEN的批量更新條件
* @param $data array 二維數組
* @param $field string 列名
* @return string sql語句
*/
function parseUpdate($data, $field)
{
$sql = '';
$keys = array_keys(current($data));
foreach ($keys as $column) {
if ($column == $field) {//去掉ID主鍵
continue;
}
$sql .= sprintf("`%s` = CASE `%s` \n", $column, $field);
foreach ($data as $line) {
$sql .= sprintf("WHEN '%s' THEN '%s' \n", $line[$field], $line[$column]);
}
$sql .= "END,";
}
return rtrim($sql, ',');
}
/**
* 解析where條件
* @param $params
* @return array|string
*/
function parseParams($params)
{
$where = [];
foreach ($params as $key => $value) {
$where[] = sprintf("`%s` = '%s'", $key, $value);
}
return $where ? ' AND ' . implode(' AND ', $where) : '';
}
調用示例:
$data = [
['id' => 1, 'parent_id' => 100, 'title' => 'A', 'sort' => 1],
['id' => 2, 'parent_id' => 100, 'title' => 'A', 'sort' => 3],
['id' => 3, 'parent_id' => 100, 'title' => 'A', 'sort' => 5],
['id' => 4, 'parent_id' => 100, 'title' => 'B', 'sort' => 7],
['id' => 5, 'parent_id' => 101, 'title' => 'A', 'sort' => 9],
];
echo batchUpdate($data, 'id', "post");
生成的SQL:
UPDATE `post` SET parent_id` = CASE `id`
WHEN '1' THEN '100'
WHEN '2' THEN '100'
WHEN '3' THEN '100'
WHEN '4' THEN '100'
WHEN '5' THEN '101'
END,`title` = CASE `id`
WHEN '1' THEN 'A'
WHEN '2' THEN 'A'
WHEN '3' THEN 'A'
WHEN '4' THEN 'B'
WHEN '5' THEN 'A'
END,`sort` = CASE `id`
WHEN '1' THEN '1'
WHEN '2' THEN '3'
WHEN '3' THEN '5'
WHEN '4' THEN '7'
WHEN '5' THEN '9'
END WHERE `id` IN ('1','2','3','4','5') ;
實現MySQL LOAD DATA按字段更新
為了將大量數據加載到MySQL中,LOAD DATA INFILE
是迄今為止最快的選擇。但是,雖然這可以以INSERT IGNORE
或REPLACE
的方式使用,但目前不支持ON DUPLICATE KEY UPDATE
。
如果我們想批量更新某個字段,ON DUPLICATE KEY UPDATE
如何使用LOAD DATA INFILE
模擬?
stackoverflow 上有網友給了答案。步驟是:
1)創建一個新的臨時表。
CREATE TEMPORARY TABLE temporary_table LIKE target_table;
2)從臨時表中刪除所有索引以加快速度。(可選)
SHOW INDEX FROM temporary_table;
DROP INDEX `PRIMARY` ON temporary_table;
DROP INDEX `some_other_index` ON temporary_table;
3)將CSV加載到臨時表中
LOAD DATA INFILE 'your_file.csv'
INTO TABLE temporary_table
Fields Terminated By '\t' Enclosed By '' Escaped By '' Lines Terminated By '\n'
(field1, field2);
4)使用ON DUPLICATE KEY UPDATE
復制數據
SHOW COLUMNS FROM target_table;
INSERT INTO target_table
SELECT * FROM temporary_table
ON DUPLICATE KEY UPDATE field1 = VALUES(field1), field2 = VALUES(field2);
MySQL將假定=
之前的部分引用INSERT INTO
子句中指定的列,第二部分引用SELECT
列。
5)刪除臨時表
DROP TEMPORARY TABLE temporary_table;
使用SHOW INDEX FROM
和SHOW COLUMNS FROM
此過程可以針對任何給定的表自動執行。
注:官方文檔里
INSERT ... SELECT ON DUPLICATE KEY UPDATE
語句被標記為基於語句的復制不安全。所以上述方案請在充分測試后再實施。詳見:
https://dev.mysql.com/doc/refman/5.6/en/insert-on-duplicate.html
附錄
ConvertBookToEs.php
<?php
/**
* 轉換wish_book為ES 批量格式(json)
*/
//id,create_time,update_time,price,title,intro
function dealBook($file)
{
$fp = fopen($file, 'r');
while (!feof($fp)) {
$line = explode(' @@@ ', fgets($fp, 65535));
if ($line && isset($line[1])) {
$arr_head = [
'index' => [
'_id' => (int)$line[0]
]
];
$arr = [
'id' => (int)$line[0],
'create_time' => strtotime($line[1]),
'update_time' => strtotime($line[2]),
'price' => intval($line[3]),
'title' => (string)$line[4],
'intro' => (string)$line[18],
];
file_put_contents($file . '.json', json_encode($arr_head, JSON_UNESCAPED_UNICODE) . PHP_EOL, FILE_APPEND);
file_put_contents($file . '.json', json_encode($arr, JSON_UNESCAPED_UNICODE) . PHP_EOL, FILE_APPEND);
}
}
}
try {
//處理CSV文件為es bluk json格式
//參考 https://www.elastic.co/guide/en/elasticsearch/reference/current/_batch_processing.html
$files = glob("/tmp/book_es.csv.*.csv");
if (false === $files) {
exit("can not find csv file");
}
$pids = [];
foreach ($files as $i => $file) {
$pid = pcntl_fork();
if ($pid < 0) {
exit("could not fork");
}
if ($pid > 0) {
$pids[$pid] = $pid;
} else {
echo time() . " new process, pid:" . getmypid() . PHP_EOL;
dealBook($file);
exit();
}
}
while (count($pids)) {
foreach ($pids as $key => $pid) {
$res = pcntl_waitpid($pid, $status, WNOHANG);
if ($res == -1 || $res > 0) {
echo 'Child process exit,pid ' . $pid . PHP_EOL;
unset($pids[$key]);
}
}
sleep(1);
}
} catch (Exception $e) {
$message = $e->getFile() . ':' . $e->getLine() . ' ' . $e->getMessage();
echo $message;
}
參考
1、Linux命令行文本工具 - 飛鴻影~ - 博客園
https://www.cnblogs.com/52fhy/p/5836429.html
2、mysqldump 導出 csv 格式 --fields-terminated-by=, :字段分割符; - superhosts的專欄 - CSDN博客
https://blog.csdn.net/superhosts/article/details/26054997
3、Batch Processing | Elasticsearch Reference [6.4] | Elastic
https://www.elastic.co/guide/en/elasticsearch/reference/current/_batch_processing.html
4、mysql導入數據load data infile用法整理 - conanwang - 博客園
https://www.cnblogs.com/conanwang/p/5890753.html
5、MySQL LOAD DATA INFILE with ON DUPLICATE KEY UPDATE - Stack Overflow
https://stackoverflow.com/questions/15271202/mysql-load-data-infile-with-on-duplicate-key-update
6、mysql - INSERT INTO ... SELECT FROM ... ON DUPLICATE KEY UPDATE - Stack Overflow
https://stackoverflow.com/questions/2472229/insert-into-select-from-on-duplicate-key-update
7、MySQL :: MySQL 5.6參考手冊:: 13.2.5.2 INSERT ... ON DUPLICATE KEY UPDATE語法
https://dev.mysql.com/doc/refman/5.6/en/insert-on-duplicate.html
8、復制表結構和數據SQL語句 - becket - 博客園
https://www.cnblogs.com/zhengxu/articles/2206894.html
9、MySQL批量更新數據 - 夢想_行人 - 博客園
https://www.cnblogs.com/ldj3/p/9288187.html