Structured Streaming編程向導


簡介

  Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

  Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.

  However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.

  In this guide, we are going to walk you through the programming model and the APIs. We are going to explain the concepts mostly using the default micro-batch processing model, and then later discuss Continuous Processing model. First, let’s start with a simple example of a Structured Streaming query - a streaming word count.

  Structured Streaming是一個可縮放、容錯的流逝處理引擎,基於Spark SQL引擎構建。當你在處理流計算時,可以像處理靜態數據批計算一樣。Spark SQL引擎負責不斷地連續運行它,並隨着流數據持續到達而更新最終結果。你可以在Scala、Java、Python或者R中使用Dataset/DataFrame API來表示流集合(aggregations)、事件時間窗口(event-time windows)、流到批連接(stream-to-batch joins)等。計算在同一個優化的Spark SQL引擎上被執行。最終,該系統通過檢查點(checkpoint)和預先寫日志(Write Ahead Logs)來確保端到端一次性執行的容錯保證(ensures end-to-end exactly-once guarantees)。簡而言之,Structured Streaming提供了快速、可伸縮、容錯、端到端一次性流處理,而用戶無需對流進行推理。

  在內部,默認情況下,Structured Streaming(結構化流)查詢使用微批處理引擎(a micro-batch procession engine),該微批處理引擎將數據流處理為一系列小批作業,從而實現低至100毫秒的端到端延遲,並且具有一次性執行容錯保證(and exactly-once fault-tolerance guarantees)。

  然而,從Spark2.3,我們引入了一種低延遲處理模式,稱為連續處理,它可以實現端到端延遲低至1毫秒,並提供至少一次性能保證。在查詢中不需要修改Dataset/DataFrame操作的情況下,你將能夠基於你的系統需求選擇這種模式。

  在該向導中,我們將向你介紹編程模式和API。我們會解釋大多使用默認的微批處理的概念,然后討論連續處理模型。首先,讓我們以一個使用Structured Streaming查詢(一個流式的單詞計數)的簡單例子開始。

快捷例子(Quick Example)

  Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in Scala/Java/Python/R. And if you download Spark, you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.

  讓我們說你想維持一個運行的單詞個數,從數據服務器監聽TCP套接字接收的文本數據。讓我們看看在Structured Streaming中如何表達。你可以在Scala/Java/Python/R中查看全部代碼。同時如果你下載download Spark,你可以直接運行這個例子(run the example.)。無論如何,我們循序漸進地了解這個例子,了解它如何工作的。首先,我們導入必要的類並創建本地SparkSession對象,這是所有與Spark相關的功能的切點。

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();

Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.

下一步,讓我們創建一個流式的DataFrame,它用來表示從服務器localhost:9999監聽接收到的文本數據,並轉換DataFrame來計算單詞計數。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as(Encoders.STRING()), so that we can apply the flatMap operation to split each line into multiple words. The resultant words Dataset contains all the words. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.

We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode("complete")) to the console every time they are updated. And then start the streaming computation using start().

DataFrame類型的lines變量代表一個包含流式文本數據的無界表。該表包含列名為“value”的字符串,並在流文本數據中的一行成為了表中的一行記錄。請注意,到目前還未接收到任何數據,因為我們才剛建立的轉換還未開始。接下來,我們使用.as(Encoders.STRING())把lines類型從DataFrame轉變為Dataset,為了使用flatMap操作來分割每一行記錄中的多個單詞。這個合成的words數據集包含了所有單詞。最后,我們定義了一個類型為DataFrame的wordCounts變量,用來在Dataset中按照唯一鍵值進行分組和計數。請注意這是一個流式的DataFrame,它表示了流式運行的單詞計數。

讓我們現在已經設置了查詢這個流式數據。剩下的事情是真正開始接收數據、計數。為此,我們設置了在每次數據更新時打印counts全部集合(指定的輸出模式(“complete”))到控制台。然后,調用start()來開始數據流計算。

// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();

After this code is executed, the streaming computation will have started in the background. The query object is a handle to that active streaming query, and we have decided to wait for the termination of the query using awaitTermination() to prevent the process from exiting while the query is active。

To actually execute this example code, you can either compile the code in your own Spark application, or simply run the example once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using

在代碼執行后,流運算將會在后台開始。這query對象是活動流查詢句柄,並且我們決定使用awaitTermination()終止查詢,以防止進程在查詢處於活動狀態時退出。

要實際執行這段代碼,你可以在你的spark應用程序中編譯,或者在下載spark代碼之后簡單的運行該示例。我們在展示后者。首先你需要運行Netcat(在大多數UNIX類系統中發現的小型實用程序)作為數據服務器。

$ nc -lk 9999

Then, in a different terminal, you can start the example by using

然后在另外一個終端上,你可以運行這個示例。

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.

然后,運行在netcat服務器的終端中輸入的每一行被計數並每秒打印到屏幕上,它看起來像下邊這樣:

實際運行中,netcat終端運行情況如下:

在spark job提交的終端上顯示如下:

編程模型(Programing Model)

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.

Structured Streaming的核心思想是把在線數據流視為連續追加的表。這就導致了一個新的流處理模型,它非常類似批處理模式。你可以把你的流計算像標准的批處理查詢表示為一個靜態表,而spark將它視為無界表上的一個增量查詢來運行。讓我們更詳細的了解這種模型。

基本概念(Basic Concepts)

Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.

把輸入數據流當做“輸入表”。在流上到達的每條數據記錄被當做“輸入表”新的一行記錄追加進來。

 A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.

對於輸入的查詢將生成“結果表(Result Table)”。每次觸發間隔(比如說,每1秒),新的記錄將會追加到輸入表(Input Table),最終被更新到結果表(Result Table)。每當結果表被修改時,我們希望將更改后的結果行寫入外部接收器。

 

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

  • Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

  • Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

  • Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

Note that each mode is applicable on certain types of queries. This is discussed in detail later.

"Output"是用來定義寫入外部存儲器的內容。輸出可以被定義為不同模型:

“Complete 模型”----整個更新后的結果表將會被寫入到外部存儲器。取決於存儲連接器來決定如何處理整個表的寫入。

“Append 模型”   ----只有最后一個觸發器中附加的新行將被寫入外部存儲。這僅僅適用於預期結果表中現有行不發生更改的查詢。

“Update 模型”    ----只有最后一個觸發器中在結果表中更新的行被寫入外部存儲(從spark2.1.1才可以使用)。請注意,這與Complete模式不同,因為該模式只輸出自上次觸發器以來已經改變的行。如果查詢不包含聚合,那么等同於Append模式。

注意,每種模型都適用於某些類型的查詢。這將在后面詳細討論。

To illustrate the use of this model, let’s understand the model in context of the Quick Example above. The first lines DataFrame is the input table, and the final wordCounts DataFrame is the result table. Note that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.

為了說明模型的使用,讓我們在上邊的快捷示例上下文中理解模型。第一行的lines DataFrame是輸入表,最后一行的wordcounts DataFrame是結果表。請注意:流式查詢lines DataFrame生成wordCounts與靜態DataFrame完全一樣的。但是,當流查詢開始后,Spark將會持續檢查從socket連接中而來的新數據。如果有新數據從socket連接中進來,Spark將會執行一個“增量”查詢,將先前運行的計數與這些新數據進行結合然后計算更新計數,如下所示:

Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).

This model is significantly different from many other stream processing engines. Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it. As an example, let’s see how this model handles event-time based processing and late arriving data.

請注意,Structured Streaming不會實現整個表。它從流數據源讀取最新可用的數據,增量地處理它以更新結果,然后丟棄源數據。它只保留最小中間狀態數據以更新結果(例如,較早的示例的中間計數)。

該模型與其他流處理引擎有很大的不同。很多流系統要求用戶要自己去維護運行的聚合,因此必須關注容錯性,數據一致性(至少一次,或至少多次,或准確地一次)。在這種模型下,Spark的職責就是當有新的數據的情況下更新結果表,從而減少用戶對其的推理。舉個例子,當我們看看這種模型是如何處理基於事件的處理和遲到達的數據的。

處理事件的時間和延時數據(Handling Event-time and Late Data)

Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.

Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state. These are explained later in more detail in the Window Operations section.

事件時間是嵌入在數據自身的時間。對於很多應用程序,你都可能希望基於事件時間的操作(運行)。比如,你希望獲取到IoT設備每分鍾產生的事件次數,你可能想使用數據產生的時間(也就是,數據中的事件時間),而不是Spark接收到數據的時間。這個事件時間在這種模型中很自然地表達出來------從設備來的每個事件都是表(流的無界表)中的一行,而且事件時間是行中的列值。這允許基於窗口的聚合(比如,每分鍾事件次數)只是事件時間列上的一種特殊類型的分組和聚合------每個時間窗口是一組,每一行可能屬於多個窗口/分組。這樣的基於事件時間窗口統計查詢可以使用在靜態數據集(比如,從收集設備的事件日志)以及數據流上一致地定義,使得用戶的生活更容易。

此外,這種模型也很自然地處理基於其事件時間而到達的比預期時間晚的數據。由於Spark是正在更新結果表,所以當存在延時數據時,它完全控制更新舊的聚合,以及清理舊的聚合以限制中間狀態數據的大小。自Spark2.1開始,我們支持水印(watermarking),允許用戶指定延時數據的閾值,並允許引擎相應地清理舊狀態。稍后將在窗口操作部分對此進行更詳細的說明。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM