由於flume-ng至1.5版本仍舊沒有穩定可用的類似flume-og中的taildir的功能,所以從git中https://github.com/jinoos/flume-ng-extends找了一個別人針對flume-ng實現的的taildir這個按照github上他自己說明,是沒法正常使用的。查看了源碼后,做了一些相應修改
1. 默認的DirectoryTailParserModulable類修改
他實現了2種DirectoryTailParserModulable
第一種是SingleLineParserModule,適用日志里只有單條記錄的。並且代碼中默認就是使用的這個,顯然很不靠譜。提供了配置項,但是說明里沒有寫出來,配置項為 ‘parser’.
第二種是MultiLineParserModule,適用多行的日志文件的。這里我們大部分情況肯定是要用這個的。
DirectoryTailSource類中如下行
private static final String DEFAULT_PARSER_MODULE_CLASS = "com.jinoos.flume.SingleLineParserModule";
修改為。包名根據實際情況來更改
private static final String DEFAULT_PARSER_MODULE_CLASS = "org.apache.flume.source.taildirectory.MultiLineParserModule";
2. first-line-pattern配置
這是MultiLineParserModule中的一個屬性,用來驗證讀進來的行是否為第一行。這個說明中也沒提到
如果沒有配置這個配置,那么就無法正常執行,會報“wrong log format”。
主要代碼如下:
1 private void readMessage(FileSet fileSet) { 2 try { 3 String buffer; 4 5 synchronized (fileSet) { 6 7 while ((buffer = fileSet.readLine()) != null) { 8 if (buffer.length() == 0) { 9 continue; 10 } 11 12 boolean isFirstLine = parserModule.isFirstLine(buffer); 13 if (isFirstLine) { 14 sendEvent(fileSet); 15 fileSet.appendLine(buffer); 16 parserModule.parse(buffer, fileSet); 17 18 } else { 19 if (fileSet.getLineSize() == 0) { 20 logger.debug("Wrong log format, " + buffer); 21 continue; 22 } else { 23 fileSet.appendLine(buffer); 24 parserModule.parse(buffer, fileSet); 25 } 26 } 27 28 if (parserModule.isLastLine(buffer)) { 29 sendEvent(fileSet); 30 } 31 } 32 } 33 } catch (IOException e) { 34 logger.warn(e.getMessage(), e); 35 } 36 }
根據我們的實際需求,我們不需要判斷是否第一行,只要有change事件,全部寫入到channel中即可
修改為如下方式
1 // 파일을 읽고 Event를 생성한다. 2 private void readMessage(FileSet fileSet) { 3 try { 4 String buffer; 5 6 synchronized (fileSet) { 7 8 while ((buffer = fileSet.readLine()) != null) { 9 if (buffer.length() == 0) { 10 continue; 11 } 12 13 fileSet.appendLine(buffer); 14 sendEvent(fileSet); 15 } 16 } 17 } catch (IOException e) { 18 logger.warn(e.getMessage(), e); 19 } 20 }
改為這種方式后,只要來一行就會send到channel中。如果需要批量的,可以按自己要求更改。
現在就不再需要關注first-line-pattern這個配置了。
注意:但是配置在配置文件中還是配的,雖然它沒有起到任何作用。如果想不配置,請修改MultiLineParserModule的configure(Context context)方法
3.監控文件中有中文,編碼的配置添加
目前這個版本是無法支持中文的文件的。
正式讀取數據的方法:位置FileSet類中
public String readLine() throws IOException { return rReader.readLine(); }
這個rReader是個RandomAccessFile對象
public FileSet(AbstractSource source, FileObject fileObject) throws IOException { this.source = source; this.fileObject = fileObject; this.bufferList = new ArrayList<String>(); //File f = new File(fileObject.getName().getPath()); File f = new File("d:/tmp/log_compare/test1.txt"); rReader = new RandomAccessFile(f, "r"); rReader.seek(f.length()); bufferList = new ArrayList<String>(); headers = new HashMap<String, String>(); logger.debug("FileSet has been created " + fileObject.getName().getPath()); this.seq = 0L; }
在FileSet類實例化時創建。
下面開始修改操作,源代碼中是直接使用了RandomAccessFile的readline()方法,修改為按byte讀取的方式
/** * * @Title: readLine * @Description: TODO(讀取文件中的一行) * @param @throws IOException 設定文件 * @return String 返回類型 * @throws */ public String readLine() throws IOException { if(rReader.getFilePointer() < rReader.length()) { byte b = rReader.readByte();//讀取一個byte int i = 0; byte[] buf = new byte[10240];//創建大小為1M的數據,如果你的單行超過1M,那么會出錯 //如果讀到換行符,或者讀到文件最后就停止。表示已經讀完一行 while(b != '\n' && rReader.getFilePointer() < rReader.length()) { buf[i++] = b; b = rReader.readByte(); } return new String(buf,0,i); }else{ return ""; } }
改完后重新打包再次測試,發現已經可以支持中文了。
4.每次新文件剛被創建時會丟失第一條數據
代碼如下
public void run() { while (true) { try { // DirectoryTailEvent event = eventQueue.poll( // eventQueueWorkerTimeoutMiliSecond, // TimeUnit.MILLISECONDS); DirectoryTailEvent event = eventQueue.take(); if (event == null) { continue; } if (event.type == FileEventType.FILE_CHANGED) { fileChanged(event.event); } else if (event.type == FileEventType.FILE_CREATED) { fileCreated(event.event); } else if (event.type == FileEventType.FILE_DELETED) { fileDeleted(event.event); } else if (event.type == FileEventType.FLUSH) { if (event.fileSet != null) sendEvent(event.fileSet); } } catch (InterruptedException e) { logger.debug(e.getMessage(), e); } catch (FileSystemException e) { logger.info(e.getMessage(), e); } } }
上面這段代碼為監測的文件夾有新的事件時的處理。這里我們要看的是FILE_CREATE事件,他調用了fileCreated(event.event);
1 private void fileCreated(FileChangeEvent event) 2 throws FileSystemException { 3 String path = event.getFile().getName().getPath(); 4 String dirPath = event.getFile().getParent().getName().getPath(); 5 6 logger.debug(path + " has been created."); 7 8 DirPattern dirPattern = null; 9 dirPattern = pathMap.get(dirPath); 10 11 if (dirPattern == null) { 12 logger.warn("Occurred create event from un-indexed directory. " 13 + dirPath); 14 return; 15 } 16 17 // 파일명이 대상인지 검사한다. 18 if (!isInFilePattern(event.getFile(), dirPattern.getFilePattern())) { 19 logger.debug(path + " is not in file pattern."); 20 return; 21 } 22 23 FileSet fileSet; 24 25 fileSet = fileSetMap.get(event.getFile().getName().getPath()); 26 //fileSet = fileSetMap.get(path); 27 if (fileSet == null) { 28 try { 29 logger.info(path 30 + " is not in monitoring list. It's going to be listed."); 31 32 fileSet = new FileSet(source, event.getFile()); 33 // a little synchronized bug here.fixed by tqli,2014-08-07 34 // ,E-mail:tiangang1126@126.com 35 synchronized (fileSetMap) { 36 fileSetMap.put(path, fileSet); 37 } 38 } catch (IOException e) { 39 logger.error(e.getMessage(), e); 40 return; 41 } 42 } 43 }
看第27行,當新的文件進來,需要創建一個fileSet對象。將這個fileSet對象存入fileSetMap中
看fileSet實例化的方法,上面已經貼過了
1 public FileSet(AbstractSource source, FileObject fileObject) 2 throws IOException { 3 this.source = source; 4 this.fileObject = fileObject; 5 6 this.bufferList = new ArrayList<String>(); 7 8 File f = new File(fileObject.getName().getPath()); 9 //File f = new File("d:/tmp/log_compare/test1.txt"); 10 rReader = new RandomAccessFile(f, "r"); 11 rReader.seek(f.length()); 12 bufferList = new ArrayList<String>(); 13 headers = new HashMap<String, String>(); 14 logger.debug("FileSet has been created " + fileObject.getName().getPath()); 15 logger.debug("file length now is : " + f.length()); 16 this.seq = 0L; 17 }
注意看第11行,將游標移到到f.length的位置,這樣的問題就是跟着文件新建時寫入的內容,全部被忽略了。這樣就造成了數據丟失
那怎么解決這個問題呢,簡單的改為
rReader.seek(0);
肯定是不行的,具體的原因,大家自己思考下吧。
我們目的的就是在有監控新的事件時,創建的fileSet,游標位置能在文件原來的的位置。
需求明確了,下面就知道該做哪些事了。
1 首先在DirectoryTailSource中start方法執行時,將配置監控文件下符合正則條件文件的length都保存在一個Map里
2 在監聽到新事件新建fileSet時,判斷這個文件是新建的還是之前就存在的,如果是之前就存在的,那么就可以直接取之前記下的這個文件的大小。如果不存在,說明這個文件是個新文件,則從0位置開始讀
注意:這個不支持文件更改的情況,只能適應只對文件做增加的場景
下面是代碼修改的部分
DirectoryTailSource類
添加 fileInitLengthMap 屬性
1 private Map<String, DirPattern> dirMap; 2 private Map<String, DirPattern> pathMap; 3 private Map<String,Long> fileInitLengthMap;//文件初始大小記錄,用來定位新建fileSet時的游標初始位置
在configure方法中實例化fileInitLengthMap
public void configure(Context context) { logger.info("Source Configuring.."); dirMap = new HashMap<String, DirPattern>(); pathMap = new HashMap<String, DirPattern>(); fileInitLengthMap = new HashMap<String,Long>();
在start方法中初始化fileInitLengthMap。保存全部符合正則條件的文件大小。紅色部分為添加的代碼
1 public void start() { 2 logger.info("Source Starting.."); 3 4 if (sourceCounter == null) { 5 sourceCounter = new SourceCounter(getName()); 6 } 7 8 fileSetMap = new Hashtable<String, FileSet>(); 9 10 try { 11 fsManager = VFS.getManager(); 12 } catch (FileSystemException e) { 13 logger.error(e.getMessage(), e); 14 return; 15 } 16 17 monitorRunnable = new MonitorRunnable(); 18 19 fileMonitor = new DefaultFileMonitor(monitorRunnable); 20 fileMonitor.setRecursive(false); 21 22 FileObject fileObject; 23 24 logger.debug("Dirlist count " + dirMap.size()); 25 for (Entry<String, DirPattern> entry : dirMap.entrySet()) { 26 logger.debug("Scan dir " + entry.getKey()); 27 28 DirPattern dirPattern = entry.getValue(); 29 30 try { 31 fileObject = fsManager.resolveFile(dirPattern.getPath()); 32 } catch (FileSystemException e) { 33 logger.error(e.getMessage(), e); 34 continue; 35 } 36 37 try { 38 if (!fileObject.isReadable()) { 39 logger.warn("No have readable permission, " 40 + fileObject.getURL()); 41 continue; 42 } 43 44 if (FileType.FOLDER != fileObject.getType()) { 45 logger.warn("Not a directory, " + fileObject.getURL()); 46 continue; 47 } 48 49 // 폴더를 Monitoring 대상에 추가한다. 50 fileMonitor.addFile(fileObject); 51 logger.debug(fileObject.getName().getPath() 52 + " directory has been add in monitoring list"); 53 pathMap.put(fileObject.getName().getPath(), entry.getValue()); 54 //pathMap.put("d:/tmp/log_compare", entry.getValue()); 55 //新增部分,文件初始化大小保存 56 FileObject[] allChiledfile = fileObject.getChildren(); 57 for(FileObject chiledFileobject : allChiledfile) { 58 if(dirPattern.getFilePattern().matcher(chiledFileobject.getName().getBaseName()).find()) { 59 String chiledFildPath = chiledFileobject.getName().getPath(); 60 //String chiledFildPath = "d:/tmp/log_compare/test1.txt"; 61 File chiledfile = new File(chiledFildPath); 62 fileInitLengthMap.put(chiledFildPath, 63 chiledfile.length()); 64 logger.debug(chiledFildPath + " init length is :" + chiledfile.length()); 65 } 66 } 67 } catch (FileSystemException e) { 68 logger.warn(e.getMessage(), e); 69 continue; 70 } catch (Exception e) { 71 logger.debug(e.getMessage(), e); 72 } 73 74 } 75 76 executorService = Executors 77 .newFixedThreadPool(eventQueueWorkerSize + 1); 78 monitorFuture = executorService.submit(monitorRunnable); 79 80 for (int i = 0; i < eventQueueWorkerSize; i++) { 81 workerFuture[i] = executorService.submit(new WorkerRunnable(this)); 82 } 83 84 sourceCounter.start(); 85 super.start(); 86 }
FileSet類
1 public FileSet(AbstractSource source, FileObject fileObject,Map<String,Long> fileInitLengthMap) 2 throws IOException { 3 this.source = source; 4 this.fileObject = fileObject; 5 6 this.bufferList = new ArrayList<String>(); 7 8 File f = new File(fileObject.getName().getPath()); 9 rReader = new RandomAccessFile(f, "r"); 10 /* 11 *判斷在初始化taildirSource時,這個文件是否存在,如果存在則游標定位當時記錄下的文件長度開始 12 *如果不存在,則說明這是一個新建的文件,游標從0開始 13 */ 14 if(fileInitLengthMap.containsKey(fileObject.getName().getPath())) { 15 rReader.seek(fileInitLengthMap.get(fileObject.getName().getPath())); 16 }else{ 17 rReader.seek(0); 18 } 19 20 bufferList = new ArrayList<String>(); 21 headers = new HashMap<String, String>(); 22 logger.debug("FileSet has been created " + fileObject.getName().getPath()); 23 logger.debug("file length now is : " + f.length()); 24 this.seq = 0L; 25 }
修改類實例化的方法。並修改DirectoryTailSource類中調用FileSet實例化方法的地方。
至此修改全部全部完成。
沒找到能上傳附件的地方,改完的jar包就不提供了。
此為一個使用這個jar的例子
a.sources = sources a.sinks = sinks a.channels = c #configure sources a.sources.sources.type = org.apache.flume.source.taildirectory.DirectoryTailSource a.sources.sources.dirs = s0 #a.sources.sources.dirs.s0.path = /usr/local/nginx/logs/ a.sources.sources.dirs.s0.path = /home/flume/testTailDir a.sources.sources.dirs.s0.file-pattern = ^access_.*log$ a.sources.sources.first-line-pattern = ^(.*)$ #congfigure sinks a.sinks.sinks.type = file_roll a.sinks.sinks.sink.directory = /home/flume/testTailDir2 a.sinks.sinks.sink.rollInterval = 30 a.sinks.sinks.channel = c #configure channals a.channels.c.type = memory #bind channel a.sources.sources.channels = c
