logstash input插件之mongodb是第三方的,配置如下:
input { mongodb { uri => 'mongodb://mongo_server:27017/db' placeholder_db_dir => '/path/to/db_dir/' placeholder_db_name => 'table.db' collection => 'table' batch_size => 5000 } }
安裝
./logstash-plugin install logstash-input-mongodb
插件實現非常簡單,就一個ruby文件,
https://github.com/phutchins/logstash-input-mongodb/blob/master/lib/logstash/inputs/mongodb.rb
使用sqlite來維護狀態,db文件目錄在 placeholder_db_dir,可以直接通過sqlite命令查看和修改
# sqlite3 /path/to/db_dir/table.db
db結構
sqlite> .schema CREATE TABLE `since_table` (`table` varchar(255), `place` Int); sqlite> select * from since_table order by place desc limit 1; logstash_since_table|5d0b2c2682b7d74de069ce4d
插件中取place代碼
public def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) since = sqlitedb[SINCE_TABLE] x = since.where(:table => "#{since_table}_#{mongo_collection_name}") if x[:place].nil? || x[:place] == 0 first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) @logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}") return first_entry_id else @logger.debug("placeholder already exists, it is #{x[:place]}") return x[:place][:place] end end
place取自mongo的_id
> db.table.find().limit(1).pretty() { "_id" : ObjectId("5b48cd2382b7d752b802de31"), ...
可以手工通過sqlite的update命令來操作進度;
同步過程日志
D, [2019-06-20T16:21:31.938302 #28968] DEBUG -- : MONGODB | 47.92.149.159:27017 | db.find | STARTED | {"find"=>"table", "filter"=>{"_id"=>{"$gt"=>BSON::ObjectId('5d0b420782b7d74de069db7b')}}, "limit"=>10000} D, [2019-06-20T16:21:31.941658 #28968] DEBUG -- : MONGODB | 47.92.149.159:27017 | db.find | SUCCEEDED | 0.002s
讀place,從place開始取10000條,然后寫place,如此往復
參考:https://github.com/phutchins/logstash-input-mongodb