基於0.93版本Storm
首先,如果自己寫KafkaSpout,該怎么辦?有哪些地方需要考慮呢
1. 得實現Storm指定的接口。這樣Storm才能夠使用它。那么需要實現什么接口?需要提供什么功能給Storm調用呢?
2. 需要給spout的每個task指定任務,也就是把Kafka里的消息分配給spout task去讀取。這時候,就會有以下問題:
- 是否一個KafkaSpout需要支持多個topic?鑒於每個topology里可以有多個spout,這樣做沒有必要,而且會帶來較大的復雜性。
- 如何把partitions分給tasks? 這時候存在如下問題:
- 是否每個partition最多只分給一個task?理論上,我們可以將一個partition的消息給多個task處理,只要這些task區分自己負責的那部分消息就行,比如一個task讀偶offset的消息,一個讀奇offset的消息。
- 如何分才好。考慮到負載的平衡,而且要使得不同task間的任務不會沖突?比如,不會出現兩個task讀相同的消息的情況。
- 每個partition從何處開始讀取?如何記錄對當前Kafka topic的消費進度,使得在topology下線以后,這部分消息不會丟失,以便以后可以接着上次的消費過度處理。
3. 如何讀取?怎么使用Kafka API讀取消息?每次讀多大量的消息?需要預讀和緩沖嗎?
4. 無法從Kafka讀取消息時如何處理?在spout里重試?認為spout出現異常,交由Storm重新調度?
5. 當KafkaSpout的進度嚴重落后於Kafka消息的數量時該如何處理?當spout讀取的速度太小,使得Kafka里未被處理的消息越來越多時如何處理?
6. 需要讀取的消息不存在該如何處理?比如從Kafka取消息時,想要獲取的消息已經由於存儲時間過久,被Kafka刪除了,該如何處理?
7. 一個啟用了log compaction的topic會有何不同?
先列一下KafkaSpout的實現里的關鍵類,以便接下來分析代碼時更好理解
| GlobalPartitionInformation |
存儲partition和leader broker的映射 |
Private Map<Integer,Broker> partitionMap; |
| Partition |
存儲某個partition和它的leader broker組成的元組 |
Public final Broker host; Public final int partition; |
| KafkaSpout |
實現IRichSpout接口 |
|
| BrokerReader |
獲取分區信息。包括partition,以及partition的leader broker |
GlobalPartitionInformation getCurrentBrokers(); |
| PartitionManager |
一個partition manager負責讀取一個partition中的消息,並執行ack, fail, commit等操作 |
|
| PartitionCoordinator |
獲取當前task所使用的PartitionManager集合 刷新當前task所使用的PartitionManager集合(以應於leader變更) 何時刷新? |
List<PartitionManager> getMyManagedPartitions(); PartitionManager getManager(Partition partition); void refresh(); |
| StaticCoordinator |
根據SpoutConfig中對於partition和leader的靜態配置信息,決定當前task所使用的PartitionManager集合。 不刷新,只根據配置一次性決定partition和leader的映射 |
|
| DynamicParitionConnections |
存儲broker, SimpleConsumer和partition的對應關系。 管理SimpleConsumer集合,包括建立,關閉SimpleConsumer 根據partition獲取對應的SimpleConsumer,以復用SimpleConsumer |
public SimpleConsumer register(Partition partition)
public SimpleConsumer register(Broker host, int partition)
public SimpleConsumer getConnection(Partition partition) |
KafkaSpout的open方法
每個Spout task會有一個KafkaSpout的實例。當這個task初始化時,Storm會調用KafkaSpout的open方法,初始化這個spout task的運行環境,包括
- a. 分配partiton給這個task
- b. 為分到的每個partition生成一個PartitionManager。PartitionManager對於每個partition的消息實現了Spout接口的ack, nextTuple, fail等主要功能。
關鍵代碼如下:KafkaSpout的open方法主要用來為當前的spout task提供一個Coordinator.
//創建一個DynamicPartitionConnections,用於獲取partition對應的SimpleConsumer _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); // using TransactionalState like this is a hack //總共有多少task int totalTasks = context.getComponentTasks(context.getThisComponentId()).size(); if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } else { _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); }
其中,在KafkaConfig中使用StaticHosts還是ZkHosts對DynamicParitionConnections和Coordinator的行為都有影響。
DynamicPartitonConnections 為Partition提供SimpleConsumer
因為Kafka的每個SimpleConsumer都可以用於與一個broker通信,不管是否這些請求是針對同一個topic或partition。當一個broker作為多個partition的leader時,只需要為這一個broker建立一個SimpleConsumser,就可以用於消費這多個partition。所以需要DynamicPartitionConnection來管理partition與SimpleConsumser之間的對應關系,更好地復用。
- 當使用StaticHosts時,KafkaUtils.makeBrokerReader(conf, _spoutConfig)會生成一個StaticBrokerReader. 這個BrokerReader只會提供StaticHosts實例化時使用的分區信息。使得DynamicPartitionConnection的register(Partition partition)方法被調用時,只會返回同樣的SimpleConsumer。
- 當使用ZkHosts時,KafkaUtils.makeBrokerReader(conf, _spoutConfig)會生成一個ZkBrokerReader。這個BrokerReader帶有自動刷新功能,當兩次對它的的getCurrentBrokers的調用間隔較長,它就會重新獲取這個topic的GlobalParitionInformation,即重新獲取分區和分區的leader。使得DynamicPartitionConnection的register(Partition partition)方法被調用時,有可能會重新獲取最新的分區信息。
Coordinator 為task分配partition,並且為每個partition建立PartitionManager
Coordinator如何為task分配Partition?
無論是StaticCoordinator還是ZkCoordinator都是使用KafkaUtilsCalculatorPartitionsForTask方法來給task分配partitions
public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) { Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks"); List<Partition> partitions = partitionInformation.getOrderedPartitions(); int numPartitions = partitions.size(); if (numPartitions < totalTasks) { LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle"); } List<Partition> taskPartitions = new ArrayList<Partition>(); for (int i = taskIndex; i < numPartitions; i += totalTasks) { Partition taskPartition = partitions.get(i); taskPartitions.add(taskPartition); } logPartitionMapping(totalTasks, taskIndex, taskPartitions); return taskPartitions; }
若一個task的index為a, 那么分給它的partition在所有partition中的index(如果用StaticHosts,並且只提供了部分partition,那么可能partition的index並不是partition id)為:
partitionIndex = a + k*totalTasks, k是正整數,且partitionIndex < numPartitions
- 當使用StaticHosts時,KafkaSpout會使用StaticCoordinator,這種Cooridnator的refresh方法什么都不會做。
- 當使用ZkHosts時,KafkaSpout會使用ZkCoordinator。這種Coordinator的refresh方法被調用時,它會通過BrokerReader獲取最新的分區信息,重新為當前的task計算分區,然后為新的分區提供PartitionManager,從當前task的分區表時移除舊的分區,關閉舊的分區。注意,當某個分區的leader變更后,它對應的Partition實例的broker字段會和以前的不同,因此會認為是新的Partition。當這種Coordinator的getMyManagedPartitions方法被調用時,如果過太久沒刷新,它就會調用refresh()方法,重新獲取這個task對應的PartitionManager集合。
public List<PartitionManager> getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) { refresh(); _lastRefreshTime = System.currentTimeMillis(); } return _cachedList; }
-
那么何時getMyManagedPartition會被調用呢?是在KafkaSpout的nextTuple方法被調用時。也就是每次nextTuple被調用, ZkCoordinator都會檢查是否需要更新PartitionManager集合。
- 如果partition的leader發生成了變更,而Coordinator沒有刷新呢?此時,按照舊的leader獲取消息,就拋出異常。而KafkaSpout的nextTuple方法會捕獲異常,然后主動調用coordinator的refresh()方法獲取新的PartitionManager集合。
-
KafkaSpout對於IRichSpout接口的實現
nextTuple方法的實現
public void nextTuple() { List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); for (int i = 0; i < managers.size(); i++) { try { // in case the number of managers decreased _currPartitionIndex = _currPartitionIndex % managers.size(); EmitState state = managers.get(_currPartitionIndex).next(_collector); if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } if (state != EmitState.NO_EMITTED) { break; } } catch (FailedFetchException e) { LOG.warn("Fetch failed", e); _coordinator.refresh(); } } long now = System.currentTimeMillis(); if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) { commit(); } }
首先,它會從coordinator處獲取當前所管理的所有partition.然后試着從這些partition的消息中emit tuple, 由於可以采用schema解析Kafka的消息,使得一個消息對應多個tuple,所以這里每次試用nextTuple,可能實際上會emit多個tuple。這就帶來了一個問題,如果一個 Kafka message生成多個tuple,那么是否這些tuple都被ack了,才認為這個Kafka消息處理完了呢?實際上,現在的KafkaSpout的實現里,只要其中有一個tuple失敗了,就認為message失敗了。
可以看到,代碼里的for循環最多會循環manager.size()次,也就是它管理多少個partition,就最多循環幾次。但實際上,只要有一個消息產生了tuple,for循環就會終止。也就是nextTuple被調用后,只要有一條消息被成功解析為tuple,它就不再繼續處理消息,在按配置時間間隔記錄下進度后,方法就執行完畢。nextTuple方法調用PartitionManager來emit tuple,根據PartitionManager的next方法返回的狀態nextTuple的控制流程。PartitionManager的next方法最多只emit一條消息產生的所有tuple,先說一下這個next方法返回的狀態的意義:
- NO_EMITTED 表示此次調用沒有emit任何tuple。其它狀態都是已經從一條消息emit了tuple,有可能處理了多條消息,但可能最初的消息沒能解析成tuple,但只有一條消息解析成tuple,next方法就不會再處理消息。
- EMITTED_MORE_LEFT 表示已經處理了一個消息emit了一個或一些tuple, 但是這個partition還有消息已經被讀取卻還沒有處理。
- EMITTED_END 表示已經從一個消息emit了一個或一些tuple,並且這個partition所有已經獲取的消息都已經被處理了。
根據這些狀態,KafkaSpout做出以下處理:
- 如果不是NO_EMITTED,也就是EMITTED_MORE_LEFT或者EMITTED_END,表示已經emit了tuple,所以就退出for循環,不再emit新的tuple.
- 如果不是EMITTED_MORE_LEFT,說明這個PartitionManager已讀的消息都已進行了處理,下次就從另一個PartitionManager處獲取消息,所以更新_currentPartitionIndex
不管是emit了tuple而退出循環, 或者把當前管理的partition循環了一遍之后還卻沒有emit任何消息而退出循環。nextTuple的最后都會檢查是否需要在Zookeeper里記錄進度。
KafkaSpout的ack, commit, fail方法的具體邏輯都由PartitionManager來實現。下一篇會詳細進行分析。
