Flume-ng-1.4.0 spooling source的方式增加了對目錄的遞歸檢測的支持



 因為flume的spooldir不支持子目錄文件的遞歸檢測,並且業務需要,所以修改了源碼,重新編譯


代碼修改參考自:http://blog.csdn.net/yangbutao/article/details/8835563

不過在1.4中已經不是修改SpoolingFileLineReader類了,而是apache-flume-1.4.0-src\flume-ng-core\src\main\java\org\apache\flume\client\avro\ReliableSpoolingFileEventReader.java

並且變量directory應該改為spoolDirectory


 1     /*
 2      * @author admln
 3      * 
 4      * @date 2015年4月8日 上午9:37:20
 5      */
 6     private void listDirFiles(List<File> files, File dir, FileFilter filter) {
 7         File[] childs = dir.listFiles(filter);
 8         for (int i = 0; i < childs.length; i++) {
 9             if (childs[i].isFile()) {
10                 files.add(childs[i]);
11             } else {
12                 if (childs[i].isDirectory()) {
13                     listDirFiles(files, childs[i], filter);
14                 }
15             }
16         }
17     }
18 
19     /**
20      * Find and open the oldest file in the chosen directory. If two or more
21      * files are equally old, the file name with lower lexicographical value is
22      * returned. If the directory is empty, this will return an absent option.
23      */
24     private Optional<FileInfo> getNextFile() {
25         /* Filter to exclude finished or hidden files */
26         FileFilter filter = new FileFilter() {
27          public boolean accept(File pathName) {
28           if ((pathName.getName().endsWith(completedSuffix))
29             || (pathName.getName().startsWith("."))) {
30             return false;
31           }
32           return true;
33         }
34      };
35      // List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter));
36      List<File> candidateFiles = new ArrayList<File>();
37      listDirFiles(candidateFiles, spoolDirectory, filter);

 

很多沒必要的版本就不改,如果少包或者版本不對,即使編譯通過了,使用的時候也會報java.lang.Error: Unresolved compilation problem,就要修改重新編譯


重新編譯的時候可以參考:http://www.iteblog.com/archives/1032
編譯命令:

mvn install -Phadoop-2 -DskipTests -Dtar

會下很多各種jar包,很浪費時間


已經編譯好的:http://pan.baidu.com/s/1eQxUDxC    5x9l


還有個比較簡單的一點的辦法,因為改的flume-ng-core里面的代碼,而flume-ng-core是flume-ng的一個子項目,所以直接在flume-ng-core里面執行單獨編譯,得到target/flume-ng-core-1.4.0.jar,然后替換現有flume-bin/lib/里面的flume-ng-core-1.4.0.jar,也可以實現想要的功能。這個編譯基本上不會有什么問題。

現成的:http://pan.baidu.com/s/1CVR3K  989v


在1.5.0中的doc說已經支持子目錄的遞歸讀取了但是只有patch,沒集成到代碼中,也沒有kafka sink,所以個人覺得干脆改CDH的flume算了,加上遞歸,本身有kafka sink

http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.5.0-cdh5.2.5.tar.gz


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM