SparkStreaming+Kafka 處理實時WIFI數據


 

 


覺得有用的話,點個贊啊~~~ O(∩_∩)O~~


業務背景

通過實時抽取華為ESIGHT系統的wifi數據,與校園的學生數據、課程數據、地理位置數據等進行關聯,進行校園大數據的流數據處理與分析。

技術選型

  • Kafka調用ESIGHT的resutful API,接入無線數據;
  • Sparkstreaming將流數據與Hive中的其他校園數據關聯分析
  • 使用ES For Hadoop將分析結果導出到ES集群中

Kafka Producer

技術常規,使用kafka接入ESIGHT數據,只需要注意

  • 默認的分區方法是否產生數據偏移
  • 如果偏移需要自定義kafka.producer.Partitioner

SparkStreaming 接收Kafka數據流

用spark streaming流式處理kafka中的數據,第一步當然是先把數據接收過來,轉換為spark streaming中的數據結構Dstream。
接收數據的方式有兩種:

基於Receiver接收數據

這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark Streaming啟動的job會去處理那些數據。
然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。

需要注意的問題有:

  • 在Receiver的方式中,Spark中的partition和kafka中的partition並不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這並沒有增加Spark在處理數據上的並行度。
  • 對於不同的Group和topic我們可以使用多個Receiver創建不同的Dstream來並行接收數據,之后可以利用union來統一成一個Dstream。
  • 如果我們啟用了Write Ahead Logs復制到文件系統如HDFS,那么storage level需要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER

直連方式讀取kafka數據

這種新的不基於Receiver的直接方式,是在Spark 1.3之后引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數據后,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的范圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數據。

這種方式有如下優點:

  • 簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream然后對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。

  • 高性能:如果要保證零數據丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka自己本身就有高可靠的機制,會對數據復制一份,而這里又會復制一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數據的復制,那么就可以通過Kafka的副本進行恢復。

  • 一次且僅一次(extract-once)的事務機制: 基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。 基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。

Direct連接示例

import org.apache.spark.streaming.kafka.*;

 JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class], [map of Kafka parameters], [set of topics to consume]);

但Direct連接方式為了能夠進行異常恢復,需要考慮如何維護KafkaOffset的問題。通常由兩種方式維護

  • 使用Spark的checkpoint機制,根據需要定期checkpoint並恢復。由於項目使用SparkSQL從Hive中拉取數據,可能由於SparkSQLContext的恢復處理不當,在恢復的時候會失敗;
  • 通過SparkStreaming的API在Zookeeper中維護Kafka的Offset

使用Zookeeper維護KafkaOffset示例


import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;

import com.sugon.smartcampus.etl.wifi.conf.WIFIConfig;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConversions;
import lombok.extern.slf4j.*;

@Slf4j
public class KafkaOffsetExample {
	private static KafkaCluster kafkaCluster = null;
	private static HashMap<String, String> kafkaParam = new HashMap<String, String>();
	private static Broadcast<HashMap<String, String>> kafkaParamBroadcast = null;
	private static scala.collection.immutable.Set<String> immutableTopics = null;

	/** * Create the Kafka Stream Directly With Offset in ZK * * @param jssc * SparkStreamContext * @param consumerOffsetsLong * Save the Offset of Kafka Topic * @return */
	private static JavaInputDStream<String> createKafkaDStream(JavaStreamingContext jssc,
			Map<TopicAndPartition, Long> consumerOffsetsLong) {
		KafkaOffsetExample.log.warn("Create KafkaDriectStream with Offset");
		JavaInputDStream<String> message = KafkaUtils.createDirectStream(jssc, String.class, String.class,
				StringDecoder.class, StringDecoder.class, String.class, kafkaParamBroadcast.getValue(),
				consumerOffsetsLong, new Function<MessageAndMetadata<String, String>, String>() {
					private static final long serialVersionUID = 1L;

					@Override
					public String call(MessageAndMetadata<String, String> v1) throws Exception {
						return v1.message();
					}
				});
		return message;
	}

	private static Map<TopicAndPartition, Long> initConsumerOffset(String topic) {
		Set<String> topicSet = new HashSet<String>();
		topicSet.add(topic);
		scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet);
		immutableTopics = mutableTopics.toSet();
		scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet = kafkaCluster
				.getPartitions(immutableTopics).right().get();
		
		// kafka direct stream 初始化時使用的offset數據
		Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>();
		if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).isLeft()) {
			KafkaOffsetExample.log.warn("沒有保存offset, 各個partition offset 默認為0");
			Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);
			for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
				consumerOffsetsLong.put(topicAndPartition, 0L);
			}
		}
		else {
			KafkaOffsetExample.log.warn("offset已存在, 使用保存的offset");
			scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster
					.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).right().get();

			Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp);
			Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);

			KafkaOffsetExample.log.warn("put data in consumerOffsetsLong");
			for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
				Long offset = (Long) consumerOffsets.get(topicAndPartition);
				consumerOffsetsLong.put(topicAndPartition, offset);
			}
		}
		return consumerOffsetsLong;
	}
	
	private static JavaDStream<String> getAndUpdateKafkaOffset(JavaInputDStream<String> message, 
			AtomicReference<OffsetRange[]> offsetRanges) {
		JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
			private static final long serialVersionUID = 1L;
			public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
				OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
				offsetRanges.set(offsets);
				for (int i = 0; i < offsets.length; i++)
					KafkaOffsetExample.log.warn("topic : {}, partitions: {}, fromoffset: {}, untiloffset: {}",
							offsets[i].topic(), offsets[i].partition(), offsets[i].fromOffset(),
							offsets[i].untilOffset());
				return rdd;
			}
		});
		KafkaOffsetExample.log.warn("foreachRDD");
		// output
		javaDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
			private static final long serialVersionUID = 1L;

			public void call(JavaRDD<String> rdd) throws Exception {
				if (rdd.isEmpty()) {
					KafkaOffsetExample.log.warn("Empty RDD");
					return;
				}
				for (OffsetRange o : offsetRanges.get()) {
					// 封裝topic.partition 與 offset對應關系 java Map
					TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
					Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>();
					topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());

					KafkaOffsetExample.log.warn(
							"Topic: " + o.topic() + " partitions: " + o.partition() + " offset : " + o.untilOffset());

					// 轉換java map to scala immutable.map
					scala.collection.mutable.Map<TopicAndPartition, Object> testMap = JavaConversions
							.mapAsScalaMap(topicAndPartitionObjectMap);
					scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = testMap
							.toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() {
								private static final long serialVersionUID = 1L;

								@Override
								public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) {
									return v1;
								}
							});
					// 更新offset到kafkaCluster
					kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"),
							scalatopicAndPartitionObjectMap);
				}
			}
		});
		return javaDStream;
	}
	
	private static void initKafkaParams() {
		kafkaParam.put("metadata.broker.list", WIFIConfig.BROKER_LIST);
		kafkaParam.put("zookeeper.connect", WIFIConfig.ZK_CONNECT);
		kafkaParam.put("auto.offset.reset", WIFIConfig.AUTO_OFFSET_RESET);
		kafkaParam.put("group.id", WIFIConfig.GROUP_ID);
	}
	
	private static KafkaCluster initKafkaCluster() {
		KafkaOffsetExample.log.warn("transform java Map to scala immutable.map");
		// transform java Map to scala immutable.map
		scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam);
		scala.collection.immutable.Map<String, String> scalaKafkaParam = testMap
				.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2<String, String> apply(Tuple2<String, String> arg0) {
						return arg0;
					}
				});

		// init KafkaCluster
		KafkaOffsetExample.log.warn("Init KafkaCluster");
		return new KafkaCluster(scalaKafkaParam);
	}
	
	public static void run() {
		initKafkaParams();
		kafkaCluster = initKafkaCluster();

		SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("tachyon-test-consumer");
		JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));
		
		// 得到rdd各個分區對應的offset, 並保存在offsetRanges中
		KafkaOffsetExample.log.warn("initConsumer Offset");
		Map<TopicAndPartition, Long> consumerOffsetsLong = initConsumerOffset(WIFIConfig.KAFKA_TOPIC);
		kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam);
		
		JavaInputDStream<String> message = createKafkaDStream(jssc, consumerOffsetsLong);
		final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();
		JavaDStream<String> javaDStream = getAndUpdateKafkaOffset(message, offsetRanges);
		
		javaDStream.print();
		
		jssc.start();
		try {
			jssc.awaitTermination();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws Exception {
		String testPath = "E:\\javaCodes\\svn\\SmartCampus\\Trunk\\smartcampus.etl.wifi\\src\\main\\resources\\WifiConfig.yaml";
		WIFIConfig.init(testPath);
		KafkaOffsetExample.log.warn(WIFIConfig.toStr());
		
		KafkaOffsetExample.run();
	}
}

  



SparkStreaming 數據處理

根據需要,將流式數據與Hive中的靜態數據關聯,結果通過Elasticsearch For Hadoop導出到ES集群中。

如果靜態數據需要定時更新,可以在創建數據流后,在foreachRDD邏輯中,根據實際情況定期更新靜態數據。

調優

由於個人經驗較少,處理的數據量不大,以下內容大多是紙上談兵,僅供參考。

合理的批處理時間(batchDuration)

  • 幾乎所有的Spark Streaming調優文檔都會提及批處理時間的調整,在StreamingContext初始化的時候,有一個參數便是批處理時間的設定。
  • 如果這個值設置的過短,即個batchDuration所產生的Job並不能在這期間完成處理,那么就會造成數據不斷堆積,最終導致Spark Streaming發生阻塞。
  • 一般對於batchDuration的設置不會小於500ms,因為過小會導致SparkStreaming頻繁的提交作業,對整個streaming造成額外的負擔。
  • 在平時的應用中,根據不同的應用場景和硬件配置,我設在1~10s之間,我們可以根據SparkStreaming的可視化監控界面,觀察Total Delay來進行batchDuration的調整,直達SparkStreaming剛剛能及時處理完上一個批處理的數據,這樣就是目前情況的最優值。

 

合理的Kafka拉取量(maxRatePerPartition重要)

spark.streaming.kafka.maxRatePerPartition參數配置指定了每秒每一個topic的每一個分區獲取的最大消息數。

對於Spark Streaming消費kafka中數據的應用場景,這個配置是非常關鍵的。這個參數默認是沒有上限的,即kafka當中有多少數據它就會直接全部拉出。而根據生產者寫入Kafka的速率以及消費者本身處理數據的速度,同時這個參數需要結合上面的batchDuration,使得每個partition拉取在每個batchDuration期間拉取的數據能夠順利的處理完畢,做到盡可能高的吞吐量,而這個參數的調整可以參考可視化監控界面中的Input Rate和Processing Time。

 

緩存反復使用的Dstream(RDD)

Spark中的RDD和SparkStreaming中的Dstream,如果被反復的使用,最好利用cache(),將該數據流緩存起來,防止過度的調度資源造成的網絡開銷。可以參考觀察Scheduling Delay參數。

設置合理的GC

長期使用Java的小伙伴都知道,JVM中的垃圾回收機制,可以讓我們不過多的關注與內存的分配回收,更加專注於業務邏輯,JVM都會為我們搞定。對JVM有些了解的小伙伴應該知道,在Java虛擬機中,將內存分為了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費一定時間的,尤其是老年代的GC回收,需要對內存碎片進行整理,通常采用標記-清楚的做法。同樣的在Spark程序中,JVM GC的頻率和時間也是影響整個Spark效率的關鍵因素。在通常的使用中建議:

設置年老代為並發收集。
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

設置合理的CPU資源數

CPU的core數量,每個executor可以占用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況,例如,很常見的一種浪費是一個executor占用了多個core,但是總的CPU使用率卻不高(因為一個executor並不總能充分利用多核的能力),這個時候可以考慮讓么個executor占用更少的core,同時worker下面增加更多的executor,或者一台host上面增加更多的worker來增加並行執行的executor的數量,從而增加CPU利用率。

但是增加executor的時候需要考慮好內存消耗,因為一台機器的內存分配給越多的executor,每個executor的內存就越小,以致出現過多的數據spill over甚至out of memory的情況。

設置合理的parallelism

partition和parallelism,partition指的就是數據分片的數量,每一次task只能處理一個partition的數據,這個值太小了會導致每片數據量太大,導致內存壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。在執行action類型操作的時候(比如各種reduce操作),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操作的時候,默認返回數據的paritition數量(而在進行map類操作的時候,partition數量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的參數沒有影響)。所以說,這兩個概念密切相關,都是涉及到數據分片的,作用方式其實是統一的。通過spark.default.parallelism可以設置默認的分片數量,而很多RDD的操作都可以指定一個partition參數來顯式控制具體的分片數量。 在SparkStreaming+kafka的使用中,我們采用了Direct連接方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,我們一般默認設置為Kafka中Partition的數量。

使用高性能的算子

這里參考了美團技術團隊的博文,並沒有做過具體的性能測試,其建議如下:

  • 使用reduceByKey/aggregateByKey替代groupByKey

  • 使用mapPartitions替代普通map

  • 使用foreachPartitions替代foreach

  • 使用filter之后進行coalesce操作

  • 使用repartitionAndSortWithinPartitions替代repartition與sort類操作

  • 使用Kryo優化序列化性能 這個優化原則我本身也沒有經過測試,但是好多優化文檔有提到,這里也記錄下來。 在Spark中,主要有三個地方涉及到了序列化:

  • 在算子函數中使用到外部變量時,該變量會被序列化后進行網絡傳輸。

  • 將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable接口。

  • 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節數組。

對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。
官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之所以默認沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要注冊所有需要進行序列化的自定義類型,因此對於開發者來說,這種方式比較麻煩。

以下是使用Kryo的代碼示例,我們只要設置序列化類,再注冊要序列化的自定義類型即可(比如算子函數中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):

// 創建SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設置序列化器為KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM