docker化canal-adapter


今日公司需求,需要將mysql更新實時同步到kafka中,后來又要將數據庫中的一張表的變化實時同步到另一台mysql中,並且將數據庫中的sql也同步到es中,於是乎canal與canal-adapter緊急解決,其中踩了不少坑,下面為總結內容

官方文檔:https://github.com/alibaba/canal/wiki

前提:默認安裝了es,本文采用6.8.8版本

需要先創建es的index及mapping

 

 

{
    "settings": {
        "number_of_shards": 5,
        "number_of_replicas": 2
    },
    "mappings": {
        "_doc": {
            "properties": {
                "personnel_name": {
                    "type": "text"
                },
                "personnel_num": {
                    "type": "keyword"
                }
            }
        }
    }
}

{
    "settings": {
        "number_of_shards": 5,
        "number_of_replicas": 2
    },
    "mappings": {
        "_doc": {
            "properties": {
                "clock_record_id": {
                    "type": "keyword"
                },
                "personnel_name": {
                    "type": "text"
                },
                "personnel_num": {
                    "type": "keyword"
                }
            }
        }
    }

一、canal-server鏡像的創建及canal-server的compose文件

1、Dockerfile文件內容

FROM openjdk:8-jre-alpine
ADD [ "https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz", "/opt/canal-server/" ]
WORKDIR /opt/canal-server
EXPOSE 11110 11112
COPY ["entrypoint.sh", "/"]
RUN apk add bash tzdata \
&& cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& apk del tzdata
VOLUME ["/opt/canal-server/logs", "/opt/canal-server/conf"]
ENTRYPOINT /entrypoint.sh

2、entrypoint.sh文件內容

#!/bin/bash

Base_dir=/opt/canal-server/conf
Log_dir=/opt/canal-server/logs

set -e
# 配置canal-server的運行模式,當前鏡像支持tcp和kafka
if [ -n "${canal_serverMode}" ]; then
  sed -i "/^canal.serverMode/ s/serverMode.*/serverMode = ${canal_serverMode}/" ${Base_dir}/canal.properties
else
  echo "Invalid mode ${canal_serverMode}, This image support tcp and kafka mode now"
  exit 1
fi

if [ -n "${instances}" ]; then
  destinations=$(echo ${instances} | sed 's/ /,/g')
  sed -i "/^canal.destinations/ccanal.destinations = ${destinations}" ${Base_dir}/canal.properties
  for instance in ${instances}
  do
    declare -A dict
    ins_dic=$(eval echo '$'"{${instance}_dict}" | awk -F'"' '{print $2}')
    for kv in ${ins_dic}
    do
      k=`echo $kv | awk -F'=' '{print $1}'`
      v=`echo $kv | awk -F'=' '{print $2}'`
      dict[$k]=$v
    done
    if [ "${instance}" != "example" ]; then
      mkdir ${Base_dir}/${instance} && cp ${Base_dir}/example/* ${Base_dir}/${instance}/
      if [ ${canal_serverMode} = 'kafka' ]; then
        sed -i "/^canal.mq.servers/ccanal.mq.servers=${canal_mq_servers}" ${Base_dir}/canal.properties
        if [ -n "${dict[canal_mq_topic]}" ];then
          sed -i "/.*canal.mq.topic/ccanal.mq.topic=${dict[canal_mq_topic]}" ${Base_dir}/${instance}/instance.properties
        else
          sed -i "/^canal.mq.topic/d" ${Base_dir}/${instance}/instance.properties
          sed -i "/.*canal.mq.dynamicTopic=/ccanal.mq.dynamicTopic=${dict[canal_mq_dynamicTopic]}" ${Base_dir}/${instance}/instance.properties
        fi
      fi

      if [ -n "${dict[canal_instance_master_address]}" ]; then
        sed -i  "/^canal.instance.master.address=/ccanal.instance.master.address=${dict[canal_instance_master_address]}" ${Base_dir}/${instance}/instance.properties
      fi

      if [ -n "${dict[canal_instance_filter_regex]}" ]; then
        sed -i "/^canal.instance.filter.regex/ccanal.instance.filter.regex=${dict[canal_instance_filter_regex]}" ${Base_dir}/${instance}/instance.properties
      fi
    fi
  done
fi

/bin/sh /opt/canal-server/bin/startup.sh
sleep 3
tail -F /opt/canal-server/logs/canal/canal.log
View Code

 

 

3、docker-compose文件內容

version: "3"
services:
  canal-server:
    image: canal-server:1.1.4
    container_name: canal-server
    env_file:
      - ./wave-canal-server.env
    ports:
      - "11110:11110"
      - "11112:11112"
    volumes:
      - "/etc/hosts:/etc/hosts"
    deploy:
      resources:
        limits:
          cpus: '0.5'
          memory: 2G
    restart: always
    logging:
      driver: "json-file"
      options:
        max-size: "10m"
        max-file: "3"

4、docker-compose文件中的env文件內容

# canal-server運行模式,支持kafka或tcp
canal_serverMode=kafka
# canal實例名稱,多個實例則以逗號分隔,wave1為同步到es中的實例,wave2為大屏展示
instances=wave1,wave2
# 定義每個實例的選項值,以實例"名稱_dict"來定義,canal_mq_topic為topic名稱,不指定時則以canal_mq_dynamicTopic的值為准,canal_instance_master_address為要同步的源mysql地址,canal_instance_filter_regex為要監聽的數據庫表,多個以逗號分隔,canal_mq_dynamicTopic為topic名稱命名規則,其中有兩為轉義符
wave1_dict="canal_mq_topic=test canal_instance_master_address=mysql01.inside.wavewisdom-bj.com:3306 canal_instance_filter_regex=wavewisdom-bj-develop.odin_device_position,wavewisdom-bj-develop.odin_device_camera,wavewisdom-bj-develop.odin_device_device_position_associate canal_mq_dynamicTopic=.*\\\\..*"
wave2_dict="canal_mq_topic=task canal_instance_master_address=mysql02.inside.wavewisdom-bj.com:3306 canal_instance_filter_regex=wavewisdom-bj-develop.odin_business_emergency_record,wavewisdom-bj-develop.odin_business_capture_record,wavewisdom-bj-develop.work_flow_mission,wavewisdom-bj-develop.odin_device_camera,wavewisdom-bj-develop.odin_device_device_position_associate,wavewisdom-bj-develop.odin_device_position,wavewisdom-bj-develop.odin_business_alarm_record,wavewisdom-bj-develop.odin_advise_info canal_mq_dynamicTopic=.*\\\\..*"
canal_mq_servers=kafka01.inside.wavewisdom-bj.com:9092
# 其中有兩個轉義符
canal_mq_dynamicTopic=.*\\\\..*

5、啟動即可,觀察日志

二、canal-adapter鏡像的創建及canal-adapter的compose文件

1、Dockerfile文件內容

FROM openjdk:8-jre-alpine
RUN echo "Asia/Shanghai" > /etc/timezone
ADD https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz /opt/adapter/
WORKDIR /opt/adapter
COPY conf /opt/adapter/conf/
COPY ["entrypoint.sh", "/"]
ENTRYPOINT /entrypoint.sh
FROM openjdk:8-jre-alpine 
ADD [ "canal.tar.gz", "/opt/" ]
WORKDIR /opt/canal
EXPOSE 11110 11112
COPY ["entrypoint.sh", "/"]
VOLUME ["/opt/canal/logs", "/opt/canal/conf"]
ENTRYPOINT /entrypoint.

2、entrypoint.sh文件內容

#!/bin/sh
set -e
Conf_Dir=/opt/adapter/conf
# 配置adapter中的canal-server的模式
if [ ${Canal_mode} == 'kafka' ]; then
  sed -i "/^.*mode:/ s/:.*/: ${Canal_mode}/" ${Conf_Dir}/application.yml
  sed -i  "/mqServers:/ s/:.*/: ${Mq_Servers}/" ${Conf_Dir}/application.yml
elif [ ${Canal_mode} == 'tcp' ]; then
  sed -i "/^.*mode:/ s/:.*/: ${Canal_mode}/" ${Conf_Dir}/application.yml
  sed -i  "/mqServers:/ s/mqServers:.*/canalServerHost: ${Canal_Servers}/" ${Conf_Dir}/application.yml
else
  echo "Invalid mode ${Canal_mode}, This image support tcp and kafka mode now"
  exit 1
fi

# 源mysql地址
if [ -n ${Src_Data_Server} ]; then
  sed -i  "/^.*url: jdbc:mysql:/ s/mysql:.*/mysql:\/\/${Src_Data_Server}\/${Src_Database}?useUnicode=true/" ${Conf_Dir}/application.yml
fi

# 源mysql用戶名
if [ -n ${Src_User} ]; then
  sed -i  "/^.*username:/ s/:.*/: ${Src_User}/" ${Conf_Dir}/application.yml
fi

# 源mysql用戶名密碼
if [ -n ${Src_Password} ]; then
  sed -i  "/^.*password:/ s/:.*/: ${Src_Password}/" ${Conf_Dir}/application.yml
fi

# 配置實例名稱,若為tcp模式,則與canal-server中實例名稱一直,若為tcp模式,則與topic名稱一直
if [ -n ${Adapter_instance} ]; then
  sed -i  "/- instance:/ s/:.*/: ${Adapter_instance}/g" ${Conf_Dir}/application.yml
  sed -i "/destination:/ s/:.*/: ${Adapter_instance}/g" ${Conf_Dir}/rdb/mytest_user.yml
  sed -i "/destination:/ s/:.*/: ${Adapter_instance}/g" ${Conf_Dir}/es/mytest_user.yml
fi

for Out in ${Out_adapters}
do
  echo ${Out}
  if [ ${Out} == 'rdb' ];then
    if [ -n ${Src_Database} ]; then
      sed -i  "/^.*database:/ s/:.*/: ${Src_Database}/" ${Conf_Dir}/rdb/mytest_user.yml
    fi
    if [ -n ${Src_Table} ]; then
      sed -i  "/^.*table:/ s/:.*/: ${Src_Table}/" ${Conf_Dir}/rdb/mytest_user.yml
    fi

    if [ -n ${Dest_User} ]; then
      sed -i  "/^.*jdbc.username:/ s/:.*/: ${Dest_User}/" ${Conf_Dir}/application.yml
    fi
    if [ -n ${Dest_Password} ]; then
      sed -i  "/^.*jdbc.password:/ s/:.*/: ${Dest_Password}/" ${Conf_Dir}/application.yml
    fi
    if [ -n ${Dest_Database} ] && [ -n ${Dest_Table} ]; then
      sed -i  "/^.*targetTable:/ s/:.*/: ${Dest_Database}.${Dest_Table}/" ${Conf_Dir}/rdb/mytest_user.yml
    fi
    if [ -n ${Target_Pk} ]; then
      R_Target_Pk=`echo $Target_Pk | sed -e 's/:/: /g'`
      sed -i  "/^.*targetPk:/{n;s/[a-z].*/${R_Target_Pk}/g}" ${Conf_Dir}/rdb/mytest_user.yml
    fi
    if [ -n ${Dest_Data_Server} ]; then
      sed -i  "/^.*jdbc.url: jdbc:mysql:/ s/mysql:.*/mysql:\/\/${Dest_Data_Server}\/${Dest_Database}/" ${Conf_Dir}/application.yml
    fi
    if [ ${Map_All} == 'true' ]; then
      sed -i "/mapAll:/c\  mapAll: true" ${Conf_Dir}/rdb/mytest_user.yml
      sed -i "/targetColumns:/c\#  targetColumns:" ${Conf_Dir}/rdb/mytest_user.yml
    else
      sed -i "/mapAll:/c\#  mapAll: true" ${Conf_Dir}/rdb/mytest_user.yml
      sed -i "/targetColumns:/c\  targetColumns:" ${Conf_Dir}/rdb/mytest_user.yml
      for colume in ${Mapping_Columes}
      do
        R_colume=`echo $colume | sed -e 's/:/: /g'`
        sed -i "/^.*targetColumns:/a\    ${R_colume}" ${Conf_Dir}/rdb/mytest_user.yml
      done
    fi
  elif [ ${Out} == 'es' ];then
    if [ -n ${Es_hosts} ];then
      sed -i "/^.*hosts:/ s/hosts:.*/hosts: ${Es_hosts}/" ${Conf_Dir}/application.yml
    fi
    if [ -n ${Es_cluster} ];then
      sed -i "/^.*cluster.name:/ s/name:.*/name: ${Es_cluster}/" ${Conf_Dir}/application.yml
    fi
    if [ -n ${Es_index} ];then
      sed -i "/^.*_index:/ s/index:.*/index: ${Es_index}/"  ${Conf_Dir}/es/mytest_user.yml
    fi
    if [ -n ${Es_type} ];then
      sed -i "/^.*_type:/ s/type:.*/type: ${Es_type}/"  ${Conf_Dir}/es/mytest_user.yml
    fi
    if [ -n ${Es_id} ];then
      sed -i "/^.*_id:/ s/id:.*/id: ${Es_id}/"  ${Conf_Dir}/es/mytest_user.yml
    fi
    sed -i "/^.*sql:/ s/sql:.*/sql: ${Sql_map}/"  ${Conf_Dir}/es/mytest_user.yml
  else
    echo "Invalid outerAdapters ${Out}, This image support es and rdb mode now"
    exit 1
  fi
done

sh /opt/adapter/bin/startup.sh
tail -F /opt/adapter/logs/adapter/adapter.log
View Code
#!/bin/sh

Base_dir=/opt/canal/conf
Log_dir=/opt/canal/logs
if [ -n ${canal_instance_master_address} ]; then
  sed -i  "/^canal.instance.master.address=/ccanal.instance.master.address=${canal_instance_master_address}" ${Base_dir}/example/instance.properties
fi

if [ -n ${canal_mq_servers} ]; then
  sed -i "/^canal.mq.servers/ccanal.mq.servers=${canal_mq_servers}" ${Base_dir}/canal.properties
fi

if [ -n ${canal_instance_filter_regex} ]; then
  sed -i "/^canal.instance.filter.regex/ccanal.instance.filter.regex=${canal_instance_filter_regex}" ${Base_dir}/example/instance.properties
fi

if [ -n ${canal_mq_dynamicTopic} ]; then
  sed -i "/^canal.mq.dynamicTopic/ccanal.mq.dynamicTopic=${canal_mq_dynamicTopic}" ${Base_dir}/example/instance.properties
fi

/bin/sh /opt/canal/bin/startup.sh
tail -F ${Log_dir}/canal/canal.log

3、docker-compose文件內容

version: "3"
services:
  canal-adapter:
    image: adapter:v1.1.4
    container_name: canal-adapter
    ports:
      - 8081:8081
    #volumes:
    #  - ./conf:/opt/adapter/conf
    env_file:
      - ./canal-adapter.env
    restart: always
    logging:
         driver: "json-file"
         options:
           max-size: "10m"
           max-file: "3"
version: "3"
services:
  canal:
    image: registry.cn-beijing.aliyuncs.com/wavewisdom-bj-registry-common/canal:1.1.4
    container_name: canal
    env_file:
      - ./wave-canal.env
    ports:
      - "11110:11110"
      - "11112:11112"
    restart: always

4、docker-compose文件用到的wave-canal.env文件內容

# canal-server運行模式,當前鏡像支持kafka和tcp模式
Canal_mode=kafka
# canal-server實例名稱,若模式為kafka,則實例名稱為kafka中的topic
Adapter_instance=test.odin_business_clock_record
# canal-adapter運行方式,當前鏡像支持es和rdb,並且可同時支持,中間以空格分隔
Out_adapters=es rdb
# 以下為adapter運行方式為es時用到的變量
# es的主機地址,若集群,以逗號分隔
Es_hosts=193.168.1.39:9300
# es集群或單機的唯一名稱
Es_cluster=wave-es
# 同步到es中的索引名稱
Es_index=test
# 索引類型
Es_type=_doc
# 索引id
Es_id=_id
# sql映射
Sql_map='select a.clock_record_id as _id, a.personnel_name, a.personnel_num from odin_business_clock_record a'
#  canal-server運行為kafka模式時,kafka服務地址
Mq_Servers=193.168.1.136:9092
# 源mysql的服務地址
Src_Data_Server=193.168.1.136:3306
# 源mysql用戶名
Src_User=root
# 源mysql用戶名密碼
Src_Password=waveDevelop123
# 源mysql要同步的庫名
Src_Database=test
# 源mysql要同步的庫中的表名
Src_Table=odin_business_clock_record
# 下面為adapter運行為rdb方式時用到的變量
# 目標mysql地址
Dest_Data_Server=193.168.1.167:3306
# 目標mysql的用戶名
Dest_User=root
# 目標mysql用戶名密碼
Dest_Password=wa123
# 目標mysql中的庫名
Dest_Database=test
# 目標mysql中的表名
Dest_Table=odin_business_clock_record
# 主鍵id
Target_Pk=clock_record_id:clock_record_id
# 是否開啟全表映射,開啟為true
Map_All=false
# 當未開啟全表映射時,需要映射的列名,格式為:“目標表中的列名:源表中的列名”,多個列以空格分隔
Mapping_Columes=clock_record_id:clock_record_id personnel_name:personnel_name personnel_num:personnel_num
canal_instance_master_address=193.168.1.136:3306
canal_instance_filter_regex=wavewisdom-bj-develop.odin_business_emergency_record,wavewisdom-bj-develop.odin_business_capture_record,wavewisdom-bj-develop.work_flow_mission,wavewisdom-bj-develop.odin_device_camera,wavewisdom-bj-develop.odin_device_device_position_associate,wavewisdom-bj-develop.odin_device_position,wavewisdom-bj-develop.odin_business_alarm_record,wavewisdom-bj-develop.odin_advise_info
canal_mq_servers=10.0.14.47:9092
# 其中有兩個轉義符
canal_mq_dynamicTopic=.*\\\\..

啟動后觀察日志

PS:通過以上的Dockerfile和entrypoint.sh可以構建adapter鏡像,通過compose文件和env文件可以將構建的鏡像運行起來

三、將環境變量配置正確后,即可運行起來

然后在源mysql中增加一條記錄

 

 然后在es中觀察效果

 

 然后在另一台mysql中觀察效果

 

 如此,則配置完成

本文中支持同時將mysql中的變更同步到es和mysql中,canal-server的運行模式支持kafka和tcp模式,其他模式不適用本文,同樣可以自己寫entrypoint.sh文件,本文使用的es版本為6.8.8

參考文獻:https://www.jianshu.com/p/5bcf97335e71    https://blog.csdn.net/q936889811/article/details/95745721  https://blog.csdn.net/singgel/article/details/86166154

四、在目標數據庫中必須先全量同步源表后方可使用adapter,否則可能會出現update失敗失敗。本次的adapter工作模式是kafka,從kafka獲取數據后對目標表進行update;本次的adapter的鏡像僅針對單庫單表的同步,若多庫多表需重寫entrypoint.sh文件

五、canal動態創建topic參數“canal.mq.dynamicTopic”表達式說明

test\\.test 指定匹配的單表,發送到以 test\\.test為名字的topic上
.\\..* 匹配所有表,每個表都會發送到各自表名的topic上
test 指定匹配對應的庫,一個庫的所有表都會發送到庫名的topic上
test\\.* 指定匹配的表達式,針對匹配的表會發送到各自表名的topic上
.*\\..* 將匹配到的表發送到庫名.表名的topic上

test,test1\\.test1,指定多個表達式,會將test庫的表都發送到test的topic上,test1\\.test1的表發送到對應的test1\\.test1 topic上,其余的表發送到默認的canal.mq.topic值

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM