對於flume的增量抽取,首先想到的就是常用的那幾種方法,監控日志,觸發器,版本號,時間戳等等,雖然可以實現,但都對數據源有了一定限制,假如客戶的系統內部表沒有那些東西,這就是一件很難搞的事了,畢竟對方數據庫不能隨便動。
這個時候可以采用 $@$,它可以表示增量列上一次查詢的值。,將它加入sql語句中所查詢的數據就實現了增量,當然在navicat中使用是不支持這個符號的,flume可能封裝了一些方法對$@$進行了解析,在這方面並沒有太多了解。
a1.channels=ch1 a1.channels.ch1.type=memory a1.sources = src-1 a1.sources.src-1.channels=ch1 a1.sources.src-1.type = org.keedio.flume.source.SQLSource a1.sources.src-1.run.query.delay=60000 a1.sources.sql-source.start.from=0 #所采集數據庫的地址和數據庫名 a1.sources.src-1.hibernate.connection.url= #數據庫用戶名 a1.sources.src-1.hibernate.connection.user = #數據庫密碼 a1.sources.src-1.hibernate.connection.password = a1.sources.src-1.hibernate.connection.autocommit = true a1.sources.src-1.hibernate.dialect=org.hibernate.dialect.SQLServerDialect #驅動類名 a1.sources.src-1.hibernate.connection.driver_class=com.microsoft.sqlserver.jdbc.SQLServerDriver #通過sql語句進行抽取,當需要實現增量抽取 $@$ 表示增量列上一次查詢的 #值,記錄在status文件中,所以查詢值中也必須有該值以及需要有一個主鍵ID。#其他條件可根據業務//情況作更改。 a1.sources.src-1.custom.query=select test1.id,test1.name,test2.address from test1 full join test2 on test1.id=test2.id where test1.id> $@$ or test2.id>$@$ #status文件的存放路徑,當執行flume該文件會在路徑下自動生成 a1.sources.src-1.status.file.path=/home/bigdata/ #status文件名 a1.sources.src-1.status.file.name = src-1.ss.status a1.sources.src-1.batch.size = 6000 a1.sources.src-1.max.rows = 1000 a1.channels.ch1.capacity = 10000 a1.channels.ch1.transactionCapacity = 1000 a1.sinks=k1 #自定義下沉jar包名 a1.sinks.k1.type=MysqlSink #所下沉到的數據庫地址及數據庫名 a1.sinks.k1.url= #下沉到的數據庫表名 a1.sinks.k1.tableName= #數據庫用戶名 a1.sinks.k1.user= #數據庫密碼 a1.sinks.k1.password= #字段名和上面的sql查詢結果要一致 a1.sinks.k1.column_name=id,name,address a1.sinks.k1.channel=ch1 a1.sinks.k1.batchSize=100
以上是我做過的一個案例實現了flume鏈表的增量抽取。