覺得有用的話,點個贊啊~~~ O(∩_∩)O~~
業務背景
通過實時抽取華為ESIGHT系統的wifi數據,與校園的學生數據、課程數據、地理位置數據等進行關聯,進行校園大數據的流數據處理與分析。
技術選型
- Kafka調用ESIGHT的resutful API,接入無線數據;
- Sparkstreaming將流數據與Hive中的其他校園數據關聯分析
- 使用ES For Hadoop將分析結果導出到ES集群中
Kafka Producer
技術常規,使用kafka接入ESIGHT數據,只需要注意
- 默認的分區方法是否產生數據偏移
- 如果偏移需要自定義kafka.producer.Partitioner
SparkStreaming 接收Kafka數據流
用spark streaming流式處理kafka中的數據,第一步當然是先把數據接收過來,轉換為spark streaming中的數據結構Dstream。
接收數據的方式有兩種:
基於Receiver接收數據
這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark Streaming啟動的job會去處理那些數據。
然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。
需要注意的問題有:
- 在Receiver的方式中,Spark中的partition和kafka中的partition並不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這並沒有增加Spark在處理數據上的並行度。
- 對於不同的Group和topic我們可以使用多個Receiver創建不同的Dstream來並行接收數據,之后可以利用union來統一成一個Dstream。
- 如果我們啟用了Write Ahead Logs復制到文件系統如HDFS,那么storage level需要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER
直連方式讀取kafka數據
這種新的不基於Receiver的直接方式,是在Spark 1.3之后引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數據后,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的范圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數據。
這種方式有如下優點:
-
簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream然后對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
-
高性能:如果要保證零數據丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka自己本身就有高可靠的機制,會對數據復制一份,而這里又會復制一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數據的復制,那么就可以通過Kafka的副本進行恢復。
-
一次且僅一次(extract-once)的事務機制: 基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。 基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。
Direct連接示例
import org.apache.spark.streaming.kafka.*;
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]);
但Direct連接方式為了能夠進行異常恢復,需要考慮如何維護KafkaOffset的問題。通常由兩種方式維護
- 使用Spark的checkpoint機制,根據需要定期checkpoint並恢復。由於項目使用SparkSQL從Hive中拉取數據,可能由於SparkSQLContext的恢復處理不當,在恢復的時候會失敗;
- 通過SparkStreaming的API在Zookeeper中維護Kafka的Offset
使用Zookeeper維護KafkaOffset示例
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import com.sugon.smartcampus.etl.wifi.conf.WIFIConfig;
import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConversions;
import lombok.extern.slf4j.*;
@Slf4j
public class KafkaOffsetExample {
private static KafkaCluster kafkaCluster = null;
private static HashMap<String, String> kafkaParam = new HashMap<String, String>();
private static Broadcast<HashMap<String, String>> kafkaParamBroadcast = null;
private static scala.collection.immutable.Set<String> immutableTopics = null;
/** * Create the Kafka Stream Directly With Offset in ZK * * @param jssc * SparkStreamContext * @param consumerOffsetsLong * Save the Offset of Kafka Topic * @return */
private static JavaInputDStream<String> createKafkaDStream(JavaStreamingContext jssc,
Map<TopicAndPartition, Long> consumerOffsetsLong) {
KafkaOffsetExample.log.warn("Create KafkaDriectStream with Offset");
JavaInputDStream<String> message = KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, String.class, kafkaParamBroadcast.getValue(),
consumerOffsetsLong, new Function<MessageAndMetadata<String, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(MessageAndMetadata<String, String> v1) throws Exception {
return v1.message();
}
});
return message;
}
private static Map<TopicAndPartition, Long> initConsumerOffset(String topic) {
Set<String> topicSet = new HashSet<String>();
topicSet.add(topic);
scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet);
immutableTopics = mutableTopics.toSet();
scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet = kafkaCluster
.getPartitions(immutableTopics).right().get();
// kafka direct stream 初始化時使用的offset數據
Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>();
if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).isLeft()) {
KafkaOffsetExample.log.warn("沒有保存offset, 各個partition offset 默認為0");
Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);
for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
consumerOffsetsLong.put(topicAndPartition, 0L);
}
}
else {
KafkaOffsetExample.log.warn("offset已存在, 使用保存的offset");
scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster
.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).right().get();
Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp);
Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);
KafkaOffsetExample.log.warn("put data in consumerOffsetsLong");
for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
Long offset = (Long) consumerOffsets.get(topicAndPartition);
consumerOffsetsLong.put(topicAndPartition, offset);
}
}
return consumerOffsetsLong;
}
private static JavaDStream<String> getAndUpdateKafkaOffset(JavaInputDStream<String> message,
AtomicReference<OffsetRange[]> offsetRanges) {
JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
for (int i = 0; i < offsets.length; i++)
KafkaOffsetExample.log.warn("topic : {}, partitions: {}, fromoffset: {}, untiloffset: {}",
offsets[i].topic(), offsets[i].partition(), offsets[i].fromOffset(),
offsets[i].untilOffset());
return rdd;
}
});
KafkaOffsetExample.log.warn("foreachRDD");
// output
javaDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
public void call(JavaRDD<String> rdd) throws Exception {
if (rdd.isEmpty()) {
KafkaOffsetExample.log.warn("Empty RDD");
return;
}
for (OffsetRange o : offsetRanges.get()) {
// 封裝topic.partition 與 offset對應關系 java Map
TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>();
topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());
KafkaOffsetExample.log.warn(
"Topic: " + o.topic() + " partitions: " + o.partition() + " offset : " + o.untilOffset());
// 轉換java map to scala immutable.map
scala.collection.mutable.Map<TopicAndPartition, Object> testMap = JavaConversions
.mapAsScalaMap(topicAndPartitionObjectMap);
scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = testMap
.toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) {
return v1;
}
});
// 更新offset到kafkaCluster
kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"),
scalatopicAndPartitionObjectMap);
}
}
});
return javaDStream;
}
private static void initKafkaParams() {
kafkaParam.put("metadata.broker.list", WIFIConfig.BROKER_LIST);
kafkaParam.put("zookeeper.connect", WIFIConfig.ZK_CONNECT);
kafkaParam.put("auto.offset.reset", WIFIConfig.AUTO_OFFSET_RESET);
kafkaParam.put("group.id", WIFIConfig.GROUP_ID);
}
private static KafkaCluster initKafkaCluster() {
KafkaOffsetExample.log.warn("transform java Map to scala immutable.map");
// transform java Map to scala immutable.map
scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam);
scala.collection.immutable.Map<String, String> scalaKafkaParam = testMap
.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> apply(Tuple2<String, String> arg0) {
return arg0;
}
});
// init KafkaCluster
KafkaOffsetExample.log.warn("Init KafkaCluster");
return new KafkaCluster(scalaKafkaParam);
}
public static void run() {
initKafkaParams();
kafkaCluster = initKafkaCluster();
SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("tachyon-test-consumer");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));
// 得到rdd各個分區對應的offset, 並保存在offsetRanges中
KafkaOffsetExample.log.warn("initConsumer Offset");
Map<TopicAndPartition, Long> consumerOffsetsLong = initConsumerOffset(WIFIConfig.KAFKA_TOPIC);
kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam);
JavaInputDStream<String> message = createKafkaDStream(jssc, consumerOffsetsLong);
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();
JavaDStream<String> javaDStream = getAndUpdateKafkaOffset(message, offsetRanges);
javaDStream.print();
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
String testPath = "E:\\javaCodes\\svn\\SmartCampus\\Trunk\\smartcampus.etl.wifi\\src\\main\\resources\\WifiConfig.yaml";
WIFIConfig.init(testPath);
KafkaOffsetExample.log.warn(WIFIConfig.toStr());
KafkaOffsetExample.run();
}
}
SparkStreaming 數據處理
根據需要,將流式數據與Hive中的靜態數據關聯,結果通過Elasticsearch For Hadoop導出到ES集群中。
如果靜態數據需要定時更新,可以在創建數據流后,在foreachRDD邏輯中,根據實際情況定期更新靜態數據。
調優
由於個人經驗較少,處理的數據量不大,以下內容大多是紙上談兵,僅供參考。
合理的批處理時間(batchDuration)
- 幾乎所有的Spark Streaming調優文檔都會提及批處理時間的調整,在StreamingContext初始化的時候,有一個參數便是批處理時間的設定。
- 如果這個值設置的過短,即個batchDuration所產生的Job並不能在這期間完成處理,那么就會造成數據不斷堆積,最終導致Spark Streaming發生阻塞。
- 一般對於batchDuration的設置不會小於500ms,因為過小會導致SparkStreaming頻繁的提交作業,對整個streaming造成額外的負擔。
-
在平時的應用中,根據不同的應用場景和硬件配置,我設在1~10s之間,我們可以根據SparkStreaming的可視化監控界面,觀察Total Delay來進行batchDuration的調整,直達SparkStreaming剛剛能及時處理完上一個批處理的數據,這樣就是目前情況的最優值。

合理的Kafka拉取量(maxRatePerPartition重要)
spark.streaming.kafka.maxRatePerPartition參數配置指定了每秒每一個topic的每一個分區獲取的最大消息數。
對於Spark Streaming消費kafka中數據的應用場景,這個配置是非常關鍵的。這個參數默認是沒有上限的,即kafka當中有多少數據它就會直接全部拉出。而根據生產者寫入Kafka的速率以及消費者本身處理數據的速度,同時這個參數需要結合上面的batchDuration,使得每個partition拉取在每個batchDuration期間拉取的數據能夠順利的處理完畢,做到盡可能高的吞吐量,而這個參數的調整可以參考可視化監控界面中的Input Rate和Processing Time。
緩存反復使用的Dstream(RDD)
Spark中的RDD和SparkStreaming中的Dstream,如果被反復的使用,最好利用cache(),將該數據流緩存起來,防止過度的調度資源造成的網絡開銷。可以參考觀察Scheduling Delay參數。

設置合理的GC
長期使用Java的小伙伴都知道,JVM中的垃圾回收機制,可以讓我們不過多的關注與內存的分配回收,更加專注於業務邏輯,JVM都會為我們搞定。對JVM有些了解的小伙伴應該知道,在Java虛擬機中,將內存分為了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費一定時間的,尤其是老年代的GC回收,需要對內存碎片進行整理,通常采用標記-清楚的做法。同樣的在Spark程序中,JVM GC的頻率和時間也是影響整個Spark效率的關鍵因素。在通常的使用中建議:
設置年老代為並發收集。
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
設置合理的CPU資源數
CPU的core數量,每個executor可以占用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況,例如,很常見的一種浪費是一個executor占用了多個core,但是總的CPU使用率卻不高(因為一個executor並不總能充分利用多核的能力),這個時候可以考慮讓么個executor占用更少的core,同時worker下面增加更多的executor,或者一台host上面增加更多的worker來增加並行執行的executor的數量,從而增加CPU利用率。
但是增加executor的時候需要考慮好內存消耗,因為一台機器的內存分配給越多的executor,每個executor的內存就越小,以致出現過多的數據spill over甚至out of memory的情況。
設置合理的parallelism
partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值太小了會導致每片數據量太大,導致內存壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。在執行action類型操作的時候(比如各種reduce操作),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操作的時候,默認返回數據的paritition數量(而在進行map類操作的時候,partition數量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的參數沒有影響)。所以說,這兩個概念密切相關,都是涉及到數據分片的,作用方式其實是統一的。通過spark.default.parallelism可以設置默認的分片數量,而很多RDD的操作都可以指定一個partition參數來顯式控制具體的分片數量。 在SparkStreaming+kafka的使用中,我們采用了Direct連接方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,我們一般默認設置為Kafka中Partition的數量。
使用高性能的算子
這里參考了美團技術團隊的博文,並沒有做過具體的性能測試,其建議如下:
-
使用reduceByKey/aggregateByKey替代groupByKey
-
使用mapPartitions替代普通map
-
使用foreachPartitions替代foreach
-
使用filter之后進行coalesce操作
-
使用repartitionAndSortWithinPartitions替代repartition與sort類操作
-
使用Kryo優化序列化性能 這個優化原則我本身也沒有經過測試,但是好多優化文檔有提到,這里也記錄下來。 在Spark中,主要有三個地方涉及到了序列化:
-
在算子函數中使用到外部變量時,該變量會被序列化后進行網絡傳輸。
-
將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable接口。
-
使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節數組。
對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。
官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要注冊所有需要進行序列化的自定義類型,因此對於開發者來說,這種方式比較麻煩。
以下是使用Kryo的代碼示例,我們只要設置序列化類,再注冊要序列化的自定義類型即可(比如算子函數中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):
// 創建SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設置序列化器為KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
