大數據Spark實時處理--實時流處理1(Spark Streaming API)


  • 正式開始:基於spark流處理框架的學習
  • 使用Flume+Kafka+SparkStreaming進行實時日志分析:如何實時地(准實時,每分鍾分析一次)收集日志,處理日志,把處理后的記錄存入Hive中。
  • Flume會實時監控寫入日志的磁盤,只要有新的日志寫入,Flume就會將日志以消息的形式傳遞給Kafka,然后Spark Streaming實時消費消息傳入Hive。即Spark是一個實時處理的框架。
  • Flume是什么呢,它為什么可以監控一個磁盤文件呢?簡而言之,Flume是用來收集、匯聚並且移動大量日志文件的開源框架,所以很適合這種實時收集日志並且傳遞日志的場景。
  • Kafka是一個消息系統,Flume收集的日志可以移動到Kafka消息隊列中,然后就可以被多處消費了,而且可以保證不丟失數據。
  • 通過這套架構,收集到的日志可以及時被Flume發現傳到Kafka,通過Kafka我們可以把日志用到各個地方,同一份日志可以存入Hdfs中,也可以離線進行分析,還可以實時計算,而且可以保證安全性,基本可以達到實時的要求。

 

  • 初識Spark Streaming
  • 一、Spark Streaming概述
  • 1)官網:Spark Streaming | Apache Spark ----->Libraries----->Spark Streaming
    • 是一個實時的流處理框架:(特點:並易於擴展,容錯性)
    • 是基於Spark之上的一個流處理框架。
  • 2)想想之前的Flume框架
    • 雖然Flume是一個數據收集的框架,但進來的數據也相當於是一個流式的。
    • 只要source對接的源端有數據進來,就可以把數據以一個event作為傳輸單元,將數據收集到目的地。
  • 3)到現在為止,我們要有一個認識
    • 大數據處理,不論是離線處理/批計算,還是實時處理/流計算,通常情況下,都是比較類似的幾個階段
    • 大致情況下,都分成了三階段
    • ① 源端數據的輸入:source
    • ② 數據的處理:transformation
    • ③ 數據的輸出:sink
  • 二、Spark Streaming宏觀角度了解
  • 1)官網Spark Streaming - Spark 3.2.0 Documentation (apache.org)----->Programming Guides ----->Spark Streaming(DStreams)
    • Spark Streaming底層是基於Spark core的

 

 

  • 快速開發第一個應用程序(Spark Streaming的詞頻統計)
  • 一、基於IDEA+Maven構建第一個流處理應用程序(本地開發模型)
  • 1)示例:Spark Streaming的詞頻統計案例,對接網絡數據。
    • 感受一下Spark Streaming的編程風格,即套路編程。
  • 2)在主pom文件中添加spark streaming的依賴
    • 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\pom.xml
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.12</artifactId>
<!--            <version>${spark.version}</version>-->
                <version>3.0.0</version>
            </dependency>
  • 3)構建log-ss子工程
    • 左擊項目----->new----->Module----->Maven----->next
    • Artifactld:log-ss
    • Module name:log-ss
  • 4)修改log-ss的子pom的依賴
    • 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\pom.xml
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
        </dependency>
    </dependencies>
  • 5)在這里,更改了本地Maven的指定路徑
    • ① File--->Settings--->Maven
    • ② 將Maven路徑改為本地路徑
    • ③ Maven home path:D:/maven_3.5.0/apache-maven-3.5.0/apache-maven-3.5.0
    • ④ User settings file:C:\Users\jieqiong\.m2\settings.xml
    • ⑤ 勾選:Always update snapshots
  • 6)新建scala文件夾
    • 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main----->右鍵----->new----->directory----->scala
  • 原本在C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main下是一個java文件夾。
  • 7)安裝scala插件
    • File----->Settings----->Plugins----->搜素scala----->install----->重啟idea
  • 8)調整scala文件夾
    • 右鍵scala----->mark directory as----->sources root
  • 9)新建package
    • 路徑:C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main\scala
    • new----->package----->com.imooc.bigdata.ss
  • 10)添加scala支持
    • 右擊log-ss----->add framework support----->選擇scala----->選擇scala2.12版本
  • 11)新建scala class
    • C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\src\main\scala\com.imooc.bigdata.ss----->new----->scala.class----->NetworkWordCount.object
package com.imooc.bigdata.ss

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
 * object NetworkWordCount作用:完成詞頻統計分析
 * 數據源:是基於端口、網絡即nc的方式,造數據
 *
 * ss的編程范式
 * 1)main方法
 * 2)找入口點:new StreamingContext().var
 * 3)添加SparkConf的構造器:new SparkConf().var
 * 4)參數1:sparkConf放入new StreamingContext()
 * 5)參數2:Seconds(5)放入new StreamingContext()
 * 6)生成ssc:new StreamingContext(sparkConf,Seconds(5)).var
 * 7)對接網絡數據
 *   ssc.socketTextStream("spark000",9527).var
 * 8)開始業務邏輯處理
 *   啟動流作業:ssc.start()
 *   輸入數據以逗號分隔開:map是給每個單詞賦值1,,然后兩兩相加。
     lines.flatMap(_.split(",")).map((_,1))
     .reduceBykey(_+_).var
 *   結果打印:
 *   終止流作業:ssc.awaitTermination()
 * 9)運行報錯,添加val sparkConf = new SparkConf()參數
 */

