Flume-NG中的Channel與Transaction關系(原創)


  在sink和source中(不管是內置還是自定義的),基本都有如下代碼,這些代碼在sink中的process方法中,而在source中自己不需要去寫,在source中getChannelProcessor().processEventBatch(events)方法中會自動創建下面類似的:  

    ...
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Event event = null;
    Status result = Status.READY;
    transaction.begin();
    ...
    event = channel.take();//getChannelProcessor().processEvent(event);,前者用於sink后者用於source
    ...
    transaction.commit();
    transaction.rollback()
    transaction.close();
    ...

  那么有些人就要問了?從上述代碼中似乎只需要獲取channel就可以了,因為獲取數據時只需要event = channel.take()或者

getChannelProcessor().processEvent(event)?這樣對嗎?你可以去掉transaction試試,結果顯示是不行的,出錯!

  那么為什么呢?這確實有點讓人疑惑,但實際上channel.take()操作是transaction.doTake()。也就是實際的put和take等操作都是在transaction中進行的,因此要用channel必須要先創建transcation才可以使用。而channel.getTransaction()方法就是獲取(已經創建)或創建(還沒有)transcation,BasicChannelSemantics的相對應代碼如下:  

@Override
  public Transaction getTransaction() {

    if (!initialized) {
      synchronized (this) {
        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }

    BasicTransactionSemantics transaction = currentTransaction.get();//獲取transcation
    if (transaction == null || transaction.getState().equals(//如果transaction不存在或者已關閉就創建
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();//創建
      currentTransaction.set(transaction);//賦值給currentTransaction
    }
    return transaction;
  }

  該方法在所有channel的父類BasicChannelSemantics中,然后在具體實現的channel類中需要實現protected abstract BasicTransactionSemantics createTransaction()這個抽象方法來獲取相應的transaction對象。BasicChannelSemantics把transaction.take()和transaction.put(event)方法進一步封裝成take()和put(event)方法,這倆方法就是暴露在sink或者source中的channel.take()和channel.put(event)方法。

 @Override
  public void put(Event event) throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    transaction.put(event);
  }

  @Override
  public Event take() throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    return transaction.take();
  }

  由此,可以看出工作行程了吧!

  Transaction transaction = channel.getTransaction();這一句至少要執行一次,因為執行一次之后就會將transcation對象緩存到currentTransaction中,后續就不會再創建transaction了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM