很多情況大數據集群需要獲取業務數據,用於分析。通常有兩種方式:
- 業務直接或間接寫入的方式
- 業務的關系型數據庫同步到大數據集群的方式
第一種可以是在業務中編寫代碼,將覺得需要發送的數據發送到消息隊列,最終落地到大數據集群。
第二種則是通過數據同步的方式,將關系型數據同步到大數據集群,可以是存儲在 hdfs 上,使用 hive 進行分析,或者是直接存儲到 hbase 中。
其中數據同步又可以大致分為兩種:增量同步、CRUD 同步。
增量同步是只將關系型數據庫中新增的數據進行同步,對於修改、刪除操作不進行同步,這種同步方式適用於那些一旦生成就不會變動的數據。 CRUD 同步則是數據的增、刪、改都需要進行同步,保證兩個庫中的數據一致性。
本文不講 binlog + Canal + 消息隊列 + JAR 實現數據實時同步的方案,也不講使用 Sqoop 進行離線同步。而是講解如何使用 Streamsets 零代碼完成整個實時同步流程。關於 Streamsets 具體是什么,以及能做哪些其他的事情,大家可以前往 Streamsets 官網進行了解。從筆者了解的信息,在數據同步方面 Streamsets 十分好用。 很多初學者,對大數據的概念都是模糊不清的,大數據是什么,能做什么,學的時候,該按照什么線路去學習,學完往哪方面發展,想深入了解,想學習的同學歡迎加入大數據學習扣扣君:四一零加上三九壹連起來七四四,有大量干貨(零基礎以及進階的經典實戰)分享給大家,並且有清華大學畢業的資深大數據講師給大家免費授課
要實現 mysql 數據的實時同步,首先我們需要打開其 binlog 模式,具體怎么操作網上有很多教程,這里就不進行闡述了。
那么,現在就直接進入正題吧。
安裝
下載
Streamsets 可以直接從官網下載: archives.streamsets.com
這里安裝的是 Core Tarball 格式,當然你也可以直接選擇下載 Full Tarball、Cloudera Parcel 或者其他格式。下載 Core Tarball 的好處是體積小,后期需要什么庫的時候可以自行在 Streamsets Web 頁進行下載。相對於 Core Tarball,Full Tarball 默認幫你下載了很多庫,但是文件體積相對較大(>4G),並且可能很多庫我們暫時使用不到。
解壓啟動
Streamsets Core Tarball 下載好后,直接解壓就可以使用,非常方便。
tar xvzf streamsets-datacollector-core-3.7.1.tgz
cd streamsets-datacollector-3.7.1/bin/ ./streamsets dc 復制代碼
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS Configuration of maximum open file limit is too low: 1024 (expected at least 32768). Please consult https://goo.gl/6dmjXd 復制代碼
如果在運行的時候遇到上面的報錯,修改操作系統的 open files 限制數量即可。
#vi /etc/security/limits.conf 復制代碼
添加兩行內容:
- soft nofile 65536
- hard nofile 65536
運行 'ulimit -n' 既可以看到 open files 設置值已生效。
Web 頁
Streamsets 擁有一個 Web 頁,默認端口是 18630。瀏覽器中輸入 ip:18630 即可進入 streamsets 的頁面,默認用戶名、密碼都是 admin。
Pipeline
准備工作
因為需要將 mysql 的數據實時同步到 hbase 中,但是下載的 Core Tarball 中沒有 MySQL Binary Log 以及 hbase 兩個 stage library,所以在 create new pipeline 之前需要先安裝它們。
安裝 MySQL Binary Log 庫
安裝 Hbase 庫,這里注意一下,hbase 庫位於 CDH 中,所以選擇一個 CDH 版本進行安裝
安裝好后在 Installed Stage Libraries 中就能看到已經安裝了 MySQL Binary Log 和 Hbase
創建 Pipeline
MySQL Binary Log
創建一個 MySQL Binary Log
設置 mysql 的連接參數(Hostname, Port 以及 Server ID),這里的 Server ID 與 mysql 配置文件(一般是 /etc/my.cnf)中的 server-id 保持一致
設置 mysql 的用戶名、密碼
其他設置:我們在 Include Tables 欄設置了兩張表(表與表之間用逗號隔開),意思是監控這兩張表的數據變化,其他表不關心。
Stream Selector
創建一個 Stream Selector,並將剛剛創建的 MySQL Binary Log 指向這個 Stream Selector。 設置過濾條件, 比如說 ${record:value("/Table")=='cartype'} 就是過濾 cartype 表。
可以看到 Stream Selector 有兩個出口(1 和 2),后面我們將會看到: 1 輸出到 Hbase, 2 數據到 Trash
Hbase & Trash
分別創建 Hbase 和 Trash,連接到 Stream Selector 上
配置 Hbase
Trash 無需進行配置
驗證 & 啟動
驗證
點擊右上角的“眼鏡”,驗證整個流程是否有問題。
這里報錯:"java.lang.RuntimeException:Unable to get driver instance for jdbcUrl"。這個報錯的原因是缺少 mysql 連接的 jar 包。解決起來也很簡單,下載一個 jar 包然后放到 streamsets 指定的目錄下。我這邊的完整目錄是:/opt/streamsets/streamsets-datacollector-3.7.1/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib/mysql-connector-java-5.1.26-bin.jar, mysql-connector-java-5.1.26-bin.jar 就是我下載的 jar 包。
還有一點就是事先要將 Hbase 中相對應的表創建好,不然驗證會提示錯誤。
接着在頁面上重啟 streamsets 即可。
重新驗證,發現成功了。
點擊右上角播放標簽,啟動流程,這樣整個流程就已經完成了(數據已經在進行實時同步),查看各個 Stage 既可以看到有多少數據流入,多少數據流出。也可以直接進入 hbase 數據庫中查看是否有數據生成。
以上就是如何使用 Streamsets 實時同步 mysql 數據到 hbase 中的整個操作流程。大家肯定發現了,整個流程沒有編寫任何的代碼,相對於 binlog + Canal + 消息隊列 + JAR 的方案是不是高效一些呢。當然任何方案都會有優缺點,Streamsets 這種方案的更多實際體驗還需要更多的觀察。