數據同步之DataX


       目前業務中需要進行數據同步, 考慮使用datax數據同步方式替換掉現有的同步方式

 

業務場景:

    即將業務中每天生成的日志表中的數據部分字段同步到自己的庫中,進行后台數據的查詢

 

起因:

  之前“大神”寫的邏輯中使用每三分鍾更新一次的策略進行數據同步,在redis中進行計數和打標記的方式進行數據的增量同步,但是最近發現經常數據會發生丟失的問題,於是進行問題的修復

 

解決:

      了解到運營對於這些數據的查詢實時性並沒有這么高,今天查詢昨天的數據這種場景比較多,於是打算使用datax凌晨同步昨天的數據到本地即可,節省資源,減少調用業務的數據庫頻次

      DataX: https://github.com/alibaba/DataX

      基於java的同步開源項目,基本使用起來較為容易,簡單配置即可,完成之后 3萬多的數據 只需要不到10秒中就完成了數據同步工作

      數據庫-mysql相關的reader和writer相關配置參考:

  • https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
  • https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

     當然他還支持很多的數據庫

 

安裝)

  1)下載源碼包

wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

基於java, 保證已經安裝了Java,推薦8版本,安裝方式,可見本博客,

cd /usr/local/src/

wget https://repo.huaweicloud.com/java/jdk/8u201-b09/jdk-8u201-linux-x64.tar.gz
tar -zxvf jdk-8u201-linux-x64.tar.gz
mv jdk1.8.0_201 /usr/local/
vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_201
export PATH=${JAVA_HOME}/bin:${PATH}

source /etc/profile
java -version 查看版本

 

   2)解壓datax

tar -zxvf datax.tar.gz

執行一個demo
python datax/bin/datax.py datax/job/job.json


注意:這里使用的是python2 版本

 

 

2)示例

   1.MySQL同步到MySQL

{
    "job": {
        "setting": {
            "speed": {
                "byte":10485760
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "querySql": [
                                    "select player_id,UNIX_TIMESTAMP(create_time) AS create_time from t_player_log__20210422"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://127.0.0.1:3306/demo"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password":"123456",
                        "column": [
                            "player_id",
                            "create_time"
                        ],
                        "session": [
                            "set names utf8mb4"
                        ],
                        "connection": [
                            {
                                "encoding": "UTF-8",
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
                                "table": [
                                    "tt_player_log"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

  還支持傳入參數的方式動態的配置(python bin/datax.py 支持的參數:-p),例如:

{
    "job": {
        "setting": {
            "speed": {
                "byte":10485760
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "${username}",
                        "password": "${password}",
                        "connection": [
                            {
                                "querySql": [
                                    "select player_id,UNIX_TIMESTAMP(create_time) AS create_time from t_player_log__${dateNum}"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://${ip}:${port}/${db}"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password":"123456",
                        "column": [
                            "player_id",
                            "create_time"
                        ],
                        "session": [
                            "set names utf8mb4"
                        ],
                        "connection": [
                            {
                                "encoding": "UTF-8",
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
                                "table": [
                                    "${table}"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

  同步命令:

/bin/python /opt/datax/bin/datax.py /opt/datax/job/game_job.json -p"-Dusername=xxxx -Dpassword=xxx -Dip=xx.xx.xx.xx -Dport=3306 -Ddb=xx -Dtable=xx -DdateNum=20210423"

注意:如果密碼中有特殊字符,需要使用單引號將其引起來

 

      相關的job任務會放到job目錄下, 新建一個shell腳本放到script下, run_jobs.sh,將相關數據庫連接配置到一個數據表中, 添加上定時任務即可

#!/bin/bash

#說明:
#    執行同步游戲庫玩家日志表數據

day_num=`date -d "-1 day" +%Y%m%d`
#day_num=`date +%Y%m%d`


main(){
    mysql -hxx.xx.xx -Pxx -uxx -pxx -N -e "select game_name,app_id,ip,port,username,password,db_name,player_log_table from t
xx.tt_game_db where type=2 and status=1" | while read game_name app_id ip port username password db_name player_log_table
    do
        echo "當前游戲:$game_name[$app_id] 數據庫信息:$username:$password@$ip:$port/$db_name 目標表:$player_log_table"
        #echo "-Dusername=$username -Dpassword=$password -Dip=$ip -Dport=$port -Ddb=$db_name -Dtable=$player_log_table -DdateNum=$day_num"
        /bin/python /opt/datax/bin/datax.py /opt/datax/job/game_job.json -p"-Dusername=$username -Dpassword=$password -Dip=$ip -Dport=$port -Ddb=
$db_name -Dtable=$player_log_table -DdateNum=$day_num" 

    done
}

main

 

  2)MongoDB同步到MySQL

{
  "job": {
      "setting": {
          "speed": {
              "channel": 2
          }
      },
      "content": [
          {
              "reader": {
                  "name": "mongodbreader",
                  "parameter": {
                      "address": ["127.0.0.1:27017"],
                      "userName": "dataxUser1",
                      "userPassword": "xxxxx",
                      "dbName": "log_1",
                      "collectionName": "20210101",
                      "column": [
                          {
                              "name": "uid",
                              "type": "int"
                          },
                          {
                              "name": "match_id",
                              "type": "string"
                          }
                      ],
                      "query": {  // 添加篩選條件
                          "func_name":"play",
                          "play_type": {
                              "$in": [2,3]
                          }
                      }
                  }
              },
              "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "gameUser",
                        "password": "xxxxx",
                        "column": [
                            "player_index",
                            "table_id"
                        ],
                        "session": [
                            "set names utf8mb4"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/game_log",
                                "table": [
                                    "game_record_log"
                                ]
                            }
                        ]
                    }
                }
          }
      ]
  }
}


注意點: 這里的MongoDB的用戶名和密碼 要開一個這個庫的所屬人的權限
db.createUser({user:"dataxUser1",pwd:"xxxx",roles:[{role:"dbOwner", db:"log_1"}]})

  后面為了通用, 需要將這里配置的MongoDB的信息和MySQL的信息都通過變量的方式傳遞進來

 

在script目錄下添加shell執行腳本,例如:run_jobs_for_mongo2mysql.sh

#!/bin/bash
## 傳入開始和結束時間將MongoDB(10十個庫)中每個庫指定日期范圍內的數據同步到MySQL
begin_date=$1
end_date=$2
begin_time=$begin_date

mongo_user_prefix="dataxUser"
mongo_pwd=xxxx
mongo_db_name_prefix="log_"
mongo_table_name_prefix="game_"

mysql_user="mysqlUser"
mysql_pwd="xxx"
mysql_host="127.0.0.1"
mysql_port=3306
mysql_db_name="game_log"
mysql_table_name="game_record_log"

main(){
    for n in $(seq 1 10)
    do
        while [ "$begin_date" != "$end_date" ]
        do
            echo $begin_date
            mongo_user=$mongo_user_prefix$n
            mongo_db_name=$mongo_db_name_prefix$n
            mongo_table_name=$mongo_table_name_prefix$begin_date
            echo "-Dmongo_user=$mongo_user -Dmongo_pwd=$mongo_pwd -Dmongo_db_name=$mongo_db_name -Dmongo_table_name=$mongo_table_name -Dmysql_user=$mysql_user -Dmysql_pwd=$mysql_pwd -Dmysql_host=$mysql_host -Dmysql_port=$mysql_port -Dmysql_db_name=$mysql_db_name -Dmysql_table_name=$mysql_table_name"
        /bin/python /opt/data/datax/bin/datax.py /opt/data/datax/job/game_mongo2mysql_job.json -p"-Dmongo_user=$mongo_user -Dmongo_pwd=$mongo_pwd -Dmongo_db_name=$mongo_db_name -Dmongo_table_name=$mongo_table_name -Dmysql_user=$mysql_user -Dmysql_pwd='$mysql_pwd' -Dmysql_host=$mysql_host -Dmysql_port=$mysql_port -Dmysql_db_name=$mysql_db_name -Dmysql_table_name=$mysql_table_name"
            let begin_date=`date -d "-1 days ago ${begin_date}" +%Y%m%d`
        done
        begin_date=$begin_time
        echo $end_date
    done
}
main

 

 

 

 

常見問題:

   1.如果數據表中有特殊表情的時候, 可能會出現字符錯誤的報錯信息, 這個時候需要進行設置

"session": [
      "set names utf8mb4"
],

 

 

 

 

 

 

 

 

 

 

 

 

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM