Spark之Structured Streaming


Part V. Streaming

版本以2.2的Structured Streaming為主,部分也有后續新版本的說明。


Stream Processing Fundamentals

1.概念

流處理就是不斷地整合新數據計算新結果。批量處理是固定輸入量計算一次。Structured Stream集合這兩個功能並加上交互式查詢。

實時計算通常用於notifications/alerting,實時報告,累積ETL,實時決策,更新服務數據(GA),線上ML等。

2.Stream Processing Design Points

Record-at-a-Time Versus Declarative APIs

新的流系統提供declarative APIs,App定義計算什么而不是怎么計算,如DStream,Kafka。

Event Time Versus Processing Time

事件發生的事件和收到數據處理的時間。前者的記錄可能亂序,有時差。

Continuous Versus Micro-Batch Execution

前者中,各個節點不斷從上游節點接受信息,進行一步計算后向下游節點傳遞結果。這方式延遲少,但最大吞吐量低(量大的話計算慢,影響下游),且節點不能單獨停止。

后者積累小批量數據,然后每批並行計算。可實現高吞吐,且需要更少節點。但有延遲。

3.Spark’s Streaming APIs

包含DStream(純 micro-batch,declarative API,但不支持event time)和Structured Streaming(在前者基礎上增加event time和continuous處理)

DStream的優勢在於“high-level API interface and simple exactly-once semantics”。然而它是基於Java/Python對象和函數,與DF相比,難以優化;不支持event time;只能micro-batch

Structured Streaming以Spark的結構化API為基礎,支持Spark language API,event time,更多類型的優化,正研發continuous處理(Spark 2.3)。操作跟DF幾乎一樣,自動轉換為累積計算形式,也能導出Spakr SQL所用的表格。


Structured Streaming Basics

1.介紹和概念

Structured Streaming是建立在Spark SQL上的流處理框架,使用結構化API、Catalyst engine。 它確保end-to-end, exactly-once processing as well as fault-tolerance through checkpointing and write-ahead logs.

只需設定batch還是streaming方式,其他代碼不用改。

為了整合Spark的其他模塊,Structured Streaming可以使用 continous application that is an end-to-end application that reacts to data in real time by combining a variety of tools: streaming jobs, batch jobs, joins between streaming and offline data, and interactive ad-hoc queries.

截止,Spark 2.3加入Continuous Processing,實現1毫秒級別的at-least-once保證,而exactly-once保證要到100毫秒。

核心概念

  • Transformations and Actions:一些查詢會受限制(還不能incrementalize),action只有start。

  • 輸入Sources:Kafka,分布式文件系統,A socket source for testing

  • 輸出sinks:Kafka,幾乎所有文件形式,foreach for computation,console for testing,memory for debugg

  • 輸出模式(各自適用於不同的查詢,“Input and Output”中介紹):append,update(2.1.1,只有被更新的數據才寫到外部。如果計算沒有聚合,即沒有影響到之前的數據,那么就和append一樣),complete

  • Triggers:什么時候輸出,固定interval或固定量(在某latency下)

  • Event-Time處理:Event-time data,Watermarks(等待時間和output結果的時間)

  • 容錯:end-to-end exactly-once

2.Transformations and Actions

Streaming DF基本上可使用所有靜態Structured APIs(inference要另外開spark.sql.streaming.schemaInference to true

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].flatMap(_.split(" "))

val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .queryName("activity_counts") // 類似tempview的表名,用於下面查詢
  .outputMode("complete")
  .format("console") // debug專用
  .trigger(Trigger.ProcessingTime("2 seconds")) // 可選
  .start()// 批量輸出用save

query.awaitTermination()//防止driver在查詢過程退出

//執行上面的代碼,stream已經運行。下面對output到內存中的table進行查詢
for( i <- 1 to 10 ) {
    Thread.sleep(1000)
    spark.sql("SELECT * FROM activity_counts").show()
}

spark.streams.active查看活動的stream

運行情況:每當查詢開始,spark就會自動判斷是否有新數據,有的話就將它和舊數據結合一起進行計算。但注意的是,舊數據只保留了中間結果,這個中間結果的大小就看后續計算需要用到原數據多少內容而定。比如計算最大值,那么就只需要保存至今見過的最大值的那個數據即可。

transformation細節補充

限制在不斷減少。目前的限制:未進行aggregate的stream不能sort,cannot perform multiple levels of aggregation without using Stateful Processing

//所有select和filter都支持
val simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))
  .where("stairs")
  .where("gt is not null")
  .select("gt", "model", "arrival_time", "creation_time")
  .writeStream
  .queryName("simple_transform")
  .format("memory")
  .outputMode("append")
  .start()

//支持大部分Aggregations
val deviceModelStats = streaming.cube("gt", "model").avg()
  .drop("avg(Arrival_time)")
  .drop("avg(Creation_Time)")
  .drop("avg(Index)")
  .writeStream.queryName("device_counts").format("memory").outputMode("complete")
  .start()
//限制在於multiple “chained” aggregations (aggregations on streaming aggregations) ,但可以通過output到sink后再aggregate來實現

//支持inner join,outer join要定義watermark
val historicalAgg = static.groupBy("gt", "model").avg()
val deviceModelStats = streaming
  .drop("Arrival_Time", "Creation_Time", "Index")
  .cube("gt", "model").avg()
  .join(historicalAgg, Seq("gt", "model")) // inner join
  .writeStream
  .queryName("device_counts")
  .format("memory")
  .outputMode("complete")
  .start()

3.Input and Output

Sources and Sinks

  1. File: 保存到csv或parquet

注意添加file到directory要為原子方式,否則Spark會在file未添加完成時就進行操作。在使用文件系統,如本地files或HDFS時,最好在另外一個目錄寫文件,完成后再移到input目錄。

分區發現,在同一目錄下,不能有/data/year=2016/和/data/date=2016-04-17/

  1. Kafka:
val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")//連接哪個服務器
  .option("subscribe", "topic1,topic2")//也可("subscribePattern", "topic.*")
  .load()

使用使用原生Structured API或UDF對信息進行parse。通常用 JSON or Avro來讀寫Kafka

ds1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")//不加"topic"就在后面加.option("topic", "topic1")
  .writeStream.format("kafka")
  .option("checkpointLocation", "/to/HDFS-compatible/dir")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

kafka的schema為key: binary;value: binary;topic: string;partition: int;offset: long;timestamp: long

讀數據有三個選擇

assign:指定topic以及topic的某部分,如 JSON string{"topicA":[0,1],"topicB":[2,4]}

subscribe or subscribePattern:讀取多個topics通過一列topics或一種模式(正則表達)

其他選項

startingOffsets and endingOffsets:earliest,latest或JSON string({"topicA":{"0":23,"1":-1},"topicB":{"0":-2}},-1latest,-2earliest)。這僅適用於新的streaming,重新開始時會在結束時的位置開始定位。新發現的分區會在earliest開始,最后位置為查詢的范圍。

failOnDataLoss:a false alarm,默認true

maxOffsetsPerTrigger:每個觸發間隔處理的最大偏移量的速率限制。指定的總偏移量將按不同卷的topicPartitions按比例分割。

還有一些Kafka consumer timeouts, fetch retries, and intervals.

  1. Foreach sink(foreachPartitions

要實現ForeachWriter 接口

//每當所output的rows trigger,下面三個方法都會被啟動(每個executor中)
//方法要Serializable(UDF or a Dataset map)
datasetOfString.write.foreach(new ForeachWriter[String] {
  def open(partitionId: Long, version: Long): Boolean = {
    // 完成所有初始化,如database connection,開始transactionsopen
    //version是單調遞增的ID, on a per-trigger basis
    // 返回是否處理這個組rows(比如crash后,如果發現儲存系統中已經有對應的version和partitionId,通過返回false來跳過)
  }
  def process(record: String) = {
    // 對每個record的處理
  }
  def close(errorOrNull: Throwable): Unit = {
    //只要open被調用就會啟動。
    //如果process時出錯,如何處理已完成的部分數據
    //關閉連接
  }
})

錯誤示范:如果在open之外初始化,則會在driver進行。

  1. 用於測試的source和sink(不能用於生產,因不提供end-to-end容錯)
//Socket source,Unix用nc -lk 9999后在console打字
val socketDF = spark.readStream.format("socket")
  .option("host", "localhost").option("port", 9999).load()

//Console sink
activityCounts.format("console").write()

//Memory sink 保存到內存,供后面的代碼使用
activityCounts.writeStream.format("memory").queryName("my_device_table")

如果真的想在生產中output為表格,建議用Parquet file sink

Output Modes

Append:將新數據append到Result Table中,並輸出新數據。適合只涉及select, where, map, flatMap, filter, join等操作的查詢。

Complete:整個Result Table輸出,適合有聚合操作時使用。

Update:更新Result Table,且只有更新部分才會輸出。

操作與模式匹配,比如append支持不會改變過去數據的操作,除非帶有wm,否則不支持aggregation。而且aggregation結果要等到window結束且超過late threshold后才展示。而complete和update還支持沒有wm的aggregation。更多的看官網。

sink與模式匹配。Kafka支持全部模式,且為at-least-once,Flie只支持Append,且為exactly-once。其他看官網。

Triggers

默認情況下,前一個trigger完成后馬上開始新的。其他選項fixed interval、one-time、Continuous with fixed checkpoint interval(實驗階段)。

//Processing time trigger,如果Spark在100s內沒有完成計算,Spark會等待下一個,而不是計算完成后馬上計算下一個
activityCounts.writeStream
.trigger(Trigger.ProcessingTime("100 seconds"))//也可以用Duration in Scala or TimeUnit in Java
.format("console")
.outputMode("complete")
.start()

//Once trigger,測試中非常有用,常被用於低頻工作(如向總結表格添加新數據)
activityCounts.writeStream
.trigger(Trigger.Once())
.format("console")
.outputMode("complete")
.start()

4.Streaming Dataset API

除了DF,也可以用Dataset的API

case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String,
  count: BigInt)
val dataSchema = spark.read
  .parquet("/data/flight-data/parquet/2010-summary.parquet/")
  .schema
val flightsDF = spark.readStream.schema(dataSchema)
  .parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
def originIsDestination(flight_row: Flight): Boolean = {
  return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}
flights.filter(flight_row => originIsDestination(flight_row))
  .groupByKey(x => x.DEST_COUNTRY_NAME).count()
  .writeStream.queryName("device_counts").format("memory").outputMode("complete")
  .start()

Event-Time and Stateful Processing

Event Time:接收到信息的時間(Processing time)和實際發生時間(Processing time)有差異。網絡傳輸中只盡量保持信息完整(有時還不完整),但是否按順序,是否重復等不保證。

Stateful Processing(前一節所講的):通過microbatch or a record-at-a-time 的方式,更新經過一段時間后的中間信息(state)。當執行這一處理時,Spark將中間信息存到state store(目前為內存),且通過存到checkpoint directory實現容錯。

Arbitrary Stateful Processing:自定義儲存什么信息,怎樣更新和什么時候移除(either explicitly or via a time-out)

1.Event-Time Basics

// 基於event-time的window,words包含timestamp和word兩列
word
  .withWatermark("timestamp", "30 minutes")//某窗口結果為x,但是部分數據在這個窗口的最后一個timestamp過后還沒到達,Spark在這會等30min,過后就不再更新x了。
  .dropDuplicates("User", "timestamp")
  .groupBy(window(col("timestamp"), "10 minutes"),col("User"))// 10min后再加一個參數變為Sliding windows,表示每隔多久計算一次。
  .count() 
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

spark.sql("SELECT * FROM events_per_window")

Spark的watermark = max event time seen by the engine - late threshold,相當於Flink的BoundedOutOfOrdernessTximestampExtractor。在window計算被觸發時,Spark會刪除結束時間低於當前wm的window的中間結果,屬於該window的遲到數據“可能”會被忽略,越遲越可能被忽略,刪除完后才更新wm,所以即便下一批沒有數據加入,Spark所依據的wm也是新的,下下一批wm不變。

上面是update mode,如果是append模式,那么結果要等到trigger后發現window的結束時間低於更新后的水位線時才會出來。另外,max event time seen by the engine - late threshold機制意味着如果下一批計算沒有更晚的數據加入,那么wm就不會前進,那么數據的append就會被延后。

Conditions for watermarking to clean aggregation state(as of Spark 2.1.1, subject to change in the future)

  • 不支持complete模式。
  • groupBy必須包含timestamp列或者window(col(timestamp)),withWatermark中的列要和前面的timestamp列相同
  • 順序必須是先withWatermark再到groupBy

2.Join

前提(inner join中可選,因為join沒有交集可以去掉,但outer join必須設置):

  • 兩個input都要定義watermark。
  • join條件:
    • Time range join conditions (e.g. ...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR
    • Join on event-time windows (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow)

支持:

  • stream-static:inner、left outer
  • stream-stream:inner、outer(必須設置wm和time constraints)

限制:

  • join的查詢僅限append模式
  • join之前不能aggregation
  • update mode模式下join之前不能mapGroupsWithState and flatMapGroupsWithState
// 廣告影響和點擊量
val impressions = spark.readStream. ...
val clicks = spark.readStream. ...

// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
    // a click can occur within a time range of 0 seconds to 1 hour after the corresponding impression
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
    joinType = ".." // "inner", "leftOuter", "rightOuter"
)

在outer join中,如果兩個流的其中一個沒有新數據,就不會trigger,join結果就會被延遲。

3.Arbitrary Stateful Processing

應用

  • 每個key用一個state來計數
  • 當計數達到某個值可觸發警報
  • 維護不確定時間的用戶會話並保存這些會話以便稍后執行某些分析。

Time-Outs

給每個組/key配置的一個全局變量,它可以是處理時間(GroupStateTimeout.ProcessingTimeTimeout)或者事件時間(EventTimeTimeout)前者基於system clock,會受時區影響,后者基於數據自身攜帶的timestamp。

當設置time-out后,在處理值之前先檢查超時,即使用state.hasTimedOut標志或檢查值迭代器是否為空來獲取此信息。 您需要設置一些狀態(即,必須定義狀態,而不是刪除狀態)才能設置time-out。time-out的觸發時間沒有上限,這種情況出現在達到time-out后沒有新數據觸發查詢。

下面是官方例子,利用mapGroupsWithState實現的stateful操作。其中SessionInfo和SessionUpdate都是case class,前者存儲state數據,后者是mapGroupsWithState返回的結果。

val sessionUpdates = events
  .groupByKey(event => event.sessionId)
  .mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout.ProcessingTimeTimeout) {

  case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>

    // 處理值之前先檢查time-out
    if (state.hasTimedOut) {
      val finalUpdate =
        SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = true)
      state.remove()
      finalUpdate
    } else {
      // 沒有超時就先判斷是否有相應的state,沒有就創建,否則就更新。
      val timestamps = events.map(_.timestamp.getTime).toSeq
      val updatedSession = if (state.exists) {
        val oldSession = state.get
        SessionInfo(
          oldSession.numEvents + timestamps.size,
          oldSession.startTimestampMs,
          math.max(oldSession.endTimestampMs, timestamps.max))
      } else {
        SessionInfo(timestamps.size, timestamps.min, timestamps.max)
      }
      state.update(updatedSession)

      // 設置超時,以便在沒有收到數據10秒的情況下會話將過期
      state.setTimeoutDuration("10 seconds")
      SessionUpdate(sessionId, state.get.durationMs, state.get.numEvents, expired = false)
    }
}

mapGroupsWithState: one row per key (or group)

flatMapGroupsWithState: multiple outputs

輸出模式

mapGroupsWithState 只支持update模式;flatMapGroupsWithState 支持append(time-out后,即watermark過后才顯示結果)和update

mapGroupsWithState例子:

基於user的activity(state)來更新。具體來說,如果收到的信息中,user的activity沒變,就檢查該信息是否早於或晚於目前所收到的信息,進而得出user某個activity的准確的持續時間(下面設置了GroupStateTimeout.NoTimeout,假設信息傳輸沒丟失,則遲早會收到)。如果activity變了,就更新activity(時間也要重新設置)。如果沒有activity,就不變。

//輸入row的格式
case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String)
//表示user狀態的格式(儲存目前狀態)
case class UserState(user:String,
  var activity:String,
  var start:java.sql.Timestamp,
  var end:java.sql.Timestamp)

//基於某row如何更新
def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = {
  if (Option(input.timestamp).isEmpty) {
    return state
  }
  if (state.activity == input.activity) {

    if (input.timestamp.after(state.end)) {
      state.end = input.timestamp
    }
    if (input.timestamp.before(state.start)) {
      state.start = input.timestamp
    }
  } else {
    if (input.timestamp.after(state.end)) {
      state.start = input.timestamp
      state.end = input.timestamp
      state.activity = input.activity
    }
  }
  state
}

//基於一段時間的row如何更新
def updateAcrossEvents(user:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserState]):UserState = {
  var state:UserState = if (oldState.exists) oldState.get else UserState(user,
        "",
        new java.sql.Timestamp(6284160000000L),
        new java.sql.Timestamp(6284160L)
    )
  // we simply specify an old date that we can compare against and
  // immediately update based on the values in our data

  for (input <- inputs) {
    state = updateUserStateWithEvent(state, input)
    oldState.update(state)
  }
  state
}

withEventTime
  .selectExpr("User as user",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity")
  .as[InputRow]
  .groupByKey(_.user)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("update")
  .start()

flatMapGroupsWithState例子

基於數量的更新,即某個指標的量達到特定值時更新。下面的代碼,DeviceState記錄了一段時間(計數未達到500)中,device的名字,數值,和計數。當計算到500時就求均值,並變為輸出row,未達到無輸出。

//輸入row的格式
case class InputRow(device: String, timestamp: java.sql.Timestamp, x: Double)
//表示DeviceState狀態的格式(儲存目前狀態)
case class DeviceState(device: String, var values: Array[Double], var count: Int)
//輸出row的格式
case class OutputRow(device: String, previousAverage: Double)

//基於某row如何更新
def updateWithEvent(state:DeviceState, input:InputRow):DeviceState = {
  state.count += 1
  // maintain an array of the x-axis values
  state.values = state.values ++ Array(input.x)
  state
}

//基於一段時間的row如何更新
def updateAcrossEvents(device:String, inputs: Iterator[InputRow],
  oldState: GroupState[DeviceState]):Iterator[OutputRow] = {//flatMapGroupsWithState要求Iterator?
  inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
    val state = if (oldState.exists) oldState.get
      else DeviceState(device, Array(), 0)
	val newState = updateWithEvent(state, input)
    if (newState.count >= 500) {
      oldState.update(DeviceState(device, Array(), 0))
      Iterator(OutputRow(device,
        newState.values.sum / newState.values.length.toDouble))
    }
    else {
      oldState.update(newState)
      Iterator()//未達到500無輸出
    }
  }
}

withEventTime
  .selectExpr("Device as device",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp", "x")
  .as[InputRow]
  .groupByKey(_.device)
  .flatMapGroupsWithState(OutputMode.Append,
    GroupStateTimeout.NoTimeout)(updateAcrossEvents)//這里NoTimeout表示,如果沒達到500,則永遠沒有結果輸出
  .writeStream
  .queryName("count_based_device")
  .format("memory")
  .outputMode("append")
  .start()

下面這個例子運用了timeout

case class InputRow(uid:String, timestamp:java.sql.Timestamp, x:Double,
  activity:String)
case class UserSession(val uid:String, var timestamp:java.sql.Timestamp,
  var activities: Array[String], var values: Array[Double])
case class UserSessionOutput(val uid:String, var activities: Array[String],
  var xAvg:Double)

def updateWithEvent(state:UserSession, input:InputRow):UserSession = {
  // handle malformed dates
  if (Option(input.timestamp).isEmpty) {
    return state
  }

  state.timestamp = input.timestamp
  state.values = state.values ++ Array(input.x)
  if (!state.activities.contains(input.activity)) {
    state.activities = state.activities ++ Array(input.activity)
  }
  state
}

def updateAcrossEvents(uid:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserSession]):Iterator[UserSessionOutput] = {

  inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
    val state = if (oldState.exists) oldState.get else UserSession(
    uid,
    new java.sql.Timestamp(6284160000000L),
    Array(),
    Array())
    val newState = updateWithEvent(state, input)

    if (oldState.hasTimedOut) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else if (state.values.length > 1000) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else {
      oldState.update(newState)
      oldState.setTimeoutTimestamp(newState.timestamp.getTime(), "5 seconds")
      Iterator()
    }
  }
}

withEventTime.where("x is not null")
  .selectExpr("user as uid",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp",
    "x", "gt as activity")
  .as[InputRow]
  .withWatermark("timestamp", "5 seconds")
  .groupByKey(_.uid)
  .flatMapGroupsWithState(OutputMode.Append,
    GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("count_based_device")
  .format("memory")
  .start()

Unsupported Operations

  • a chain of aggregations on a streaming DF
  • Limit and take first N rows
  • Distinct operations
  • Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.
  • Few types of outer joins on streaming Datasets are not supported.
  • groupby().count(),不能單count
  • writeStream.foreach(...),不能單foreach,而且要實現foreachwriter接口,詳細看官網。
  • format("console"),不能show()

Starting Streaming Queries

  • Details of the output sink: Data format, location, etc.
  • Output mode:
  • Query name: Optionally, specify a unique name of the query for identification.
  • Trigger interval: Optionally, specify the trigger interval. 如果不設定,就是一有數據就處理,處理完再看有沒有新數據,然后馬上再處理。如果有trigger是在處理計算時出現的,那么處理完當前數據就會馬上處理trigger。
  • Checkpoint location

Structured Streaming in Production

1.Fault Tolerance and Checkpointing

配置App使用checkpointing and write-ahead logs並配置查詢寫到checkpoint location(如HDFS)。當遇到failure時,只需重啟應用,並保證它指向正確的checkpoint location

checkpoint stores all of the information about what your stream has processed thus far and what the intermediate state it may be storing is

//在運行App前,在writeStream配置
.option("checkpointLocation", "/some/location/")

2.Updating Your Application

增加一個新列或改變UDF不需要新的checkpoint目錄,但如果怎加新的聚合鍵或根本性地改變查詢就需要。

infrastructure changes有時只需重啟stream,如spark.sql.shuffle.partitions,有時則需重啟整個App,如Spark application configurations

3.Metrics and Monitoring

query.statusquery.recentProgress(注意Input rate and processing rate和Batch duration)和Spark UI

4.Alerting

feed the metrics to a monitoring system such as the open source Coda Hale Metrics library or Prometheus, or you may simply log them and use a log aggregation system like Splunk.

5.Advanced Monitoring with the Streaming Listener(詳細可能要看其他資料)

allow you to receive asynchronous updates from the streaming query in order to automatically output this information to other systems and implement robust monitoring and alerting mechanisms.

// 測試代碼
val dir = new Path("/tmp/test-structured-streaming")
val fs = dir.getFileSystem(sc.hadoopConfiguration)
fs.mkdirs(dir)

val schema = StructType(StructField("vilue", StringType) ::
                        StructField("timestamp", TimestampType) ::
                        Nil)

val eventStream = spark
  .readStream
  .option("sep", ";")
  .option("header", "false")
  .schema(schema)
  .csv(dir.toString)

// Watermarked aggregation
val eventsCount = eventStream
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .count

def writeFile(path: Path, data: String) {
  val file = fs.create(path)
  file.writeUTF(data)
  file.close()
}

// Debug query
val query = eventsCount.writeStream
  .format("console")
  .outputMode("complete")
  .option("truncate", "false")
  .trigger(ProcessingTime("5 seconds"))
  .start()

writeFile(new Path(dir, "file1"), """
  |A;2017-08-09 10:00:00
  |B;2017-08-09 10:10:00
  |C;2017-08-09 10:20:00""".stripMargin)

query.processAllAvailable()
val lp1 = query.lastProgress

// -------------------------------------------
// Batch: 0
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// +---------------------------------------------+-----+

// lp1: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T10:10:00.000Z",
//     "max" : "2017-08-09T10:20:00.000Z",
//     "min" : "2017-08-09T10:00:00.000Z",
//     "watermark" : "1970-01-01T00:00:00.000Z"
//   },
//   ...
// }


writeFile(new Path(dir, "file2"), """
  |Z;2017-08-09 20:00:00
  |X;2017-08-09 12:00:00
  |Y;2017-08-09 12:50:00""".stripMargin)

query.processAllAvailable()
val lp2 = query.lastProgress

// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+

// lp2: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T14:56:40.000Z",
//     "max" : "2017-08-09T20:00:00.000Z",
//     "min" : "2017-08-09T12:00:00.000Z",
//     "watermark" : "2017-08-09T09:20:00.000Z"
//   },
//   "stateOperators" : [ {
//     "numRowsTotal" : 3,
//     "numRowsUpdated" : 2
//   } ],
//   ...
// }

writeFile(new Path(dir, "file3"), "")

query.processAllAvailable()
val lp3 = query.lastProgress

// -------------------------------------------
// Batch: 2
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+

// lp3: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 0,
//   "eventTime" : {
//     "watermark" : "2017-08-09T19:00:00.000Z"
//   },
//   "stateOperators" : [ ],
//   ...
// }

query.stop()
fs.delete(dir, true)

Dstream

代碼編寫步驟:定義輸入源 -> 定義transformation -> 啟動,停止

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream/ textFileStream //等方法創造streams

val res = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

ssc.start()
ssc.awaitTermination()

只有一個context,它停止后就不能再啟動。一個StreamingContext只能在一個JVM運行。StreamingContext和SparkContext可以分開停止,例如停止一個StreamingContext后可以開啟一個新的StreamingContext。

setMaster中,local[n]要大於receivers的數量,receivers一個占一個線程。

textFileStream中,處理的文件格式要一致,文件的添加是原子性的,即把寫好的文件放到一個文件夾中,或把文件重命名為要處理的文件。另外文件只會被處理一次,后續修改不起作用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM