Storm源碼閱讀之SpoutOutputCollector


不得不說storm是一個特別棒的實時計算框架。為了對后文理解的方便,先說幾個storm中的術語:

Topology:拓撲圖或者拓撲結構。在storm中它通過消息分組的分式連接Spout和Bolt節點定義了運算處理的拓撲結構。如下圖:

那什么是Spout呢?

在計算任務需要的數據其實就是由Spout提供的,所以它可以說是Storm中的消息源,一般是從外部數據源(日志文件、數據庫、消息隊列等等)不間斷地讀取數據然后發送給tuple元組的。

那它是通過誰發送的呢?又是如何發送的呢?

這里我們先回答第一個問題,第二個問題以后解答。

好了上面說了那么多就是為了引出今天的任務:閱讀SpoutOutputCollector源碼。

在閱讀之前,我們先明確一下SpoutOutputCollector到底是什么?其實從類名就能說出大概(不得不說老外寫的代碼的可讀性真是好的沒法說。這里啰嗦一句,

個人覺得這也是他們分享精神的體現,時刻記住方便給別人看。),它就是Spout輸出收集器。

那它到底能干些啥呢?請看代碼:

1.ISpoutOutputCollector:是SpoutOutputCollector的接口

 1 public interface ISpoutOutputCollector {
 2     /**
 3         發送tuple消息,並返回起發送任務的task的序列號集合
 4     */
 5     List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
 6     /**
 7     *與上述發送方法類似,只不過emitDirect方法是要指定接收端的task,讓接收端特定的task接收消息。
 8     */
 9     void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
10     /**
11     *處理異常
12     */
13     void reportError(Throwable error);
14 }

從上述接口ISpoutOutputCollector源碼可以看出ISpoutOutputCollector中聲明了3個方法,兩個屬於發送tuple元組的方法,他們之間的差異在上述注釋中已說的很清楚,還有一個處理異常的方法。

2.SpoutOutputCollector:它實現了接口ISpoutOutputCollector

 1 public class SpoutOutputCollector implements ISpoutOutputCollector {
 2     ISpoutOutputCollector _delegate;
 3 
 4     public SpoutOutputCollector(ISpoutOutputCollector delegate) {
 5         _delegate = delegate;
 6     }
 7 
 8     /**
 9      * 指定一個streamid和message發射tuple消息並返回起發送消息的task的序號。當tuple消息完全處理了,就會回調ack方法,否則會回調fail方法。
10      */
11     public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
12         return _delegate.emit(streamId, tuple, messageId);
13     }
14 
15     /**
16      * emit(String streamId, List<Object> tuple, Object messageId)的重載方法,這沒有指定streamid,故采用默認的streamid
17      */
18     public List<Integer> emit(List<Object> tuple, Object messageId) {
19         return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
20     }
21 
22     /**
23      * emit(String streamId, List<Object> tuple, Object messageId)
24      *的重載方法,這沒有指定streamid,故采用默認的streamid,因為沒有messageid,故ack方法和fail方法不會被調用   
25      */
26     public List<Integer> emit(List<Object> tuple) {
27         return emit(tuple, null);
28     }
29 
30     /**
31      * emit(String streamId, List<Object> tuple, Object messageId)的重載方法,因為沒有messageid,故ack方法和fail方法不會被調用
32      */
33     public List<Integer> emit(String streamId, List<Object> tuple) {
34         return emit(streamId, tuple, null);
35     }
36 
37     /**
38      * 發射tuple消息,不過需要指定接收端的task來接收,並且輸出必須聲明為直接流,同時指定用來接收消息的task必須采用直接分組的方式來接收消息.
39      *
40      */
41     public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
42         _delegate.emitDirect(taskId, streamId, tuple, messageId);
43     }
44 
45     /**
46      * emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)的重載方法,采用默認的streamid
47      */
48     public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
49         emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
50     }
51     
52     /**
53      * emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)的重載方法,因為沒有指定的消息id,所以ack和fail方法就不會調用.
54      */
55     public void emitDirect(int taskId, String streamId, List<Object> tuple) {
56         emitDirect(taskId, streamId, tuple, null);
57     }
58 
59     /**
60      * 該類提供的重載方法,因為沒有指定的消息id,所以ack和fail方法就不會調用.
61      */
62     public void emitDirect(int taskId, List<Object> tuple) {
63         emitDirect(taskId, tuple, null);
64     }
65     /**
66      * 接口ISpoutOutputCollector中reportError的實現.
67      */
68     @Override
69     public void reportError(Throwable error) {
70         _delegate.reportError(error);
71     }
72 }

SpoutOutputCollector類中,實現了消息發射的方法,並且還提供了多個重載方法方便用戶使用。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM