Structured Streaming系列——輸入與輸出


一、輸入數據源

1. 文件輸入數據源(FIie)

file數據源提供了很多種內置的格式,如csv、parquet、orc、json等等,就以csv為例:

 import spark.implicits._
    val userSchema = new StructType()
.add("name", "string").add("age", "integer") val lines = spark.readStream .option("sep", ";") .schema(userSchema) .csv("file:///data/*") val query = lines.writeStream .outputMode("append") .format("console") .start() query.awaitTermination()

在對應的目錄下新建文件時,就可以在控制台看到對應的數據了。

還有一些其他可以控制的參數:

maxFilesPerTrigger  每個batch最多的文件數,默認是沒有限制。比如我設置了這個值為1,那么同時增加了5個文件,這5個文件會每個文件作為一波數據,更新streaming dataframe。
latestFirst  是否優先處理最新的文件,默認是false。如果設置為true,那么最近被更新的會優先處理。這種場景一般是在監聽日志文件的時候使用。
fileNameOnly  是否只監聽固定名稱的文件

 

2.網絡輸入數據源(socket)

一般都是基於這個socket來做測試。首先開啟一個socket服務器(nc -lk 9999),然后streaming這邊連接進行處理。

  spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

3. 輸入數據源(kafka)

// Subscribe to 1 topic
val df= spark                                                                                                                
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]

// Subscribe to multiple topics
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribe","topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]

// Subscribe to a pattern
val df= spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","host1:port1,host2:port2")
.option("subscribePattern","topic.*")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.as[(String,String)]

以批的形式查詢

關於Kafka的offset,structured streaming默認提供了幾種方式:

//設置每個分區的起始和結束值
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

//配置起始和結束的offset值(默認)
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Schema信息

讀取后的數據的Schema是固定的,包含的列如下:

Column Type 說明
key binary 信息key
value binary 信息的value(我們自己的數據)
topic string 主題
partition int 分區
offset long 偏移值
timestamp long 時間戳
timestampType int 類型

source相關的配置

無論是流的形式,還是批的形式,都需要一些必要的參數:

  • kafka.bootstrap.servers kafka的服務器配置,host:post形式,用逗號進行分割,如host1:9000,host2:9000
  • assign,以json的形式指定topic信息
  • subscribe,通過逗號分隔,指定topic信息
  • subscribePattern,通過java的正則指定多個topic
    assign、subscribe、subscribePattern同時之中能使用一個。

其他比較重要的參數有:

    • startingOffsets, offset開始的值,如果是earliest,則從最早的數據開始讀;如果是latest,則從最新的數據開始讀。默認流是latest,批是earliest
    • endingOffsets,最大的offset,只在批處理的時候設置,如果是latest則為最新的數據
    • failOnDataLoss,在流處理時,當數據丟失時(比如topic被刪除了,offset在指定的范圍之外),查詢是否報錯,默認為true。這個功能可以當做是一種告警機制,如果對丟失數據不感興趣,可以設置為false。在批處理時,這個值總是為true。
    • kafkaConsumer.pollTimeoutMs,excutor連接kafka的超時時間,默認是512ms
    • fetchOffset.numRetries,獲取kafka的offset信息時,嘗試的次數;默認是3次
    • fetchOffset.retryIntervalMs,嘗試重新讀取kafka offset信息時等待的時間,默認是10ms
    • maxOffsetsPerTrigger,trigger暫時不會用,不太明白什么意思。Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.

二、輸出數據源

目前Structed Streaming有四種方式:

1.File sink。寫入到文件中。

2.Foreach sink。對輸出的記錄進行任意計算。比如保存到mysql中。目前spark不支持直接寫入外部數據庫,只提供了Foreach接收器自己來實現,而且官網也沒有示例代碼。

3.Console sink。輸出到控制台,僅用於測試。

4.Memory sink。以表的形式輸出到內存,spark可以讀取內存中的表,僅用於測試。

5.Kafka sink。spark2.2.1更新了kafka sink,所以可以直接使用,如果你的版本低於2.2.1,那就只能使用第二個方法foreach sink來實現。

在配置完輸入,並針對DataFrame或者DataSet做了一些操作后,想要把結果保存起來。就可以使用DataSet.writeStream()方法,配置輸出需要配置下面的內容:

  • format : 配置輸出的格式
  • output mode:輸出的格式
  • query name:查詢的名稱,類似tempview的名字
  • trigger interval:觸發的間隔時間,如果前一個batch處理超時了,那么不會立即執行下一個batch,而是等下一個trigger時間在執行。
  • checkpoint location:為保證數據的可靠性,可以設置檢查點保存輸出的結果。

1. output Mode

只有三種類型

  • complete,把所有的DataFrame的內容輸出,這種模式只能在做agg聚合操作的時候使用,比如ds.group.count,之后可以使用它
  • append,普通的dataframe在做完map或者filter之后可以使用。這種模式會把新的batch的數據輸出出來,
  • update,把此次新增的數據輸出,並更新整個dataframe。有點類似之前的streaming的state處理。

2. 輸出的類型

2.1)file:保存成csv或者parquet

DF.writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

2.2)console:直接輸出到控制台。一般做測試的時候用這個比較方便(測試用)

DF.writeStream
  .format("console")
  .start()

2.3)memory:可以保存在內容,供后面的代碼使用(測試用)

DF.writeStream
  .queryName("aggregates")
  .outputMode("complete")
  .format("memory")
  .start()
spark.sql("select * from aggregates").show()  

2.4) kafka: 輸出到kafka, 在spark 2.2.1以前用自定義實現寫入。在spark2.2.1后提供了方法。

spark 2.2.1之前寫入kafka的方法

自定義一個類KafkaSink繼承ForeachWriter

import java.util.Properties
 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.sql.{ForeachWriter, Row}
 
class KafkaSink(topic: String, servers: String) extends ForeachWriter[Row]{
  val kafkaProperties = new Properties()
  kafkaProperties.put("bootstrap.servers", servers)
  kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
  kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
 
  val results = new scala.collection.mutable.HashMap
  var producer: KafkaProducer[String, String] = _
 
  override def open(partitionId: Long, version: Long): Boolean = {
    producer = new KafkaProducer(kafkaProperties)
    return true
  }
 
  override def process(value: Row): Unit = {
    val word = value.getAs[String]("word")
    val count = value.getAs[String]("count")
    producer.send(new ProducerRecord(topic, word, count))
  }
 
  override def close(errorOrNull: Throwable): Unit = {
    producer.close()
  }
}

spark 2.2.1以后寫入kafka的方法

// spark 2.2.1以后
wordcount.writeStream .format(
"kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "wordcount") .start()

2.5)foreach:參數是一個foreach的方法,用戶可以實現這個方法實現寫入mysql自定義的功能。

import java.sql._
 
import org.apache.spark.sql.{ForeachWriter, Row}
 
class JDBCSink(url: String, userName: String, password: String) extends ForeachWriter[Row]{
 
  var statement: Statement = _
  var resultSet: ResultSet = _
  var connection: Connection = _
  // 初始化信息
  override def open(partitionId: Long, version: Long): Boolean = {
    
    Class.forName("com.mysql.jdbc.Driver")
    connection = DriverManager.getConnection(url, userName, password)
    statement = connection.createStatement()
    return true
  }
   // 執行操作
  override def process(value: Row): Unit = {
 
    val word= value.getAs[String]("word")
    val count = value.getAs[Integer]("count")
 
 
    val insertSql = "insert into webCount(word,count)" +
      "values('" + word + "'," + count + ")"
 
    statement.execute(insertSql)
  }
  // 結束操作
  override def close(errorOrNull: Throwable): Unit = {
      connection.close()
  }
}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
 
object KafkaStructedStreaming {
 
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().master("local[2]").appName("streaming").getOrCreate()
 
    val df = sparkSession
        .readStream
        .format("socket")
        .option("host", "hadoop102")
        .option("port", "9999")
        .load()
 
    import sparkSession.implicits._
    val lines = df.selectExpr("CAST(value as STRING)").as[String]
    val weblog = lines.as[String].flatMap(_.split(" "))
 
    val wordCount = weblog.groupBy("value").count().toDF("word", "count")
 
    val url ="jdbc:mysql://hadoop102:3306/test"
    val username="root"
    val password="000000"
 
    val writer = new JDBCSink(url, username, password)
 
    val query = wordCount.writeStream
        .foreach(writer)
        .outputMode("update")
        .trigger(ProcessingTime("10 seconds"))
        .start()
    query.awaitTermination()
}

 

參考原文鏈接:https://blog.csdn.net/a790439710/article/details/103027602

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM