Flume Spooldir 源的一些問題


來自:http://blog.xlvector.net/2014-01/flume-spooldir-source-problem/

自己寫的插件,數據序列化,格式化拋出的異常都會導致flume停止,不能繼續取數據,異常可以自己處理

最近在用Flume做數據的收集。用到了里面的Spooldir的源在使用中有如下的問題:

  • 如果文件的某一行有亂碼,不符合指定的編碼規范,那么flume會拋出一個exception,然后就停在那兒了。
  • spooldir指定的文件夾中的文件一旦被修改,flume就會拋出一個exception,然后停在那兒了。

其實,flume的最大問題就是不夠魯棒。一旦出現問題,不能跳過,只能死在那兒。不知道flume為什么要這么設計。理論上,它應該允許我們在配置文件中指定在遇到錯誤的行時,是停止還是跳過,不過它目前並不支持這個。所以,我們只能寫一個自己的flume的插件了。

https://github.com/xlvector/flume https://github.com/ponyma/flume

這個插件主要修復了前面提到的兩個問題:

  • 如果某一行有亂碼,flume會忽略這一行
  • flume只會check最近N分鍾沒有修改過的文件

具體修改方法如下。首先,我們繼承了SpoolDirectorySource,實現了一個叫做RobustSpoolDirectorySource的類。這個類的代碼基本是拷貝了SpoolDirectorySource的代碼。但做了如下的修改。

在getNextFile()的函數中,我們發現了一個filter,做了如下的修改

FileFilter filter = new FileFilter() { public boolean accept(File candidate) { String fileName = candidate.getName(); if ((candidate.isDirectory()) || (fileName.endsWith(completedSuffix)) || (fileName.startsWith(".")) || ignorePattern.matcher(fileName).matches() || (System.currentTimeMillis() - candidate.lastModified() < 600000)) { return false; } return true; } };

這里,我們加入了一個條件

(System.currentTimeMillis() - candidate.lastModified() < 600000)

也就是說10分鍾之內修改過的文件我們不會處理。

第二個修改是關於編碼的,你可以在ReliableSpoolingFileEventReader.java的代碼中找到如下的代碼:

ResettableInputStream in = new ResettableFileInputStream(nextFile, tracker, ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset, DecodeErrorPolicy.FAIL);

這里,我們只需要將DecodeErrorPolicy 改成 DecodeErrorPolicy.IGNORE 即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM