- 正式開始:基於spark流處理框架的學習
- 使用Flume+Kafka+SparkStreaming進行實時日志分析:如何實時地(准實時,每分鍾分析一次)收集日志,處理日志,把處理后的記錄存入Hive中。
- Flume會實時監控寫入日志的磁盤,只要有新的日志寫入,Flume就會將日志以消息的形式傳遞給Kafka,然后Spark Streaming實時消費消息傳入Hive。即Spark是一個實時處理的框架。
- Flume是什么呢,它為什么可以監控一個磁盤文件呢?簡而言之,Flume是用來收集、匯聚並且移動大量日志文件的開源框架,所以很適合這種實時收集日志並且傳遞日志的場景。
- Kafka是一個消息系統,Flume收集的日志可以移動到Kafka消息隊列中,然后就可以被多處消費了,而且可以保證不丟失數據。
- 通過這套架構,收集到的日志可以及時被Flume發現傳到Kafka,通過Kafka我們可以把日志用到各個地方,同一份日志可以存入Hdfs中,也可以離線進行分析,還可以實時計算,而且可以保證安全性,基本可以達到實時的要求。
- 初識Spark Streaming
- 一、Spark Streaming概述
- 1)官網:Spark Streaming | Apache Spark ----->Libraries----->Spark Streaming
- 是一個實時的流處理框架:(特點:並易於擴展,容錯性)
- 是基於Spark之上的一個流處理框架。
2)想想之前的Flume框架
- 雖然Flume是一個數據收集的框架,但進來的數據也相當於是一個流式的。
- 只要source對接的源端有數據進來,就可以把數據以一個event作為傳輸單元,將數據收集到目的地。
- 3)到現在為止,我們要有一個認識
- 大數據處理,不論是離線處理/批計算,還是實時處理/流計算,通常情況下,都是比較類似的幾個階段
- 大致情況下,都分成了三階段
- ① 源端數據的輸入:source
- ② 數據的處理:transformation
- ③ 數據的輸出:sink
- 二、Spark Streaming宏觀角度了解
- 1)官網Spark Streaming - Spark 3.2.0 Documentation (apache.org)----->Programming Guides ----->Spark Streaming(DStreams)
- Spark Streaming底層是基於Spark core的

- 快速開發第一個應用程序(Spark Streaming的詞頻統計)
- 一、基於IDEA+Maven構建第一個流處理應用程序(本地開發模型)
1)示例:Spark Streaming的詞頻統計案例,對接網絡數據。
- 感受一下Spark Streaming的編程風格,即套路編程。
- 2)在主pom文件中添加spark streaming的依賴
- 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<!-- <version>${spark.version}</version>-->
<version>3.0.0</version>
</dependency>
- 3)構建log-ss子工程
- 左擊項目----->new----->Module----->Maven----->next
- Artifactld:log-ss
- Module name:log-ss
- 4)修改log-ss的子pom的依賴
- 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\pom.xml
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
</dependency>
</dependencies>
- 5)在這里,更改了本地Maven的指定路徑
- ① File--->Settings--->Maven
- ② 將Maven路徑改為本地路徑
- ③ Maven home path:D:/maven_3.5.0/apache-maven-3.5.0/apache-maven-3.5.0
- ④ User settings file:C:\Users\jieqiong\.m2\settings.xml
- ⑤ 勾選:Always update snapshots
- 6)新建scala文件夾
- 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main----->右鍵----->new----->directory----->scala
- 原本在C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main下是一個java文件夾。
- 7)安裝scala插件
- File----->Settings----->Plugins----->搜素scala----->install----->重啟idea
- 8)調整scala文件夾
- 右鍵scala----->mark directory as----->sources root
- 9)新建package
- 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main\scala
- new----->package----->com.imooc.bigdata.ss
- 10)添加scala支持
- 右擊log-ss----->add framework support----->選擇scala----->選擇scala2.12版本
- 11)新建scala class
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main\scala\com.imooc.bigdata.ss----->new----->scala.class----->NetworkWordCount.object
package com.imooc.bigdata.ss
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* object NetworkWordCount作用:完成詞頻統計分析
* 數據源:是基於端口、網絡即nc的方式,造數據
*
* ss的編程范式
* 1)main方法
* 2)找入口點:new StreamingContext().var
* 3)添加SparkConf的構造器:new SparkConf().var
* 4)參數1:sparkConf放入new StreamingContext()
* 5)參數2:Seconds(5)放入new StreamingContext()
* 6)生成ssc:new StreamingContext(sparkConf,Seconds(5)).var
* 7)對接網絡數據
* ssc.socketTextStream("spark000",9527).var
* 8)開始業務邏輯處理
* 啟動流作業:ssc.start()
* 輸入數據以逗號分隔開:map是給每個單詞賦值1,,然后兩兩相加。
lines.flatMap(_.split(",")).map((_,1))
.reduceBykey(_+_).var
* 結果打印:
* 終止流作業:ssc.awaitTermination()
* 9)運行報錯,添加val sparkConf = new SparkConf()參數
*/
object NetworkWordCount {
// *****第1步
/*
對於NetworkWordCount這種Spark Streaming編程來講,也是通過main方法
輸入main,回車
*/
def main(args: Array[String]): Unit = {
// *****第2步
/*
和kafka相同,找入口點
官網:https://spark.apache.org/docs/latest/streaming-programming-guide.html
要開發Spark Streaming應用程序,入口點就是:拿到一個streamingContext:new StreamingContext()
看源碼:按ctrl,進入StreamingContext.scala
* 關於StreamingContext.scala的描述
Main entry point for Spark Streaming functionality. It provides methods used to create DStream:
[[org.apache.spark.streaming.dstream.DStream]
那 DStream是什么呢?
* 目前,鼠標放在StreamingContext(),報錯:不能解析構造器,所以這里缺少構造器
Cannot resolve overloaded constructor `StreamingContext`
在scala里是有構造器的,主構造器、副主構造器。
* 以下就是構造器要傳的三個參數
* class StreamingContext private[streaming] (
_sc: SparkContext,
_cp: Checkpoint,
_batchDur: Duration
)
* 這個是副主構造器1:傳的是sparkContext
* batchDuration是時間間隔
* def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
}
* 這個是副主構造器2:傳的是SparkConf
* def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
*/
// *****第3步
/* 添加SparkConf的構造器
new SparkConf().var
然后選擇sparkConf。不建議加類型
*/
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[2]")
// *****第2步
/*
new StreamingContext()
*/
// *****第4步
/*
將第3步中新生成的sparkConf,放入new StreamingContext()括號中。
*/
// *****第5步
/*
* 添加時間間隔Duration(毫秒),可以看一下源碼
* 使用
* object Seconds {
def apply(seconds: Long): Duration = new Duration(seconds * 1000)
}
* 並導入org.apache的包,往Seconds()放5
* 意味着指定間隔5秒為一個批次
*/
// *****第6步
/*
new StreamingContext(sparkConf,Seconds(5)).var
輸入ssc
*/
val ssc = new StreamingContext(sparkConf, Seconds(5))
// TODO... 對接業務數據
// *****第7步:先調用start啟動
/*
Creates an input stream from TCP source hostname:port. Data is received using
a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
*/
val lines = ssc.socketTextStream("spark000", 9527)
// TODO... 業務邏輯處理
// *****第9步:輸入數據以逗號分隔,並打印結果
val result = lines.flatMap(_.split(",")).map((_,1))
.reduceByKey(_+_)
result.print()
// *****第8步:先調用start啟動\終止
ssc.start()
ssc.awaitTermination()
}
}
- 二、本地功能測試
- 1)啟動端口9527
- 即object NetworkWordCount代碼中:val lines = ssc.socketTextStream("spark000", 9527)
[hadoop@spark000 ~]$ nc -lk 9527
- 2)此時運行NetworkWordCount,會報錯。是因為 val sparkConf = new SparkConf()參數為空。
- 報錯原因分析:
- 報錯提示是:SparkContext初始化錯誤。
- 嚴格意義,從代碼的層面上來看,傳遞的是SparkConf()進來的,並沒有構建SparkContext。
- 來看一下構造器的方法:進入StreamingContext的源碼。
- 源碼中,我們的確是傳遞了一個conf和間隔進來的,然后調用副主構造器(通過StreamingContext創建一個新的SparkContext,並將conf傳進去),所以看到的是一個SparkContext異常。
- 對於StreamingContext里面幾個構造器,它們之間是相互調用的。同一個def中,上面的this調用下面的this。
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
- 3)添加完參數后,重新運行:5s輸出一個時間值。
- 4)在虛擬機的連接器中輸入值:a,a,a,b,b,b,c
- 在IDEA中,直接輸出詞頻統計結果:
-------------------------------------------
Time: 1639358875000 ms
-------------------------------------------
(b,3)
(a,3)
(c,1)
- 5)注意:流處理和批處理的開發方式是一樣的,只不過對於接數據、輸出可能不一樣。在core中,是使用connect輸出的。
- 6)單拿出以下代碼,分析是流處理,還是批處理?是看不出來的。
- 這就是所謂的在流處理和批處理里面,對於業務邏輯處理是可以單獨抽象出來的,所以流處理和批處理都可以調用。這樣的話,業務邏輯處理就可以統一了。
lines.flatMap(_.split(",")).map((_,1))
.reduceByKey(_+_)
- 7)在生產里面,將兩個最后加的參數注解掉,然后就可以將代碼提交上去了。
- 8)注意,在本地測試時,一定要先將對接網絡數據的端口啟起來。
- Spark部署
- 一、Spark部署及服務器端測試
- 1)環境部署
[hadoop@spark000 source]$ tar -zxvf spark-3.0.0.tar.gz
[hadoop@spark000 source]$ ls
spark-3.0.0 spark-3.0.0.tar.gz
-
- ④開始編譯Building a Runnable Distribution:
Building Spark - Spark 3.2.0 Documentation (apache.org)
- 已提供編譯好的版本了:在app文件中的spark-3.0.0-bin-2.6.0-cdh5.16.2
- 環境版本:The Maven-based build is the build of reference for Apache Spark. Building Spark using Maven requires Maven 3.6.3 and Java 8. Spark requires Scala 2.12; support for Scala 2.11 was removed in Spark 3.0.0.
- 查看HADOOP_HOME版本:
[hadoop@spark000 ~]$ echo $HADOOP_HOME
/home/hadoop/app/hadoop-2.6.0-cdh5.16.2
-
- ⑤在source目錄下的spark-3.0.0中,執行編譯(時間較長)
./dev/make-distribution.sh \
--name 2.6.0-cdh5.16.2 \
--tgz -Phive -Phive-thriftserver \
-Pyarn
-
- ⑥編譯好了之后spark-3.0.0-bin-2.6.0-cdh5.16.2.tgz,直接tar -zxvf解壓到app目錄。
- ⑦如何提交一個作業?
- 官網的示例:Running Spark on YARN - Spark 3.2.0 Documentation (apache.org)
- ⑧進入spark主目錄中,執行
- 執行過程中去瀏覽器界面看一下spark000
- 在執行界面,查看結果為Pi is roughly 3.14005.....
- 此處報錯:INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 6 time(s); retry policy is RetryUpToMaximumCoISECONDS)
- 原因:啟動hadoop中的dfs和yarn的腳本即可。
[hadoop@spark000 spark-3.0.0-bin-2.6.0-cdh5.16.2]$ bin/spark-submit --class org.apache.spark.examples.SparkPi \
> --master yarn \
> examples/jars/spark-examples*.jar \
> 2
-
- ⑨開始打包本地代碼包
- 注意將AppName和Master兩個參數注釋掉,一會兒是通過腳本提交的。
- 到C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\target下找到log-ss-1.0
- 上傳至lib路徑下
[hadoop@spark000 lib]$ pwd
/home/hadoop/lib
[hadoop@spark000 lib]$ ls
log-ss-1.0.jar
-
- ⑩再次注意:啟動hdf、yarn、Zookeeper、單個Master進程
- /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/sbin
- [hadoop@hadoop000 sbin]$ ./start-dfs.sh
- /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/sbin
- [hadoop@hadoop000 sbin]$ ./start-yarn.sh
- /home/hadoop/app/zookeeper-3.4.5-cdh5.16.2/bin
- zkServer.sh start
- /home/hadoop/app/spark-3.0.0-bin-2.6.0-cdh5.16.2/sbin
- ./start-master.sh
- ⑪腳本啟動
spark-submit \
--name jieqiong-network-wc \
--class com.imooc.bigdata.ss.NetworkWordCount \
--master yarn \
/home/hadoop/lib/log-ss-1.0.jar
-
- ⑫啟動端口:
- [hadoop@spark000 ~]$ nc -lk 9527
- 輸入單詞。會在另一個界面進行詞頻統計。
- 總結:目前來說spark對於我們只是一個客戶端。不需要啟動本地的spark集群
- 二、StreamingContext編程注意事項
- 1)官網:Spark Streaming - Spark 3.2.0 Documentation (apache.org)
- 2)Initializing StreamingContext
- 這樣理解,Context是一個上下文的框,什么東西都可以放到里面。
- 對於SparkStreaming應用程序來說,StreamingContext是一個入口點,所以一定要創建。
- 在StreamingContext或者Streaming core啟動過程中,一定要檢查conf里的兩個參數:spark.master、spark.app.name
- 3)After a context is defined, you have to do the following.(看官網)
- 輸入流式通過ssc接收socket話,就是一個hostname+ip。即input DStreams。
- 4)Points to remember.(看官網)
- 一旦ssc.start()啟動,就不要再進行其他計算了(result.map等),是無效的。