以下內容都為自己淺顯的理解,用作備忘的流水賬,所以寫的比較混亂。如理解有錯誤,請幫忙指正
FLUME-NG中沒有之前的對文件的實時流SOURCE,只提供了spoolDir的source,這個source的功能監控指定文件夾,放入文件夾內的文件不能再做任何修改(包括修改時間和文件大小),這2個錯誤正是對應這2個
在代碼中體現為
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile()方法內
1 File fileToRoll = new File(currentFile.get().getFile().getAbsolutePath()); 2 3 currentFile.get().getDeserializer().close(); 4 5 // Verify that spooling assumptions hold 6 if (fileToRoll.lastModified() != currentFile.get().getLastModified()) { 7 String message = "File has been modified since being read: " + fileToRoll +"\n" 8 + "fileToRoll.lastModified() : " + fileToRoll.lastModified() + 9 "currentFile.get().getLastModified() : " + currentFile.get().getLastModified(); 10 throw new IllegalStateException(message); 11 } 12 if (fileToRoll.length() != currentFile.get().getLength()) { 13 String message = "File has changed size since being read: " + fileToRoll +"\n" 14 + "fileToRoll.length() : " + fileToRoll.length() + 15 "currentFile.get().getLength() : " + currentFile.get().getLength(); 16 throw new IllegalStateException(message); 17 }
但是問題是我們確實沒有對文件做出過任何修改啊,會什么還是會報這個錯誤。查看了代碼后,發現他的這個線程頻率為500ms,當我們拷貝一個大些的文件的時候,500ms還沒有拷貝完成,所以就會出現這樣的錯誤。當然flume被設計成500MS,是因為默認大家都是傳很小的文件,每幾分鍾或者每幾秒就做寫一個日志文件,就不會存在這樣的問題。
org.apache.flume.source.SpoolDirectorySource
//這個檢驗的太快,當文件比較大的時候,需要拷貝的時間比較超過500毫秒,就會報文件更改或者文件大小的變化,改為5000 private static final int POLL_DELAY_MS = 15000;
默認為500MS,我給改成15000ms。
那么問題既然出來了,我想就把這個值調大點吧,調成15秒總行了吧。但是經過測試后發現,還是不可以。
那這又是為什么呢?原因很簡單,即使我們把這個值調成1萬秒,當一個大點的文件正好在第9999秒的時候往里面拷貝了,一秒后線程啟動了發現一個新文件並記下了這時候的這個文件的修改時間與大小,但是因為文件並沒有拷貝完成,所以這2個值都是錯誤。當剛才的那部分讀完后,程序中檢查文件的修改時間與大小看看是否被改變,剛才那段時間文件又被copy了一些進來,此時就會報上面的錯誤了。
知道了根本的原因,就像從根本上解決這個問題,最好的方式就是我們等文件完全拷貝完成,我們再開始讀這個文件。那找到代碼中獲取要讀文件的這部分
1 /** 2 * Find and open the oldest file in the chosen directory. If two or more 3 * files are equally old, the file name with lower lexicographical value is 4 * returned. If the directory is empty, this will return an absent option. 5 */ 6 private Optional<FileInfo> getNextFile() { 7 /* Filter to exclude finished or hidden files */ 8 FileFilter filter = new FileFilter() { 9 public boolean accept(File candidate) { 10 String fileName = candidate.getName(); 11 if ((candidate.isDirectory()) || 12 (fileName.endsWith(completedSuffix)) || 13 (fileName.startsWith(".")) || 14 ignorePattern.matcher(fileName).matches()) { 15 return false; 16 } 17 return true; 18 } 19 }; 20 List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter)); //獲取spoolDirectory下滿足條件的文件 21 if (candidateFiles.isEmpty()) { 22 return Optional.absent(); 23 } else { 24 Collections.sort(candidateFiles, new Comparator<File>() { //按最后修改時間排序文件 25 public int compare(File a, File b) { 26 int timeComparison = new Long(a.lastModified()).compareTo( 27 new Long(b.lastModified())); 28 if (timeComparison != 0) { 29 return timeComparison; 30 } 31 else { 32 return a.getName().compareTo(b.getName()); 33 } 34 } 35 }); 36 File nextFile = candidateFiles.get(0); //因為每次獲取到的文件處理完都會被標記為已完成,所以直接取拍完序的第一個 37 //修復傳輸大文件報錯文件被修改的BUG 38 this.checkFileCpIsOver(nextFile);//此處被阻塞,直到文件拷貝文件或者超過20秒 39 40 try { 41 // roll the meta file, if needed 42 String nextPath = nextFile.getPath(); 43 PositionTracker tracker = 44 DurablePositionTracker.getInstance(metaFile, nextPath); 45 if (!tracker.getTarget().equals(nextPath)) { 46 tracker.close(); 47 deleteMetaFile(); 48 tracker = DurablePositionTracker.getInstance(metaFile, nextPath); 49 } 50 51 // sanity check 52 Preconditions.checkState(tracker.getTarget().equals(nextPath), 53 "Tracker target %s does not equal expected filename %s", 54 tracker.getTarget(), nextPath); 55 56 ResettableInputStream in = 57 new ResettableFileInputStream(nextFile, tracker, 58 ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset, 59 decodeErrorPolicy); 60 EventDeserializer deserializer = EventDeserializerFactory.getInstance 61 (deserializerType, deserializerContext, in); 62 63 return Optional.of(new FileInfo(nextFile, deserializer)); 64 } catch (FileNotFoundException e) { 65 // File could have been deleted in the interim 66 logger.warn("Could not find file: " + nextFile, e); 67 return Optional.absent(); 68 } catch (IOException e) { 69 logger.error("Exception opening file: " + nextFile, e); 70 return Optional.absent(); 71 } 72 } 73 }
在方法的第36行是獲取准備要讀的文件的部分,之前就是直接拿到文件不會檢查文件是否拷貝完成,第38行為自己添加的方法
方法如下:
1 /** 2 * 3 * @Title: checkFileCpIsOver 4 * @Description: TODO(用來檢查文件拷貝是否完成) 5 * @param @param currentFile 設定文件 6 * @return void 返回類型 7 * @throws 8 */ 9 private void checkFileCpIsOver(File file) { 10 long modified = file.lastModified();//目前文件的修改時間 11 long length = file.length();//目前文件的大小 12 try { 13 Thread.sleep(1000);//等待1秒鍾 14 } catch (InterruptedException e) { 15 // TODO Auto-generated catch block 16 e.printStackTrace(); 17 } 18 File currentFile = new File(file.getAbsolutePath()); 19 int count = 0;//記錄循環次數,超過20次,也就是10秒后拋出異常 20 while(currentFile.lastModified() != modified || currentFile.length() != length) { 21 if(count > 20) { 22 String message = "File Copy time too long. please check copy whether exception!" + "\n" 23 + "File at :" + file.getAbsolutePath() + "\n" 24 + "File current length is:" + currentFile.lastModified(); 25 new IllegalStateException(message); 26 } 27 count++; 28 modified = currentFile.lastModified(); 29 length = currentFile.length(); 30 try { 31 Thread.sleep(500);//等待500毫秒 32 } catch (InterruptedException e) { 33 // TODO Auto-generated catch block 34 e.printStackTrace(); 35 } 36 currentFile = new File(file.getAbsolutePath()); 37 38 39 } 40 //一直到文件傳輸完成就可以退出 41 }
修復完成,將修改涉及到的部分重新打包,替換線上JAR包,再次驗證。