HDFSEventSink是flume中一個很重要的sink,配置文件中type=hdfs。與此sink相關的類都在org.apache.flume.sink.hdfs包中。
HDFSEventSink算是一個比較復雜的sink,包下涉及的源代碼文件數多達13個。。。可配置的參數眾多。。。希望我能講清楚。
一、首先依然是看configure(Context context)方法,用來獲取配置文件中的配置信息,及初始化一些重要參數
1 public void configure(Context context) {
2 this.context = context; 3 //HDFS目錄路徑,必需(eg hdfs://namenode/flume/webdata/) 4 filePath = Preconditions.checkNotNull( 5 context.getString("hdfs.path"), "hdfs.path is required"); 6 //在Hdfs目錄中生成的文件名字的前綴 7 fileName = context.getString("hdfs.filePrefix", defaultFileName); 8 //文件后綴,例如.avro,一般不用 9 this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix); 10 //內部寫文件的時候表示正在寫的文件的前綴和后綴 11 inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix); 12 inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);//默認是.tmp 13 String tzName = context.getString("hdfs.timeZone"); 14 timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName); 15 //當前寫入的文件滾動間隔,默認30秒生成一個新的文件,0表示不基於時間間隔來滾動 16 rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval); 17 //以文件大小觸發文件滾動,單位字節,0表示不基於文件大小間隔來滾動 18 rollSize = context.getLong("hdfs.rollSize", defaultRollSize); 19 //以寫入的事件數觸發文件滾動, 0表示不基於事件數大小間隔來滾動 20 rollCount = context.getLong("hdfs.rollCount", defaultRollCount); 21 //事件刷新到HDFS之前的數量 22 batchSize = context.getLong("hdfs.batchSize", defaultBatchSize); 23 //控制文件打開時間,單位:s 24 idleTimeout = context.getInteger("hdfs.idleTimeout", 0); 25 //壓縮編碼類型. one of following : gzip, bzip2, lzo, snappy 26 String codecName = context.getString("hdfs.codeC"); 27 //文件格式:當前為SequenceFile, DataStream or CompressedStream。 28 //(1)DataStream不壓縮輸出文件,不能設置codeC選項,(2)CompressedStream需要設置hdfs.codeC的一個可用的編解碼器 29 fileType = context.getString("hdfs.fileType", defaultFileType); 30 //允許打開的文件數。如果超過這個數字,最早的文件被關閉。 31 maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles); 32 //HDFS的操作允許的毫秒數,如打開,寫,刷新,關閉。這個數字應該增加,如果正在發生許多HDFS超時操作。 33 callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout); 34 //每個HDFS sink用於HDFS io操作的線程數,如打開、寫入等操作。 35 threadsPoolSize = context.getInteger("hdfs.threadsPoolSize", 36 defaultThreadPoolSize); 37 //每個HDFS sink用於調度定時文件滾動的線程數 38 rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize", 39 defaultRollTimerPoolSize); 40 //安全認證時使用Kerberos user principal for accessing secure HDFS 41 kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", ""); 42 //安全認證時使用Kerberos keytab for accessing secure HDFS 43 kerbKeytab = context.getString("hdfs.kerberosKeytab", ""); 44 proxyUserName = context.getString("hdfs.proxyUser", ""); //代理用戶 45 46 Preconditions.checkArgument(batchSize > 0, 47 "batchSize must be greater than 0"); 48 if (codecName == null) { //不壓縮數據 49 codeC = null; 50 compType = CompressionType.NONE; 51 } else { //壓縮數據 52 codeC = getCodec(codecName); 53 // TODO : set proper compression type 54 compType = CompressionType.BLOCK; 55 } 56 57 // Do not allow user to set fileType DataStream with codeC together 58 // To prevent output file with compress extension (like .snappy) 59 if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)//如果fileType是DataStream,則不允許壓縮 60 && codecName != null) { 61 throw new IllegalArgumentException("fileType: " + fileType + 62 " which does NOT support compressed output. Please don't set codeC" + 63 " or change the fileType if compressed output is desired."); 64 } 65 66 if(fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {//如果fileType是壓縮類型,則codeC不允許為空 67 Preconditions.checkNotNull(codeC, "It's essential to set compress codec" 68 + " when fileType is: " + fileType); 69 } 70 71 if (!authenticate()) { //認證 72 LOG.error("Failed to authenticate!"); 73 } 74 //時間戳是否四舍五入(如果為true,會影響所有基於時間的轉義序列%t除外) 75 needRounding = context.getBoolean("hdfs.round", false); 76 77 if(needRounding) { 78 //The unit of the round down value - second, minute or hour. 79 String unit = context.getString("hdfs.roundUnit", "second"); //滾動時間單位 80 if (unit.equalsIgnoreCase("hour")) { 81 this.roundUnit = Calendar.HOUR_OF_DAY; 82 } else if (unit.equalsIgnoreCase("minute")) { 83 this.roundUnit = Calendar.MINUTE; 84 } else if (unit.equalsIgnoreCase("second")){ 85 this.roundUnit = Calendar.SECOND; 86 } else { 87 LOG.warn("Rounding unit is not valid, please set one of" + 88 "minute, hour, or second. Rounding will be disabled"); 89 needRounding = false; 90 } 91 //Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. 92 this.roundValue = context.getInteger("hdfs.roundValue", 1); //滾動時間大小 93 if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){//檢查是否符合分、秒數值,0<v<=60 94 Preconditions.checkArgument(roundValue > 0 && roundValue <= 60, 95 "Round value" + 96 "must be > 0 and <= 60"); 97 } else if (roundUnit == Calendar.HOUR_OF_DAY){ 98 Preconditions.checkArgument(roundValue > 0 && roundValue <= 24, //檢查是否符合時數值0<v<=24 99 "Round value" + 100 "must be > 0 and <= 24"); 101 } 102 } 103 104 if (sinkCounter == null) {//構造計數器 105 sinkCounter = new SinkCounter(getName()); 106 } 107 }
上面比較常用的參數有:rollInterval以固定時間間隔滾動文件,rollSize以文件大小為單位滾動文件,rollCount以行數來滾動文件,fileType(有3種SequenceFile(二進制)、DataStream(不能壓縮)、CompressedStream(壓縮文件))
二、接下來是start()方法。
1 public void start() { 2 String timeoutName = "hdfs-" + getName() + "-call-runner-%d"; 3 callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize, 4 new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); //這個線程池用來將event寫入HDFS文件 5 6 String rollerName = "hdfs-" + getName() + "-roll-timer-%d"; 7 timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize, 8 new ThreadFactoryBuilder().setNameFormat(rollerName).build()); //這個線程池用來滾動文件 9 10 this.sfWriters = new WriterLinkedHashMap(maxOpenFiles); //用來存儲文件的絕對路徑以及對應的BucketWriter 11 sinkCounter.start(); 12 super.start(); 13 }
start方法主要是初始化兩個線程池。
三、process()方法,是用來處理channel中的event的,非線程安全的,要確保HDFS中的文件是打開的。
1 public Status process() throws EventDeliveryException { 2 Channel channel = getChannel(); //獲取對應的channel 3 Transaction transaction = channel.getTransaction();//獲得事務 4 List<BucketWriter> writers = Lists.newArrayList(); //BucketWriter列表 5 transaction.begin(); 6 try { 7 int txnEventCount = 0; 8 for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {//批量處理 9 Event event = channel.take(); //獲取event 10 if (event == null) { 11 break; 12 } 13 14 // reconstruct the path name by substituting place holders 15 String realPath = BucketPath.escapeString(filePath, event.getHeaders(), 16 timeZone, needRounding, roundUnit, roundValue); //格式化后的HDFS目錄 17 String realName = BucketPath.escapeString(fileName, event.getHeaders(), 18 timeZone, needRounding, roundUnit, roundValue); //格式化后的文件名 19 20 String lookupPath = realPath + DIRECTORY_DELIMITER + realName; //要寫入的文件的HDFS絕對路徑 21 BucketWriter bucketWriter = sfWriters.get(lookupPath); //獲取文件的BucketWriter 22 23 // we haven't seen this file yet, so open it and cache the handle 24 if (bucketWriter == null) { //如果沒有這個文件 25 //根據fileType類型構造HDFSWriter(三種:SequenceFile、DataStream、CompressedStream) 26 HDFSWriter hdfsWriter = writerFactory.getWriter(fileType); 27 28 WriterCallback idleCallback = null; 29 if(idleTimeout != 0) { 30 idleCallback = new WriterCallback() { 31 @Override 32 public void run(String bucketPath) { 33 sfWriters.remove(bucketPath); 34 } 35 }; 36 } 37 bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, 38 batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, 39 suffix, codeC, compType, hdfsWriter, timedRollerPool, 40 proxyTicket, sinkCounter, idleTimeout, idleCallback, lookupPath); 41 42 sfWriters.put(lookupPath, bucketWriter); //將文件路徑和BucketWriter組成K-V,放入sfWriters 43 } 44 45 // track the buckets getting written in this transaction 46 if (!writers.contains(bucketWriter)) {//如果BucketWriter列表沒有正在寫的文件——bucketWriter,則加入 47 writers.add(bucketWriter); 48 } 49 50 // Write the data to HDFS 51 append(bucketWriter, event); //將event寫入bucketWriter對應的文件中 52 } 53 54 if (txnEventCount == 0) { //這次事務沒有處理任何event 55 sinkCounter.incrementBatchEmptyCount(); 56 } else if (txnEventCount == batchSize) {//一次處理batchSize個event 57 sinkCounter.incrementBatchCompleteCount(); 58 } else {//channel中剩余的events不足batchSize 59 sinkCounter.incrementBatchUnderflowCount(); 60 } 61 62 // flush all pending buckets before committing the transaction 63 for (BucketWriter bucketWriter : writers) { //將所有BucketWriter數據刷新到HDFS中 64 flush(bucketWriter); 65 } 66 67 transaction.commit(); //提交事務 68 69 if (txnEventCount < 1) { 70 return Status.BACKOFF; 71 } else { 72 sinkCounter.addToEventDrainSuccessCount(txnEventCount); 73 return Status.READY; 74 } 75 } catch (IOException eIO) { 76 transaction.rollback();//異常后回滾 77 LOG.warn("HDFS IO error", eIO); 78 return Status.BACKOFF; 79 } catch (Throwable th) { 80 transaction.rollback();//異常后回滾 81 LOG.error("process failed", th); 82 if (th instanceof Error) { 83 throw (Error) th; 84 } else { 85 throw new EventDeliveryException(th); 86 } 87 } finally { 88 transaction.close();//關閉事務 89 } 90 }
1、獲取sink的channel和transaction,transaction.begin()是必要的步驟;
2、循環處理批量的event,如果event==null,說明channel已無數據,則退出循環;
3、realPath和realName都是格式化后的文件HDFS存儲路徑及文件名;lookupPath則是要寫入的文件完整HDFS路徑(目錄+文件名);獲取該文件對應的BucketWriter對象,要寫入的文件及對應的BucketWriter對象需要存入sfWriters這個LinkedHashMap結構中,表示正在寫的文件,BucketWriter類用來滾動文件、處理文件格式以及數據的序列化等操作,其實就是負責數據的寫的;
4、如果文件對應的bucketWriter不存在,則文件需要滾動,創建一個BucketWriter對象,只有public方法才是線程安全的。
創建BucketWriter對象之前需要先構建一個HDFSWriter對象負責寫文件,有三種類型:HDFSSequenceFile、HDFSDataStream、HDFSCompressedDataStream。
WriterCallback idleCallback是用來超時后滾動文件的時候調用的,前提得是配置文件中有配置hdfs.idleTimeout且不為0;
然后是new 一個BucketWriter對象,這有點復雜稍后說;
sfWriters.put(lookupPath, bucketWriter)然后就是將文件及對應的bucketWriter對象存入sfWriters中,表示正在寫的文件。
5、這里要說下new BucketWriter對象的事。BucketWriter的構造函數首先是對眾多參數賦值,然后isOpen = false,最后是this.writer.configure(context),即對writer對象進行配置。復雜就在這,這個writer對象是什么?它是上面4中所說的HDFSWriter。
HDFSWriterFactory工廠類會根據配置文件中設置的類型返回相應的HDFSWriter對象,沒有配置文件類型的話默認是HDFSSequenceFile。
HDFSSequenceFile:configure(context)方法會首先獲取寫入格式writeFormat即參數"hdfs.writeFormat",默認格式是二進制的Writable(HDFSWritableSerializer.Builder.class),還有一個是Text(HDFSTextSerializer.Builder.class),第三個是null;再獲取是否使用HDFS本地文件系統"hdfs.useRawLocalFileSystem",默認是flase不使用;然后獲取writeFormat的所有配置信息serializerContext;然后根據writeFormat和serializerContext構造SequenceFileSerializer的對象serializer。在serializer中並無serializerContext配置的方法,在1.3.0中此處的serializerContext沒有任何作用,可能是為以后做的預留。
HDFSDataStream:configure(context)方法先獲取serializerType類型,默認是TEXT(BodyTextEventSerializer.Builder.class),此外還有HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class)、OTHER(null)、AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class)共四種類型;再獲取是否使用HDFS本地文件系統"hdfs.useRawLocalFileSystem",默認是flase不使用;然后獲取serializer的所有配置信息serializerContext。serializer的實例化是在HDFSDataStream.open(String filePath)方法中實現的。此處的serializerContext在BodyTextEventSerializer和HeaderAndBodyTextEventSerializer均未用到,可能是做預留,但是FlumeEventAvroEventSerializer在其Builder中用到了,並進行了配置。
HDFSCompressedDataStream:configure(context)方法和HDFSDataStream.configure(context)是一樣的,serializerType的類型是一樣的;其他也是一樣。serializer的實例化是在HDFSCompressedDataStream.open(String filePath)方法中實現的,調用open(String filePath, CompressionCodec codec,CompressionType cType)來實例化。
6、如果存儲着正在寫的bucketWriter的writers列表中沒有此bucketWriter,則添加進去,writers的存在是為了統一flush方便,后面會有介紹。
7、append(bucketWriter, event)這個是讓bucketWriter處理event的方法,會使用bucketWriter.append(event)處理。這個方法的代碼如下:
1 public synchronized void append(Event event) 2 throws IOException, InterruptedException { 3 checkAndThrowInterruptedException(); 4 if (!isOpen) { 5 if(idleClosed) { 6 throw new IOException("This bucket writer was closed due to idling and this handle " + 7 "is thus no longer valid"); 8 } 9 open();//已經寫完一個文件,新建新文件 10 } 11 12 // check if it's time to rotate the file 13 if (shouldRotate()) {//檢查行數、大小是否改完成一個文件 14 close(); 15 open();//新建新文件 16 } 17 18 // write the event 19 try { 20 sinkCounter.incrementEventDrainAttemptCount(); 21 writer.append(event); // could block寫數據 22 } catch (IOException e) { 23 LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + 24 bucketPath + ") and rethrowing exception.", 25 e.getMessage()); 26 try { 27 close(); 28 } catch (IOException e2) { 29 LOG.warn("Caught IOException while closing file (" + 30 bucketPath + "). Exception follows.", e2); 31 } 32 throw e; 33 } 34 35 // update statistics 36 processSize += event.getBody().length; 37 eventCounter++; 38 batchCounter++; 39 40 if (batchCounter == batchSize) { 41 flush(); 42 } 43 }
A、首先會檢查當前線程是否中斷checkAndThrowInterruptedException();
B、BucketWriter初次運行時,isOpen=false(表示文件未打開不能寫),idleClosed=false,會運行open()——doOpen()。fullFileName是"前綴.時間戳"組成的文件名,從這也可以看出時間戳部分不能更改,也就是HDFS中文件名無法自定義,除非自己定制HDFSSink;另外后綴名和壓縮不能同時兼得,即如果沒有配置壓縮則可以在fullFileName后面添加自定義的后綴(比如后綴為.avro),否則只能添加壓縮類型的后綴;bucketPath表示在HDFS中正在寫的文件完整名字,這個名字中有標示正在寫的文件的前后綴(inUsePrefix、inUseSuffix);targetPath這個是文件寫完后的要更改成的完整名字,和bucketPath的區別是沒有inUsePrefix、inUseSuffix;然后是根據有無壓縮配置信息open此witer,沒有壓縮:writer.open(bucketPath),有壓縮:writer.open(bucketPath, codeC, compType)。需要注意的是當使用Kerberos時,hadoop的RPC操作是非線程安全的包括getFileSystem()操作,open()操作在同一個JVM的同一時刻只能由一個線程使用,因為有可能導致死鎖,見FLUME-1231。所以對open進行了同步。另外當在運行flume過程中出現類似異常“java.io.IOException: Callable timed out after 10000 ms on file”和"java.util.concurrent.TimeoutException"時,需要增大callTimeout
writer包含的三類均有兩個open方法,一個是對應不壓縮的open(String filePath) ,一個是對應壓縮的open(String filePath, CompressionCodec codec,CompressionType cType)。
首先writer若為HDFSSequenceFile,是支持壓縮的,open(String filePath)會調用open(filePath, null, CompressionType.NONE)壓縮方法,只不過沒有壓縮類型。壓縮open方法先判斷是否使用了本地文件系統,然后根據hadoop的配置信息是否支持追加"hdfs.append.support",構造相應的SequenceFile即writer。其中的serializer若為HDFSWritableSerializer則writer的Key為LongWritable類型,Value為BytesWritable二進制類型;若為HDFSTextSerializer,writer的Key為LongWritable類型,Value為Text文本類型。
其次writer若為HDFSDataStream,是不支持壓縮的。它的壓縮方法open(String filePath, CompressionCodec codec,CompressionType cType)直接調用非壓縮方法open(filePath)。open(filePath)判斷是否使用了本地文件系統;然后根據是否支持append操作(獲取配置的"hdfs.append.support"參數),構造對應的輸出流outStream;然后構造serializer,有三種類型BodyTextEventSerializer、HeaderAndBodyTextEventSerializer、FlumeEventAvroEventSerializer,前兩種支持追加,最后一種不支持追加,所以FlumeEventAvroEventSerializer不能將"hdfs.append.support"設置為true。如果支持追加就執行serializer.afterReopen()前兩種serializer未實現這個方法(1.3.0),不支持就serializer.afterCreate()前兩種也未實現這個方法,第三種則是dataFileWriter.create(getSchema(), getOutputStream())。
最后writer若為HDFSCompressedDataStream,就是針對壓縮的,其open(String filePath)會使用默認的DefaultCodec以及CompressionType.BLOCK來調用壓縮open(String filePath, CompressionCodec codec,CompressionType cType)。壓縮方法和HDFSDataStream的壓縮方法類似,區別有兩點一個是serializer的輸出流變成壓縮輸出流了;另一個就是最后加了isFinished = false表示壓縮流是否完畢。
回到BucketWriter,如果rollInterval(按時間滾動文件)不為0,則創建一個Callable,放入timedRollFuture中rollInterval秒之后關閉文件,默認是30s寫一個文件,這只是控制文件滾動的3個條件之一;
isOpen = true表示文件已打開,可以write了。
C、回到上面7中,shouldRotate()方法會判斷文件中的行數和文件的大小是否達到配置文件中的配置,如果任何一個滿足條件則可以關閉文件,這是控制文件滾動的3個條件中的兩個。close()方法會關閉文件,再清理倆線程池及一些其他的清理工作,及改名(將.tmp文件改名),再open()就又到了上面B中所說的。
D、writer.append(event)這是向HDFS中寫數據的地方。這里又要分很多討論了,因為writer有三類。
writer為HDFSSequenceFile:append(event)方法,會先通過serializer.serialize(e)把event處理成一個Key和一個Value。
(1)serializer為HDFSWritableSerializer時,則Key會是event.getHeaders().get("timestamp"),如果沒有"timestamp"的Headers則使用當前系統時間System.currentTimeMillis(),然后將時間封裝成LongWritable;Value是將event.getBody()封裝成BytesWritable,代碼是bytesObject.set(e.getBody(), 0, e.getBody().length);
(2)serializer為HDFSTextSerializer時,Key和上述HDFSWritableSerializer一樣;Value會將event.getBody()封裝成Text,代碼是textObject.set(e.getBody(), 0, e.getBody().length)。
writer.append(event)中會將Key和Value,writer.append(record.getKey(), record.getValue())。
writer為HDFSDataStream:append(event)方法直接調用serializer.write(e)。
(1)serializer為BodyTextEventSerializer,則其write(e)方法會將e.getBody()寫入輸出流,並根據配置再寫入一個"\n";
(2)serializer為HeaderAndBodyTextEventSerializer,則其write(e)方法會將e.getHeaders() + " "(注意此空格)和e.getBody()寫入輸出流,並根據配置再寫入一個"\n";
(3)serializer為FlumeEventAvroEventSerializer,則其write(e)方法會將event整體寫入dataFileWriter。
writer為HDFSCompressedDataStream:append(event)方法會首先判斷是否完成一個階段的壓縮isFinished,如果是則更新壓縮輸出流的狀態,並isFinished=false,否則剩下的執行和HDFSDataStream.append(event)相同。
E、是做一些統計工作processSize是統計文件大小的;eventCounter是統計文件行數的;batchCounter是統計最近一次flush之后的處理的event數;
F、如果處理的event數達到batchSize則刷新到HDFS中,flush()。flush()方法會首先執行writer.sync()即寫入HDFS,然后清空batchCounter表明這次batch已經完成,可以准備下次的。涉及到writer就會涉及很多寫入類型:
writer為HDFSSequenceFile:sync()方法執行SequenceFile.Writer.syncFs()將數據寫入HDFS中;
writer為HDFSDataStream:sync()方法執行
writer為HDFSCompressedDataStream:sync()方法先執行serializer.flush():只有FlumeEventAvroEventSerializer的flush()方法也有實現dataFileWriter.flush(),其他倆BodyTextEventSerializer和HeaderAndBodyTextEventSerializer均未實現flush()方法。然后執行outStream.flush()和outStream.sync()將數據刷新至HDFS中。
如果idleTimeout>0,表示文件最長打開時間,超時后就成為無效文件需要關閉(默認是0不允許關閉的),構造一個Callable對象idleAction執行內容是:close()方法,idleClosed = true表示超時關閉了這個bucketwriter,而且onIdleCallback.run(onIdleCallbackPath)會將onIdleCallbackPath從HDFSEventSink.sfWriters中刪除對應對應的bucketwriter,表示這個文件已經寫完了。然后將這個idleAction放入timedRollerPool中idleTimeout秒后執行。
8、回到HDFSEventSink.process()方法中,會根據這次事務處理的event數量更新相應的計數器;
9、遍歷writers,挨個刷新BucketWriter至HDFS;
10、transaction.commit();//提交事務
11、transaction.rollback();//異常后回滾
12、transaction.close();//關閉事務
四、stop()方法。首先會遍歷sfWriters,挨個close(BucketWriter):BucketWriter.close()方法,如果isOpen=true表示文件還處於打開狀態,則writer.close()(這里的writer就不分情況了,HDFSSequenceFile就直接writer.close();其他倆都是先flush(好些都沒實現)再beforClose(好些都沒實現)輸出流再flush、sync、close),BucketWriter.close()方法方法接下來關閉倆線程池以及改名等清理操作。HDFSEventSink的stop()方法接下來是關閉倆線程池,清理一些數據比如sfWriters.clear()。
ps:1、BucketWriter中的public方法都是線程安全的,包括append、close、flush三個均是同步方法,會調用相應的do方法,做具體的操作。
2、callWithTimeout方法需要注意,在HDFSEventSink中多次用到這個方法:append、flush、close,這個方法會將對應的Callable放入callTimeoutPool線程池中執行,並等待callTimeout(默認是10000) ms返回結果。
問題1:WriterLinkedHashMap的sfWriters除了設置hdfs.idleTimeout且>0時才會從sfWriters中remove掉超時的bucketwriter,其它地方並沒有發現remove操作,那么以后隨着寫入文件的增多sfWriters會不會始終增大?
解:肯定不會啊。別忘了還有一個"hdfs.maxOpenFiles"參數默認是5000,追蹤發現HDFSEventSink內部靜態類WriterLinkedHashMap繼承了LinkedHashMap,並重寫了removeEldestEntry方法,這個方法在sfWriters.put時總會調用,當sfWriters.size()>maxOpenFiles時就是自動清理之時了。maxOpenFiles就是sfWriters得最大容量。
這次的sink比較復雜,希望我寫的大伙能夠看懂,期間還有一些細節不太清楚,不過不影響整體的理解。
不解1:bucketwriter類中的doOpen方法中hadoop的RPC線程非安全,說是可以從FLUME-1231這得到解釋
不解2:同樣doOpen方法中有說“Need to get reference to FS using above config before underlying writer does in order to avoid shutdown hook & IllegalStateExceptions”這里也表示疑問,為什么這么說?
不解3:為什么HDFSWriter的3個實現類的open()方法中,均考慮了conf.getBoolean("hdfs.append.support", false) == true?一個是可追加的一個是不可追加的。但是都是一個SequenceFile.Writer或者FSDataOutputStream,尤其是在HDFSSequenceFile中的writer能不能追加似乎根本沒什么區別,充其量是一個writer的參數是FSDataOutputStream,另外一個則不是,其他倆好歹還有需要設置appending=true用來判斷是否可重復打開但也是有點牽強,都可以合二為一,但是為什么不那么做呢?
不解4:BucketPath.escapeString這個方法還沒搞懂,導致格式化的結果不甚明了。。。哎
解4:escapeString這個會將目錄和文件名前綴進行格式化,如果這兩個變量中存在正則表達式,則event.headers中必須要有timestamp,也就是source的攔截器使用TimestampInterceptor或者自己寫進headers中,會將正則部分格式化成時間對應的部分。realName是對配置文件中的前綴名格式化后的前綴名。會隨着event的時間戳來進行變動,從而控制着新文件的滾動。另外由於三個控制文件滾動的參數使得文件滾動較快,寫完一個文件后后續的還指向這個realName,則在bucketWriter.append中會根據文件是否寫完從新創建一個前綴相同但是時間戳不同的文件;如果三個參數控制文件滾動較慢,realName變化比較快的話不能滿足大小和行數就只能等超時的或者時間滾動這個參數設置了,最甚者文件永遠不關閉等待滿足條件,所有控制文件滾動的三個參數最好不要都為0,尤其是時間參數。這個HDFSEventSink還可以設置時區"hdfs.timeZone",會在格式化HDFS目錄和前綴的時候用到,用的是java自帶的java.util.TimeZone類,根據需要可以配置,這個功能應該是收集距離較遠異地或者異國時用到。
歡迎大伙交流!!