用於Kafka 0.10的結構化流集成從Kafka讀取數據並將數據寫入到Kafka。
1. Linking
對於使用SBT/Maven項目定義的Scala/Java應用程序,用以下工件artifact連接你的應用程序:
對於Python應用程序,你需要在部署應用程序時添加上面的庫及其依賴關系。查看Deploying子節點。
2. Reading Data from Kafka 從Kafka讀取數據
2.1 Creating a Kafka Source for Streaming Queries 為流式查詢創建一個Kafka來源
2.2 Creating a Kafka Source for Batch Queries 為批處理查詢創建一個Kafka來源
如果你有一個更適合用於批處理的用例,你可以為定義的偏移量范圍創建一個Dataset/DataFrame。
來源的每一行有以下格式:
Column | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
對於批處理和流式查詢,必須為Kafka來源設置以下選項。
Option | value | meaning |
---|---|---|
assign | json string {"topicA":[0,1],"topicB":[2,4]} | 要使用特定TopicPartition。對於Kafka來源,只能指定“assign”,"subscribe"或者“subscribePattern”選項中的一個。 |
subscribe | A comma-separated list of topics | 要訂閱的主題列表.對於Kafka來源,只能指定“assign”,"subscribe"或者“subscribePattern”選項中的一個。 |
subscribePattern | Java regex string | 用於訂閱主題的格式。對於Kafka來源,只能指定“assign”,"subscribe"或者“subscribePattern”選項中的一個。 |
kafka.bootstrap.servers | A comma-separated list of host:port | Kafka "bootstrap.servers" 配置。 |
以下配置是可選的:
Option | value | default | query type | meaning |
---|---|---|---|---|
startingOffsets | "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ | "latest"用於流, "earliest" 用於批量 | streaming and batch 流和批量 |
查詢開始時的起始點,可以是最早偏移量的“earliest”,也可以是最近偏移量的“latest”,也可以是指定每個TopicPartition起始偏移量的json字符串。在Json中,-2作為偏移量可以用來指最早的,-1指最新的。注意:對於批量查詢,不允許使用最新(隱式或在json中使用-1)。對於流式查詢,這僅適用於新查詢開始時的情況,並且恢復將始終從查詢停止的地方開始。在查詢期間新發現的分區將從earliest開始。 |
endingOffsets | latest or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | latest | batch query 批量查詢 |
批量查詢結束時的結束點,可以是最新引用的“latest”,也可以是指定每個TopicPartition結束偏移量的json字符串。在json中,-1作為偏移量可以用於引用最新的,-2(最早的)作為偏移量是不允許的。 |
failOnDataLoss | true or false | true | streaming query 流式查詢 |
當可能丟失數據時是否讓查詢失敗(例如,主題被刪除或偏移超出范圍)。這可能是一個虛驚。當它不像你期望的那樣工作時,你可以禁用它。如果由於丟失數據而無法從提供的偏移量中讀取任何數據,批量查詢將始終失敗。 |
kafkaConsumer.pollTimeoutMs | long | 512 | streaming and batch | 在executors中輪詢來自Kafka的數據的超時時間(以毫秒為單位) |
fetchOffset.numRetries | int | 3 | streaming and batch | 在放棄提取Kafka偏移量之前重試的次數。 |
fetchOffset.retryIntervalMs | long | 10 | streaming and batch | 重試去提取Kafka偏移量之前等待的毫秒數。 |
maxOffsetsPerTrigger | long | none | streaming and batch | 每次觸發間隔處理的最大偏移量的速率限制。指定的偏移量總數將按不同卷的topicPartition成比例地分割。 |
3. Writing Data to Kafka 將數據寫入到Kafka
這里,我們描述了向Apache Kafka寫入流式查詢和批量查詢的支持。注意Apache Kafka只支持至少一次寫入語義。因此,在向Kafka寫入流式查詢或批量查詢時,可能會復制一些記錄;例如,如果Kafka需要重試未被Broker確認的消息,即使該Broker接收並編寫了消息記錄,也會發生這種情況。由於這些Kafka寫入語義,結構化流不能阻止這種復制發生。然而,如果編寫查詢是成功的,那么你可以假定查詢輸出至少寫入一次。當讀取寫入的數據刪除重復項的可能解決方法可能是引入主要(唯一)key,當讀取時這key可以用於執行重復數據刪除。
寫入到Kafka的Dataframe應該在模式上有以下列:
Column | Type |
---|---|
key (optional) | string or binary |
value (required) | string or binary |
topic (*optional) | string |
* 如果“topic”配置選項不指定,則topic列是必須的。
value列是唯一要求的選項。如果key列沒有指定,那么將會自動添加值為null的key列(查看Kafka語義中關於如何處理空值key)。如果topic列存在,那么在將給予的行寫入到Kafka時,它的值用作topic,除非設置好“topic”配置選項。例如,“topic”配置選項覆蓋topic列。
對於批量和流式查詢,必須為Kafka sink設置以下選項:
Option | value | meaning |
---|---|---|
kafka.bootstrap.servers | A comma-separated list of host:port | Kafka "bootstrap.servers"配置。 |
以下選項是可選的:
Option | value | default | query type | meaning |
---|---|---|---|---|
topic | string | none | streaming and batch | 設置所有行將在Kafka中寫入的topic。該選項將覆蓋數據中可能存在的任何topic列。 |
3.1 Creating a Kafka Sink for Streaming Queries 為流式查詢創建Kafka Sink
3.2 Writing the output of Batch Queries to Kafka 將批量查詢的輸出寫入到Kafka中
4. Kafka Specific Configurations Kafka特定的配置
Kafka自己的配置可以通過帶有Kafka.prefix的DataStreamReader.option進行設置。例如,stream.option("kafka.bootstrap.servers","host":"port")。有關可能的Kafka參數,參閱Kafka消費者配置文檔以獲取與讀取數據相關的參數,以及Kafka生產者配置文件以獲取與寫入數據相關的參數。
注意以下Kafka參數不能設置,Kafka source或者sink將會拋出錯誤。
- group.id: Kafka source將會自動為每次查詢創建唯一的分組id。
- auto.offset.reset: 將source選項startingOffsets設置為指定從哪里開始。結構化流管理內部消費的偏移量,而不是依賴Kafka消費者來完成。這將確保在topic/partitons動態訂閱時不會遺漏任何數據。注意,只有在啟動新的流式查詢時才會應用
startingOffsets,並且恢復操作始終會從查詢停止的位置啟動。
. - key.deserializer:Keys總是被反序列化為ByteArrayDeserializer的字節數組。使用DataFrame操作顯式反序列化keys。
- value.deserializer:Values總是被反序列化為ByteArrayDeserializer的字節數組。使用DataFrame操作顯式反序列化values。
- key.serializer: keys總是使用ByteArraySerializer或StringSerializer進行序列化。使用DataFrame操作將keys顯示序列化為字符串或字節數組。
- value.serializer: values總是使用ByteArraySerializer或StringSerializer進行序列化使用DataFrame操作將values顯示序列化為字符串或字節數組。
- enable.auto.commit: Kafka source不提交任何offset。
- interceptor.classes: Kafka source總是以字節數組的形式讀取key和value。使用ConsumerInterceptor是不安全的,因為它可能會打斷查詢。
5. Deploying 部署
與任何Spark應用程序一樣,spark-submit用於啟動你的應用程序。spark-sql-kafka-0-10_2.11及其依賴關系可以直接添加到使用--packages的spark-submit中,例如,
更多關於提交帶有外部依賴項的應用程序的詳細信息參閱Application Submission Guide。