最近在使用storm做一個實時計算的項目,Spout需要從 KAFKA 集群中讀取數據,為了提高開發效率,直接使用了Storm提供的KAFKA插件。今天抽空看了一下KafkaSpout的源碼,記錄下心得體會。
KafkaSpout基於kafka.javaapi.consumer.SimpleConsumer實現了consumer客戶端的功能,包括 partition的分配,消費狀態的維護(offset)。同時KafkaSpout使用了storm的可靠API,並實現了spout的ack 和 fail機制。KafkaSpout的基本處理流程如下:
1. 建立zookeeper客戶端,在zookeeper zk_root + "/topics/" + _topic + "/partitions" 路徑下獲取到partition列表
2. 針對每個partition 到路徑Zk_root + "/topics/" + _topic + "/partitions"+"/" + partition_id + "/state"下面獲取到leader partition 所在的broker id
3. 到/broker/ids/broker id 路徑下獲取broker的host 和 port 信息,並保存到Map中Partition_id –-> learder broker
4. 獲取spout的任務個數和當前任務的index,然后再根據partition的個數來分配當前spout 所消費的partition列表
5. 針對所消費的每個broker建立一個SimpleConsumer對象用來從kafka上獲取數據
6. 提交當前partition的消費信息到zookeeper上面保存
下面對幾個關鍵點進行下分析:
一、partition 的分配策略
1. 在KafkaSpout中獲取spout的task的個數,也就是consumer的個數,代碼如下:
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
2. 在KafkaSpout中獲取當前spout的 task index,注意,task index和task id是不同的,task id是當前spout在整個topology中的id,而task index是當前spout在組件中的id,取值范圍為[0, spout_task_number-1],代碼如下:
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
3. 獲取partiton與leader partition所在broker的映射關系,代碼的調用順序如下:
ZkCoordinator:
GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
DynamicBrokersReader:
/** * Get all partitions with their current leaders */ public GlobalPartitionInformation getBrokerInfo() throws SocketTimeoutException { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(); try { int numPartitionsForTopic = getNumPartitions(); String brokerInfoPath = brokerPath(); for (int partition = 0; partition < numPartitionsForTopic; partition++) { int leader = getLeaderFor(partition); String path = brokerInfoPath + "/" + leader; try { byte[] brokerData = _curator.getData().forPath(path); Broker hp = getBrokerHost(brokerData); globalPartitionInformation.addPartition(partition, hp); } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { LOG.error("Node {} does not exist ", path); } } } catch (SocketTimeoutException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } LOG.info("Read partition info from zookeeper: " + globalPartitionInformation); return globalPartitionInformation; }
4. 獲取當前spout消費的partition
KafkaUtils:
public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) { Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks"); //獲取所有的排序后的partition列表 List<Partition> partitions = partitionInformation.getOrderedPartitions(); int numPartitions = partitions.size(); if (numPartitions < totalTasks) { LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle"); } List<Partition> taskPartitions = new ArrayList<Partition>(); //此處是核心分配算法,舉個例子來說明分配策略 //假設spout的並發度是3,當前spout的task index 是 1,總的partition的個數為5,那么當前spout消費的partition id為1,4 for (int i = taskIndex; i < numPartitions; i += totalTasks) { Partition taskPartition = partitions.get(i); taskPartitions.add(taskPartition); } logPartitionMapping(totalTasks, taskIndex, taskPartitions); return taskPartitions; }
二、partition的更新策略
如果出現broker宕機,spout掛掉的情況,那么spout是要重新分配parition的,KafkaSpout並沒有監聽zookeeper上broker、partition和其他spout的狀態,所以當有異常發生的時候KafkaSpout並不知道的,它采用了兩種方法來更新partition的分配。
1. 定時更新
根據ZkHosts中的refreshFreqSecs字段來定時更新partition列表,我們可以通過修改配置來更改定時刷新的間隔。每一次調用kafkaspout的nextTuple方法時,都會首先調用ZkCoordinator的getMyManagedPartitions方法來獲取當前spout消費的partition列表
public void nextTuple() { List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); //getMyManagedPartitions方法中會判斷是否已經到了該刷新的時間,如果到了就重新分配partition public List<PartitionManager> getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) { refresh(); _lastRefreshTime = System.currentTimeMillis(); } return _cachedList; }
2.異常更新
當調用kafkaspout的nextTuple方法出現異常時,強制更新當前spout的partition消費列表
public void nextTuple() { List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); for (int i = 0; i < managers.size(); i++) { try { EmitState state = managers.get(_currPartitionIndex).next(_collector); } catch (FailedFetchException e) { _coordinator.refresh(); } }
三、消費狀態的維護
1.首先要分析一下當spout啟動的時候是怎么獲取初始offset的。在每個spout獲取到消費的partition列表時,會針對每個partition來創建PartitionManager對象,下面看一下PartitionManager的初始化過程:
public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { _partition = id; _connections = connections; _spoutConfig = spoutConfig; _topologyInstanceId = topologyInstanceId; //到連接池里注冊partition和partition leader所在的broker host,如果連接池里有該broker的連接,則直接返回該連接、 //如果連接池里沒有,則建立broker的連接,並返回連接 _consumer = connections.register(id.host, id.partition); _state = state; _stormConf = stormConf; numberAcked = numberFailed = 0; String jsonTopologyId = null; Long jsonOffset = null; //獲取zookeeper上offset的提交路徑 String path = committedPath(); try { //從提交路徑上讀取信息,提取記錄的該partition的消費offset //如果zookeeper上沒有該路徑則表示當前topic沒有被spout消費過 Map<Object, Object> json = _state.readJSON(path); LOG.info("Read partition information from: " + path + " --> " + json ); if (json != null) { jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id"); jsonOffset = (Long) json.get("offset"); } } catch (Throwable e) { LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); } //從broker上獲取當前partition的offset,默認為獲取最新的offset,如果用戶配置forceFromStart(KafkaConfig),則獲取該partition最早的offset, //也就是consume from beginning Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig); //情況1: 如果從zookeeper上沒有獲取topology和消費信息,則直接用從broker上獲取到的offset if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? _committedTo = currentOffset; LOG.info("No partition information found, using configuration to determine offset"); //情況2: 獲取到的topology id 不一致 或者用戶要求從新獲取數據的時候,則從kafka上獲取offset //可以和情況1 合並,在KafkaUtils.getOffset已經判斷過forceFromStart,此處無需再次判斷 } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) { _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime); LOG.info("Topology change detected and reset from start forced, using configuration to determine offset"); } //情況3: 使用zookeeper上保留的offset進行消費 else { _committedTo = jsonOffset; LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId ); } //如果上次消費的offset已經過了保質期,則直接消費新數據 if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) { LOG.info("Last commit offset from zookeeper: " + _committedTo); _committedTo = currentOffset; LOG.info("Commit offset " + _committedTo + " is more than " + spoutConfig.maxOffsetBehind + " behind, resetting to startOffsetTime=" + spoutConfig.startOffsetTime); } LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo); _emittedToOffset = _committedTo; }
2. 然后看一下partition消費offset是怎么保存和維護的
PartitionManager 中的 _emittedToOffset用來保存當前消費的offset,在每一次獲取到消息的時候都會更新這個值
private void fill() { if (!had_failed || failed.contains(cur_offset)) { numMessages += 1; _pending.add(cur_offset); _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset)); //更新_emittedToOffset _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset); if (had_failed) { failed.remove(cur_offset); } } } _fetchAPIMessageCount.incrBy(numMessages); } }
3.提交offset到zookeeper
offset的提交是周期性的,提交的周期可在SpoutConfig中的stateUpdateIntervalMs中來配置。每次調用kafkaspout的nextTuple方法后都會判斷是否需要提交offset
public void nextTuple() { if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) { commit(); } }
如果需要提交則調用kafkaspout的commit方法,使用輪巡的方式提交每個partition的消費狀況
private void commit() { _lastUpdateMs = System.currentTimeMillis(); for (PartitionManager manager : _coordinator.getMyManagedPartitions()) { manager.commit(); } }
具體的提交是委托PartitionManager來完成的
public void commit() { //獲取當前要提交的offset,如果有pending的offset的話,就說明還有一些消息沒有完成處理,則提交pending消息的最小的offset //如果沒有pending的消息,則提交當前消費的offset long lastCompletedOffset = lastCompletedOffset(); //用來判斷是否有新的offset需要提交 if (_committedTo != lastCompletedOffset) { LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder() .put("topology", ImmutableMap.of("id", _topologyInstanceId, "name", _stormConf.get(Config.TOPOLOGY_NAME))) .put("offset", lastCompletedOffset) .put("partition", _partition.partition) .put("broker", ImmutableMap.of("host", _partition.host.host, "port", _partition.host.port)) .put("topic", _spoutConfig.topic).build(); _state.writeJSON(committedPath(), data); _committedTo = lastCompletedOffset; LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); } else { LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId); } }
四、kafkaspout ack 和 fail的處理
1. 首先還是說說kafkaspout消息的發送
當調用kafkaspout的nextTuple方法時,kafkaspout委托PartitionManager next方法來發送數據
public void nextTuple() { List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); for (int i = 0; i < managers.size(); i++) { try { // in case the number of managers decreased _currPartitionIndex = _currPartitionIndex % managers.size(); EmitState state = managers.get(_currPartitionIndex).next(_collector); if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } } public EmitState next(SpoutOutputCollector collector) { //判斷等待隊列是否為空,如果為空則調用fill方法從broker上取數據進行填充 if (_waitingToEmit.isEmpty()) { fill(); } while (true) { MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); if (toEmit == null) { return EmitState.NO_EMITTED; } //對kafka的消息進行解碼 Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); if (tups != null) { for (List<Object> tup : tups) { //如果tuple不為null,則發送該tuple,messageID為new KafkaMessageId(_partition, toEmit.offset) //這樣在ack 或者 fail的時候才能根據_partition找到相應的PartitionManager collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } break; } else { ack(toEmit.offset); } } if (!_waitingToEmit.isEmpty()) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } }
2. 在PartitionManager會維護一個pending 列表,用來保存已經發送但是沒有被成功處理的消息,一個failed列表,用來保存已經失敗的消息
3. 當一個消息成功處理時會調用spout的ack方法,kafkaspout會根據message id中包含的partition id 來委托相應的PartitionManager來處理
public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); } } //PartitionManager 接收到ack消息后,會判斷pending的最早的一條消息是否已經過質保,如果過質保,則清除隊列中所有過保的消息 //如果沒有過保的消息,則在pending隊列中移除當前消息 public void ack(Long offset) { if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) { // Too many things pending! _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear(); } _pending.remove(offset); numberAcked++; }
4. 當一條消息處理失敗時,會調用spout的fail方法,同樣,kafkaspout會根據message id中包含的partition id 來委托相應的PartitionManager來處理
public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } } //PartitionManager接收到fail消息,會判斷失敗的消息是否已經過保,如果過保則忽略掉 public void fail(Long offset) { if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) { LOG.info( "Skipping failed tuple at offset=" + offset + " because it's more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind + " behind _emittedToOffset=" + _emittedToOffset ); } //如果在保質期內,則加入failed列表,如果沒有成功響應的消息,並且失敗的消息個數已經超過保質期個數,則認為沒有消息成功,系統有問題,丟異常 else { LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset); failed.add(offset); numberFailed++; if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) { throw new RuntimeException("Too many tuple failures"); } } } //對於failed的消息會進行重發 private void fill() { //如果有失敗的消息,則獲取第一個的offset final boolean had_failed = !failed.isEmpty(); if (had_failed) { offset = failed.first(); } else { offset = _emittedToOffset; } ByteBufferMessageSet msgs = null; try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); LOG.warn("Using new offset: {}", _emittedToOffset); // fetch failed, so don't update the metrics return; } if (msgs != null) { int numMessages = 0; for (MessageAndOffset msg : msgs) { final Long cur_offset = msg.offset(); if (cur_offset < offset) { // Skip any old offsets. continue; } //如果該消息在failed列表中,則重新發送,並將其從failed列表中刪除 if (!had_failed || failed.contains(cur_offset)) { numMessages += 1; _pending.add(cur_offset); _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset)); _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset); if (had_failed) { failed.remove(cur_offset); } } } _fetchAPIMessageCount.incrBy(numMessages); } }