- 正式開始:基於spark流處理框架的學習
- 使用Flume+Kafka+SparkStreaming進行實時日志分析:如何實時地(准實時,每分鍾分析一次)收集日志,處理日志,把處理后的記錄存入Hive中。
- Flume會實時監控寫入日志的磁盤,只要有新的日志寫入,Flume就會將日志以消息的形式傳遞給Kafka,然后Spark Streaming實時消費消息傳入Hive。即Spark是一個實時處理的框架。
- Flume是什么呢,它為什么可以監控一個磁盤文件呢?簡而言之,Flume是用來收集、匯聚並且移動大量日志文件的開源框架,所以很適合這種實時收集日志並且傳遞日志的場景。
- Kafka是一個消息系統,Flume收集的日志可以移動到Kafka消息隊列中,然后就可以被多處消費了,而且可以保證不丟失數據。
- 通過這套架構,收集到的日志可以及時被Flume發現傳到Kafka,通過Kafka我們可以把日志用到各個地方,同一份日志可以存入Hdfs中,也可以離線進行分析,還可以實時計算,而且可以保證安全性,基本可以達到實時的要求。
- 初識Spark Streaming
- 一、Spark Streaming概述
- 1)官網:Spark Streaming | Apache Spark ----->Libraries----->Spark Streaming
- 是一個實時的流處理框架:(特點:並易於擴展,容錯性)
- 是基於Spark之上的一個流處理框架。
2)想想之前的Flume框架
- 雖然Flume是一個數據收集的框架,但進來的數據也相當於是一個流式的。
- 只要source對接的源端有數據進來,就可以把數據以一個event作為傳輸單元,將數據收集到目的地。
- 3)到現在為止,我們要有一個認識
- 大數據處理,不論是離線處理/批計算,還是實時處理/流計算,通常情況下,都是比較類似的幾個階段
- 大致情況下,都分成了三階段
- ① 源端數據的輸入:source
- ② 數據的處理:transformation
- ③ 數據的輸出:sink
- 二、Spark Streaming宏觀角度了解
- 1)官網Spark Streaming - Spark 3.2.0 Documentation (apache.org)----->Programming Guides ----->Spark Streaming(DStreams)
- Spark Streaming底層是基於Spark core的
- 快速開發第一個應用程序(Spark Streaming的詞頻統計)
- 一、基於IDEA+Maven構建第一個流處理應用程序(本地開發模型) 1)示例:Spark Streaming的詞頻統計案例,對接網絡數據。
- 感受一下Spark Streaming的編程風格,即套路編程。
- 2)在主pom文件中添加spark streaming的依賴
- 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\pom.xml
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <!-- <version>${spark.version}</version>--> <version>3.0.0</version> </dependency>
- 3)構建log-ss子工程
- 左擊項目----->new----->Module----->Maven----->next
- Artifactld:log-ss
- Module name:log-ss
- 4)修改log-ss的子pom的依賴
- 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\pom.xml
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> </dependency> </dependencies>
- 5)在這里,更改了本地Maven的指定路徑
- ① File--->Settings--->Maven
- ② 將Maven路徑改為本地路徑
- ③ Maven home path:D:/maven_3.5.0/apache-maven-3.5.0/apache-maven-3.5.0
- ④ User settings file:C:\Users\jieqiong\.m2\settings.xml
- ⑤ 勾選:Always update snapshots
- 6)新建scala文件夾
- 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main----->右鍵----->new----->directory----->scala
- 原本在C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main下是一個java文件夾。
- 7)安裝scala插件
- File----->Settings----->Plugins----->搜素scala----->install----->重啟idea
- 8)調整scala文件夾
- 右鍵scala----->mark directory as----->sources root
- 9)新建package
- 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main\scala
- new----->package----->com.imooc.bigdata.ss
- 10)添加scala支持
- 右擊log-ss----->add framework support----->選擇scala----->選擇scala2.12版本
- 11)新建scala class
- C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main\scala\com.imooc.bigdata.ss----->new----->scala.class----->NetworkWordCount.object
package com.imooc.bigdata.ss import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /* * object NetworkWordCount作用:完成詞頻統計分析 * 數據源:是基於端口、網絡即nc的方式,造數據 * * ss的編程范式 * 1)main方法 * 2)找入口點:new StreamingContext().var * 3)添加SparkConf的構造器:new SparkConf().var * 4)參數1:sparkConf放入new StreamingContext() * 5)參數2:Seconds(5)放入new StreamingContext() * 6)生成ssc:new StreamingContext(sparkConf,Seconds(5)).var * 7)對接網絡數據 * ssc.socketTextStream("spark000",9527).var * 8)開始業務邏輯處理 * 啟動流作業:ssc.start() * 輸入數據以逗號分隔開:map是給每個單詞賦值1,,然后兩兩相加。 lines.flatMap(_.split(",")).map((_,1)) .reduceBykey(_+_).var * 結果打印: * 終止流作業:ssc.awaitTermination() * 9)運行報錯,添加val sparkConf = new SparkConf()參數 */ object NetworkWordCount { // *****第1步 /* 對於NetworkWordCount這種Spark Streaming編程來講,也是通過main方法 輸入main,回車 */ def main(args: Array[String]): Unit = { // *****第2步 /* 和kafka相同,找入口點 官網:https://spark.apache.org/docs/latest/streaming-programming-guide.html 要開發Spark Streaming應用程序,入口點就是:拿到一個streamingContext:new StreamingContext() 看源碼:按ctrl,進入StreamingContext.scala * 關於StreamingContext.scala的描述 Main entry point for Spark Streaming functionality. It provides methods used to create DStream: [[org.apache.spark.streaming.dstream.DStream] 那 DStream是什么呢? * 目前,鼠標放在StreamingContext(),報錯:不能解析構造器,所以這里缺少構造器 Cannot resolve overloaded constructor `StreamingContext` 在scala里是有構造器的,主構造器、副主構造器。 * 以下就是構造器要傳的三個參數 * class StreamingContext private[streaming] ( _sc: SparkContext, _cp: Checkpoint, _batchDur: Duration ) * 這個是副主構造器1:傳的是sparkContext * batchDuration是時間間隔 * def this(sparkContext: SparkContext, batchDuration: Duration) = { this(sparkContext, null, batchDuration) } * 這個是副主構造器2:傳的是SparkConf * def this(conf: SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf), null, batchDuration) } */ // *****第3步 /* 添加SparkConf的構造器 new SparkConf().var 然后選擇sparkConf。不建議加類型 */ val sparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[2]") // *****第2步 /* new StreamingContext() */ // *****第4步 /* 將第3步中新生成的sparkConf,放入new StreamingContext()括號中。 */ // *****第5步 /* * 添加時間間隔Duration(毫秒),可以看一下源碼 * 使用 * object Seconds { def apply(seconds: Long): Duration = new Duration(seconds * 1000) } * 並導入org.apache的包,往Seconds()放5 * 意味着指定間隔5秒為一個批次 */ // *****第6步 /* new StreamingContext(sparkConf,Seconds(5)).var 輸入ssc */ val ssc = new StreamingContext(sparkConf, Seconds(5)) // TODO... 對接業務數據 // *****第7步:先調用start啟動 /* Creates an input stream from TCP source hostname:port. Data is received using a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited */ val lines = ssc.socketTextStream("spark000", 9527) // TODO... 業務邏輯處理 // *****第9步:輸入數據以逗號分隔,並打印結果 val result = lines.flatMap(_.split(",")).map((_,1)) .reduceByKey(_+_) result.print() // *****第8步:先調用start啟動\終止 ssc.start() ssc.awaitTermination() } }
- 二、本地功能測試
- 1)啟動端口9527
- 即object NetworkWordCount代碼中:val lines = ssc.socketTextStream("spark000", 9527)
[hadoop@spark000 ~]$ nc -lk 9527
- 2)此時運行NetworkWordCount,會報錯。是因為 val sparkConf = new SparkConf()參數為空。
- 報錯原因分析:
- 報錯提示是:SparkContext初始化錯誤。
- 嚴格意義,從代碼的層面上來看,傳遞的是SparkConf()進來的,並沒有構建SparkContext。
- 來看一下構造器的方法:進入StreamingContext的源碼。
- 源碼中,我們的確是傳遞了一個conf和間隔進來的,然后調用副主構造器(通過StreamingContext創建一個新的SparkContext,並將conf傳進去),所以看到的是一個SparkContext異常。
- 對於StreamingContext里面幾個構造器,它們之間是相互調用的。同一個def中,上面的this調用下面的this。
def this(conf: SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf), null, batchDuration) }
- 3)添加完參數后,重新運行:5s輸出一個時間值。
- 4)在虛擬機的連接器中輸入值:a,a,a,b,b,b,c
- 在IDEA中,直接輸出詞頻統計結果:
------------------------------------------- Time: 1639358875000 ms ------------------------------------------- (b,3) (a,3) (c,1)
- 5)注意:流處理和批處理的開發方式是一樣的,只不過對於接數據、輸出可能不一樣。在core中,是使用connect輸出的。
- 6)單拿出以下代碼,分析是流處理,還是批處理?是看不出來的。
- 這就是所謂的在流處理和批處理里面,對於業務邏輯處理是可以單獨抽象出來的,所以流處理和批處理都可以調用。這樣的話,業務邏輯處理就可以統一了。
lines.flatMap(_.split(",")).map((_,1))
.reduceByKey(_+_)
- 7)在生產里面,將兩個最后加的參數注解掉,然后就可以將代碼提交上去了。
- 8)注意,在本地測試時,一定要先將對接網絡數據的端口啟起來。
- 三、官網案例解讀
- 1)在這門課,使用的是scala語言(哎)
- 2)官網:Spark Streaming - Spark 3.2.0 Documentation (apache.org)
- 就是使用官網對第二部分的代碼,解讀。
- Spark部署
- 一、Spark部署及服務器端測試
- 1)環境部署
- ①官網下載地址(未找到3.0.0):Downloads | Apache Spark
- ②git下載地址(源碼是需要編譯的):Release v3.0.0 · apache/spark · GitHub
- ③將3.0.0壓縮包放入source目錄下,並解壓。解壓后的文件里都是源碼,不能直接使用。需要先編譯。
[hadoop@spark000 source]$ tar -zxvf spark-3.0.0.tar.gz
[hadoop@spark000 source]$ ls
spark-3.0.0 spark-3.0.0.tar.gz
-
- ④開始編譯Building a Runnable Distribution:
Building Spark - Spark 3.2.0 Documentation (apache.org) - 已提供編譯好的版本了:在app文件中的spark-3.0.0-bin-2.6.0-cdh5.16.2
- 環境版本:The Maven-based build is the build of reference for Apache Spark. Building Spark using Maven requires Maven 3.6.3 and Java 8. Spark requires Scala 2.12; support for Scala 2.11 was removed in Spark 3.0.0.
- 查看HADOOP_HOME版本:
- ④開始編譯Building a Runnable Distribution:
[hadoop@spark000 ~]$ echo $HADOOP_HOME
/home/hadoop/app/hadoop-2.6.0-cdh5.16.2
-
- ⑤在source目錄下的spark-3.0.0中,執行編譯(時間較長)
./dev/make-distribution.sh \ --name 2.6.0-cdh5.16.2 \ --tgz -Phive -Phive-thriftserver \ -Pyarn
-
- ⑥編譯好了之后spark-3.0.0-bin-2.6.0-cdh5.16.2.tgz,直接tar -zxvf解壓到app目錄。
- ⑦如何提交一個作業?
- 官網的示例:Running Spark on YARN - Spark 3.2.0 Documentation (apache.org)
- ⑧進入spark主目錄中,執行
- 執行過程中去瀏覽器界面看一下spark000
- 在執行界面,查看結果為Pi is roughly 3.14005.....
- 此處報錯:INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 6 time(s); retry policy is RetryUpToMaximumCoISECONDS)
- 原因:啟動hadoop中的dfs和yarn的腳本即可。
[hadoop@spark000 spark-3.0.0-bin-2.6.0-cdh5.16.2]$ bin/spark-submit --class org.apache.spark.examples.SparkPi \ > --master yarn \ > examples/jars/spark-examples*.jar \ > 2
-
- ⑨開始打包本地代碼包
- 注意將AppName和Master兩個參數注釋掉,一會兒是通過腳本提交的。
- 到C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\target下找到log-ss-1.0
- 上傳至lib路徑下
[hadoop@spark000 lib]$ pwd /home/hadoop/lib [hadoop@spark000 lib]$ ls log-ss-1.0.jar
-
- ⑩再次注意:啟動hdf、yarn、Zookeeper、單個Master進程
- /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/sbin
- [hadoop@hadoop000 sbin]$ ./start-dfs.sh
- /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/sbin
- [hadoop@hadoop000 sbin]$ ./start-yarn.sh
- /home/hadoop/app/zookeeper-3.4.5-cdh5.16.2/bin
- zkServer.sh start
- /home/hadoop/app/spark-3.0.0-bin-2.6.0-cdh5.16.2/sbin
- ./start-master.sh
- ⑪腳本啟動
spark-submit \ --name jieqiong-network-wc \ --class com.imooc.bigdata.ss.NetworkWordCount \ --master yarn \ /home/hadoop/lib/log-ss-1.0.jar
-
- ⑫啟動端口:
- [hadoop@spark000 ~]$ nc -lk 9527
- 輸入單詞。會在另一個界面進行詞頻統計。
- 總結:目前來說spark對於我們只是一個客戶端。不需要啟動本地的spark集群
- 二、StreamingContext編程注意事項
- 1)官網:Spark Streaming - Spark 3.2.0 Documentation (apache.org)
- 2)Initializing StreamingContext
- 這樣理解,Context是一個上下文的框,什么東西都可以放到里面。
- 對於SparkStreaming應用程序來說,StreamingContext是一個入口點,所以一定要創建。
- 在StreamingContext或者Streaming core啟動過程中,一定要檢查conf里的兩個參數:spark.master、spark.app.name
- 3)After a context is defined, you have to do the following.(看官網)
- 輸入流式通過ssc接收socket話,就是一個hostname+ip。即input DStreams。
- 4)Points to remember.(看官網)
- 一旦ssc.start()啟動,就不要再進行其他計算了(result.map等),是無效的。