轉載原文地址:
https://anjia0532.github.io/2019/07/17/mysql-to-clickhouse/
1
create table engin mysql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);
官方文檔:
https://clickhouse.yandex/docs/en/operations/table_engines/mysql/
注意,實際數據存儲在遠端mysql數據庫中,可以理解成外表。
可以通過在mysql增刪數據進行驗證。
2
insert into select from
-- 先建表
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = engine
-- 導入數據
INSERT INTO [db.]table [(c1, c2, c3)] select 列或者* from mysql('host:port', 'db', 'table_name', 'user', 'password')
可以自定義列類型,列數,使用clickhouse函數對數據進行處理,比如
select toDate(xx) from mysql("host:port","db","table_name","user_name","password")
3
create table as select from
CREATE TABLE [IF NOT EXISTS] [db.]table_name
ENGINE =Log
AS
SELECT *
FROM mysql('host:port', 'db', 'article_clientuser_sum', 'user', 'password')
網友文章:
http://jackpgao.github.io/2018/02/04/ClickHouse-Use-MySQL-Data/
不支持自定義列,參考資料里的博主寫的ENGIN=MergeTree 測試失敗。
可以理解成 create table 和 insert into select 的組合
4
Altinity/clickhouse-mysql-data-reader
Altinity公司開源的一個python工具,用來從mysql遷移數據到clickhouse(支持binlog增量更新和全量導入),但是官方readme和代碼脫節,根據quick start跑不通。
## 創建表
clickhouse-mysql \
--src-host=127.0.0.1 \
--src-user=reader \
--src-password=Qwerty1# \
--table-templates-with-create-database \
--src-table=airline.ontime > create_clickhouse_table_template.sql
## 修改腳本
vim create_clickhouse_table_template.sql
## 導入建表
clickhouse-client -mn < create_clickhouse_table_template.sql
## 數據導入
clickhouse-mysql \
--src-host=127.0.0.1 \
--src-user=reader \
--src-password=Qwerty1# \
--table-migrate \
--dst-host=127.0.0.1 \
--dst-table=logunified \
--csvpool
官方文檔:
https://github.com/Altinity/clickhouse-mysql-data-reader#mysql-migration-case-1—migrate-existing-data
注意,上述三種都是從mysql導入clickhouse,如果數據量大,對於mysql壓力還是挺大的。下面介紹兩種離線方式(streamsets支持實時,也支持離線)
csv
## 忽略建表
clickhouse-client \
-h host \
--query="INSERT INTO [db].table FORMAT CSV" < test.csv
但是如果源數據質量不高,往往會有問題,比如包含特殊字符(分隔符,轉義符),或者換行。被坑的很慘。
- 自定義分隔符, --format_csv_delimiter="|"
- 遇到錯誤跳過而不中止, --input_format_allow_errors_num=10 最多允許10行錯誤, --input_format_allow_errors_ratio=0.1 允許10%的錯誤
- csv 跳過空值(null) ,報
Code: 27. DB::Exception: Cannot parse input: expected , before: xxxx: (at row 69) ERROR: garbage after Nullable(Date): "8,002<LINE FEED>0205" sed ' :a;s/,,/,\\N,/g;ta' |clickhouse-client -h host --query "INSERT INTO [db].table FORMAT CSV"
將 ,, 替換成 ,\N,
python clean_csv.py --src=src.csv --dest=dest.csv --chunksize=50000 --cols --encoding=utf-8 --delimiter=,
clean_csv.py參考我另外一篇《032-csv文件容錯處理》
https://anjia0532.github.io/2019/07/16/clean-csv/
5
StreamSets
streamsets支持從mysql或者讀csv全量導入,也支持訂閱binlog增量插入,參考我另外一篇《025-大數據ETL工具之StreamSets安裝及訂閱mysql binlog》。
https://anjia0532.github.io/2019/06/10/cdh-streamsets/
本文只展示從mysql全量導入clickhouse 本文假設你已經搭建起streamsets服務

啟用並重啟服務

上傳mysql和clickhouse的jdbc jar和依賴包
便捷方式,創建pom.xml,使用maven統一下載
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.anjia</groupId>
<artifactId>demo</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>demo</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.1.54</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
</project>
如果本地裝有maven,執行如下命令
mvn dependency:copy-dependencies -DoutputDirectory=lib -DincludeScope=compile
所有需要的jar會下載並復制到lib目錄下

然后拷貝到 streamsets /opt/streamsets-datacollector-3.9.1/streamsets-libs-extras/streamsets-datacollector-jdbc-lib/lib/ 目錄下

重啟streamsets服務







