Storm學習筆記


1.如何讓一個spout並行讀取多個流?
方法:任何spout、bolts組件都可以訪問TopologyContext。利用這個特性可以讓Spouts的實例之間划分流。
示例:獲取到storm集群spouts並行度的大小,和這個spout實例的Task Index,讓輸入的tracks[]的多個流合理地分到某一個spout實例中。這樣就可以實現一個spout並行讀取多個流。
//open()方法中
int
spoutsSize = context.getComponentTasks(context.getThisComponentId()).size(); int myIdx = context.getThisTaskIndex(); String[] tracks = ((String) conf.get("track")).split(",");

一個完整的Spout code:輸入參數track代表了多個流,在open()方法中用取模%初始化track,在execute()方法讀取track的數據,發送。由於spout的多個實例的myIdx不同,它們可以獲得各自的一個track,可以實現一個spout讀取多個流。

  1 //ApiStreamingSpout.java
  2 package twitter.streaming;
  3 
  4 import java.io.BufferedReader;
  5 import java.io.IOException;
  6 import java.io.InputStream;
  7 import java.io.InputStreamReader;
  8 import java.util.Map;
  9 import java.util.concurrent.LinkedBlockingQueue;
 10 
 11 import org.apache.http.HttpResponse;
 12 import org.apache.http.StatusLine;
 13 import org.apache.http.auth.AuthScope;
 14 import org.apache.http.auth.UsernamePasswordCredentials;
 15 import org.apache.http.client.methods.HttpGet;
 16 import org.apache.http.impl.client.BasicCredentialsProvider;
 17 import org.apache.http.impl.client.DefaultHttpClient;
 18 import org.apache.log4j.Logger;
 19 import org.json.simple.parser.JSONParser;
 20 import org.json.simple.parser.ParseException;
 21 
 22 import backtype.storm.spout.SpoutOutputCollector;
 23 import backtype.storm.task.TopologyContext;
 24 import backtype.storm.topology.OutputFieldsDeclarer;
 25 import backtype.storm.topology.base.BaseRichSpout;
 26 import backtype.storm.tuple.Fields;
 27 import backtype.storm.tuple.Values;
 28 
 29 public class ApiStreamingSpout extends BaseRichSpout {
 30 
 31     static String STREAMING_API_URL = "https://stream.twitter.com/1/statuses/filter.json?track=";
 32     private String track;
 33     private String user;
 34     private String password;
 35     private DefaultHttpClient client;
 36     private SpoutOutputCollector collector;
 37     private UsernamePasswordCredentials credentials;
 38     private BasicCredentialsProvider credentialProvider;
 39 
 40     LinkedBlockingQueue<String> tweets = new LinkedBlockingQueue<String>();
 41 
 42     static Logger LOG = Logger.getLogger(ApiStreamingSpout.class);
 43     static JSONParser jsonParser = new JSONParser();
 44 
 45     @Override
 46     public void nextTuple() {
 47         /*
 48          * Create the client call
 49          */
 50         client = new DefaultHttpClient();
 51         client.setCredentialsProvider(credentialProvider);
 52         HttpGet get = new HttpGet(STREAMING_API_URL + track); // 每個spout實例track是唯一的。
 53         HttpResponse response;
 54         try {
 55             // Execute
 56             response = client.execute(get);
 57             StatusLine status = response.getStatusLine();
 58             if (status.getStatusCode() == 200) {
 59                 InputStream inputStream = response.getEntity().getContent();
 60                 BufferedReader reader = new BufferedReader(
 61                         new InputStreamReader(inputStream));
 62                 String in;
 63                 // Read line by line
 64                 while ((in = reader.readLine()) != null) {
 65                     try {
 66                         // Parse and emit
 67                         Object json = jsonParser.parse(in);
 68                         collector.emit(new Values(track, json));
 69                     } catch (ParseException e) {
 70                         LOG.error("Error parsing message from twitter", e);
 71                     }
 72                 }
 73             }
 74         } catch (IOException e) {
 75             LOG.error("Error in communication with twitter api ["
 76                     + get.getURI().toString() + "]");
 77             try {
 78                 Thread.sleep(10000);
 79             } catch (InterruptedException e1) {
 80             }
 81         }
 82     }
 83 
 84     /**
 85      * spoutsSize、myIdx實現了一個spout讀取多個流tracks。
 86      */
 87     @Override
 88     public void open(Map conf, TopologyContext context,
 89             SpoutOutputCollector collector) {
 90         int spoutsSize = context
 91                 .getComponentTasks(context.getThisComponentId()).size();
 92         int myIdx = context.getThisTaskIndex();
 93         String[] tracks = ((String) conf.get("track")).split(",");
 94         StringBuffer tracksBuffer = new StringBuffer();
 95         for (int i = 0; i < tracks.length; i++) {
 96             if (i % spoutsSize == myIdx) {
 97                 tracksBuffer.append(",");
 98                 tracksBuffer.append(tracks[i]);
 99             }
100         }
101 
102         if (tracksBuffer.length() == 0)
103             throw new RuntimeException("No track found for spout"
104                     + " [spoutsSize:" + spoutsSize + ", tracks:"
105                     + tracks.length + "] the amount"
106                     + " of tracks must be more then the spout paralellism");
107 
108         this.track = tracksBuffer.substring(1).toString();
109 
110         user = (String) conf.get("user");
111         password = (String) conf.get("password");
112 
113         credentials = new UsernamePasswordCredentials(user, password);
114         credentialProvider = new BasicCredentialsProvider();
115         credentialProvider.setCredentials(AuthScope.ANY, credentials);
116         this.collector = collector;
117     }
118 
119     @Override
120     public void declareOutputFields(OutputFieldsDeclarer declarer) {
121         declarer.declare(new Fields("criteria", "tweet"));
122     }
123 }
View Code

 通過這種技術,可以在數據源間分布收集器。相同的技術可以被應用在其他的場景-例如,從web服務器收集日志文件。PS:沒有試過。

 

2.Bolt可以使用emit(streamId, tuple)發射元組到多條流,每條流由字符串streamId來識別。然后,在TopologyBuilder 中,你可以決定訂閱哪條流。

  沒有試過。2個疑問:如何declare呢?spout有這個功能么?

解答:1.declareOutputFields()方法中聲明多條流,不就可以了。

1 public void declareOutputFields(OutputFieldsDeclarer declarer) {
2         declarer.declare(new Fields("line"));
3         declarer.declareStream("second", new Fields("line2"));
4     }

        2.Bolt和spout的實現來看,應該都是可以的。

 

3.BaseRichSpout是否是自動調用ack方法的,實現IBasicBolt接口可以自動ack?

    BaseBasicBolt,  is used to do the acking automatically.意思就是說,這個是自動調用ack的 (測試結果:使用BaseBasicBolt,不主動使用input.ack(),那么storm ui界面上看不到ack的個數。)so,最好使用input.ack().。PS:目前的項目編程是如下方法。

collector.emit();
input.ack();

 通過IBasicBolt可以自動ack,用法如下。Storm UI可以看到該bolt的ack數目。

 1 public class TotalBolt implements IBasicBolt{
 2     
 3     private static final long serialVersionUID = 1L;
 4     static Integer Total = new Integer(0);
 5   //必須要實現的方法,使用BasicOutputCollector
 6     public void execute(Tuple input,BasicOutputCollector collector) {
 7         try {
 8             String clear  = input.getStringByField("PROVINCE_ID");
 9             Total++;
10             collector.emit(new Values(Total));
11         } catch (IllegalArgumentException e) {
12             if (input.getSourceStreamId().equals("signals24Hour")) {
13                 Total = 0;
14             }
15         }
16     }
17 }

 

4.關於bolt的錨定

記錄原始的spout實例的最好方式是在消息元組中包含一個原始spout的引用。這個技術叫做錨定。

collector.emit(tuple,new Values(word)); 
什么是錨定?(如果這樣可以自動重發的話,確實是不錯的選擇。????測試:測試無法模擬消息失敗,我采用主動調用collector.fail(input),但是沒看到重發消息的現象,原因是因為重發消息是不調用nextTuple()方法的,因此日志中看不到自己寫的日志輸出,實際上是重發了的。)
為tuple tree中指定的節點增加一個新的節點,我們稱之為錨定(anchoring)。
采用_collector. emit( tuple , new Values(word) )就是錨定了。錨定是分段的,可以一部分bolt錨定。

每個消息都通過這種方式被錨定:把輸入消息作為emit方法的第一個參數。因為word消息被錨定在了輸入消息上,這個輸入消息是spout發送過來的tuple tree的根節點,如果任意一個word消息處理失敗,派生這個tuple tree那個spout 消息將會被重新發送。(錨定的好處是什么)

但是,這樣會不會導致 元組被重發,計數重復? 會。

系統使用一種哈希算法來根據spout消息的messageId確定由哪個acker跟蹤此消息派生出來的tuple tree。

下面這張圖是OutputCollector(Bolt)中的emit方法:

 

 

 測試結果:做了anchors錨定后,沒有看到實際的效果。

 自動錨定和ack的接口:IBasicBolt

A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute method. These bolts fall into the categories of filters and simple functions. Storm has an interface called IBasicBolt that encapsulates this pattern for you. The SplitSentence example can be written as a IBasicBolt like follows:

public class SplitSentence implements IBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

This implementation is simpler than the implementation from before and is semantically identical. Tuples emitted to BasicOutputCollectorare automatically anchored to the input tuple, and the input tuple is acked for you automatically when the execute method completes.

In contrast, bolts that do aggregations or joins may delay acking a tuple until after it has computed a result based on a bunch of tuples. Aggregations and joins will commonly multi-anchor their output tuples as well. These things fall outside the simpler pattern of IBasicBolt.

 

5 集群的各級容錯

5. 疑問:主動調用input.ack(),日志中沒有看到調用了spout的ack方法?這是為什么呢?

    原因:Spout中SpoutOutputCollector.emit()方法中,沒有加入messageID,也就是說采用了以下第一種emit方法,用第二種方法便可以看到。加入后在storm ui界面看到。

SpoutOutputCollector的emit方法:(messageID用long或者Integer型都是可以的。If the messageId was null, Storm will not track the tuple and no callback will be received. The emitted values must be immutable.)

1 collector.emit(new Values(str), messageID++);//messageID is type long.

一個消息默認被成功處理的timeOut是30s,超過30s就會觸發spout的fail方法。這個值可以根據實際的集群情況進行調整。在Topology中 Conf conf.setMessageTimeoutSecs(int secs) 設置。

storm ui界面:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM