一、前言
隨着業務的發展,以往的離線批量計算方式,因為延遲太長已經不能滿足需求,隨着flink這種實時計算工具的出現,實時采集也成為大數據工作中非常重要的一環。
現今企業的數據來源大體分為兩種:存儲在各種關系數據庫中的業務數據、網站或APP產生的用戶行為日志數據
日志數據通過flume、kafka等工具已經可以實現實時采集,但關系數據庫的同步仍然以批量為主。
當關系數據庫的表數據達到一定程度,批量同步耗時太久,增量同步不能解決實時性的要求
mysql可以通過binlog進行實時同步,技術也比較成熟,但無法解決SQLserver、Oracle、postgresql等數據庫的問題。
即便有kafka這種流數據分發訂閱平台、flink這種實時計算平台,redis這種高效讀寫數據庫,如果不能解決實時采集問題,那么一個整體的實時鏈路就無法實現。
幸好,國外有一款開源工具實現了市面上各種常見數據庫的數據更新日志的獲取,它就是debezium
插件模式
二、簡介
Debezium是一組分布式服務,用於捕獲數據庫中的更改,以便您的應用程序可以查看這些更改並對它們做出響應。Debezium在更改事件流中記錄每個數據庫表中的所有行級更改,應用程序只需讀取這些流,即可按更改事件發生的順序查看更改事件。
debezium有兩種運行方式,一種是以插件的方式繼承在kafka connect中,另外一種是以獨立服務的方式運行(孵化中)
服務器模式
今天我們要介紹的就是插件模式。
三、部署
插件模式首先要求集群上已經安裝好zookeeper和kafka,kafka可以連通上游數據庫,我這里用flink消費kafka里的日志實時寫入mysql
因此還需要部署好flink集群和mysql數據庫
以上都具備之后,就可以開始部署debezium
1.下載安裝包
#以mysql為例,下載debezium-connector-mysql-1.4.2.Final-plugin.tar.gz wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.4.2.Final/debezium-connector-mysql-1.4.2.Final-plugin.tar.gz
在kafka安裝文件夾創建一個connectors文件夾,將下載的debezium插件解壓到connectors
2.創建topic
創建kafka connect所需要的三個topic: connect-offsets,connect-configs,connect-status
kafka-topics --zookeeper ip1:2181,ip2:2181,ip3:2181 --create --topic connect-status --replication-factor 2 --partitions 3
3.編寫kafka connect配置文件
創建connect-distributed.properties,分發到所有節點
#kafka-connect配置文件 # kafka集群地址 bootstrap.servers=ip1:9092,ip2:9092,ip3:9092 # Connector集群的名稱,同一集群內的Connector需要保持此group.id一致 group.id=connect-cluster # 存儲到kafka的數據格式 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false # 內部轉換器的格式,針對offsets、config和status,一般不需要修改 internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # 用於保存offsets的topic,應該有多個partitions,並且擁有副本(replication) # Kafka Connect會自動創建這個topic,但是你可以根據需要自行創建 offset.storage.topic=connect-offsets offset.storage.replication.factor=2 offset.storage.partitions=3 # 保存connector和task的配置,應該只有1個partition,並且有多個副本 config.storage.topic=connect-configs config.storage.replication.factor=2 # 用於保存狀態,可以擁有多個partition和replication status.storage.topic=connect-status status.storage.replication.factor=2 status.storage.partitions=3 # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # RESET主機名,默認為本機 #rest.host.name= # REST端口號 rest.port=18083 # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers. #rest.advertised.host.name= #rest.advertised.port= # 保存connectors的路徑 #plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/opt/cloudera/parcels/CDH/lib/kafka/connectors
4.啟動kafka-connect
備注:全部節點都要執行
cd /opt/cloudera/parcels/CDH/lib/kafka bin/connect-distributed.sh -daemon config/connect-distributed.properties ###jps 可看到 ConnectDistributed 進程
5.以POST URL方式提交連接請求
多個表名以逗號分隔,格式為db.table,參數中指定的topic是元數據topic,真正的topic名以server_name.db_name.table_name構成
POST:http://ip:18083/connectors Headers:Content-Type: application/json Body:{ "name" : "debezium-mysql", "config":{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "host", "database.port": "3306", "database.user": "username", "database.password": "password", "database.server.id" :"1739", "database.server.name": "mysql", "database.history.kafka.bootstrap.servers": "ip1:9092,ip2:9092,ip3:9092", "database.history.kafka.topic": "mysql.test", "database.whitelist": "test", "table.whitelist":"test.test_table2", "include.schema.changes" : "true" , "mode" : "incrementing", "incrementing.column.name" : "id", "database.history.skip.unparseable.ddl" : "true" } }
提交完成后以GET http://ip:18083/connectors獲取連接器信息
由於debezium沒有建topic的邏輯,因此kafka需要開放自動產生topic的配置
查看kafka是否產生相應的topic,更高源表的內容,如果topic中有對應的更改日志記錄,任務就配置成功了
剩下的從kafka消費數據就有很多方式可以實現了