一、輸入數據源
1. 文件輸入數據源(FIie)
file數據源提供了很多種內置的格式,如csv、parquet、orc、json等等,就以csv為例:
import spark.implicits._ val userSchema = new StructType()
.add("name", "string").add("age", "integer") val lines = spark.readStream .option("sep", ";") .schema(userSchema) .csv("file:///data/*") val query = lines.writeStream .outputMode("append") .format("console") .start() query.awaitTermination()
在對應的目錄下新建文件時,就可以在控制台看到對應的數據了。
還有一些其他可以控制的參數:
maxFilesPerTrigger | 每個batch最多的文件數,默認是沒有限制。比如我設置了這個值為1,那么同時增加了5個文件,這5個文件會每個文件作為一波數據,更新streaming dataframe。 |
latestFirst | 是否優先處理最新的文件,默認是false。如果設置為true,那么最近被更新的會優先處理。這種場景一般是在監聽日志文件的時候使用。 |
fileNameOnly | 是否只監聽固定名稱的文件 |
2.網絡輸入數據源(socket)
一般都是基於這個socket來做測試。首先開啟一個socket服務器(nc -lk 9999
)
,然后streaming這邊連接進行處理。
spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
3. 輸入數據源(kafka)
// Subscribe to 1 topic val df= spark .readStream .format("kafka") .option("kafka.bootstrap.servers","host1:port1,host2:port2") .option("subscribe","topic1") .load() df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)") .as[(String,String)] // Subscribe to multiple topics val df= spark .readStream .format("kafka") .option("kafka.bootstrap.servers","host1:port1,host2:port2") .option("subscribe","topic1,topic2") .load() df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)") .as[(String,String)] // Subscribe to a pattern val df= spark .readStream .format("kafka") .option("kafka.bootstrap.servers","host1:port1,host2:port2") .option("subscribePattern","topic.*") .load() df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)") .as[(String,String)]
以批的形式查詢
關於Kafka的offset,structured streaming默認提供了幾種方式:
//設置每個分區的起始和結束值 val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] //配置起始和結束的offset值(默認) val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
Schema信息
讀取后的數據的Schema是固定的,包含的列如下:
Column | Type | 說明 |
key | binary | 信息key |
value | binary | 信息的value(我們自己的數據) |
topic | string | 主題 |
partition | int | 分區 |
offset | long | 偏移值 |
timestamp | long | 時間戳 |
timestampType | int | 類型 |
source相關的配置
無論是流的形式,還是批的形式,都需要一些必要的參數:
- kafka.bootstrap.servers kafka的服務器配置,host:post形式,用逗號進行分割,如host1:9000,host2:9000
- assign,以json的形式指定topic信息
- subscribe,通過逗號分隔,指定topic信息
- subscribePattern,通過java的正則指定多個topic
assign、subscribe、subscribePattern同時之中能使用一個。
其他比較重要的參數有:
- startingOffsets, offset開始的值,如果是earliest,則從最早的數據開始讀;如果是latest,則從最新的數據開始讀。默認流是latest,批是earliest
- endingOffsets,最大的offset,只在批處理的時候設置,如果是latest則為最新的數據
- failOnDataLoss,在流處理時,當數據丟失時(比如topic被刪除了,offset在指定的范圍之外),查詢是否報錯,默認為true。這個功能可以當做是一種告警機制,如果對丟失數據不感興趣,可以設置為false。在批處理時,這個值總是為true。
- kafkaConsumer.pollTimeoutMs,excutor連接kafka的超時時間,默認是512ms
- fetchOffset.numRetries,獲取kafka的offset信息時,嘗試的次數;默認是3次
- fetchOffset.retryIntervalMs,嘗試重新讀取kafka offset信息時等待的時間,默認是10ms
- maxOffsetsPerTrigger,trigger暫時不會用,不太明白什么意思。Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
二、輸出數據源
目前Structed Streaming有四種方式:
1.File sink。寫入到文件中。
2.Foreach sink。對輸出的記錄進行任意計算。比如保存到mysql中。目前spark不支持直接寫入外部數據庫,只提供了Foreach接收器自己來實現,而且官網也沒有示例代碼。
3.Console sink。輸出到控制台,僅用於測試。
4.Memory sink。以表的形式輸出到內存,spark可以讀取內存中的表,僅用於測試。
5.Kafka sink。spark2.2.1更新了kafka sink,所以可以直接使用,如果你的版本低於2.2.1,那就只能使用第二個方法foreach sink來實現。
在配置完輸入,並針對DataFrame或者DataSet做了一些操作后,想要把結果保存起來。就可以使用DataSet.writeStream()方法,配置輸出需要配置下面的內容:
- format : 配置輸出的格式
- output mode:輸出的格式
- query name:查詢的名稱,類似tempview的名字
- trigger interval:觸發的間隔時間,如果前一個batch處理超時了,那么不會立即執行下一個batch,而是等下一個trigger時間在執行。
- checkpoint location:為保證數據的可靠性,可以設置檢查點保存輸出的結果。
1. output Mode
只有三種類型
- complete,把所有的DataFrame的內容輸出,這種模式只能在做agg聚合操作的時候使用,比如ds.group.count,之后可以使用它
- append,普通的dataframe在做完map或者filter之后可以使用。這種模式會把新的batch的數據輸出出來,
- update,把此次新增的數據輸出,並更新整個dataframe。有點類似之前的streaming的state處理。
2. 輸出的類型
2.1)file:保存成csv或者parquet
DF.writeStream .format("parquet") .option("checkpointLocation", "path/to/checkpoint/dir") .option("path", "path/to/destination/dir") .start()
2.2)console:直接輸出到控制台。一般做測試的時候用這個比較方便(測試用)
DF.writeStream .format("console") .start()
2.3)memory:可以保存在內容,供后面的代碼使用(測試用)
DF.writeStream .queryName("aggregates") .outputMode("complete") .format("memory") .start() spark.sql("select * from aggregates").show()
2.4) kafka: 輸出到kafka, 在spark 2.2.1以前用自定義實現寫入。在spark2.2.1后提供了方法。
spark 2.2.1之前寫入kafka的方法
自定義一個類KafkaSink繼承ForeachWriter
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.spark.sql.{ForeachWriter, Row} class KafkaSink(topic: String, servers: String) extends ForeachWriter[Row]{ val kafkaProperties = new Properties() kafkaProperties.put("bootstrap.servers", servers) kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer") kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer") val results = new scala.collection.mutable.HashMap var producer: KafkaProducer[String, String] = _ override def open(partitionId: Long, version: Long): Boolean = { producer = new KafkaProducer(kafkaProperties) return true } override def process(value: Row): Unit = { val word = value.getAs[String]("word") val count = value.getAs[String]("count") producer.send(new ProducerRecord(topic, word, count)) } override def close(errorOrNull: Throwable): Unit = { producer.close() } }
spark 2.2.1以后寫入kafka的方法
// spark 2.2.1以后
wordcount.writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "wordcount") .start()
2.5)foreach:參數是一個foreach的方法,用戶可以實現這個方法實現寫入mysql自定義的功能。
import java.sql._ import org.apache.spark.sql.{ForeachWriter, Row} class JDBCSink(url: String, userName: String, password: String) extends ForeachWriter[Row]{ var statement: Statement = _ var resultSet: ResultSet = _ var connection: Connection = _ // 初始化信息 override def open(partitionId: Long, version: Long): Boolean = { Class.forName("com.mysql.jdbc.Driver") connection = DriverManager.getConnection(url, userName, password) statement = connection.createStatement() return true } // 執行操作 override def process(value: Row): Unit = { val word= value.getAs[String]("word") val count = value.getAs[Integer]("count") val insertSql = "insert into webCount(word,count)" + "values('" + word + "'," + count + ")" statement.execute(insertSql) } // 結束操作 override def close(errorOrNull: Throwable): Unit = { connection.close() } }
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} object KafkaStructedStreaming { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().master("local[2]").appName("streaming").getOrCreate() val df = sparkSession .readStream .format("socket") .option("host", "hadoop102") .option("port", "9999") .load() import sparkSession.implicits._ val lines = df.selectExpr("CAST(value as STRING)").as[String] val weblog = lines.as[String].flatMap(_.split(" ")) val wordCount = weblog.groupBy("value").count().toDF("word", "count") val url ="jdbc:mysql://hadoop102:3306/test" val username="root" val password="000000" val writer = new JDBCSink(url, username, password) val query = wordCount.writeStream .foreach(writer) .outputMode("update") .trigger(ProcessingTime("10 seconds")) .start() query.awaitTermination() }
參考原文鏈接:https://blog.csdn.net/a790439710/article/details/103027602