object NetworkWordCount {
  // *****第1步
  /*
     對於NetworkWordCount這種Spark Streaming編程來講,也是通過main方法
     輸入main,回車
   */
  def main(args: Array[String]): Unit = {
    // *****第2步
    /*
       和kafka相同,找入口點
       官網:https://spark.apache.org/docs/latest/streaming-programming-guide.html
       要開發Spark Streaming應用程序,入口點就是:拿到一個streamingContext:new StreamingContext()
       看源碼:按ctrl,進入StreamingContext.scala

     * 關於StreamingContext.scala的描述
       Main entry point for Spark Streaming functionality. It provides methods used to create DStream:
       [[org.apache.spark.streaming.dstream.DStream]
       那 DStream是什么呢?


     * 目前,鼠標放在StreamingContext(),報錯:不能解析構造器,所以這里缺少構造器
       Cannot resolve overloaded constructor `StreamingContext`
       在scala里是有構造器的,主構造器、副主構造器。

     * 以下就是構造器要傳的三個參數
     * class StreamingContext private[streaming] (
       _sc: SparkContext,
       _cp: Checkpoint,
       _batchDur: Duration
       )

     * 這個是副主構造器1:傳的是sparkContext
     * batchDuration是時間間隔
     * def this(sparkContext: SparkContext, batchDuration: Duration) = {
       this(sparkContext, null, batchDuration)
       }

     * 這個是副主構造器2:傳的是SparkConf
     * def this(conf: SparkConf, batchDuration: Duration) = {
       this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
       }
     */

    // *****第3步
    /* 添加SparkConf的構造器
       new SparkConf().var
       然后選擇sparkConf。不建議加類型
     */
    val sparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName)
      .setMaster("local[2]")

    // *****第2步
    /*
      new StreamingContext()
     */

    // *****第4步
    /*
       將第3步中新生成的sparkConf,放入new StreamingContext()括號中。
     */

    // *****第5步
    /*
     * 添加時間間隔Duration(毫秒),可以看一下源碼
     * 使用
     * object Seconds {
       def apply(seconds: Long): Duration = new Duration(seconds * 1000)
       }

     * 並導入org.apache的包,往Seconds()放5
     * 意味着指定間隔5秒為一個批次
     */

    // *****第6步
    /*
       new StreamingContext(sparkConf,Seconds(5)).var
       輸入ssc
     */
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // TODO... 對接業務數據
    // *****第7步:先調用start啟動
    /*
       Creates an input stream from TCP source hostname:port. Data is received using
       a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
     */
    val lines = ssc.socketTextStream("spark000", 9527)

    // TODO... 業務邏輯處理
    // *****第9步:輸入數據以逗號分隔,並打印結果
    val result = lines.flatMap(_.split(",")).map((_,1))
      .reduceByKey(_+_)
    result.print()
    // *****第8步:先調用start啟動\終止
    ssc.start()
    ssc.awaitTermination()
  }
}

 

  • 二、本地功能測試
  • 1)啟動端口9527
  • 即object NetworkWordCount代碼中:val lines = ssc.socketTextStream("spark000", 9527)
[hadoop@spark000 ~]$ nc -lk 9527
  • 2)此時運行NetworkWordCount,會報錯。是因為 val sparkConf = new SparkConf()參數為空。
  • 報錯原因分析:
  • 報錯提示是:SparkContext初始化錯誤。
  • 嚴格意義,從代碼的層面上來看,傳遞的是SparkConf()進來的,並沒有構建SparkContext。
  • 來看一下構造器的方法:進入StreamingContext的源碼。
  • 源碼中,我們的確是傳遞了一個conf和間隔進來的,然后調用副主構造器(通過StreamingContext創建一個新的SparkContext,並將conf傳進去),所以看到的是一個SparkContext異常。
  • 對於StreamingContext里面幾個構造器,它們之間是相互調用的。同一個def中,上面的this調用下面的this。
  def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  }
  • 3)添加完參數后,重新運行:5s輸出一個時間值。
  • 4)在虛擬機的連接器中輸入值:a,a,a,b,b,b,c
  • 在IDEA中,直接輸出詞頻統計結果:
-------------------------------------------
Time: 1639358875000 ms
-------------------------------------------
(b,3)
(a,3)
(c,1)
  • 5)注意:流處理和批處理的開發方式是一樣的,只不過對於接數據、輸出可能不一樣。在core中,是使用connect輸出的。
  • 6)單拿出以下代碼,分析是流處理,還是批處理?是看不出來的。
  • 這就是所謂的在流處理和批處理里面,對於業務邏輯處理是可以單獨抽象出來的,所以流處理和批處理都可以調用。這樣的話,業務邏輯處理就可以統一了。
lines.flatMap(_.split(",")).map((_,1))
      .reduceByKey(_+_)
  • 7)在生產里面,將兩個最后加的參數注解掉,然后就可以將代碼提交上去了。
  • 8)注意,在本地測試時,一定要先將對接網絡數據的端口啟起來。

 

 

  • Spark部署
  • 一、Spark部署及服務器端測試
  • 1)環境部署
[hadoop@spark000 source]$ tar -zxvf spark-3.0.0.tar.gz
[hadoop@spark000 source]$ ls
spark-3.0.0  spark-3.0.0.tar.gz
    • ④開始編譯Building a Runnable Distribution:
      Building Spark - Spark 3.2.0 Documentation (apache.org)
    • 已提供編譯好的版本了:在app文件中的spark-3.0.0-bin-2.6.0-cdh5.16.2
    • 環境版本:The Maven-based build is the build of reference for Apache Spark. Building Spark using Maven requires Maven 3.6.3 and Java 8. Spark requires Scala 2.12; support for Scala 2.11 was removed in Spark 3.0.0.
    • 查看HADOOP_HOME版本:
[hadoop@spark000 ~]$ echo $HADOOP_HOME
/home/hadoop/app/hadoop-2.6.0-cdh5.16.2
    • ⑤在source目錄下的spark-3.0.0中,執行編譯(時間較長)
./dev/make-distribution.sh \
--name 2.6.0-cdh5.16.2 \
--tgz -Phive -Phive-thriftserver \
-Pyarn
    • ⑥編譯好了之后spark-3.0.0-bin-2.6.0-cdh5.16.2.tgz,直接tar -zxvf解壓到app目錄。
    • ⑦如何提交一個作業?
    • 官網的示例:Running Spark on YARN - Spark 3.2.0 Documentation (apache.org)
    • ⑧進入spark主目錄中,執行
    • 執行過程中去瀏覽器界面看一下spark000
    • 在執行界面,查看結果為Pi is roughly 3.14005.....
    • 此處報錯:INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 6 time(s); retry policy is RetryUpToMaximumCoISECONDS)
    • 原因:啟動hadoop中的dfs和yarn的腳本即可。
[hadoop@spark000 spark-3.0.0-bin-2.6.0-cdh5.16.2]$ bin/spark-submit --class org.apache.spark.examples.SparkPi \
> --master yarn \
> examples/jars/spark-examples*.jar \
> 2
    • ⑨開始打包本地代碼包
    • 注意將AppName和Master兩個參數注釋掉,一會兒是通過腳本提交的。
    • 到C:\Users\jieqiong\IdeaProjects\spark-log4j\log-ss\target下找到log-ss-1.0
    • 上傳至lib路徑下
[hadoop@spark000 lib]$ pwd
/home/hadoop/lib
[hadoop@spark000 lib]$ ls
log-ss-1.0.jar
    • ⑩再次注意:啟動hdf、yarn、Zookeeper、單個Master進程
    • /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/sbin
    • [hadoop@hadoop000 sbin]$ ./start-dfs.sh
    • /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/sbin
    • [hadoop@hadoop000 sbin]$ ./start-yarn.sh
    • /home/hadoop/app/zookeeper-3.4.5-cdh5.16.2/bin
    • zkServer.sh start
    • /home/hadoop/app/spark-3.0.0-bin-2.6.0-cdh5.16.2/sbin
    • ./start-master.sh
    • ⑪腳本啟動
spark-submit \
--name jieqiong-network-wc \
--class com.imooc.bigdata.ss.NetworkWordCount \
--master yarn \
/home/hadoop/lib/log-ss-1.0.jar
    • ⑫啟動端口:
    • [hadoop@spark000 ~]$ nc -lk 9527
    • 輸入單詞。會在另一個界面進行詞頻統計。
    • 總結:目前來說spark對於我們只是一個客戶端。不需要啟動本地的spark集群
  • 二、StreamingContext編程注意事項
  • 1)官網:Spark Streaming - Spark 3.2.0 Documentation (apache.org)
  • 2)Initializing StreamingContext
  • 這樣理解,Context是一個上下文的框,什么東西都可以放到里面。
  • 對於SparkStreaming應用程序來說,StreamingContext是一個入口點,所以一定要創建。
  • 在StreamingContext或者Streaming core啟動過程中,一定要檢查conf里的兩個參數:spark.master、spark.app.name
  • 3)After a context is defined, you have to do the following.(看官網)
    • 輸入流式通過ssc接收socket話,就是一個hostname+ip。即input DStreams。
  • 4)Points to remember.(看官網)
  • 一旦ssc.start()啟動,就不要再進行其他計算了(result.map等),是無效的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM