//open()方法中
int spoutsSize = context.getComponentTasks(context.getThisComponentId()).size(); int myIdx = context.getThisTaskIndex(); String[] tracks = ((String) conf.get("track")).split(",");
一個完整的Spout code:輸入參數track代表了多個流,在open()方法中用取模%初始化track,在execute()方法讀取track的數據,發送。由於spout的多個實例的myIdx不同,它們可以獲得各自的一個track,可以實現一個spout讀取多個流。

1 //ApiStreamingSpout.java 2 package twitter.streaming; 3 4 import java.io.BufferedReader; 5 import java.io.IOException; 6 import java.io.InputStream; 7 import java.io.InputStreamReader; 8 import java.util.Map; 9 import java.util.concurrent.LinkedBlockingQueue; 10 11 import org.apache.http.HttpResponse; 12 import org.apache.http.StatusLine; 13 import org.apache.http.auth.AuthScope; 14 import org.apache.http.auth.UsernamePasswordCredentials; 15 import org.apache.http.client.methods.HttpGet; 16 import org.apache.http.impl.client.BasicCredentialsProvider; 17 import org.apache.http.impl.client.DefaultHttpClient; 18 import org.apache.log4j.Logger; 19 import org.json.simple.parser.JSONParser; 20 import org.json.simple.parser.ParseException; 21 22 import backtype.storm.spout.SpoutOutputCollector; 23 import backtype.storm.task.TopologyContext; 24 import backtype.storm.topology.OutputFieldsDeclarer; 25 import backtype.storm.topology.base.BaseRichSpout; 26 import backtype.storm.tuple.Fields; 27 import backtype.storm.tuple.Values; 28 29 public class ApiStreamingSpout extends BaseRichSpout { 30 31 static String STREAMING_API_URL = "https://stream.twitter.com/1/statuses/filter.json?track="; 32 private String track; 33 private String user; 34 private String password; 35 private DefaultHttpClient client; 36 private SpoutOutputCollector collector; 37 private UsernamePasswordCredentials credentials; 38 private BasicCredentialsProvider credentialProvider; 39 40 LinkedBlockingQueue<String> tweets = new LinkedBlockingQueue<String>(); 41 42 static Logger LOG = Logger.getLogger(ApiStreamingSpout.class); 43 static JSONParser jsonParser = new JSONParser(); 44 45 @Override 46 public void nextTuple() { 47 /* 48 * Create the client call 49 */ 50 client = new DefaultHttpClient(); 51 client.setCredentialsProvider(credentialProvider); 52 HttpGet get = new HttpGet(STREAMING_API_URL + track); // 每個spout實例track是唯一的。 53 HttpResponse response; 54 try { 55 // Execute 56 response = client.execute(get); 57 StatusLine status = response.getStatusLine(); 58 if (status.getStatusCode() == 200) { 59 InputStream inputStream = response.getEntity().getContent(); 60 BufferedReader reader = new BufferedReader( 61 new InputStreamReader(inputStream)); 62 String in; 63 // Read line by line 64 while ((in = reader.readLine()) != null) { 65 try { 66 // Parse and emit 67 Object json = jsonParser.parse(in); 68 collector.emit(new Values(track, json)); 69 } catch (ParseException e) { 70 LOG.error("Error parsing message from twitter", e); 71 } 72 } 73 } 74 } catch (IOException e) { 75 LOG.error("Error in communication with twitter api [" 76 + get.getURI().toString() + "]"); 77 try { 78 Thread.sleep(10000); 79 } catch (InterruptedException e1) { 80 } 81 } 82 } 83 84 /** 85 * spoutsSize、myIdx實現了一個spout讀取多個流tracks。 86 */ 87 @Override 88 public void open(Map conf, TopologyContext context, 89 SpoutOutputCollector collector) { 90 int spoutsSize = context 91 .getComponentTasks(context.getThisComponentId()).size(); 92 int myIdx = context.getThisTaskIndex(); 93 String[] tracks = ((String) conf.get("track")).split(","); 94 StringBuffer tracksBuffer = new StringBuffer(); 95 for (int i = 0; i < tracks.length; i++) { 96 if (i % spoutsSize == myIdx) { 97 tracksBuffer.append(","); 98 tracksBuffer.append(tracks[i]); 99 } 100 } 101 102 if (tracksBuffer.length() == 0) 103 throw new RuntimeException("No track found for spout" 104 + " [spoutsSize:" + spoutsSize + ", tracks:" 105 + tracks.length + "] the amount" 106 + " of tracks must be more then the spout paralellism"); 107 108 this.track = tracksBuffer.substring(1).toString(); 109 110 user = (String) conf.get("user"); 111 password = (String) conf.get("password"); 112 113 credentials = new UsernamePasswordCredentials(user, password); 114 credentialProvider = new BasicCredentialsProvider(); 115 credentialProvider.setCredentials(AuthScope.ANY, credentials); 116 this.collector = collector; 117 } 118 119 @Override 120 public void declareOutputFields(OutputFieldsDeclarer declarer) { 121 declarer.declare(new Fields("criteria", "tweet")); 122 } 123 }
通過這種技術,可以在數據源間分布收集器。相同的技術可以被應用在其他的場景-例如,從web服務器收集日志文件。PS:沒有試過。
2.Bolt可以使用emit(streamId, tuple)發射元組到多條流,每條流由字符串streamId來識別。然后,在TopologyBuilder 中,你可以決定訂閱哪條流。
沒有試過。2個疑問:如何declare呢?spout有這個功能么?
解答:1.declareOutputFields()方法中聲明多條流,不就可以了。
1 public void declareOutputFields(OutputFieldsDeclarer declarer) { 2 declarer.declare(new Fields("line")); 3 declarer.declareStream("second", new Fields("line2")); 4 }
2.Bolt和spout的實現來看,應該都是可以的。
3.BaseRichSpout是否是自動調用ack方法的,實現IBasicBolt接口可以自動ack?
BaseBasicBolt, is used to do the acking automatically.意思就是說,這個是自動調用ack的 (測試結果:使用BaseBasicBolt,不主動使用input.ack(),那么storm ui界面上看不到ack的個數。)so,最好使用input.ack().。PS:目前的項目編程是如下方法。
collector.emit(); input.ack();
通過IBasicBolt可以自動ack,用法如下。Storm UI可以看到該bolt的ack數目。
1 public class TotalBolt implements IBasicBolt{ 2 3 private static final long serialVersionUID = 1L; 4 static Integer Total = new Integer(0); 5 //必須要實現的方法,使用BasicOutputCollector 6 public void execute(Tuple input,BasicOutputCollector collector) { 7 try { 8 String clear = input.getStringByField("PROVINCE_ID"); 9 Total++; 10 collector.emit(new Values(Total)); 11 } catch (IllegalArgumentException e) { 12 if (input.getSourceStreamId().equals("signals24Hour")) { 13 Total = 0; 14 } 15 } 16 } 17 }
4.關於bolt的錨定
記錄原始的spout實例的最好方式是在消息元組中包含一個原始spout的引用。這個技術叫做錨定。
collector.emit(tuple,new Values(word));
每個消息都通過這種方式被錨定:把輸入消息作為emit方法的第一個參數。因為word消息被錨定在了輸入消息上,這個輸入消息是spout發送過來的tuple tree的根節點,如果任意一個word消息處理失敗,派生這個tuple tree那個spout 消息將會被重新發送。(錨定的好處是什么)
但是,這樣會不會導致 元組被重發,計數重復? 會。
系統使用一種哈希算法來根據spout消息的messageId確定由哪個acker跟蹤此消息派生出來的tuple tree。
下面這張圖是OutputCollector(Bolt)中的emit方法:
測試結果:做了anchors錨定后,沒有看到實際的效果。
A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute
method. These bolts fall into the categories of filters and simple functions. Storm has an interface called IBasicBolt
that encapsulates this pattern for you. The SplitSentence
example can be written as a IBasicBolt
like follows:
public class SplitSentence implements IBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
This implementation is simpler than the implementation from before and is semantically identical. Tuples emitted to BasicOutputCollector
are automatically anchored to the input tuple, and the input tuple is acked for you automatically when the execute method completes.
In contrast, bolts that do aggregations or joins may delay acking a tuple until after it has computed a result based on a bunch of tuples. Aggregations and joins will commonly multi-anchor their output tuples as well. These things fall outside the simpler pattern of IBasicBolt
.
5 集群的各級容錯
5. 疑問:主動調用input.ack(),日志中沒有看到調用了spout的ack方法?這是為什么呢?
原因:Spout中SpoutOutputCollector.emit()方法中,沒有加入messageID,也就是說采用了以下第一種emit方法,用第二種方法便可以看到。加入后在storm ui界面看到。
SpoutOutputCollector的emit方法:(messageID用long或者Integer型都是可以的。If the messageId was null, Storm will not track the tuple and no callback will be received. The emitted values must be immutable.)
1 collector.emit(new Values(str), messageID++);//messageID is type long.
一個消息默認被成功處理的timeOut是30s,超過30s就會觸發spout的fail方法。這個值可以根據實際的集群情況進行調整。在Topology中 Conf conf.setMessageTimeoutSecs(int secs) 設置。
storm ui界面: