flume增量采集數據


對於flume的增量抽取,首先想到的就是常用的那幾種方法,監控日志,觸發器,版本號,時間戳等等,雖然可以實現,但都對數據源有了一定限制,假如客戶的系統內部表沒有那些東西,這就是一件很難搞的事了,畢竟對方數據庫不能隨便動。

這個時候可以采用 $@$,它可以表示增量列上一次查詢的值。,將它加入sql語句中所查詢的數據就實現了增量,當然在navicat中使用是不支持這個符號的,flume可能封裝了一些方法對$@$進行了解析,在這方面並沒有太多了解。

 

a1.channels=ch1                         
a1.channels.ch1.type=memory

a1.sources = src-1

a1.sources.src-1.channels=ch1

a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.run.query.delay=60000
a1.sources.sql-source.start.from=0
#所采集數據庫的地址和數據庫名
a1.sources.src-1.hibernate.connection.url=
#數據庫用戶名
a1.sources.src-1.hibernate.connection.user =
#數據庫密碼
a1.sources.src-1.hibernate.connection.password =
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect=org.hibernate.dialect.SQLServerDialect

#驅動類名
a1.sources.src-1.hibernate.connection.driver_class=com.microsoft.sqlserver.jdbc.SQLServerDriver
#通過sql語句進行抽取,當需要實現增量抽取 $@$ 表示增量列上一次查詢的
#值,記錄在status文件中,所以查詢值中也必須有該值以及需要有一個主鍵ID。#其他條件可根據業務//情況作更改。
a1.sources.src-1.custom.query=select test1.id,test1.name,test2.address from test1 full join test2 on test1.id=test2.id where test1.id> $@$ or test2.id>$@$ 
#status文件的存放路徑,當執行flume該文件會在路徑下自動生成
a1.sources.src-1.status.file.path=/home/bigdata/
#status文件名
a1.sources.src-1.status.file.name = src-1.ss.status

a1.sources.src-1.batch.size = 6000
a1.sources.src-1.max.rows = 1000

a1.channels.ch1.capacity = 10000
a1.channels.ch1.transactionCapacity = 1000

a1.sinks=k1
#自定義下沉jar包名
a1.sinks.k1.type=MysqlSink
#所下沉到的數據庫地址及數據庫名
a1.sinks.k1.url=
#下沉到的數據庫表名
a1.sinks.k1.tableName=
#數據庫用戶名
a1.sinks.k1.user=
#數據庫密碼
a1.sinks.k1.password=
#字段名和上面的sql查詢結果要一致
a1.sinks.k1.column_name=id,name,address
a1.sinks.k1.channel=ch1

a1.sinks.k1.batchSize=100

  以上是我做過的一個案例實現了flume鏈表的增量抽取。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM