來自:http://blog.xlvector.net/2014-01/flume-spooldir-source-problem/
(
自己寫的插件,數據序列化,格式化拋出的異常都會導致flume停止,不能繼續取數據,異常可以自己處理
)
最近在用Flume做數據的收集。用到了里面的Spooldir的源在使用中有如下的問題:
- 如果文件的某一行有亂碼,不符合指定的編碼規范,那么flume會拋出一個exception,然后就停在那兒了。
- spooldir指定的文件夾中的文件一旦被修改,flume就會拋出一個exception,然后停在那兒了。
其實,flume的最大問題就是不夠魯棒。一旦出現問題,不能跳過,只能死在那兒。不知道flume為什么要這么設計。理論上,它應該允許我們在配置文件中指定在遇到錯誤的行時,是停止還是跳過,不過它目前並不支持這個。所以,我們只能寫一個自己的flume的插件了。
https://github.com/xlvector/flume https://github.com/ponyma/flume
這個插件主要修復了前面提到的兩個問題:
- 如果某一行有亂碼,flume會忽略這一行
- flume只會check最近N分鍾沒有修改過的文件
具體修改方法如下。首先,我們繼承了SpoolDirectorySource,實現了一個叫做RobustSpoolDirectorySource的類。這個類的代碼基本是拷貝了SpoolDirectorySource的代碼。但做了如下的修改。
在getNextFile()的函數中,我們發現了一個filter,做了如下的修改
FileFilter filter = new FileFilter() { public boolean accept(File candidate) { String fileName = candidate.getName(); if ((candidate.isDirectory()) || (fileName.endsWith(completedSuffix)) || (fileName.startsWith(".")) || ignorePattern.matcher(fileName).matches() || (System.currentTimeMillis() - candidate.lastModified() < 600000)) { return false; } return true; } };
這里,我們加入了一個條件
(System.currentTimeMillis() - candidate.lastModified() < 600000)
也就是說10分鍾之內修改過的文件我們不會處理。
第二個修改是關於編碼的,你可以在ReliableSpoolingFileEventReader.java的代碼中找到如下的代碼:
ResettableInputStream in = new ResettableFileInputStream(nextFile, tracker, ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset, DecodeErrorPolicy.FAIL);
這里,我們只需要將DecodeErrorPolicy 改成 DecodeErrorPolicy.IGNORE 即可。
