不得不說storm是一個特別棒的實時計算框架。為了對后文理解的方便,先說幾個storm中的術語:
Topology:拓撲圖或者拓撲結構。在storm中它通過消息分組的分式連接Spout和Bolt節點定義了運算處理的拓撲結構。如下圖:
那什么是Spout呢?
在計算任務需要的數據其實就是由Spout提供的,所以它可以說是Storm中的消息源,一般是從外部數據源(日志文件、數據庫、消息隊列等等)不間斷地讀取數據然后發送給tuple元組的。
那它是通過誰發送的呢?又是如何發送的呢?
這里我們先回答第一個問題,第二個問題以后解答。
好了上面說了那么多就是為了引出今天的任務:閱讀SpoutOutputCollector源碼。
在閱讀之前,我們先明確一下SpoutOutputCollector到底是什么?其實從類名就能說出大概(不得不說老外寫的代碼的可讀性真是好的沒法說。這里啰嗦一句,
個人覺得這也是他們分享精神的體現,時刻記住方便給別人看。),它就是Spout輸出收集器。
那它到底能干些啥呢?請看代碼:
1.ISpoutOutputCollector:是SpoutOutputCollector的接口
1 public interface ISpoutOutputCollector { 2 /** 3 發送tuple消息,並返回起發送任務的task的序列號集合 4 */ 5 List<Integer> emit(String streamId, List<Object> tuple, Object messageId); 6 /** 7 *與上述發送方法類似,只不過emitDirect方法是要指定接收端的task,讓接收端特定的task接收消息。 8 */ 9 void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId); 10 /** 11 *處理異常 12 */ 13 void reportError(Throwable error); 14 }
從上述接口ISpoutOutputCollector源碼可以看出ISpoutOutputCollector中聲明了3個方法,兩個屬於發送tuple元組的方法,他們之間的差異在上述注釋中已說的很清楚,還有一個處理異常的方法。
2.SpoutOutputCollector:它實現了接口ISpoutOutputCollector
1 public class SpoutOutputCollector implements ISpoutOutputCollector { 2 ISpoutOutputCollector _delegate; 3 4 public SpoutOutputCollector(ISpoutOutputCollector delegate) { 5 _delegate = delegate; 6 } 7 8 /** 9 * 指定一個streamid和message發射tuple消息並返回起發送消息的task的序號。當tuple消息完全處理了,就會回調ack方法,否則會回調fail方法。 10 */ 11 public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { 12 return _delegate.emit(streamId, tuple, messageId); 13 } 14 15 /** 16 * emit(String streamId, List<Object> tuple, Object messageId)的重載方法,這沒有指定streamid,故采用默認的streamid 17 */ 18 public List<Integer> emit(List<Object> tuple, Object messageId) { 19 return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); 20 } 21 22 /** 23 * emit(String streamId, List<Object> tuple, Object messageId) 24 *的重載方法,這沒有指定streamid,故采用默認的streamid,因為沒有messageid,故ack方法和fail方法不會被調用 25 */ 26 public List<Integer> emit(List<Object> tuple) { 27 return emit(tuple, null); 28 } 29 30 /** 31 * emit(String streamId, List<Object> tuple, Object messageId)的重載方法,因為沒有messageid,故ack方法和fail方法不會被調用 32 */ 33 public List<Integer> emit(String streamId, List<Object> tuple) { 34 return emit(streamId, tuple, null); 35 } 36 37 /** 38 * 發射tuple消息,不過需要指定接收端的task來接收,並且輸出必須聲明為直接流,同時指定用來接收消息的task必須采用直接分組的方式來接收消息. 39 * 40 */ 41 public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { 42 _delegate.emitDirect(taskId, streamId, tuple, messageId); 43 } 44 45 /** 46 * emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)的重載方法,采用默認的streamid 47 */ 48 public void emitDirect(int taskId, List<Object> tuple, Object messageId) { 49 emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId); 50 } 51 52 /** 53 * emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)的重載方法,因為沒有指定的消息id,所以ack和fail方法就不會調用. 54 */ 55 public void emitDirect(int taskId, String streamId, List<Object> tuple) { 56 emitDirect(taskId, streamId, tuple, null); 57 } 58 59 /** 60 * 該類提供的重載方法,因為沒有指定的消息id,所以ack和fail方法就不會調用. 61 */ 62 public void emitDirect(int taskId, List<Object> tuple) { 63 emitDirect(taskId, tuple, null); 64 } 65 /** 66 * 接口ISpoutOutputCollector中reportError的實現. 67 */ 68 @Override 69 public void reportError(Throwable error) { 70 _delegate.reportError(error); 71 } 72 }
在SpoutOutputCollector類中,實現了消息發射的方法,並且還提供了多個重載方法方便用戶使用。