flume-ng-taildirectory-source 修改調試可用


由於flume-ng至1.5版本仍舊沒有穩定可用的類似flume-og中的taildir的功能,所以從git中https://github.com/jinoos/flume-ng-extends找了一個別人針對flume-ng實現的的taildir這個按照github上他自己說明,是沒法正常使用的。查看了源碼后,做了一些相應修改

1. 默認的DirectoryTailParserModulable類修改

他實現了2種DirectoryTailParserModulable

  第一種是SingleLineParserModule,適用日志里只有單條記錄的。並且代碼中默認就是使用的這個,顯然很不靠譜。提供了配置項,但是說明里沒有寫出來,配置項為 ‘parser’.

   第二種是MultiLineParserModule,適用多行的日志文件的。這里我們大部分情況肯定是要用這個的。

DirectoryTailSource類中如下行

  private static final String DEFAULT_PARSER_MODULE_CLASS = "com.jinoos.flume.SingleLineParserModule";

修改為。包名根據實際情況來更改

  private static final String DEFAULT_PARSER_MODULE_CLASS = "org.apache.flume.source.taildirectory.MultiLineParserModule";

2. first-line-pattern配置

這是MultiLineParserModule中的一個屬性,用來驗證讀進來的行是否為第一行。這個說明中也沒提到

如果沒有配置這個配置,那么就無法正常執行,會報“wrong log format”。

主要代碼如下:

 1         private void readMessage(FileSet fileSet) {
 2             try {
 3                 String buffer;
 4 
 5                 synchronized (fileSet) {
 6 
 7                     while ((buffer = fileSet.readLine()) != null) {
 8                         if (buffer.length() == 0) {
 9                             continue;
10                         }
11 
12                         boolean isFirstLine = parserModule.isFirstLine(buffer);
13                         if (isFirstLine) {
14                             sendEvent(fileSet);
15                             fileSet.appendLine(buffer);
16                             parserModule.parse(buffer, fileSet);
17 
18                         } else {
19                             if (fileSet.getLineSize() == 0) {
20                                 logger.debug("Wrong log format, " + buffer);
21                                 continue;
22                             } else {
23                                 fileSet.appendLine(buffer);
24                                 parserModule.parse(buffer, fileSet);
25                             }
26                         }
27 
28                         if (parserModule.isLastLine(buffer)) {
29                             sendEvent(fileSet);
30                         }
31                     }
32                 }
33             } catch (IOException e) {
34                 logger.warn(e.getMessage(), e);
35             }
36         }

根據我們的實際需求,我們不需要判斷是否第一行,只要有change事件,全部寫入到channel中即可

修改為如下方式

 1         // 파일을 읽고 Event를 생성한다.
 2         private void readMessage(FileSet fileSet) {
 3             try {
 4                 String buffer;
 5 
 6                 synchronized (fileSet) {
 7 
 8                     while ((buffer = fileSet.readLine()) != null) {
 9                         if (buffer.length() == 0) {
10                             continue;
11                         }
12 
13                         fileSet.appendLine(buffer);
14                         sendEvent(fileSet);
15                     }
16                 }
17             } catch (IOException e) {
18                 logger.warn(e.getMessage(), e);
19             }
20         }

改為這種方式后,只要來一行就會send到channel中。如果需要批量的,可以按自己要求更改。

現在就不再需要關注first-line-pattern這個配置了。

注意:但是配置在配置文件中還是配的,雖然它沒有起到任何作用。如果想不配置,請修改MultiLineParserModule的configure(Context context)方法

 

3.監控文件中有中文,編碼的配置添加

 目前這個版本是無法支持中文的文件的。

正式讀取數據的方法:位置FileSet類中

  public String readLine() throws IOException {
    return rReader.readLine();
  }

這個rReader是個RandomAccessFile對象

  public FileSet(AbstractSource source, FileObject fileObject)
      throws IOException {
    this.source = source;
    this.fileObject = fileObject;

    this.bufferList = new ArrayList<String>();

    //File f = new File(fileObject.getName().getPath());
    File f = new File("d:/tmp/log_compare/test1.txt");

    rReader = new RandomAccessFile(f, "r"); rReader.seek(f.length());

    bufferList = new ArrayList<String>();
    headers = new HashMap<String, String>();
    logger.debug("FileSet has been created " + fileObject.getName().getPath());
    this.seq = 0L;
  }

在FileSet類實例化時創建。

下面開始修改操作,源代碼中是直接使用了RandomAccessFile的readline()方法,修改為按byte讀取的方式

  /**
   * 
   * @Title: readLine
   * @Description: TODO(讀取文件中的一行)
   * @param @throws IOException    設定文件
   * @return String    返回類型
   * @throws
   */
  public String readLine() throws IOException {
    if(rReader.getFilePointer() < rReader.length()) {
        byte b = rReader.readByte();//讀取一個byte
        int i = 0;
        byte[] buf = new byte[10240];//創建大小為1M的數據,如果你的單行超過1M,那么會出錯
        //如果讀到換行符,或者讀到文件最后就停止。表示已經讀完一行
        while(b != '\n' && rReader.getFilePointer() < rReader.length()) {
            buf[i++] = b;
            b = rReader.readByte();
            
        }
        return new String(buf,0,i);
    }else{
        return "";
    }
  }

改完后重新打包再次測試,發現已經可以支持中文了。

4.每次新文件剛被創建時會丟失第一條數據

代碼如下

        public void run() {
            while (true) {
                try {
                    // DirectoryTailEvent event = eventQueue.poll(
                    // eventQueueWorkerTimeoutMiliSecond,
                    // TimeUnit.MILLISECONDS);
                    DirectoryTailEvent event = eventQueue.take();

                    if (event == null) {
                        continue;
                    }

                    if (event.type == FileEventType.FILE_CHANGED) {
                        fileChanged(event.event);
     } else if (event.type == FileEventType.FILE_CREATED) { fileCreated(event.event);
                    } else if (event.type == FileEventType.FILE_DELETED) {
                        fileDeleted(event.event);
                    } else if (event.type == FileEventType.FLUSH) {
                        if (event.fileSet != null)
                            sendEvent(event.fileSet);
                    }
                } catch (InterruptedException e) {
                    logger.debug(e.getMessage(), e);
                } catch (FileSystemException e) {
                    logger.info(e.getMessage(), e);
                }
            }
        }

上面這段代碼為監測的文件夾有新的事件時的處理。這里我們要看的是FILE_CREATE事件,他調用了fileCreated(event.event);

 1     private void fileCreated(FileChangeEvent event)
 2                 throws FileSystemException {
 3             String path = event.getFile().getName().getPath();
 4             String dirPath = event.getFile().getParent().getName().getPath();
 5 
 6             logger.debug(path + " has been created.");
 7 
 8             DirPattern dirPattern = null;
 9             dirPattern = pathMap.get(dirPath);
10 
11             if (dirPattern == null) {
12                 logger.warn("Occurred create event from un-indexed directory. "
13                         + dirPath);
14                 return;
15             }
16 
17             // 파일명이 대상인지 검사한다.
18             if (!isInFilePattern(event.getFile(), dirPattern.getFilePattern())) {
19                 logger.debug(path + " is not in file pattern.");
20                 return;
21             }
22 
23             FileSet fileSet;
24 
25             fileSet = fileSetMap.get(event.getFile().getName().getPath());
26             //fileSet = fileSetMap.get(path);
27             if (fileSet == null) {
28                 try {
29                     logger.info(path
30                             + " is not in monitoring list. It's going to be listed.");
31                     
32                     fileSet = new FileSet(source, event.getFile());
33                     // a little synchronized bug here.fixed by tqli,2014-08-07
34                     // ,E-mail:tiangang1126@126.com
35                     synchronized (fileSetMap) {
36                         fileSetMap.put(path, fileSet);
37                     }
38                 } catch (IOException e) {
39                     logger.error(e.getMessage(), e);
40                     return;
41                 }
42             }
43         }

看第27行,當新的文件進來,需要創建一個fileSet對象。將這個fileSet對象存入fileSetMap中

看fileSet實例化的方法,上面已經貼過了

 1   public FileSet(AbstractSource source, FileObject fileObject)
 2       throws IOException {
 3     this.source = source;
 4     this.fileObject = fileObject;
 5 
 6     this.bufferList = new ArrayList<String>();
 7 
 8     File f = new File(fileObject.getName().getPath());
 9     //File f = new File("d:/tmp/log_compare/test1.txt");
10     rReader = new RandomAccessFile(f, "r");
11     rReader.seek(f.length());
12     bufferList = new ArrayList<String>();
13     headers = new HashMap<String, String>();
14     logger.debug("FileSet has been created " + fileObject.getName().getPath());
15     logger.debug("file length now is : " + f.length());
16     this.seq = 0L;
17   }

注意看第11行,將游標移到到f.length的位置,這樣的問題就是跟着文件新建時寫入的內容,全部被忽略了。這樣就造成了數據丟失

那怎么解決這個問題呢,簡單的改為

rReader.seek(0);
肯定是不行的,具體的原因,大家自己思考下吧。

我們目的的就是在有監控新的事件時,創建的fileSet,游標位置能在文件原來的的位置。
需求明確了,下面就知道該做哪些事了。
1 首先在DirectoryTailSource中start方法執行時,將配置監控文件下符合正則條件文件的length都保存在一個Map里
2 在監聽到新事件新建fileSet時,判斷這個文件是新建的還是之前就存在的,如果是之前就存在的,那么就可以直接取之前記下的這個文件的大小。如果不存在,說明這個文件是個新文件,則從0位置開始讀
注意:這個不支持文件更改的情況,只能適應只對文件做增加的場景
下面是代碼修改的部分

DirectoryTailSource類
添加 fileInitLengthMap 屬性
1     private Map<String, DirPattern> dirMap;
2     private Map<String, DirPattern> pathMap;
3     private Map<String,Long> fileInitLengthMap;//文件初始大小記錄,用來定位新建fileSet時的游標初始位置
在configure方法中實例化fileInitLengthMap
    public void configure(Context context) {
        logger.info("Source Configuring..");

        dirMap = new HashMap<String, DirPattern>();
        pathMap = new HashMap<String, DirPattern>();
        fileInitLengthMap = new HashMap<String,Long>();

在start方法中初始化fileInitLengthMap。保存全部符合正則條件的文件大小。紅色部分為添加的代碼

 1     public void start() {
 2         logger.info("Source Starting..");
 3 
 4         if (sourceCounter == null) {
 5             sourceCounter = new SourceCounter(getName());
 6         }
 7 
 8         fileSetMap = new Hashtable<String, FileSet>();
 9 
10         try {
11             fsManager = VFS.getManager();
12         } catch (FileSystemException e) {
13             logger.error(e.getMessage(), e);
14             return;
15         }
16 
17         monitorRunnable = new MonitorRunnable();
18 
19         fileMonitor = new DefaultFileMonitor(monitorRunnable);
20         fileMonitor.setRecursive(false);
21 
22         FileObject fileObject;
23 
24         logger.debug("Dirlist count " + dirMap.size());
25         for (Entry<String, DirPattern> entry : dirMap.entrySet()) {
26             logger.debug("Scan dir " + entry.getKey());
27 
28             DirPattern dirPattern = entry.getValue();
29 
30             try {
31                 fileObject = fsManager.resolveFile(dirPattern.getPath());
32             } catch (FileSystemException e) {
33                 logger.error(e.getMessage(), e);
34                 continue;
35             }
36 
37             try {
38                 if (!fileObject.isReadable()) {
39                     logger.warn("No have readable permission, "
40                             + fileObject.getURL());
41                     continue;
42                 }
43 
44                 if (FileType.FOLDER != fileObject.getType()) {
45                     logger.warn("Not a directory, " + fileObject.getURL());
46                     continue;
47                 }
48 
49                 // 폴더를 Monitoring 대상에 추가한다.
50                 fileMonitor.addFile(fileObject);
51                 logger.debug(fileObject.getName().getPath()
52                         + " directory has been add in monitoring list");
53                 pathMap.put(fileObject.getName().getPath(), entry.getValue());
54                 //pathMap.put("d:/tmp/log_compare", entry.getValue());
55                 //新增部分,文件初始化大小保存
56                 FileObject[] allChiledfile = fileObject.getChildren();
57                 for(FileObject chiledFileobject : allChiledfile) {
58                     if(dirPattern.getFilePattern().matcher(chiledFileobject.getName().getBaseName()).find()) {
59                         String chiledFildPath = chiledFileobject.getName().getPath();
60                         //String chiledFildPath = "d:/tmp/log_compare/test1.txt";
61                         File chiledfile = new File(chiledFildPath);
62                         fileInitLengthMap.put(chiledFildPath, 
63                                 chiledfile.length());
64                         logger.debug(chiledFildPath + " init length is :" + chiledfile.length());
65                     }
66                 }
67             } catch (FileSystemException e) {
68                 logger.warn(e.getMessage(), e);
69                 continue;
70             } catch (Exception e) {
71                 logger.debug(e.getMessage(), e);
72             }
73 
74         }
75 
76         executorService = Executors
77                 .newFixedThreadPool(eventQueueWorkerSize + 1);
78         monitorFuture = executorService.submit(monitorRunnable);
79 
80         for (int i = 0; i < eventQueueWorkerSize; i++) {
81             workerFuture[i] = executorService.submit(new WorkerRunnable(this));
82         }
83 
84         sourceCounter.start();
85         super.start();
86     }

FileSet類

 

 1   public FileSet(AbstractSource source, FileObject fileObject,Map<String,Long> fileInitLengthMap)
 2       throws IOException {
 3     this.source = source;
 4     this.fileObject = fileObject;
 5 
 6     this.bufferList = new ArrayList<String>();
 7 
 8     File f = new File(fileObject.getName().getPath());
 9     rReader = new RandomAccessFile(f, "r");
10     /*
11      *判斷在初始化taildirSource時,這個文件是否存在,如果存在則游標定位當時記錄下的文件長度開始
12      *如果不存在,則說明這是一個新建的文件,游標從0開始
13      */
14     if(fileInitLengthMap.containsKey(fileObject.getName().getPath())) {
15         rReader.seek(fileInitLengthMap.get(fileObject.getName().getPath()));
16     }else{
17         rReader.seek(0);
18     }
19     
20     bufferList = new ArrayList<String>();
21     headers = new HashMap<String, String>();
22     logger.debug("FileSet has been created " + fileObject.getName().getPath());
23     logger.debug("file length now is : " + f.length());
24     this.seq = 0L;
25   }

 

修改類實例化的方法。並修改DirectoryTailSource類中調用FileSet實例化方法的地方。

至此修改全部全部完成。

 

沒找到能上傳附件的地方,改完的jar包就不提供了。

此為一個使用這個jar的例子

a.sources = sources
a.sinks = sinks
a.channels = c

#configure sources
a.sources.sources.type = org.apache.flume.source.taildirectory.DirectoryTailSource
a.sources.sources.dirs = s0
#a.sources.sources.dirs.s0.path = /usr/local/nginx/logs/
a.sources.sources.dirs.s0.path = /home/flume/testTailDir
a.sources.sources.dirs.s0.file-pattern = ^access_.*log$
a.sources.sources.first-line-pattern = ^(.*)$
#congfigure sinks
a.sinks.sinks.type = file_roll
a.sinks.sinks.sink.directory = /home/flume/testTailDir2
a.sinks.sinks.sink.rollInterval = 30
a.sinks.sinks.channel = c
#configure channals
a.channels.c.type = memory
#bind channel
a.sources.sources.channels = c

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM