在sink和source中(不管是內置還是自定義的),基本都有如下代碼,這些代碼在sink中的process方法中,而在source中自己不需要去寫,在source中getChannelProcessor().processEventBatch(events)方法中會自動創建下面類似的:
... Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; Status result = Status.READY; transaction.begin(); ... event = channel.take();//getChannelProcessor().processEvent(event);,前者用於sink后者用於source ... transaction.commit(); transaction.rollback() transaction.close(); ...
那么有些人就要問了?從上述代碼中似乎只需要獲取channel就可以了,因為獲取數據時只需要event = channel.take()或者
getChannelProcessor().processEvent(event)?這樣對嗎?你可以去掉transaction試試,結果顯示是不行的,出錯!
那么為什么呢?這確實有點讓人疑惑,但實際上channel.take()操作是transaction.doTake()。也就是實際的put和take等操作都是在transaction中進行的,因此要用channel必須要先創建transcation才可以使用。而channel.getTransaction()方法就是獲取(已經創建)或創建(還沒有)transcation,BasicChannelSemantics的相對應代碼如下:
@Override public Transaction getTransaction() { if (!initialized) { synchronized (this) { if (!initialized) { initialize(); initialized = true; } } } BasicTransactionSemantics transaction = currentTransaction.get();//獲取transcation if (transaction == null || transaction.getState().equals(//如果transaction不存在或者已關閉就創建 BasicTransactionSemantics.State.CLOSED)) { transaction = createTransaction();//創建 currentTransaction.set(transaction);//賦值給currentTransaction } return transaction; }
該方法在所有channel的父類BasicChannelSemantics中,然后在具體實現的channel類中需要實現protected abstract BasicTransactionSemantics createTransaction()這個抽象方法來獲取相應的transaction對象。BasicChannelSemantics把transaction.take()和transaction.put(event)方法進一步封裝成take()和put(event)方法,這倆方法就是暴露在sink或者source中的channel.take()和channel.put(event)方法。
@Override public void put(Event event) throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); transaction.put(event); } @Override public Event take() throws ChannelException { BasicTransactionSemantics transaction = currentTransaction.get(); Preconditions.checkState(transaction != null, "No transaction exists for this thread"); return transaction.take(); }
由此,可以看出工作行程了吧!
Transaction transaction = channel.getTransaction();這一句至少要執行一次,因為執行一次之后就會將transcation對象緩存到currentTransaction中,后續就不會再創建transaction了。
