覺得有用的話,點個贊啊~~~ O(∩_∩)O~~
業務背景
通過實時抽取華為ESIGHT系統的wifi數據,與校園的學生數據、課程數據、地理位置數據等進行關聯,進行校園大數據的流數據處理與分析。
技術選型
- Kafka調用ESIGHT的resutful API,接入無線數據;
- Sparkstreaming將流數據與Hive中的其他校園數據關聯分析
- 使用ES For Hadoop將分析結果導出到ES集群中
Kafka Producer
技術常規,使用kafka接入ESIGHT數據,只需要注意
- 默認的分區方法是否產生數據偏移
- 如果偏移需要自定義kafka.producer.Partitioner
SparkStreaming 接收Kafka數據流
用spark streaming流式處理kafka中的數據,第一步當然是先把數據接收過來,轉換為spark streaming中的數據結構Dstream。
接收數據的方式有兩種:
基於Receiver接收數據
這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark Streaming啟動的job會去處理那些數據。
然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。
需要注意的問題有:
- 在Receiver的方式中,Spark中的partition和kafka中的partition並不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這並沒有增加Spark在處理數據上的並行度。
- 對於不同的Group和topic我們可以使用多個Receiver創建不同的Dstream來並行接收數據,之后可以利用union來統一成一個Dstream。
- 如果我們啟用了Write Ahead Logs復制到文件系統如HDFS,那么storage level需要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER
直連方式讀取kafka數據
這種新的不基於Receiver的直接方式,是在Spark 1.3之后引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數據后,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的范圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數據。
這種方式有如下優點:
-
簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream然后對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
-
高性能:如果要保證零數據丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka自己本身就有高可靠的機制,會對數據復制一份,而這里又會復制一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數據的復制,那么就可以通過Kafka的副本進行恢復。
-
一次且僅一次(extract-once)的事務機制: 基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。 基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。
Direct連接示例
import org.apache.spark.streaming.kafka.*;
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]);
但Direct連接方式為了能夠進行異常恢復,需要考慮如何維護KafkaOffset的問題。通常由兩種方式維護
- 使用Spark的checkpoint機制,根據需要定期checkpoint並恢復。由於項目使用SparkSQL從Hive中拉取數據,可能由於SparkSQLContext的恢復處理不當,在恢復的時候會失敗;
- 通過SparkStreaming的API在Zookeeper中維護Kafka的Offset
使用Zookeeper維護KafkaOffset示例
import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.HasOffsetRanges; import org.apache.spark.streaming.kafka.KafkaCluster; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.OffsetRange; import com.sugon.smartcampus.etl.wifi.conf.WIFIConfig; import kafka.common.TopicAndPartition; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; import scala.Predef; import scala.Tuple2; import scala.collection.JavaConversions; import lombok.extern.slf4j.*; @Slf4j public class KafkaOffsetExample { private static KafkaCluster kafkaCluster = null; private static HashMap<String, String> kafkaParam = new HashMap<String, String>(); private static Broadcast<HashMap<String, String>> kafkaParamBroadcast = null; private static scala.collection.immutable.Set<String> immutableTopics = null; /** * Create the Kafka Stream Directly With Offset in ZK * * @param jssc * SparkStreamContext * @param consumerOffsetsLong * Save the Offset of Kafka Topic * @return */ private static JavaInputDStream<String> createKafkaDStream(JavaStreamingContext jssc, Map<TopicAndPartition, Long> consumerOffsetsLong) { KafkaOffsetExample.log.warn("Create KafkaDriectStream with Offset"); JavaInputDStream<String> message = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParamBroadcast.getValue(), consumerOffsetsLong, new Function<MessageAndMetadata<String, String>, String>() { private static final long serialVersionUID = 1L; @Override public String call(MessageAndMetadata<String, String> v1) throws Exception { return v1.message(); } }); return message; } private static Map<TopicAndPartition, Long> initConsumerOffset(String topic) { Set<String> topicSet = new HashSet<String>(); topicSet.add(topic); scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet); immutableTopics = mutableTopics.toSet(); scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet = kafkaCluster .getPartitions(immutableTopics).right().get(); // kafka direct stream 初始化時使用的offset數據 Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>(); if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).isLeft()) { KafkaOffsetExample.log.warn("沒有保存offset, 各個partition offset 默認為0"); Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet); for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { consumerOffsetsLong.put(topicAndPartition, 0L); } } else { KafkaOffsetExample.log.warn("offset已存在, 使用保存的offset"); scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster .getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).right().get(); Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp); Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet); KafkaOffsetExample.log.warn("put data in consumerOffsetsLong"); for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { Long offset = (Long) consumerOffsets.get(topicAndPartition); consumerOffsetsLong.put(topicAndPartition, offset); } } return consumerOffsetsLong; } private static JavaDStream<String> getAndUpdateKafkaOffset(JavaInputDStream<String> message, AtomicReference<OffsetRange[]> offsetRanges) { JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { private static final long serialVersionUID = 1L; public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); for (int i = 0; i < offsets.length; i++) KafkaOffsetExample.log.warn("topic : {}, partitions: {}, fromoffset: {}, untiloffset: {}", offsets[i].topic(), offsets[i].partition(), offsets[i].fromOffset(), offsets[i].untilOffset()); return rdd; } }); KafkaOffsetExample.log.warn("foreachRDD"); // output javaDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { private static final long serialVersionUID = 1L; public void call(JavaRDD<String> rdd) throws Exception { if (rdd.isEmpty()) { KafkaOffsetExample.log.warn("Empty RDD"); return; } for (OffsetRange o : offsetRanges.get()) { // 封裝topic.partition 與 offset對應關系 java Map TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition()); Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>(); topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset()); KafkaOffsetExample.log.warn( "Topic: " + o.topic() + " partitions: " + o.partition() + " offset : " + o.untilOffset()); // 轉換java map to scala immutable.map scala.collection.mutable.Map<TopicAndPartition, Object> testMap = JavaConversions .mapAsScalaMap(topicAndPartitionObjectMap); scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = testMap .toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) { return v1; } }); // 更新offset到kafkaCluster kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap); } } }); return javaDStream; } private static void initKafkaParams() { kafkaParam.put("metadata.broker.list", WIFIConfig.BROKER_LIST); kafkaParam.put("zookeeper.connect", WIFIConfig.ZK_CONNECT); kafkaParam.put("auto.offset.reset", WIFIConfig.AUTO_OFFSET_RESET); kafkaParam.put("group.id", WIFIConfig.GROUP_ID); } private static KafkaCluster initKafkaCluster() { KafkaOffsetExample.log.warn("transform java Map to scala immutable.map"); // transform java Map to scala immutable.map scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam); scala.collection.immutable.Map<String, String> scalaKafkaParam = testMap .toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> apply(Tuple2<String, String> arg0) { return arg0; } }); // init KafkaCluster KafkaOffsetExample.log.warn("Init KafkaCluster"); return new KafkaCluster(scalaKafkaParam); } public static void run() { initKafkaParams(); kafkaCluster = initKafkaCluster(); SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("tachyon-test-consumer"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000)); // 得到rdd各個分區對應的offset, 並保存在offsetRanges中 KafkaOffsetExample.log.warn("initConsumer Offset"); Map<TopicAndPartition, Long> consumerOffsetsLong = initConsumerOffset(WIFIConfig.KAFKA_TOPIC); kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam); JavaInputDStream<String> message = createKafkaDStream(jssc, consumerOffsetsLong); final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>(); JavaDStream<String> javaDStream = getAndUpdateKafkaOffset(message, offsetRanges); javaDStream.print(); jssc.start(); try { jssc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { String testPath = "E:\\javaCodes\\svn\\SmartCampus\\Trunk\\smartcampus.etl.wifi\\src\\main\\resources\\WifiConfig.yaml"; WIFIConfig.init(testPath); KafkaOffsetExample.log.warn(WIFIConfig.toStr()); KafkaOffsetExample.run(); } }
SparkStreaming 數據處理
根據需要,將流式數據與Hive中的靜態數據關聯,結果通過Elasticsearch For Hadoop導出到ES集群中。
如果靜態數據需要定時更新,可以在創建數據流后,在foreachRDD邏輯中,根據實際情況定期更新靜態數據。
調優
由於個人經驗較少,處理的數據量不大,以下內容大多是紙上談兵,僅供參考。
合理的批處理時間(batchDuration)
- 幾乎所有的Spark Streaming調優文檔都會提及批處理時間的調整,在StreamingContext初始化的時候,有一個參數便是批處理時間的設定。
- 如果這個值設置的過短,即個batchDuration所產生的Job並不能在這期間完成處理,那么就會造成數據不斷堆積,最終導致Spark Streaming發生阻塞。
- 一般對於batchDuration的設置不會小於500ms,因為過小會導致SparkStreaming頻繁的提交作業,對整個streaming造成額外的負擔。
-
在平時的應用中,根據不同的應用場景和硬件配置,我設在1~10s之間,我們可以根據SparkStreaming的可視化監控界面,觀察Total Delay來進行batchDuration的調整,直達SparkStreaming剛剛能及時處理完上一個批處理的數據,這樣就是目前情況的最優值。
合理的Kafka拉取量(maxRatePerPartition重要)
spark.streaming.kafka.maxRatePerPartition參數配置指定了每秒每一個topic的每一個分區獲取的最大消息數。
對於Spark Streaming消費kafka中數據的應用場景,這個配置是非常關鍵的。這個參數默認是沒有上限的,即kafka當中有多少數據它就會直接全部拉出。而根據生產者寫入Kafka的速率以及消費者本身處理數據的速度,同時這個參數需要結合上面的batchDuration,使得每個partition拉取在每個batchDuration期間拉取的數據能夠順利的處理完畢,做到盡可能高的吞吐量,而這個參數的調整可以參考可視化監控界面中的Input Rate和Processing Time。
緩存反復使用的Dstream(RDD)
Spark中的RDD和SparkStreaming中的Dstream,如果被反復的使用,最好利用cache(),將該數據流緩存起來,防止過度的調度資源造成的網絡開銷。可以參考觀察Scheduling Delay參數。
設置合理的GC
長期使用Java的小伙伴都知道,JVM中的垃圾回收機制,可以讓我們不過多的關注與內存的分配回收,更加專注於業務邏輯,JVM都會為我們搞定。對JVM有些了解的小伙伴應該知道,在Java虛擬機中,將內存分為了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費一定時間的,尤其是老年代的GC回收,需要對內存碎片進行整理,通常采用標記-清楚的做法。同樣的在Spark程序中,JVM GC的頻率和時間也是影響整個Spark效率的關鍵因素。在通常的使用中建議:
設置年老代為並發收集。
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
設置合理的CPU資源數
CPU的core數量,每個executor可以占用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況,例如,很常見的一種浪費是一個executor占用了多個core,但是總的CPU使用率卻不高(因為一個executor並不總能充分利用多核的能力),這個時候可以考慮讓么個executor占用更少的core,同時worker下面增加更多的executor,或者一台host上面增加更多的worker來增加並行執行的executor的數量,從而增加CPU利用率。
但是增加executor的時候需要考慮好內存消耗,因為一台機器的內存分配給越多的executor,每個executor的內存就越小,以致出現過多的數據spill over甚至out of memory的情況。
設置合理的parallelism
partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值太小了會導致每片數據量太大,導致內存壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。在執行action類型操作的時候(比如各種reduce操作),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操作的時候,默認返回數據的paritition數量(而在進行map類操作的時候,partition數量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的參數沒有影響)。所以說,這兩個概念密切相關,都是涉及到數據分片的,作用方式其實是統一的。通過spark.default.parallelism可以設置默認的分片數量,而很多RDD的操作都可以指定一個partition參數來顯式控制具體的分片數量。 在SparkStreaming+kafka的使用中,我們采用了Direct連接方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,我們一般默認設置為Kafka中Partition的數量。
使用高性能的算子
這里參考了美團技術團隊的博文,並沒有做過具體的性能測試,其建議如下:
-
使用reduceByKey/aggregateByKey替代groupByKey
-
使用mapPartitions替代普通map
-
使用foreachPartitions替代foreach
-
使用filter之后進行coalesce操作
-
使用repartitionAndSortWithinPartitions替代repartition與sort類操作
-
使用Kryo優化序列化性能 這個優化原則我本身也沒有經過測試,但是好多優化文檔有提到,這里也記錄下來。 在Spark中,主要有三個地方涉及到了序列化:
-
在算子函數中使用到外部變量時,該變量會被序列化后進行網絡傳輸。
-
將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable接口。
-
使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節數組。
對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。
官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要注冊所有需要進行序列化的自定義類型,因此對於開發者來說,這種方式比較麻煩。
以下是使用Kryo的代碼示例,我們只要設置序列化類,再注冊要序列化的自定義類型即可(比如算子函數中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):
// 創建SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設置序列化器為KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))