SparkStreaming-Kafka集成
文章基本是官方的翻譯, 最多再加入了一小部分自己的思考在內, 如果能看懂官方文檔, 也可以自行查看官網。
另外就是提供了自己實現的 zk + kafka + spark 獲取offset。 offset的存儲在 獲取偏移量 與 存儲偏移量的 第三小節 有描述。
基於版本:
Kafka broker version 0.10.0 or higher
0.10.0 版本的 SparkStreaming kafka 與 0.8版本的 DirectStream比較接近。
它支持比較簡單的並行性,包括 Kafka 分區 和Spark 分區之間 是 1:1對應關系,以及對偏移量和元數據的訪問。但是,由於較新的集成使用了新的Kafka使用者API而不是簡單的API,因此用法上存在顯着差異。
Maven依賴
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 2.4.4
注意: 不要自行添加 org.apache.kafka artifacts (例如:kafka-clients), 在 spark-streaming-kafka-0-10 已經集成了可使用的kafka版本, 如果自行引入其他kakfa版本可能會引發問題。
但同樣也需要注意到的是: 這一點是在2.4.4版本才添加的, 在2.4.3版本及以前還是需要自己手動引入 kafka.clients的。
創建 Direct Stream
需要注意引入的版本號: 010
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
參數:auto.offset.reset
earliest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
latest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none: topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
默認建議用earliest。設置該參數后 kafka出錯后重啟,找到未消費的offset可以繼續消費。
但是對於 Spark而言,在某些情況下 采取哪一種並沒有太大區別, 這個稍后再說。
對於kafka中可配置的參數, 可以在 KAFKA_CONFIGURATION 中找到.
如果你的 spark batch interval 時間要大於 Kafka heartbeat session timeout(默認是30s),
If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), 需要自行增加 heartbeat.interval.ms 和 session.timeout.ms. 因為 Spark是 每隔一個 batch interval才去拉取數據, 如果間隔太久, kafka就會認為已經斷開連接。 對於 batch interval 大於5分鍾的, 還需要配置另一個參數:group.max.session.timeout.ms.
另外就是 注意到 在 例子中設置: enable.auto.commit false,在稍后會描述原因。
LocationStrategies
在方法參數中, 需要傳入 LocationStrategies。
在新版的kafka Consumer API中, 會將 message 預加載到緩存中,因此, 出於性能的原因, Spark集成kafka 會將 consumers 緩存到 executor中(而不是在每個批次都重新創建 consumers),並且更傾向於 在具有 適當的 consumers 的主機上 安排分區。
在大多數情況下, 我們需要使用 LocationStrategies.PreferConsistent 它將會在 可用的 executors上均勻分配分區。
如果 executors 和 kafka brokers 在同一台主機上, 則LocationStrategies.PreferBrokers 是更好的選擇。因為它會 將 partition 優先分配到存在 kafka broker 的機器上。
因為kafka的分區會與 spark 分區一一對應, 因此, 可能會因為 kafka的數據傾斜, 導致 spark中同樣出現數據傾斜的問題, 因此 LocationStrategies.PreferFixed 允許您指定分區到主機的顯式映射(任何未指定的分區將使用一致的位置)。
consumers 的緩存的默認最大大小為64. 如果你希望處理超過(64 * executors)Kafka分區,則可以通過spark.streaming.kafka.consumer.cache.maxCapacity更改此設置。
如果你想禁用 kafka consumer 的緩存, 可以設置 spark.streaming.kafka.consumer.cache.enabled 為 false。
kafka consumer cache 的 緩存 是用 topicpartition 和 group.id 做區分的, 因此對於同時啟動 多個receiver, 需要為每個 direct stream 創建不同的 groupId。
ConsumerStrategies
kafka新的 api中, 提供了大量的不同的方法 去指定 topic,其中一部分 要求 特別大的 post對象實例(原文是: post-object-instantiation 不太理解)配置。 ConsumerStrategies 提供了一種抽象,即使從檢查點重新啟動后,Spark也可以獲得正確配置的消費者。
ConsumerStrategies.Subscribe, 允許你訂閱固定的 topic 集合。
ConsumerStrategies.SubscribePattern 允許你使用正則表達式來指定感興趣的主題。
與0.8版本的集成不同, 通過以上兩種方式 在運行流期間使用Subscribe或SubscribePattern應該響應添加分區, 在這里的意思應該是, 即使topic一開始不存在, 即使是動態添加的依然能夠在 spark 運行期間 拉取數據。
最后 ConsumerStrategies.Assign 允許你指定特定的 分區。
這三種方式 都支持你 指定 對特定分區的起始offset。
如果你具有上述選項無法滿足的需求,可以通過 extend ConsumerStrategy實現自己的方法。
最后需要提醒的是:
即使你指定的topic 和 partition 並不存在, 程序也能夠正常運行, 這得益於 kafka中的一個參數:
allow.auto.create.topics
默認為true。
創建 RDD
你可以通過指定 topic partition 以及 offset的范圍的方式, 來創建RDD
// Import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
注意:在這里不能夠使用 LocationStrategies.PreferBrokers 因為在沒有流的情況下, 缺乏驅動側的 consumer 幫你自動查找獲取 broker的元信息。 如果必須要用的話, 使用 PreferFixed 來自己查找元信息。
獲取偏移量
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
});
});
注意: HasOffsetRanges的類型轉換只有在createDirectStream 獲取到的流, 在流處理的第一個方法調用時才會成功,而不能在其之后的方法鏈中調用。需要認識到,RDD分區和Kafka分區之間的映射關系,在任何一個repartition或shuffle操作后(如reduceByKey()或Window())函數后都不再存在。
因此,我往往是通過:
dstream.transform(rdd -> {
(HasOffsetRanges) rdd.rdd()).offsetRanges();
return rdd;
})
在這之后再執行更復雜流處理過程。
存儲偏移量
在失敗的情況下, kafka的交付語義 取決於在什么時候 其 offset被存儲,存儲則意味着 歸屬於當前 offset之前的所有數據 都已經被正確處理, 因此相當於 之前的數據已經被 丟棄, 不會再度進行處理。
而這也是我們不使用 enable.auto.commit 為 true的原因。
在kafka中的自動提交機制是:
enable.auto.commit 的默認值是 true;就是默認采用自動提交的機制。
auto.commit.interval.ms 的默認值是 5000,單位是毫秒。
這樣,默認5秒鍾,一個 Consumer 將會提交它的 Offset 給 Kafka,或者每一次數據從指定的 Topic 取回時,將會提交最后一次的 Offset。
也就是說,當我們從 topic partition中取回數據時,每隔固定時間, 這個offset就會被提交。
在絕大多數情況下, 這並不是我們想要的方式。
spark的 輸出語義是 至少一次,所以 如果 你想要獲取 與 至少一次等效的語義, 你必須在 冪等的輸出操作后存儲 或在一次與輸出操作並行的原子操作中存儲。為了達到上述目的, 你有以下三種方式去處理:
-
checkPoints(檢查點)
如果你打開了Spark的checkpointing選項,偏移量會被保存在checkpoint里面。
這確實是一種很簡單的方式, 然而有一些缺點, 首先為了對於同一數據得到的輸出是 重復的, 所以你的輸出操作必須是冪等的;事務並不是一個好的選擇。
此外,如果你的代碼有了更改,就不能從checkpoint之中恢復。對於計划升級,可以在舊代碼運行的同時部署運行新的代碼來緩解這個問題(因為輸出是冪等的,所以不會造成沖突)。
但是對於意料之外的故障而需要更改代碼的,除非你有其他的方式來獲取開始的偏移量,否則就會丟失數據。
-
kafka自身
其 auto.commit 不必多說, 自然是不合適的, 因此,你可以在你確保輸出操作已經完成后使用commitSync API向Kafka提交偏移量。與checkpoint方式相比,該種方式的好處是Kafka是一個持久化的存儲,而不需要考慮代碼的更新。 然而, kafka是非事務性的, 因此仍然需要 輸出操作 是冪等的。
stream.foreachRDD(rdd -> { OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); // some time later, after outputs have completed ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); });
這本身就是一種比較好的方式。
-
自己實現
對於支持事務的數據存儲,可以在同一個事務中保存偏移量,這樣即便在失敗的情況下也可以保證兩者的同步。
如果你關心重復的或者跳過的偏移量的范圍,回滾事務可以防止重復或丟失消息影響結果。這等價於僅僅一次的語義。也可以使用這種策略來對那些通常很難保證冪等性的聚合輸出操作起作用。
// The details depend on your data store, but the general idea looks like this // begin from the the offsets committed to the database Map<TopicPartition, Long> fromOffsets = new HashMap<>(); for (resultSet : selectOffsetsFromYourDatabase) fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset")); } JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets) ); stream.foreachRDD(rdd -> { OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); Object results = yourCalculation(rdd); // begin your transaction // update results // update offsets where the end of existing offsets matches the beginning of this batch of offsets // assert that offsets were updated correctly // end your transaction });
這部分,目前比較常見的方式是, 通過 zk存儲數據,但不限於 zk, redis, mysql等方式都是可以的。
因為在這里的數據更新頻次實際上並不會太高,一般是每一批次提交一次, 因此即使是存儲在mysql中也是可以接受的。
在這里給出我自己的項目所使用的實現:
import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SparkDataSource { private static final Logger logger = LoggerFactory.getLogger(SparkDataSource.class); private static final String ZK_PATH_PREFIX = "/consumer/spark/project/offset/"; public static JavaInputDStream<ConsumerRecord<Object, Object>> getInputDStreamByKakfa(JavaStreamingContext jssc, @SuppressWarnings("rawtypes") Class valueDeserializerClass, String groupId, String topic ) { Map<String, Object> kafkaConfig = new HashMap<>(); kafkaConfig.put("bootstrap.servers", "localhost:9092,anotherhost:9092"); kafkaConfig.put("key.deserializer", StringDeserializer.class); kafkaConfig.put("value.deserializer", valueDeserializerClass); kafkaConfig.put("group.id", "groupId"); kafkaConfig.put("auto.offset.reset", "lastest"); kafkaConfig.put("enable.auto.commit", false); KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(kafkaConfig); //可能出現連接超時, topic 不存在等情況,會引起報錯,導致啟動中斷. while (true) { try { List<TopicPartition> topicPartitions = topicPartitions(consumer, topic); if (topicPartitions != null && !topicPartitions.isEmpty()) { break; } Thread.sleep(5000); } catch (Exception e) { logger.warn("獲取topic partition信息失敗", e); } } return KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Arrays.asList(new String[] {topic}), kafkaConfig, getOffset(consumer, topic)) ); } /** * 獲取偏移量, 如果zk中有,則取zk,否則直接去獲取. * @param consumer kafkaConsumer * @param topic topic * @return 最終的topic partition 和 offset */ private static Map<TopicPartition, Long> getOffset(KafkaConsumer<String, Object> consumer, String topic) { String zkPath = ZK_PATH_PREFIX + topic; //創建zk, 需要傳入自身的連接信息。 ZkClient zkClient = new ZkClient("zkServer"); //檢查當前路徑是否存在子節點, 默認是有值的,是我們在保存信息時創建的 zk節點。 int childNumber = zkClient.countChildren(zkPath); Map<TopicPartition, Long> fromOffset = new HashMap<>(); if (childNumber > 0) { //獲取對應topic的最大 offset, 因為如果請求的offset超出最大值是會報錯的. Map<TopicPartition, Long> endOffsets = getEndOffsetByTopic(consumer, topic); for (int i = 0; i < childNumber; i++) { TopicPartition tap = new TopicPartition(topic, i); //存儲kafka對應的各個partition對應 offset 的 路徑. String realPath = zkPath + "/" + i; String offset = zkClient.readData(realPath); Long lastOffset = endOffsets.get(tap); //然而這種方式也不見得完全正確, 依然存在一種可能性,topic已經被刪除,這是重新創建的數據, 且已經灌入一批數據 //所以此時應該選擇從頭開始讀, 或者說從最新處開始讀,要看個人選擇, 同時最好可以加入相關的信息標識 //表明是來自同一批數據. if (lastOffset != null) { if (lastOffset < Long.parseLong(offset)) { //如果記錄的offset過大,則可以選擇最新的offset. fromOffset.put(tap, lastOffset); } else { fromOffset.put(tap, Long.parseLong(offset)); } } else { //如果為null的話, 說明kafka的分區可能已經經過調整, 需要刪除zk對應的節點. zkClient.delete(realPath); } } } else { fromOffset = getBeginningOffsetByTopic(consumer, topic); } return fromOffset; } private static Map<TopicPartition, Long> getEndOffsetByTopic(KafkaConsumer<String, Object> consumer, String topic) { return consumer.endOffsets(topicPartitions(consumer, topic)); } private static Map<TopicPartition, Long> getBeginningOffsetByTopic(KafkaConsumer<String, Object> consumer, String topic) { return consumer.beginningOffsets(topicPartitions(consumer, topic)); } private static List<TopicPartition> topicPartitions(KafkaConsumer<String, Object> consumer, String topic) { List<PartitionInfo> partitions = consumer.partitionsFor(topic); List<TopicPartition> topicPartitons = new ArrayList<>(partitions.size()); partitions.forEach(pInfo -> { topicPartitons.add(new TopicPartition(topic, pInfo.partition())); }); return topicPartitons; } }
存儲 offset倒是沒有什么特別的地方, 主要是在 項目啟動 offset的獲取上。
SSL / TLS
Tips: HTTPS、SSL、TLS三者之間的聯系和區別 通俗來說, TLS就是 SSL標准化后的產物。
新的kafkaConsumer支持 SSL,為了支持這一點, 需要在接入kafka之前 加入一部分配置, 注意,這僅僅適用於spark和kafka 服務器之間的交流,你同樣需要保證Spark節點內部之間的安全(Spark安全)通信。
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");
程序部署
對於JAVA或Scala應用來說,如果你使用SBT或MAVEN來做項目管理,需要將spark-streaming-kafka-010_2.11包以及它的依賴包添加到你的應用的JAR包中。確保spark-core+2.11包和spark-streaming_2.11包在你的依賴中位provided級別,因為他們在Spark的安裝包中已經提供了。接下來使用spark-submit命令來部署你的應用。