本文主要介紹了Storm Spout,並以KafkaSpout為例,進行了說明。
概念
數據源(Spout)是拓撲中數據流的來源。一般 Spout 會從一個外部的數據源讀取元組然后將他們發送到拓撲中。根據需求的不同,Spout 既可以定義為可靠的數據源,也可以定義為不可靠的數據源。一個可靠的 Spout 能夠在它發送的元組處理失敗時重新發送該元組,以確保所有的元組都能得到正確的處理;相對應的,不可靠的 Spout 就不會在元組發送之后對元組進行任何其他的處理。
一個 Spout 可以發送多個數據流。為了實現這個功能,可以先通過 OutputFieldsDeclarer 的 declareStream 方法來聲明定義不同的數據流,然后在發送數據時在 SpoutOutputCollector 的 emit 方法中將數據流 id 作為參數來實現數據發送的功能。
Spout 中的關鍵方法是 nextTuple。顧名思義,nextTuple 要么會向拓撲中發送一個新的元組,要么會在沒有可發送的元組時直接返回。需要特別注意的是,由於 Storm 是在同一個線程中調用所有的 Spout 方法,nextTuple 不能被 Spout 的任何其他功能方法所阻塞,否則會直接導致數據流的中斷(關於這一點,阿里的 JStorm 修改了 Spout 的模型,使用不同的線程來處理消息的發送,這種做法有利有弊,好處在於可以更加靈活地實現 Spout,壞處在於系統的調度模型更加復雜,如何取舍還是要看具體的需求場景吧——譯者注)。
Spout 中另外兩個關鍵方法是 ack 和 fail,他們分別用於在 Storm 檢測到一個發送過的元組已經被成功處理或處理失敗后的進一步處理。注意,ack 和 fail 方法僅僅對上述“可靠的” Spout 有效。
實現
在實現Spout的時候,有兩種常用的方式:
- implements IRichSpout
- extends BaseRichSpout
IRichSpout
從上圖看出,IRchSpout 繼承了ISpout
和 IComponent
這兩個接口,所以一共有9個函數需要實現。
- open: 環境初始化,調用open函數。
- close: 當
ISpout
停止的時候,進行調用,但是並不一定保證成功,因為集群是調用kill -9
來停止程序的. - active: 激活spout,將spout的狀態從
deactive
轉變為active
,緊接着spout就會調用nextTuple。 - deactive: 關閉spout。
- nextTuple: Spout向下游發射一組tuple。
- ack: storm確認spout發射出的id為msgId的tuple已經被處理完畢。
- fail: storm 確認spout發射出的id為msgId的tuple在下游處理失敗了。
- declareOutputFields(OutputFieldsDeclarer declarer): 聲明當前Spout要發送的stram的field的名字。
- getComponentConfiguration:在component中聲明配置信息。只有一些以"topology."開頭的配置才會被重寫。並且這些配置信息也可以通過
TopologyBuilder
來覆蓋。
BaseRichSpout
BaseRichSpout
是一個抽象類,它實現了IRichSpout
的部分接口。如果業務不要求實現這些接口的時候,可以使用BaseRichSpout
。
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
}
舉例KafkaSpout
KafkaSpoout的作用是從kafka中讀取數據,然后發送給下游進行處理。
回憶一下,一般消費kafka的流程是:
- 創建一個consumer實例。
- 訂閱tpoic。
- 消費數據。
- 進行處理。
- commit offset。
具體的使用方式看這里。
在Storm中,它的使用方式如下:
String bootstrapServers = projectProperties.getProperty("kafkaBootstrapServers");
String[] topic = projectProperties.getProperty("kafkaConsumerTopic").split(",");
String consumerGroupId = projectProperties.getProperty("kafkaConsumerGroupId");
KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(bootstrapServers, topic)
.setGroupId(consumerGroupId)
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
.build();
....
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafkaSpout", new KafkaSpout<String, String>(kafkaSpoutConfig));
首先創建一個KafkaSpoutConfig
,這個里面包含了相關的配置。
然后創建KafkaSpout實例。
簡要介紹下KafkaSpoutConfig
, 它不僅包含了kafka consumer的配置,還包含了 Kafka spout 的一些配置。
// Kafka consumer configuration
private final Map<String, Object> kafkaProps;
private final Subscription subscription;
private final SerializableDeserializer<K> keyDes;
private final Class<? extends Deserializer<K>> keyDesClazz;
private final SerializableDeserializer<V> valueDes;
private final Class<? extends Deserializer<V>> valueDesClazz;
private final long pollTimeoutMs;
// Kafka spout configuration
private final RecordTranslator<K, V> translator;
private final long offsetCommitPeriodMs;
private final int maxUncommittedOffsets;
private final FirstPollOffsetStrategy firstPollOffsetStrategy;
private final KafkaSpoutRetryService retryService;
private final long partitionRefreshPeriodMs;
private final boolean emitNullTuples;
KafkaSpoout 除了實現了從kafka中獲取數據,然后emit之外,還實現了retry機制。
- 如果要從kafka中獲取數據,就要初始化一個KafkaConsumer對象。這個過程是在構造函數中實現的:
public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>());
}
KafkaConsumerFactoryDefault 這個類的作用就是創建一個KafkaConsumer 對象。
-
在Spout中,第一個調用的是open函數。這個函數對所有需要的變量進行初始化。這里的變量太多了,如果列舉會太專注細節,而忽視了流程。
-
在 調用 open 函數之后,需要調用 active 函數啟動spout,並訂閱topic。
@Override
public void activate() {
try {
subscribeKafkaConsumer();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}
- active 之后,就是調用nextTuple函數,從kafka中poll數據,然后進行發送。每次發送完一個tuple后,就會在emitted 這個set中添加對應的記錄,方便后續的追蹤。
@Override
public void nextTuple() {
try {
//如果設置了自動提交,或者距離上次提交時間已經過了指定時間
if (commit()) {
commitOffsetsForAckedTuples();
}
if (poll()) {
try {
setWaitingToEmit(pollKafkaBroker());
} catch (RetriableException e) {
LOG.error("Failed to poll from kafka.", e);
}
}
if (waitingToEmit()) {
emit();
}
....
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}
- 后續的bolt按照預期處理完對應的tuple后,會進行ack。這時會調用ack函數。ack函數會將對應msg從emitted中去除。
public void ack(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!emitted.contains(msgId)) {
.....
} else {
.....
emitted.remove(msgId);
}
}
- 如果后續的bolt處理消息失敗了,就會調用fail函數.而fail函數會進行重試。
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!emitted.contains(msgId)) {
.....
}
emitted.remove(msgId);
msgId.incrementNumFails();
if (!retryService.schedule(msgId)) {
.....
ack(msgId);
}
}
- 等到消息處理完了之后,就需要調用 deactive 函數,deactive函數就是commit offset 並 close comsumer。 close 函數的實現和 deactive函數的實現一模一樣。
@Override
public void close() {
try {
shutdown();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}
private void shutdown() {
try {
if (!consumerAutoCommitMode) {
commitOffsetsForAckedTuples();
}
} finally {
//remove resources
kafkaConsumer.close();
}
}
一些細節
上面描述了KafkaSpout的大體流程,這里記錄下其它的實現細節。
KafkaConsumerFactory
在創建kafka的comsumer對象的時候,使用了接口KafkaConsumerFactory
,而這個接口只有一個實現KafkaConsumerFactoryDefault
。
FirstPollOffsetStrategy
在第一次重kafka中讀取數據的時候,spout提供了4種不同的策略。
/**
* <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li>
* <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li>
* <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any.
* If no offset has been committed, it behaves as EARLIEST.</li>
* <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any.
* If no offset has been committed, it behaves as LATEST.</li>
*/
public static enum FirstPollOffsetStrategy {
EARLIEST,
LATEST,
UNCOMMITTED_EARLIEST,
UNCOMMITTED_LATEST }
Subscription
Subscription 封裝了 kafka consumer 的 subscribe,然后提供了兩個實現:
- NamedSubscription
- PatternSubscription
兩者的區別就是subscribe 的方式不同。
public class PatternSubscription extends Subscription {
.....
@Override
public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
consumer.subscribe(pattern, listener);
LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
// Initial poll to get the consumer registration process going.
// KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
consumer.poll(0);
}
.....
}
為什么要做這層封裝,我猜的是要通過 consumer.poll(0)
來觸發KafkaSpoutConsumerRebalanceListener
。
KafkaSpoutConsumerRebalanceListener
Kafka 在增減consumer, partition, broker的時候會觸發rebalance。 rebalance 之后, consumer對應的partition就會發生變化。這個時候要確保兩件事情,第一是rebalance之前要commit 當前partition 消費的offset,第二是從新的 partition 獲取當前的offset。而這些,都是通過實現ConsumerRebalanceListener
達到目的。
在Storm中,就是:
private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
if (!consumerAutoCommitMode && initialized) {
initialized = false;
//提交所有的已經ack的tuple的offset
commitOffsetsForAckedTuples();
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//根據partitions進行初始化,包括獲取當前的offset,更新其他變量..
initialize(partitions);
}
KafkaSpoutMessageId
private transient TopicPartition topicPart;
private transient long offset;
private transient int numFails = 0;
private boolean emitted;
FailedMsgRetryManager
Storm對失敗的消息如何處理,有下面的幾個問題?
- 消息處理失敗了,應該怎么弄?
- 消息重復處理失敗了,應該怎么弄?
- 消息重復處理成功了,應該怎么弄?
- 怎么獲取將要處理的消息?
- 怎么知道這條消息是否需要發送?
KafkaSpoutRetryService
接口和它的實現KafkaSpoutRetryExponentialBackoff
就是用來解決這個問題的。
在KafkaSpoutRetryService
設計了幾種方法,包括添加、去掉處理失敗的消息,查看當前的消息是否已經被添加和當前的消息是否達到了重新發送的條件。
KafkaSpoutRetryExponentialBackoff
實現了一種消息失敗的處理方式。如果某條消息處理失敗了,就會重試一定的次數,並且每次重試的時間按照指數時間增加。當然如果超過了最大的重試次數,KafkaSpou默認會將它ACK掉。
那么怎么實現的呢?
- 使用
RetrySchedule
表示每條處理失敗了消息,里面包含了:private final KafkaSpoutMessageId msgId; private long nextRetryTimeNanos;
這里面nextRetryTimeNanos
表示下次重試的時間,如果這個時間小於當前的時間,就說明這條消息可以重試了。
-
KafkaSpoutRetryExponentialBackoff
使用了兩個數據結構來保存每條要重試的數據,其中一個retrySchedules
是一個treeSet,里面按照nextRetryTimeNanos
從小到大進行排序。toRetryMsgs
是一個HashSet,查找某條重試數據的狀態。private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR); private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
-
消息處理失敗了,應該怎么弄?
會增加這條消息的失敗次數,然后調用
schedule
函數,這個函數會將消息添加到上面兩個數據結構中。如果添加失敗,就說明這個消息已經超過最大處理次數。否則的話,就會更新數據結構。 -
消息重復處理失敗了,應該怎么弄?
處理過程同上,只不過會將
retrySchedules
和toRetryMsgs
中對應的數據先刪掉,然后再添加更新后的數據。 -
怎么獲取將要處理的消息?
先通過
retriableTopicPartitions
來獲取需要重試的消息的TopicPartition
集合,然后重新從這些TopicPartition
中獲取數據。 -
怎么知道這條消息是否需要發送?
遍歷
retrySchedules
進行查找。 -
consumer Poll 的時候,是拉取多條消息的,怎么保證某條消息處理失敗了,重新拉取后,只向bolts中發送這條處理失敗的消息?
- nextTuple函數中,consumer poll一堆消息后,它會逐條發送,並且將已經發送的消息保存到 emitted 這個 Set 中。
- 如果這條消息處理成功了,會將這條消息從emitted 中去掉,並保存到acked中。
- 如果這條消息處理失敗了, 會將這條消息從emitted中去掉,添加到retryService中。
- 在nextTuple重新拉取數據的時候,它會優先從需要retry的offset處開始拉取消息,這樣就會重復拉取一些消息,所以,在emit的時候,會先從emitted 和acked 中查看是否包含了這條消息,如果不包含,就會發送。這樣子發送的消息就只會有那條處理失敗的消息了。
- 當然這條消息如果多次失敗,也會被標記為處理成功了。