1.1.1.讀取文本數據
spark應用可以監聽某一個目錄,而web服務在這個目錄上實時產生日志文件,這樣對於spark應用來說,日志文件就是實時數據
Structured Streaming支持的文件類型有text,csv,json,parquet
●准備工作
在people.json文件輸入如下數據:
{"name":"json","age":23,"hobby":"running"}
{"name":"charles","age":32,"hobby":"basketball"}
{"name":"tom","age":28,"hobby":"football"}
{"name":"lili","age":24,"hobby":"running"}
{"name":"bob","age":20,"hobby":"swimming"}
注意:文件必須是被移動到目錄中的,且文件名不能有特殊字符
●需求
接下里使用Structured Streaming統計年齡小於25歲的人群的愛好排行榜
●代碼演示:
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* {"name":"json","age":23,"hobby":"running"}
* {"name":"charles","age":32,"hobby":"basketball"}
* {"name":"tom","age":28,"hobby":"football"}
* {"name":"lili","age":24,"hobby":"running"}
* {"name":"bob","age":20,"hobby":"swimming"}
* 統計年齡小於25歲的人群的愛好排行榜
*/
object WordCount2 {
def main(args: Array[String]): Unit = {
//1.創建SparkSession,因為StructuredStreaming的數據模型也是DataFrame/DataSet
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
val Schema: StructType = new StructType()
.add("name","string")
.add("age","integer")
.add("hobby","string")
//2.接收數據
import spark.implicits._
// Schema must be specified when creating a streaming source DataFrame.
val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\\data\\spark\\data")
//3.處理數據
val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)
//4.輸出結果
result.writeStream
.format("console")
.outputMode("complete")
.trigger(Trigger.ProcessingTime(0))
.start()
.awaitTermination()
}
}
1
import org.apache.spark.SparkContext
2
import org.apache.spark.sql.streaming.Trigger
3
import org.apache.spark.sql.types.StructType
4
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
5
/**
6
* {"name":"json","age":23,"hobby":"running"}
7
* {"name":"charles","age":32,"hobby":"basketball"}
8
* {"name":"tom","age":28,"hobby":"football"}
9
* {"name":"lili","age":24,"hobby":"running"}
10
* {"name":"bob","age":20,"hobby":"swimming"}
11
* 統計年齡小於25歲的人群的愛好排行榜
12
*/
13
object WordCount2 {
14
def main(args: Array[String]): Unit = {
15
//1.創建SparkSession,因為StructuredStreaming的數據模型也是DataFrame/DataSet
16
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
17
val sc: SparkContext = spark.sparkContext
18
sc.setLogLevel("WARN")
19
val Schema: StructType = new StructType()
20
.add("name","string")
21
.add("age","integer")
22
.add("hobby","string")
23
//2.接收數據
24
import spark.implicits._
25
// Schema must be specified when creating a streaming source DataFrame.
26
val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\\data\\spark\\data")
27
//3.處理數據
28
val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)
29
//4.輸出結果
30
result.writeStream
31
.format("console")
32
.outputMode("complete")
33
.trigger(Trigger.ProcessingTime(0))
34
.start()
35
.awaitTermination()
36
}
37
}
代碼截圖:
