MySQL數據實時增量同步到Kafka - Flume


轉載自:https://www.cnblogs.com/yucy/p/7845105.html

MySQL數據實時增量同步到Kafka - Flume

 
  • 寫在前面的話

  需求,將MySQL里的數據實時增量同步到Kafka。接到活兒的時候,第一個想法就是通過讀取MySQL的binlog日志,將數據寫到Kafka。不過對比了一些工具,例如:Canel,Databus,Puma等,這些都是需要部署server和client的。其中server端是由這些工具實現,配置了就可以讀binlog,而client端是需要我們動手編寫程序的,遠沒有達到我即插即用的期望和懶人的標准。

  再來看看flume,只需要寫一個配置文件,就可以完成數據同步的操作。官網:http://flume.apache.org/FlumeUserGuide.html#flume-sources。它的數據源默認是沒有讀取binlog日志實現的,也沒有讀數據庫表的官方實現,只能用開源的自定義source:https://github.com/keedio/flume-ng-sql-source

  • 同步的格式

  原作者的插件flume-ng-sql-source只支持csv的格式,如果開始同步之后,數據庫表需要增減字段,則會給開發者造成很大的困擾。所以我添加了一個分支版本,用來將數據以JSON的格式,同步到kafka,字段語義更加清晰。

  sql-json插件包下載地址:https://github.com/yucy/flume-ng-sql-source-json/releases/download/1.0/flume-ng-sql-source-json-1.0.jar

  將此jar包下載之后,和相應的數據庫驅動包,一起放到flume的lib目錄之下即可。

  • 處理機制

flume-ng-sql-source在【status.file.name】文件中記錄讀取數據庫表的偏移量,進程重啟后,可以接着上次的進度,繼續增量讀表。

 

  • 啟動說明

說明:啟動命令里的【YYYYMM=201711】,會傳入到flume.properties里面,替換${YYYYMM}

 
           
  1. [test@localhost ~]$ YYYYMM=201711 bin/flume-ng agent -c conf -f conf/flume.properties -n sync &

 -c:表示配置文件的目錄,在此我們配置了flume-env.sh,也在conf目錄下;

 -f:指定配置文件,這個配置文件必須在全局選項的--conf參數定義的目錄下,就是說這個配置文件要在前面配置的conf目錄下面;

 -n:表示要啟動的agent的名稱,也就是我們flume.properties配置文件里面,配置項的前綴,這里我們配的前綴是【sync】;

 

  • flume的配置說明

  • flume-env.sh
 
           
  1. # 配置JVM堆內存和java運行參數,配置-DpropertiesImplementation參數是為了在flume.properties配置文件中使用環境變量
  2. export JAVA_OPTS="-Xms512m -Xmx512m -Dcom.sun.management.jmxremote -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties"

 關於propertiesImplementation參數的官方說明:http://flume.apache.org/FlumeUserGuide.html#using-environment-variables-in-configuration-files

 

  • flume.properties

 

復制代碼
 
           
  1. # 數據來源
  2. sync.sources = s-1
  3. # 數據通道
  4. sync.channels = c-1
  5. # 數據去處,這里配置了failover,根據下面的優先級配置,會先啟用k-1,k-1掛了后再啟用k-2
  6. sync.sinks = k-1 k-2
  7.  
  8. #這個是配置failover的關鍵,需要有一個sink group
  9. sync.sinkgroups = g-1
  10. sync.sinkgroups.g-1.sinks = k-1 k-2
  11. #處理的類型是failover
  12. sync.sinkgroups.g-1.processor.type = failover
  13. #優先級,數字越大優先級越高,每個sink的優先級必須不相同
  14. sync.sinkgroups.g-1.processor.priority.k-1 = 5
  15. sync.sinkgroups.g-1.processor.priority.k-2 = 10
  16. #設置為10秒,當然可以根據你的實際狀況更改成更快或者很慢
  17. sync.sinkgroups.g-1.processor.maxpenalty = 10000
  18.  
  19. ########## 數據通道的定義
  20. # 數據量不大,直接放內存。其實還可以放在JDBC,kafka或者磁盤文件等
  21. sync.channels.c-1.type = memory
  22. # 通道隊列的最大長度
  23. sync.channels.c-1.capacity = 100000
  24. # putList和takeList隊列的最大長度,sink從capacity中抓取batchsize個event,放到這個隊列。所以此參數最好比capacity小,比sink的batchsize大。
  25. # 官方定義:The maximum number of events the channel will take from a source or give to a sink per transaction.
  26. sync.channels.c-1.transactionCapacity = 1000
  27. sync.channels.c-1.byteCapacityBufferPercentage = 20
  28. ### 默認值的默認值等於JVM可用的最大內存的80%,可以不配置
  29. # sync.channels.c-1.byteCapacity = 800000
  30.  
  31. #########sql source#################
  32. # source s-1用到的通道,和sink的通道要保持一致,否則就GG了
  33. sync.sources.s-1.channels=c-1
  34. ######### For each one of the sources, the type is defined
  35. sync.sources.s-1.type = org.keedio.flume.source.SQLSource
  36. sync.sources.s-1.hibernate.connection.url = jdbc:mysql://192.168.1.10/testdb?useSSL=false
  37. ######### Hibernate Database connection properties
  38. sync.sources.s-1.hibernate.connection.user = test
  39. sync.sources.s-1.hibernate.connection.password = 123456
  40. sync.sources.s-1.hibernate.connection.autocommit = true
  41. sync.sources.s-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
  42. sync.sources.s-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
  43. sync.sources.s-1.run.query.delay=10000
  44. sync.sources.s-1.status.file.path = /home/test/apache-flume-1.8.0-bin/status
  45. # 用上${YYYYMM}環境變量,是因為我用的測試表示一個月表,每個月的數據會放到相應的表里。使用方式見上面的啟動說明
  46. sync.sources.s-1.status.file.name = test_${YYYYMM}.status
  47. ######## Custom query
  48. sync.sources.s-1.start.from = 0
  49. sync.sources.s-1.custom.query = select * from t_test_${YYYYMM} where id > $@$ order by id asc
  50. sync.sources.s-1.batch.size = 100
  51. sync.sources.s-1.max.rows = 100
  52. sync.sources.s-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
  53. sync.sources.s-1.hibernate.c3p0.min_size=5
  54. sync.sources.s-1.hibernate.c3p0.max_size=20
  55.  
  56. ######### sinks 1
  57. # sink k-1用到的通道,和source的通道要保持一致,否則取不到數據
  58. sync.sinks.k-1.channel = c-1
  59. sync.sinks.k-1.type = org.apache.flume.sink.kafka.KafkaSink
  60. sync.sinks.k-1.kafka.topic = sync-test
  61. sync.sinks.k-1.kafka.bootstrap.servers = localhost:9092
  62. sync.sinks.k-1.kafka.producer.acks = 1
  63. # 每批次處理的event數量
  64. sync.sinks.k-1.kafka.flumeBatchSize  = 100
  65.  
  66. ######### sinks 2
  67. # sink k-2用到的通道,和source的通道要保持一致,否則取不到數據
  68. sync.sinks.k-2.channel = c-1
  69. sync.sinks.k-2.type = org.apache.flume.sink.kafka.KafkaSink
  70. sync.sinks.k-2.kafka.topic = sync-test
  71. sync.sinks.k-2.kafka.bootstrap.servers = localhost:9092
  72. sync.sinks.k-2.kafka.producer.acks = 1
  73. sync.sinks.k-2.kafka.flumeBatchSize  = 100
復制代碼

 

關於putList和takeList與capacity的關系,引用:http://blog.csdn.net/u012948976/article/details/51760546

flume各部分參數含義

flume架構詳情

  • batchData的大小見參數:batchSize
  • PutList和TakeList的大小見參數:transactionCapactiy
  • Channel總容量大小見參數:capacity

 

  •   問題記錄

異常:Exception in thread "PollableSourceRunner-SQLSource-src-1" java.lang.AbstractMethodError: org.keedio.flume.source.SQLSource.getMaxBackOffSleepInterval()J

分析:由於我用的是flume1.8,而flume-ng-sql-1.4.3插件對應的flume-ng-core版本是1.5.2,1.8版本里的PollableSource接口多了兩個方法 getBackOffSleepIncrement(); getMaxBackOffSleepInterval();在失敗補償暫停線程處理時,需要用到這個方法。

解決方法:更新flume-ng-sql-1.4.3里依賴的flume-ng-core版本為1.8.0,並在源代碼【SQLSource.java】里添加這兩個方法即可。

復制代碼
 
           
  1. @Override
  2. public long getBackOffSleepIncrement() {
  3.     return 1000;
  4. }
  5. @Override
  6. public long getMaxBackOffSleepInterval() {
  7.     return 5000;
  8. }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM