spout放在每個executer執行,我們先從spoutExecutors的初始化開始往下看,spoutExecutors是在一個worker中管理其中的tasks,在SpoutExecutors的構造函數中初始化一些組件:taskId,topologyId,spout等,在這個線程中,除了一些常見的屬性,可以看到還會去創建並設置兩個對象,將待執行的task信息傳入:
1、TaskTransfer
2、TaskHeartbeatTrigger
構造完成之后,init方法進行一些初始化,在這里執行spout的open方法同時進行事件注冊:
this.spout.open(storm_conf, userTopologyCtx, outputCollector);
LOG.info("Successfully open SpoutExecutors " + idStr);
taskHbTrigger.register();
int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf);
// wait other bolt is ready
JStormUtils.sleepMs(delayRun * 1000);
if (taskStatus.isRun()) {
spout.activate();
} else {
spout.deactivate();
}
LOG.info(idStr + " is ready ");
}
在spout調用open初始化完成之后,spout需要根據配置文件每10秒讀取一次數據,這個是怎么實現的呢?發現在調用open之后,會調用taskHbTrigger.register(),taskHeartbeatTrigger是一個TimerTrigger的繼承類,他會根據配置,通過ScheduledExecutorService設置每隔一段時間執行task。
Spout.emit過程:
真正執行emit的是SpoutCollector.sendMsg
public List<Integer> sendMsg(String out_stream_id, List<Object> values, Object message_id, Integer out_task_id, ICollectorCallback callback) {
final long startTime = emitTotalTimer.getTime();
try {
boolean needAck = (message_id != null) && (ackerNum > 0); //needAck滿足的兩個條件
Long root_id = getRootId(message_id);//如果需要ack,隨機生成rootId,並對rootId做一次去重校驗
java.util.List<Integer> out_tasks;
if (out_task_id != null) {
out_tasks = sendTargets.get(out_task_id, out_stream_id, values, null, root_id);
} else {
out_tasks = sendTargets.get(out_stream_id, values, null, root_id);
}
if (out_tasks.size() == 0) {
// don't need send tuple to other task
return out_tasks;
}
List<Long> ackSeq = new ArrayList<Long>();
for (Integer t : out_tasks) {
MessageId msgid;
if (needAck) {
// Long as = MessageId.generateId();
Long as = MessageId.generateId(random);
msgid = MessageId.makeRootId(root_id, as);
ackSeq.add(as);
} else {
msgid = MessageId.makeUnanchored();
}
TupleImplExt tp = new TupleImplExt(topology_context, values, task_id, out_stream_id, msgid);
tp.setTargetTaskId(t);
transfer_fn.transfer(tp);
}
sendMsgToAck(out_stream_id, values, message_id, root_id, ackSeq, needAck);
if (callback != null)
callback.execute(out_tasks);
return out_tasks;
} finally {
emitTotalTimer.updateTime(startTime);
}
}