基於datax的數據同步平台


一、需求

         由於公司各個部門對業務數據的需求,比如進行數據分析、報表展示等等,且公司沒有相應的系統、數據倉庫滿足這些需求,最原始的辦法就是把數據提取出來生成excel表發給各個部門,這個功能已經由腳本轉成了平台,交給了DBA使用,而有些數據分析部門,則需要運維把生產庫的數據同步到他們自己的庫,並且需要對數據進行脫敏,比如客戶的身份證號、手機號等等,且數據來源分散在不同的機器,不同的數據庫實例里,這樣就無法使用MySQL的多源復制,只能用寫腳本通過SQL語句實現,隨着業務的發展,導致堆積到運維部門的同步數據任務越來越多,一個任務對應一個腳本,有的腳本多達20多張表,腳本超過10個以后,每次同步失敗、或者對腳本里的參數進行增刪改查,都要從10多個腳本里的10多個SQL去找,這是一件非常痛苦的事情,耗費時間、沒有效率,且容易改錯,是一件吃力不討好的事。為此開發了一個數據同步平台,將同步任務的增刪改查、執行的歷史日志全部放到平台里,然后交給DBA去自己去操作。

         市面上也有一些ETL工具,比如kettle,但是為了練手決定重新造輪子。

二、平台簡介

          平台主要用於數據同步、數據處理等等ETL操作。

          平台基於阿里的開源同步工具datax3.0開發。

          開發語言:Python、Django、celery、bootstrap、jquery

          系統:Centos 7  64位

          注意:時間緊迫,平台只支持MySQL數據庫,其它的sqlserver等等后期再開發。

          datax3.0 介紹:https://yq.aliyun.com/articles/59373

          datax3.0 github 地址:https://github.com/alibaba/DataX

          項目地址:https://github.com/hanson007/FirstBlood

三、功能模塊

         1、數據同步

               主要用於數據同步

         2、SQL腳本(后期開發,包括備份模塊等等。)

               保存並執行各種增刪改查SQL語句。

         3、批處理作業

               將數據同步、SQL腳本等等各個模塊的子任務組合成一個批處理作業。借鑒了數據庫客戶端工具Navicat Premium 的批處理作業功能。

               支持作業定時調度。

         4、數據庫管理工具web界面后期開發

              主要用於管理生產數據庫的IP、用戶名、密碼等等信息,供其它模塊調用。

              目前模塊的表已建好,生產庫的信息需要通過其它平台同步或者用數據庫客戶端工具導入,web界面的增刪改查后期開發。目前生產環境里是將其它平台保存的所有生產庫IP、用戶名、密碼等等信息同步到此平台里。

         5、接口

               提供查詢批處理作業執行歷史的接口,供其它部門使用。(主要還是大數據部門,他們寫了一個程序,根據我這邊每次同步后的結果,是成功還是失敗,再進行下一步的操作。)

               后續接口按業務部門的需求再開發。

         6、權限(Django自帶)

               平台管理員賬號擁有模塊的所有權限,僅供運維部門使用。

               普通人員賬號只能查看數據同步、批處理作業,以及執行歷史,不能新增、修改、執行作業或任務。主要提供給業務部門使用。

               查看批處理作業的執行歷史接口沒有權限控制,普通人員也能調用。

四、表結構設計

          1、生產數據庫信息

                功能:主要用於保存各種生產庫的 ip、用戶名、密碼等等信息。

                表名:databaseinfo

           

名稱 類型 約束條件 說明
id int 不允許為空 自增主鍵
name varchar 不允許為空、不允許重復 生產庫英文標識。
description varchar 不允許為空 生產庫的業務信息描述
host varchar 不允許為空、不允許重復 生產庫的IP地址。
user varchar 不允許為空 生產數據庫的用戶名
passwd varchar 不允許為空 生產數據庫的密碼
db varchar 不允許為空 生產數據庫中的某一個庫
type varchar 不允許為空 生產數據庫類型。 比如MySQL、sqlserver
create_time datetime 不允許為空 創建時間,默認為當前時間
modify_time datetime 不允許為空 修改時間,默認為當前時間,數據變化時自動改為當前時間。

          2.數據庫同步任務

            功能:用於保存數據庫同步任務的各種參數,主要為datax的json配置文件里的各種參數。

            表名:datax_job

名稱 類型 約束條件 說明
id int 不允許為空 自增主鍵
name varchar 不允許為空,不允許重復 數據同步任務的英文標識
description varchar 不允許為空 任務的詳細描述
querySql longtext 不允許為空 提取數據時的查詢SQL
reader_databaseinfo_id int 不允許為空 讀取數據庫(從哪個生產庫執行SQL提取數據,對應databaseinfo表的主鍵)
writer_table varchar 不允許為空 寫入表名(提取的數據插入到哪張表里)
writer_databaseinfo_id int 不允許為空 寫入數據庫(提數據的數據插入到哪個數據庫里)
writer_preSql longtext 允許為空 寫入前執行的SQL(比如同步數據前需要清空寫入的表)
writer_postSql longtext 允許為空 寫入后執行的SQL(比如同步完數據后需要再結合其它表執行數據分析)
create_time datetime 不允許為空 創建時間,默認為當前時間
modify-time datetime 不允許為空 修改時間,默認為當前時間,數據變化時自動改為當前時間。

        3.寫入表的列信息

          功能:保存同步任務時寫入到表的哪些列。比如寫入表有20個字段,此時只需要往其中的10個字段寫入信息,就需要保存這10個列名。

                     注意:* 星號代碼寫入到表的所有字段。

          表名:datax_job_writer_column

名稱 類型 約束條件 說明
id int 不允許為空 自增主鍵
name varchar 不允許為空 列名
datax_job_id int 不允許為空 數據同步任務ID,關聯datax_job表的主鍵。
create_time datetime 不允許為空 創建時間,默認為當前時間
modify_time datetime 不允許為空 修改時間,默認為當前時間,隨着數據的變化而變為當前時間。

       4.數據同步任務實例

          功能:用於保存數據同步任務的執行歷史。

                    方便自己及業務部門進行任務的分析和排錯,省的每次同步失敗后還得幫他們查日志。現在直接將日志記錄表里,在平台開個賬號后,讓業務部門自己去查。

                    每一個數據同步任務執行后,可以看成是一個實例,類似面向對象里實例化。將任務的執行時間、執行結果等等保存起來。借鑒了騰訊藍鯨的作業平台表結構設計思想。(麻花藤啊麻花藤,給你沖了幾十年的點卡,終於是回了一點點利息。)

          表名:datax_job_instance

          說明:instance_id也對應datax生成的日志文件名,當需要在頁面查看datax生成的日志時就通過instance_id去查找日志文件,並將其實時輸出到頁面。

名稱 類型 約束條件 說明 
id int 不允許為空  自增主鍵 
instance_id  bigint 任務實例ID ,不允許重復 任務實例ID(由datax_job的id號+13位時間戳組成)
name varchar 不允許為空 任務名稱 (執行時,datax_job表的name,同下面的字段一樣) 
description varchar 不允許為空 任務描述
querySql longtext 不允許為空  查詢SQL語句
reader_databaseinfo_host varchar 不允許為空 讀取數據庫IP
reader_databaseinfo_description varchar 不允許為空 讀取數據庫描述
writer_table varchar 不允許為空 寫入表
writer_databaseinfo_host varchar 不允許為空 寫入數據庫IP
writer_databaseinfo_description varchar 不允許為空 寫入數據庫描述
writer_preSql longtext 允許為空 寫入數據前執行的SQL語句
writer_postSql longtext 允許為空 寫入數據后執行的SQL語句
trigger_mode int 不允許為空 觸發模式 1 自動 2 手動(默認自動)
status int 不允許為空 狀態 0 正在執行 1 執行完成
result int 不允許為空 執行結果 0 成功 1 失敗 2 未知
start_time datetime 不允許為空 開始時間
end_time datetime 允許為空 結束時間

        5.批處理作業

         功能:保存批處理作業。

         表名:batch_job

名稱 類型 約束條件 說明
id int 不允許為空 自增主鍵
name varchar 不允許為空,不允許重復 名稱
description varchar 不允許為空 描述
create_time datetime 不允許為空 創建時間
modify_time datetime 不允許為空 修改時間

        6.批處理作業詳情

         功能:保存批處理作業的各個子任務。

                    比如一個批處理作業包含8個數據同步任務,一個SQL腳本任務,則將這幾個任務的id保存起來。

         表名:batch_job_details

         說明:字段subjob_id,對應其它子任務的ID。比如,類型為數據同步,則對應datax_job表的主鍵。類型為SQL腳本,則對應SQL腳本表的主鍵。(SQL腳本后期開發)

名稱 類型 約束條件 說明
id int 不允許為空 自增主鍵
batch_job_id int 不允許為空 批處理作業ID,對應batch_job表的主鍵
subjob_id int 不允許為空 子作業ID,對應其它子任務的主鍵
type int 不允許為空 類型 1 數據同步 2 SQL腳本 3 備份。 主要用於后期擴展
create_time datetime 不允許為空 創建時間
modify_time datetime 不允許為空 修改時間

        7.批處理作業執行實例

        功能:保存批處理作業的執行歷史日志。功能同數據同步實例一樣。

        表名:batch_job_instance

名稱 類型 約束條件 說明
id int 不允許為空 自增主鍵
instance_id bigint 不允許為空、不允許重復 實例ID(由batch_job表的id號+13位時間戳組成)
name varchar 不允許為空 名稱
description varchar 不允許為空 描述
trigger_mode int 不允許為空 觸發模式 1 自動 2 手動(默認自動)
status int 不允許為空 狀態 0 正在執行 1 執行完成
result int 不允許為空 執行結果 0 成功 1 失敗 2 未知
start_time datetime 不允許為空 開始時間
end_time datetime 不允許為空 結束時間

         8.批處理作業執行實例詳情

         功能:保存批處理作業執行實例的各個子任務實例

         表名:batch_job_instance_details

         說明:每個批處理作業執行時,實際是執行各個其它功能模塊的子任務,而每個子任務都會保存子任務實例ID。

                   比如一個批處理作業有8個數據同步任務,1個備份任務(后期開發),執行后,datax_job_instance表會保存這8個數據同步任務的實例,備份實例表則保存備份實例ID。然后再將8個同步任務實例的ID及1個備份實例ID保存到batch_job_instance_details表里,查詢時只要通過各個子任務的實例ID關聯查詢。

名稱 類型 約束條件 說明
id int 不允許為空 自增主鍵
instance_id bigint 不允許為空 實例ID,對應batch_job_instance表的instance_id
subjob_instance_id bigint 不允許為空 子作業實例ID,比如datax_job_instance表的instance_id
type int 不允許為空 類型 1 數據同步 2 SQL腳本 3 備份。 主要用於后期擴展

      9.建表語句

          

/*
* 創建數據庫
*/
create database FirstBlood default character set utf8 collate utf8_bin;


/*
* 數據庫信息
*/
CREATE TABLE `databaseinfo` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL COMMENT '名稱',
  `description` varchar(255) DEFAULT NULL COMMENT '描述',
  `host` varchar(255) DEFAULT NULL COMMENT '主機',
  `user` varchar(255) DEFAULT NULL COMMENT '用戶',
  `passwd` varchar(255) DEFAULT NULL COMMENT '密碼',
  `db` varchar(255) DEFAULT NULL COMMENT '數據庫',
  `type` varchar(255) DEFAULT NULL COMMENT '類型',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
  `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `databaseinfo_host_c254f05e_uniq` (`host`),
  UNIQUE KEY `databaseinfo_name_a3bc8190_uniq` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=28 DEFAULT CHARSET=utf8 COMMENT='數據庫信息';


/*
* 數據同步任務
*/
drop table if exists `datax_job`;
CREATE TABLE `datax_job` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL COMMENT '名稱',
  `description` varchar(255) DEFAULT NULL COMMENT '描述',
  `querySql` longtext COLLATE utf8_bin NOT NULL COMMENT '查詢SQL語句',
  `reader_databaseinfo_id` int(11) NOT NULL COMMENT '讀取數據庫',
  `writer_table` varchar(255) DEFAULT NULL COMMENT '寫入表名',
  `writer_databaseinfo_id` int(11) NOT NULL COMMENT '寫入數據庫',
  `writer_preSql` longtext COLLATE utf8_bin NOT NULL COMMENT '寫入數據前執行的SQL語句',
  `writer_postSql` longtext COLLATE utf8_bin NOT NULL COMMENT '寫入數據后執行的SQL語句',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
  `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `datax_job_name_uniq` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=28 DEFAULT CHARSET=utf8 COMMENT='datax數據同步任務';


/*
* 寫入表的列信息
*/
drop table if exists `datax_job_writer_column`;
CREATE TABLE `datax_job_writer_column` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL COMMENT '列名',
  `datax_job_id` int(11) NOT NULL COMMENT '數據同步任務ID',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
  `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=28 DEFAULT CHARSET=utf8 COMMENT='寫入表的列信息';


/*
* 數據同步任務實例
*/
drop table if exists `datax_job_instance`;
CREATE TABLE `datax_job_instance` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `instance_id` bigint(20) NOT NULL COMMENT '任務實例ID',
  `name` varchar(255) DEFAULT NULL COMMENT '名稱',
  `description` varchar(255) DEFAULT NULL COMMENT '描述',
  `querySql` longtext COLLATE utf8_bin NOT NULL COMMENT '查詢SQL語句',
  `reader_databaseinfo_host` varchar(255) NOT NULL COMMENT '讀取數據庫IP',
  `reader_databaseinfo_description` varchar(255) NOT NULL COMMENT '讀取數據庫描述',
  `writer_table` varchar(255) DEFAULT NULL COMMENT '寫入表名',
  `writer_databaseinfo_host` varchar(255) NOT NULL COMMENT '寫入數據庫IP',
  `writer_databaseinfo_description` varchar(255) NOT NULL COMMENT '寫入數據庫描述',
  `writer_preSql` longtext COLLATE utf8_bin NOT NULL COMMENT '寫入數據前執行的SQL語句',
  `writer_postSql` longtext COLLATE utf8_bin NOT NULL COMMENT '寫入數據后執行的SQL語句',
  `trigger_mode` int(2) DEFAULT '1' COMMENT '觸發模式 1 自動 2 手動(默認自動)',
  `status` int(2) DEFAULT '0' COMMENT '狀態 0 正在執行 1 執行完成',
  `result` int(2) DEFAULT '2' COMMENT '執行結果 0 成功 1 失敗 2 未知',
  `start_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '開始時間',
  `end_time` datetime DEFAULT NULL COMMENT '結束時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `datax_job_instance_id_uniq` (`instance_id`)
) ENGINE=InnoDB AUTO_INCREMENT=28 DEFAULT CHARSET=utf8 COMMENT='datax數據同步任務實例';


/*
* 批處理作業
*/
drop table if exists `batch_job`;
CREATE TABLE `batch_job` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL COMMENT '名稱',
  `description` varchar(255) DEFAULT NULL COMMENT '描述',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
  `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `batch_job_name_uniq` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=28 DEFAULT CHARSET=utf8 COMMENT='批處理作業';


/*
* 批處理作業詳情
*/
drop table if exists `batch_job_details`;
CREATE TABLE `batch_job_details` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `batch_job_id` int(11) NOT NULL COMMENT '批處理作業ID',
  `subjob_id` int(11) NOT NULL COMMENT '子作業ID',
  `type` int(2) NOT NUll COMMENT '類型 1 數據同步 2 SQL腳本 3 備份。 主要用於后期擴展',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
  `modify_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=28 DEFAULT CHARSET=utf8 COMMENT='批處理作業詳情';


/*
* 批處理作業執行實例
*/
drop table if exists `batch_job_instance`;
CREATE TABLE `batch_job_instance` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `instance_id` bigint(20) NOT NULL COMMENT '實例ID',
  `name` varchar(255) DEFAULT NULL COMMENT '名稱',
  `description` varchar(255) DEFAULT NULL COMMENT '描述',
  `trigger_mode` int(2) DEFAULT '1' COMMENT '觸發模式 1 自動 2 手動(默認自動)',
  `status` int(2) DEFAULT '0' COMMENT '狀態 0 正在執行 1 執行完成',
  `result` int(2) DEFAULT '2' COMMENT '執行結果 0 成功 1 失敗 2 未知',
  `start_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '開始時間',
  `end_time` datetime DEFAULT NULL COMMENT '結束時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `batch_job_instance_id_uniq` (`instance_id`)
) ENGINE=InnoDB AUTO_INCREMENT=28 DEFAULT CHARSET=utf8 COMMENT='批處理作業執行實例';


/*
* 批處理作業執行實例詳情
*/
drop table if exists `batch_job_instance_details`;
CREATE TABLE `batch_job_instance_details` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `instance_id` bigint(20) NOT NULL COMMENT '實例ID',
  `subjob_instance_id` bigint(20) NOT NULL COMMENT '子作業實例ID',
  `type` int(2) NOT NUll COMMENT '類型 1 數據同步 2 SQL腳本 3 備份。 主要用於后期擴展',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=28 DEFAULT CHARSET=utf8 COMMENT='批處理作業執行實例詳情';

 

  

 

五、功能詳解

2、數據同步

            功能:底層使用阿里的datax3.0工具進行同步。可以新增、修改同步任務。每個任務對應一張表。在頁面添加任務后,執行時就在后台生成基於datax3.0的json配置文件。並且可以實時查看datax生成的同步日志,也可以查看任務的執行歷史。

                 衍生:增量同步

                                 需要源表里增加時間戳字段,兩種方案。

                              (1)如果歷史數據不變,每次只同步前一天的數據。

                              (2)如果歷史數據變化,需要在目標庫里加一張臨時表,每次同步時將前一天或前一個小時的時間戳有變化的數據插入到臨時表里。再將臨時表里的數據更新或插入到目標表里。

            操作

                (1)首頁           

                         點擊“數據同步->作業”,進入數據同步首頁,可以查看所有的數據同步任務

 

 

               (2)新增同步任務

                        點擊首頁的“新增”按鈕,進入新增任務頁面,填完表單后點擊保存。

 

(3)     更新、運行同步任務

              在數據同步首頁點擊“任務名稱”,進入任務更新頁面。可以對任務的SQL、數據庫等等信息進行修改。

(1)     執行任務

              在更新頁面點擊“Run”按鈕,可以執行任務。

(1)執行歷史

點擊“數據同步->執行歷史”,在執行歷史首頁可以查看數據同步任務的執行歷史,並且可以按照任務名稱、描述、讀取數據庫、執行狀態等等進行搜索。

衍生:由於執行歷史是一個日志記錄,隨着時間推移,數據量會越來越多,為了減小平台數據庫的壓力,按照業務量大小可以只保存一年、或者半年的數據。

(1)同步日志

在執行歷史首頁點擊“任務名稱”,可以實時查看同步日志。

日志是由工具datax生成的日志文件,文件名為執行時任務的ID號+13位時間戳組成。平台只保存文件名,查看日志時,后台通過文件名將日志文件內容實時輸出到頁面。

2.批處理作業

   功能描述:

         將數據同步、SQL腳本(3.0版本后期開發)等等子任務組合成一個批處理作業,並發執行。並且支持linux crontab格式的定時執行。

         時間緊迫,暫時不支持任務串行,或者任務之間的依賴,比如A執行完成,並且成功后才能執行B,類似功能后期3.0版本開發。

  操作

(1)批處理作業首頁

          點擊“批處理作業->作業列表”,進入批處理作業首頁

(2)新增批處理作業

點擊“新增”按鈕,進入新增批處理作業頁面。

選擇“執行時間”、勾選“是否啟用”等等參數,填好表單后點擊保存。后台會根據執行時間自動執行。

(3)更新、運行批處理作業

在批處理作業首頁點擊“任務名稱”后,進入更新頁面,可以修改批處理作業參數。

點擊“Save”按鈕,保存更新后的批處理作業。

在更新頁面點擊“Run”按鈕可手動執行批處理作業。

(4)執行歷史

點擊“批處理作業->執行歷史”,即可進入批處理作業 - 執行歷史首頁。

可以按照任務名稱、執行結果等等搜索歷史的執行作業。

點擊“任務名稱”進入批處理作業詳情 - 執行歷史,可查看批處理作業執行時它的子任務。

(5)執行日志

在“批處理作業詳情 - 執行歷史”頁面,點擊“任務名稱”可查看每個子任務的日志。如類型為數據同步的子任務,它的日志就是調的datax的日志文件內容。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM