ETL工具 DataX數據同步,LINUX CRONTAB 定時調度


DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平台,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、OTS、ODPS 等各種異構數據源之間高效的數據同步功能。

例子:

全量從MYSQL 同步到MYSQL

{
  "job": {
  "content":[
  {
    "reader":{
    "name":"mysqlreader",
    "parameter":{
    "connection":[
      {
        "querySql":["SELECT * FROM TABEL"],
        "jdbcUrl":["jdbc:mysql://127.0.0.1:3306/dsjglpt"],
      }
    ],

    "username":"root",
    "password":"123456",
    "mandatoryEncoding":"UTF-8"
    }
    },
    "writer":{
      "name":"mysqlwriter",
      "parameter":{
      "column": ["*"],
      "connection":[
        {
          "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/dsjglpt_tmp",
          "table":["TABEL"],
        }
      ],

    "username":"root",
    "password":"123456",
    "mandatoryEncoding":"UTF-8",
    "preSql": ["truncate table TABEL"],
    "writeMode": "insert"
    }
    }
   }
  ],
  "setting":{
  "errorLimit":{
  "record":0
  },
  "speed":{
  "channel":"1"
  }
  }
  }
}

從 MYSQL 同步到 PostgreSQL , 部分配置如下:

"name": "postgresqlwriter"

"jdbcUrl": "jdbc:postgresql://[target_server]:5432/[target_db]",

 

編寫    ty_commit_datax_sjyc.sh   (並發任務數)

if [ $# -lt 1 ] ; then
echo "請輸入正確的參數:並發任務數 "
exit 1;
fi
processNum=$1
##基礎路徑
base=/home/admin
##新版本的datax
dataxHome=/home/admin/datax/datax3
##配置文件目錄
confBase=`ls ${base}/sjyc/*.json`
##全量json目錄
qlJsonPath="${base}/sjyc"
##如果統計表記錄數文件不存在,需先執行統計表記錄數腳本
#if [ ! -f ${shellPath}/log/count/${syscode}_CountTable.conf ] ;then
# echo "表記錄數文件${syscode}_CountTable.conf不存在,請先執行統計記錄數腳本!"
# exit 1;
#fi
startTime=`date '+%Y%m%d%H%M%S'`
##輸出日志路徑
outpath="${base}/log/commit/sjyc/${startTime}"
if [ -d ${outpath} ] ;then
rm -rf ${outpath}
fi
mkdir -p ${outpath}
touch ${outpath}/commitJson.log
touch ${outpath}/succJson.log
touch ${outpath}/failJson.log
mkdir -p ${outpath}/succLog
mkdir -p ${outpath}/failLog
jsonNum=`ls ${qlJsonPath}/*.json | wc -l`
if [ ${processNum} -gt ${jsonNum} ] ;then
echo "並發任務數必須小於等於json文件數:${jsonNum}"
exit 1;
fi

##以當前進程號命名
fifoName="/tmp/$$.fifo"
mkfifo ${fifoName}
##定義文件描述符(fd是一個整數,可理解為索引或指針),以讀寫的方式綁定到文件描述符3中
exec 3<>"${fifoName}"
##此時可以刪除管道文件,保留fd即可
rm -rf ${fifoName}
##定義進程數量,向管道文件中寫入進程數量個空行
for ((i=1;i<=${processNum};i++)) do
echo >&3
done

##組裝dataX 動態參數

##根據配置文件ty_createJson.conf中的表匹配json目錄下的json文件,如果一個表匹配到多個json文件(分區表的情況),則依次提交
for file in `ls ${qlJsonPath}/*.json` ;do
##讀入一個空行
read -u3
{
jsonName="${file##*/}"
fileName="${jsonName%%.*}"
echo ${jsonName}
echo "${file}" >> ${outpath}/commitJson.log
python ${dataxHome}/bin/datax.py ${file} 2>&1 >> ${outpath}/${fileName}.log
result=$?
if [ ${result} -eq 0 ] ;then
echo "${fileName}" >> ${outpath}/succJson.log
mv -f ${outpath}/${fileName}.log ${outpath}/succLog
else
echo "${fileName}" >> ${outpath}/failJson.log
mv -f ${outpath}/${fileName}.log ${outpath}/failLog
##記錄失敗的表配置
#echo "${line}" >> ${confBase}/ty_createJson_bsj.conf
fi
#sleep 1
##最后向管道文件中寫入一個空行
echo >&3
}&
done
done
wait
echo "開始時間:${startTime}" >> ${outpath}/commitJson.log
echo "開始時間:${startTime}"
endTime=`date '+%Y%m%d%H%M%S'`
echo "結束時間:${endTime}" >> ${outpath}/commitJson.log
echo "結束時間:${endTime}"
#關閉文件描述符的讀
exec 3<&-
#關閉文件描述符的寫
exec 3>&-
exit 0

 

編寫CRONTAB 腳本(每天6點半執行 每次執行10個JSON文件)

crontab-e

30 6 * * * /home/admin/ty_commit_datax_sjyc.sh 10

 

------------------------------------------------------------------

要實現增量更新, 首先要 PostgresqlReader 從目標數據庫讀取最大日期, 並用 TextFileWriter 寫入到一個 csv 文件, 這一步我的配置如下所示:

{
"job": {
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:postgresql://[target_server]:5432/[target_db]"
],
"querySql": [
"SELECT max(data_time) FROM public.minute_data"
]
}
],
"password": "...",
"username": "..."
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"fileName": "minute_data_max_time_result",
"fileFormat": "csv",
"path": "/scripts/",
"writeMode": "truncate"
}
}
}
],
"setting": { }
}
}

 

最后編寫增量同步的 shell 文件, 內容如下:

#!/bin/bash
### every exit != 0 fails the script
set -e

# 獲取目標數據庫最大數據時間,並寫入一個 csv 文件
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data_max_time.json
if [ $? -ne 0 ]; then
echo "minute_data_sync.sh error, can not get max_time from target db!"
exit 1
fi
# 找到 DataX 寫入的文本文件,並將內容讀取到一個變量中
RESULT_FILE=`ls minute_data_max_time_result_*`
MAX_TIME=`cat $RESULT_FILE`
# 如果最大時間不為 null 的話, 修改全部同步的配置,進行增量更新;
if [ "$MAX_TIME" != "null" ]; then
# 設置增量更新過濾條件
WHERE="DataTime > '$MAX_TIME'"
sed "s/1=1/$WHERE/g" minute_data.json > minute_data_tmp.json
# 將第 45 行的 truncate 語句刪除;
sed '45d' minute_data_tmp.json > minute_data_inc.json
# 增量更新
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data_inc.json
# 刪除臨時文件
rm ./minute_data_tmp.json ./minute_data_inc.json
else
# 全部更新
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data.json
fi


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM