1、UpdateStateByKey、windows等有狀態的操作時,自動進行checkpoint,必須設置checkpoint目錄,數據保留一份在容錯的文件系統中,一旦內存中的數據丟失,可以從文件系統中讀取數據,不需要重新計算。
SparkStreaming.checkpoint("hdfs://ip:port/checkpoint")
2、Driver高可用性
一、Java版
第一次在創建和啟動StreamingContext的時候,那么將持續不斷的產生實時計算的元數據並寫入檢查點,如果driver節點掛掉,那么可以讓Spark集群自動重啟集群(必須使用yarn cluster模式,spark-submit --deploy-mode cluster --supervise ....),然后繼續運行計算程序,沒有數據丟失。
private static void testDriverHA() {
final Streaming checkpointDir="hdfs://ip:port/checkpoint";
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("AdClickRealTimeStatSpark");
JavaStreamingContext jssc = new JavaStreamingContext(
conf, Durations.seconds(5));
jssc.checkpoint(checkpointDir);
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put(Constants.KAFKA_METADATA_BROKER_LIST,
ConfigurationManager.getProperty(Constants.KAFKA_METADATA_BROKER_LIST));
String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
String[] kafkaTopicsSplited = kafkaTopics.split(",");
Set<String> topics = new HashSet<String>();
for(String kafkaTopic : kafkaTopicsSplited) {
topics.add(kafkaTopic);
}
JavaPairInputDStream<String, String> adRealTimeLogDStream = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics);
JavaPairDStream<String, String> filteredAdRealTimeLogDStream =
filterByBlacklist(adRealTimeLogDStream);
generateDynamicBlacklist(filteredAdRealTimeLogDStream);
JavaPairDStream<String, Long> adRealTimeStatDStream = calculateRealTimeStat(
filteredAdRealTimeLogDStream);
calculateProvinceTop3Ad(adRealTimeStatDStream);
calculateAdClickCountByWindow(adRealTimeLogDStream);
return jssc;
}
};
JavaStreamingContext context = JavaStreamingContext.getOrCreate(
checkpointDir, contextFactory);
context.start();
context.awaitTermination();
}
二、Scala版
package cn.piesat.spark
import org.apache.kafka.clients.consumer.{ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamingKafka {
private val brokers = "hadoop01:9092"
private val topics = "lj01"
private val checkPointPath = "hdfs://hadoop01:9000/sparkStreaming/kafka6"
def main(args: Array[String]): Unit = {
val spark = getSparkSession()
val streamingContext = StreamingContext.getOrCreate(checkPointPath, () => {
val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
ssc.checkpoint(checkPointPath)
val kafkaInputStream = getKafkaInputStream(ssc)
val result = kafkaInputStream.map(x => x.value()).flatMap(x => {
x.split(" ").map(x => {
(x, 1)
})
}).reduceByKey(_ + _)
result.print()
ssc
})
streamingContext.start()
streamingContext.awaitTermination()
}
def getSparkSession(): SparkSession = {
SparkSession.builder()
.appName("kafka_test")
.master("local[4]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}
def getKafkaInputStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
val topicArray = topics.split(",").toList
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "lj00",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicArray, kafkaParams)
)
}
}
注意:對streaming的操作邏輯必須寫在StreamingContext.getOrCreate()方法里,因為若是第二次恢復時則執行方法里的邏輯!!!
3、實現RDD高可用性,啟動WAL預寫日志機制
sparkStreaming從原理上說,是通過receiver來進行數據接收的,接收到時的數據,會被划分成一個個的block,block會被組合成batch,針對一個batch,會創建一個Rdd,啟動一個job來執行定義的算子操作。receiver主要接收到數據,那么就會立即將數據寫入一份到時容錯文件系統(比如hdfs)上的checkpoint目錄中的,一份磁盤文件中去,作為數據的冗余副本。
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("AdClickRealTimeStatSpark")
.set("spark.streaming.receiver.writeAheadLog.enable","true");