一、Spark-StructuredStreaming checkpointLocation 介紹
Structured Streaming 在 Spark 2.0 版本於 2016 年引入, 是基於 Spark SQL 引擎構建的可擴展且容錯的流處理引擎,對比傳統的 Spark Streaming,由於復用了 Spark SQL 引擎,代碼的寫法和批處理 API (基於 Dataframe 和 Dataset API)一樣,而且這些 API 非常的簡單。
Structured Streaming 還支持使用 event time,通過設置 watermark 來處理延時到達的數據;而 Spark Streaming 只能基於 process time 做計算,顯然是不夠用的。
比如 .withWatermark("timestamp", "10 minutes") 表示用 DataFrame 里面的 timestamp 字段作為 event time,如果 event time 比 process time 落后超過 10 分鍾,那么就不會處理這些數據。
Structured Streaming 默認情況下還是使用 micro batch 模式處理數據,不過從 Spark 2.3 開始提供了一種叫做 Continuous Processing 的模式,可以在至少一次語義下數據端到端只需 1ms 。
不過 Structured Streaming 的 Web UI 並沒有和 Spark Streaming 一樣的監控指標。
Checkpoint目錄的結構:

1、checkpointLocation 在源碼調用鏈
分析源碼查看 StructuredStreaming 啟動流程發現,DataStreamWriter#start 方法啟動一個 StreamingQuery。
同時將 checkpointLocation配置參數傳遞給StreamingQuery管理。
StreamingQuery 接口實現關系如下:

-
StreamingQueryWrapper 僅包裝了一個不可序列化的StreamExecution
-
StreamExecution 管理Spark SQL查詢的執行器
-
MicroBatchExecution 微批處理執行器
-
ContinuousExecution 連續處理(流式)執行器
因此我們僅需要分析 checkpointLocation 在 StreamExecution中調用即可。
備注:StreamExecution 中 protected def checkpointFile(name: String): String 方法為所有與 checkpointLocation 有關邏輯,返回 $checkpointFile/name 路徑
2、MetadataLog(元數據日志接口)
spark 提供了org.apache.spark.sql.execution.streaming.MetadataLog接口用於統一處理元數據日志信息。
checkpointLocation 文件內容均使用 MetadataLog進行維護。
分析接口實現關系如下:

類作用說明:
-
NullMetadataLog 空日志,即不輸出日志直接丟棄
-
HDFSMetadataLog 使用 HDFS 作為元數據日志輸出
-
CommitLog 提交日志
-
OffsetSeqLog 偏移量日志
-
CompactibleFileStreamLog 封裝了支持按大小合並、刪除歷史記錄的 MetadataLog
-
StreamSourceLog 文件類型作為數據源時日志記錄
-
FileStreamSinkLog 文件類型作為數據接收端時日志記錄
-
EsSinkMetadataLog Es作為數據接收端時日志記錄
分析 CompactibleFileStreamLog#compact 合並邏輯簡單描述為:假設有 0,1,2,3,4,5,6,7,8,9,10 個批次以此到達,合並大小為3當前合並結果為 `0,1,2.compact,3,4`下一次合並結果為 `0,1,2.compact,3,4,5.compact` , **說明:5.compact 文件內容 = 2.compact + 3 + 4**last.compact 文件大小會隨着批次運行無限增大...
分析 CompactibleFileStreamLog 刪除過期文件邏輯:CompactibleFileStreamLog#add 方法被調用時,默認會判斷是否支持刪除操作 override def add(batchId: Long, logs: Array[T]): Boolean = { val batchAdded = if (isCompactionBatch(batchId, compactInterval)) { // 是否合並 compact(batchId, logs) } else { super.add(batchId, logs) } if (batchAdded && isDeletingExpiredLog) { // 添加成功且支持刪除過期文件 // 刪除時判斷當前批次是否在 spark.sql.streaming.minBatchesToRetain 配置以外且在文件保留時間內 // 配置項參考 第4節 解決方案配置說明 deleteExpiredLog(batchId) } batchAdded }
3、 分析 checkpointLocation 目錄內容
目前 checkpointLocation 內容主要包含以下幾個目錄
-
offsets
-
commits
-
metadata
-
sources
-
sinks
3.1 offsets 目錄
記錄每個批次中的偏移量。為了保證給定的批次始終包含相同的數據,在處理數據前將其寫入此日志記錄。
此日志中的第 N 條記錄表示當前正在已處理,第 N-1 個條目指示哪些偏移已處理完成。
// StreamExecution 中val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
// 該日志示例內容如下,文件路徑=checkpointLocation/offsets/560504v1{"batchWatermarkMs":0,"batchTimestampMs":1574315160001,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}{"game_dc_real_normal":{"17":279843310,"8":318732102,"11":290676804,"2":292352132,"5":337789356,"14":277147358,"13":334833752,"4":319279439,"16":314038811,"7":361740056,"1":281418138,"10":276872234,"9":244398684,"3":334708621,"12":290208334,"15":267180971,"6":296588360,"0":350011707}}
3.2 commitLog 目錄
記錄已完成的批次,重啟任務檢查完成的批次與 offsets 批次記錄比對,確定接下來運行的批次
StreamExecution 中val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))// 該日志示例內容如下,文件路徑=checkpointLocation/commits/560504v1{"nextBatchWatermarkMs":0}
3.3 metadata 目錄
metadata 與整個查詢關聯的元數據,目前僅保留當前job id
StreamExecution 中val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))// 該日志示例內容如下,文件路徑=checkpointLocation/metadata{"id":"5314beeb-6026-485b-947a-cb088a9c9bac"}
3.4 sources 目錄
sources 目錄為數據源(Source)時各個批次讀取詳情
3.5 sinks 目錄
sinks 目錄為數據接收端(Sink)時批次的寫出詳情
另外如果在任務中存在state計算時,還會存在state目錄: 記錄狀態。當有狀態操作時,如累加聚合、去重、最大最小等場景,這個目錄會被用來記錄這些狀態數據。目錄結構:checkpoint/state/xxx.delta、checkpoint/state/xxx.snapshot。新的.snapshot是老的.snapshot和.delta合並生成的文件。Structured Streaming會根據配置周期性地生成.snapshot文件用於記錄狀態。
二、Spark Structured Streaming 對接 Grafana 監控
Structured Streaming 有個 StreamingQueryListener 用於異步報告指標,這是一個官方示例:
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
StreamingQuery API含義:

轉載請注明 作者:張永清 來源於博客園:https://www.cnblogs.com/laoqing/p/15588436.html
我們監控的話,主要是利用 onQueryProgress 方法來上報數據給監控系統。
import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
import org.apache.spark.sql.streaming.StreamingQueryListener
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
class SparkStreamingGraphiteMetrics(prefix: String, graphiteHostName: String, graphitePort: Int) extends StreamingQueryListener {
val metrics = new MetricRegistry()
var inputRowsPerSecond = 0D
var processedRowsPerSecond = 0D
var numInputRows = 0D
var triggerExecution = 0L
var batchDuration = 0L
var sourceEndOffset = 0L
var sourceStartOffset = 0L
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
val graphite = new Graphite(new InetSocketAddress(graphiteHostName, graphitePort))
val reporter: GraphiteReporter = GraphiteReporter
.forRegistry(metrics)
.prefixedWith(s"spark_structured_streaming_${prefix}") // 指標名稱前綴,便於在 Grafana 里面使用
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.build(graphite)
reporter.start(30, TimeUnit.SECONDS)
metrics.register(s"inputRowsPerSecond", new Gauge[Double] {
override def getValue: Double = inputRowsPerSecond
})
metrics.register(s"processedRowsPerSecond", new Gauge[Double] {
override def getValue: Double = processedRowsPerSecond
})
metrics.register("numInputRows", new Gauge[Double] {
override def getValue: Double = numInputRows
})
metrics.register("triggerExecution", new Gauge[Long] {
override def getValue: Long = triggerExecution
})
metrics.register("batchDuration", new Gauge[Long] {
override def getValue: Long = batchDuration
})
metrics.register("sourceEndOffset", new Gauge[Long] {
override def getValue: Long = sourceEndOffset
})
metrics.register("sourceStartOffset", new Gauge[Long] {
override def getValue: Long = sourceStartOffset
})
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
// 對各個指標進行賦值、上報
inputRowsPerSecond = event.progress.inputRowsPerSecond
processedRowsPerSecond = event.progress.processedRowsPerSecond
numInputRows = event.progress.numInputRows
triggerExecution = event.progress.durationMs.getOrDefault("triggerExecution", 0L)
batchDuration = event.progress.batchDuration
event.progress.sources.foreach(source => {
sourceEndOffset = source.endOffset.toLong
sourceStartOffset = source.startOffset.toLong
})
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println("onQueryTerminated")
}
}
在主程序里面添加監聽:轉載請注明 作者:張永清 來源於博客園:https://www.cnblogs.com/laoqing/p/15588436.html
spark.streams.addListener(xxxxxx)
需要啟動 graphite_exporter,隨便找一台服務器即可,有兩個默認端口:
- 9109 用來上報數據,即 spark -> graphite_exporter
- 9108 是 Prometheus 從 graphite_exporter 拉去數據用的
還需要在 Prometheus 配置文件 prometheus.yml 里面配置讀取數據
scrape_configs: - job_name: 'spark' static_configs: - targets: ['192.168.1.xx:9108']
最后啟動 spark 程序之后,就可以在 Grafana 里面配置圖表了。
配置 Grafana 圖表
比如我設置的 prefix 是 click,那么我們在 Grafana 里面的 Explore 模塊可以選擇 Prometheus 數據源,輸入指標 spark_click_inputRowsPerSecond ,點擊 Query 就可以獲取讀取速率這個指標了,如圖:

三、基於StreamingQueryListener向Kafka提交Offset
我們可以在SparkStreamingGraphiteMetrics的基礎上做向kafka提交offset。如下所示
轉載請注明 作者:張永清 來源於博客園:https://www.cnblogs.com/laoqing/p/15588436.html
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import java.util
import java.util.Properties
class KafkaOffsetCommiter(prefix: String, graphiteHostName: String, graphitePort: Int, kafkaProperties: Properties) extends SparkStreamingGraphiteMetrics(prefix: String, graphiteHostName: String, graphitePort: Int) {
val kafkaConsumer = new KafkaConsumer[String, String](kafkaProperties)
// 提交Offset
override def onQueryProgress(event: QueryProgressEvent): Unit = {
super.onQueryProgress(event)
// 遍歷所有Source
event.progress.sources.foreach(source => {
val objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.USE_LONG_FOR_INTS, true)
.registerModule(DefaultScalaModule)
val endOffset = objectMapper.readValue(source.endOffset, classOf[Map[String, Map[String, Long]]])
// 遍歷Source中的每個Topic
for ((topic, topicEndOffset) <- endOffset) {
val topicPartitionsOffset = new util.HashMap[TopicPartition, OffsetAndMetadata]()
//遍歷Topic中的每個Partition
for ((partition, offset) <- topicEndOffset) {
val topicPartition = new TopicPartition(topic, partition.toInt)
val offsetAndMetadata = new OffsetAndMetadata(offset)
topicPartitionsOffset.put(topicPartition, offsetAndMetadata)
}
kafkaConsumer.commitSync(topicPartitionsOffset)
}
})
}
}
轉載請注明 作者:張永清 來源於博客園:https://www.cnblogs.com/laoqing/p/15588436.html
