Spark入門實戰系列--7.Spark Streaming(下)--實時流計算Spark Streaming實戰


【注】該系列文章以及使用到安裝包/測試數據 可以在《傾情大奉送--Spark入門實戰系列》獲取

1實例演示

1.1 流數據模擬器

1.1.1 流數據說明

在實例演示中模擬實際情況,需要源源不斷地接入流數據,為了在演示過程中更接近真實環境將定義流數據模擬器。該模擬器主要功能:通過Socket方式監聽指定的端口號,當外部程序通過該端口連接並請求數據時,模擬器將定時將指定的文件數據隨機獲取發送給外部程序。

1.1.2 模擬器代碼

import java.io.{PrintWriter}

import java.net.ServerSocket

import scala.io.Source

 

object StreamingSimulation {

  // 定義隨機獲取整數的方法

  def index(length: Int) = {

    import java.util.Random

    val rdm = new Random

    rdm.nextInt(length)

  }

 

  def main(args: Array[String]) {

    // 調用該模擬器需要三個參數,分為為文件路徑、端口號和間隔時間(單位:毫秒)

    if (args.length != 3) {

      System.err.println("Usage: <filename> <port> <millisecond>")

      System.exit(1)

    }

 

    // 獲取指定文件總的行數

    val filename = args(0)

    val lines = Source.fromFile(filename).getLines.toList

    val filerow = lines.length

 

    // 指定監聽某端口,當外部程序請求時建立連接

    val listener = new ServerSocket(args(1).toInt)

    while (true) {

      val socket = listener.accept()

      new Thread() {

        override def run = {

          println("Got client connected from: " + socket.getInetAddress)

          val out = new PrintWriter(socket.getOutputStream(), true)

          while (true) {

            Thread.sleep(args(2).toLong)

            // 當該端口接受請求時,隨機獲取某行數據發送給對方

            val content = lines(index(filerow))

            println(content)

            out.write(content + '\n')

            out.flush()

          }

          socket.close()

        }

      }.start()

    }

  }

}

clip_image002

1.1.3 生成打包文件

【注】可以參見第3課《Spark編程模型(下)--IDEA搭建及實戰》進行打包

clip_image004

在打包配置界面中,需要在Class Path加入:/app/scala-2.10.4/lib/scala-swing.jar /app/scala-2.10.4/lib/scala-library.jar /app/scala-2.10.4/lib/scala-actors.jar ,各個jar包之間用空格分開,

點擊菜單Build->Build Artifacts,彈出選擇動作,選擇Build或者Rebuild動作,使用如下命令復制打包文件到Spark根目錄下

cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar

cp LearnSpark.jar /app/hadoop/spark-1.1.0/

ll /app/hadoop/spark-1.1.0/

clip_image006

1.2 實例1:讀取文件演示

1.2.1 演示說明

在該實例中Spark Streaming將監控某目錄中的文件,獲取在間隔時間段內變化的數據,然后通過Spark Streaming計算出改時間段內單詞統計數。

1.2.2 演示代碼

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.StreamingContext._

 

object FileWordCount {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("FileWordCount").setMaster("local[2]")

 

    // 創建Streaming的上下文,包括Spark的配置和時間間隔,這里時間為間隔20

    val ssc = new StreamingContext(sparkConf, Seconds(20))

 

    // 指定監控的目錄,在這里為/home/hadoop/temp/

    val lines = ssc.textFileStream("/home/hadoop/temp/")

 

    // 對指定文件夾變化的數據進行單詞統計並且打印

    val words = lines.flatMap(_.split(" "))

    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts.print()

 

       // 啟動Streaming

    ssc.start()

    ssc.awaitTermination()

  }

}

clip_image008

1.2.3 運行代碼

第一步   創建Streaming監控目錄

創建/home/hadoop/tempSpark Streaming監控的目錄,通過在該目錄中定時添加文件內容,然后由Spark Streaming統計出單詞個數

clip_image010

第二步   使用如下命令啟動Spark集群

$cd /app/hadoop/spark-1.1.0

$sbin/start-all.sh

第三步   IDEA中運行Streaming程序

IDEA中運行該實例,由於該實例沒有輸入參數故不需要配置參數,在運行日志中將定時打印時間戳。如果在監控目錄中加入文件內容,將輸出時間戳的同時將輸出單詞統計個數。

clip_image012

1.2.4 添加文本及內容

clip_image014

clip_image016

1.2.5 查看結果

第一步   查看IDEA中運行情況

IDEA的運行日志窗口中,可以觀察到輸出時間戳的同時將輸出單詞統計個數

clip_image018

第二步   通過webUI監控運行情況

http://hadoop1:4040監控Spark Streaming運行情況,可以觀察到每20秒運行一次作業

clip_image020

並且與其他運行作業相比在監控菜單增加了"Streaming"項目,點擊可以看到監控內容:

clip_image022

1.3 實例2:網絡數據演示

1.3.1 演示說明

在該實例中將由4.1流數據模擬以1秒的頻度發送模擬數據,Spark Streaming通過Socket接收流數據並每20秒運行一次用來處理接收到數據,處理完畢后打印該時間段內數據出現的頻度,即在各處理段時間之間狀態並無關系。

1.3.2 演示代碼

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.storage.StorageLevel

 

object NetworkWordCount {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")

    val sc = new SparkContext(conf)

    val ssc = new StreamingContext(sc, Seconds(20))

 

    // 通過Socket獲取數據,該處需要提供Socket的主機名和端口號,數據保存在內存和硬盤中

    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

 

    // 對讀入的數據進行分割、計數

    val words = lines.flatMap(_.split(","))

    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

 

    wordCounts.print()

    ssc.start()

    ssc.awaitTermination()

  }

}

clip_image024

1.3.3 運行代碼

第一步   啟動流數據模擬器

啟動4.1打包好的流數據模擬器,在該實例中將定時發送/home/hadoop/upload/class7目錄下的people.txt數據文件(該文件可以在本系列配套資源目錄/data/class7中找到),其中people.txt數據內容如下:

clip_image026

模擬器Socket端口號為9999,頻度為1秒,

$cd /app/hadoop/spark-1.1.0

$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000

clip_image028

在沒有程序連接時,該程序處於阻塞狀態

第二步   IDEA中運行Streaming程序

IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1和端口號為9999

clip_image030

1.3.4 查看結果

第一步   觀察模擬器發送情況

IDEA中的Spark Streaming程序運行與模擬器建立連接,當模擬器檢測到外部連接時開始發送測試數據,數據是隨機的在指定的文件中獲取一行數據並發送,時間間隔為1

clip_image032

第二步   在監控頁面觀察執行情況

webUI上監控作業運行情況,可以觀察到每20秒運行一次作業

clip_image034

第三步   IDEA運行情況

IDEA的運行窗口中,可以觀測到的統計結果,通過分析在Spark Streaming每段時間內單詞數為20,正好是20秒內每秒發送總數。

clip_image036

1.4 實例3:銷售數據統計演示

1.4.1 演示說明

在該實例中將由4.1流數據模擬器以1秒的頻度發送模擬數據(銷售數據),Spark Streaming通過Socket接收流數據並每5秒運行一次用來處理接收到數據,處理完畢后打印該時間段內銷售數據總和,需要注意的是各處理段時間之間狀態並無關系。

1.4.2 演示代碼

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.storage.StorageLevel

 

object SaleAmount {

  def main(args: Array[String]) {

    if (args.length != 2) {

      System.err.println("Usage: SaleAmount <hostname> <port> ")

      System.exit(1)

    }

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

 

    val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")

    val sc = new SparkContext(conf)

    val ssc = new StreamingContext(sc, Seconds(5))

 

   // 通過Socket獲取數據,該處需要提供Socket的主機名和端口號,數據保存在內存和硬盤中

    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

    val words = lines.map(_.split(",")).filter(_.length == 6)

    val wordCounts = words.map(x=>(1, x(5).toDouble)).reduceByKey(_ + _)

 

    wordCounts.print()

    ssc.start()

    ssc.awaitTermination()

  }

}

clip_image038

1.4.3 運行代碼

第一步   啟動流數據模擬器

啟動4.1打包好的流數據模擬器,在該實例中將定時發送第五課/home/hadoop/upload/class5/saledata目錄下的tbStockDetail.txt數據文件(參見第五課《5.Hive(下)--Hive實戰》中2.1.2數據描述,該文件可以在本系列配套資源目錄/data/class5/saledata中找到),其中表tbStockDetail字段分別為訂單號、行號、貨品、數量、金額,數據內容如下:

clip_image040

模擬器Socket端口號為9999,頻度為1

$cd /app/hadoop/spark-1.1.0

$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class5/saledata/tbStockDetail.txt 9999 1000

clip_image042

IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1和端口號為9999

clip_image044

1.4.4 查看結果

第一步   觀察模擬器發送情況

IDEA中的Spark Streaming程序運行與模擬器建立連接,當模擬器檢測到外部連接時開始發送銷售數據,時間間隔為1

clip_image046

 

第二步   IDEA運行情況

IDEA的運行窗口中,可以觀察到每5秒運行一次作業(兩次運行間隔為5000毫秒),運行完畢后打印該時間段內銷售數據總和。

clip_image048

第三步   在監控頁面觀察執行情況

webUI上監控作業運行情況,可以觀察到每5秒運行一次作業

clip_image050

1.5 實例4Stateful演示

1.5.1 演示說明

該實例為Spark Streaming狀態操作,模擬數據由4.1流數據模擬以1秒的頻度發送,Spark Streaming通過Socket接收流數據並每5秒運行一次用來處理接收到數據,處理完畢后打印程序啟動后單詞出現的頻度,相比較前面4.3實例在該實例中各時間段之間狀態是相關的。

1.5.2 演示代碼

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.StreamingContext._

 

object StatefulWordCount {

  def main(args: Array[String]) {

    if (args.length != 2) {

      System.err.println("Usage: StatefulWordCount <filename> <port> ")

      System.exit(1)

    }

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

 

    // 定義更新狀態方法,參數values為當前批次單詞頻度,state為以往批次單詞頻度

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {

      val currentCount = values.foldLeft(0)(_ + _)

      val previousCount = state.getOrElse(0)

      Some(currentCount + previousCount)

    }

 

    val conf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")

    val sc = new SparkContext(conf)

 

    // 創建StreamingContextSpark Steaming運行時間間隔為5

    val ssc = new StreamingContext(sc, Seconds(5))

    // 定義checkpoint目錄為當前目錄

    ssc.checkpoint(".")

 

    // 獲取從Socket發送過來數據

    val lines = ssc.socketTextStream(args(0), args(1).toInt)

    val words = lines.flatMap(_.split(","))

    val wordCounts = words.map(x => (x, 1))

 

    // 使用updateStateByKey來更新狀態,統計從運行開始以來單詞總的次數

    val stateDstream = wordCounts.updateStateByKey[Int](updateFunc)

    stateDstream.print()

    ssc.start()

    ssc.awaitTermination()

  }

}

clip_image052

1.5.3 運行代碼

第一步   啟動流數據模擬器

啟動4.1打包好的流數據模擬器,在該實例中將定時發送/home/hadoop/upload/class7目錄下的people.txt數據文件(該文件可以在本系列配套資源目錄/data/class7中找到),其中people.txt數據內容如下:

clip_image026[1]

模擬器Socket端口號為9999,頻度為1

$cd /app/hadoop/spark-1.1.0

$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000

clip_image028[1]

在沒有程序連接時,該程序處於阻塞狀態,IDEA中運行Streaming程序

IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1和端口號為9999

clip_image054

1.5.4 查看結果

第一步   IDEA運行情況

IDEA的運行窗口中,可以觀察到第一次運行統計單詞總數為1,第二次為6,第N次為5(N-1)+1,即統計單詞的總數為程序運行單詞數總和。

clip_image056

第二步   在監控頁面觀察執行情況

webUI上監控作業運行情況,可以觀察到每5秒運行一次作業

clip_image058

第三步   查看CheckPoint情況

在項目根目錄下可以看到checkpoint文件

clip_image060

1.6 實例5Window演示

1.6.1 演示說明

該實例為Spark Streaming窗口操作,模擬數據由4.1流數據模擬以1秒的頻度發送,Spark Streaming通過Socket接收流數據並每10秒運行一次用來處理接收到數據,處理完畢后打印程序啟動后單詞出現的頻度。相比前面的實例,Spark Streaming窗口統計是通過reduceByKeyAndWindow()方法實現的,在該方法中需要指定窗口時間長度和滑動時間間隔。

1.6.2 演示代碼

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

 

object WindowWordCount {

  def main(args: Array[String]) {

    if (args.length != 4) {

      System.err.println("Usage: WindowWorldCount <filename> <port> <windowDuration> <slideDuration>")

      System.exit(1)

    }

    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

 

    val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")

    val sc = new SparkContext(conf)

 

     // 創建StreamingContext

    val ssc = new StreamingContext(sc, Seconds(5))

     // 定義checkpoint目錄為當前目錄

    ssc.checkpoint(".")

 

    // 通過Socket獲取數據,該處需要提供Socket的主機名和端口號,數據保存在內存和硬盤中

    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)

    val words = lines.flatMap(_.split(","))

 

    // windows操作,第一種方式為疊加處理,第二種方式為增量處理

    val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))

//val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_, _-_,Seconds(args(2).toInt), Seconds(args(3).toInt))

 

    wordCounts.print()

    ssc.start()

    ssc.awaitTermination()

  }

}

clip_image062

1.6.3 運行代碼

第一步   啟動流數據模擬器

啟動4.1打包好的流數據模擬器,在該實例中將定時發送/home/hadoop/upload/class7目錄下的people.txt數據文件(該文件可以在本系列配套資源目錄/data/class7中找到),其中people.txt數據內容如下:

clip_image026[2]

模擬器Socket端口號為9999,頻度為1

$cd /app/hadoop/spark-1.1.0

$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000

clip_image028[2]

在沒有程序連接時,該程序處於阻塞狀態,IDEA中運行Streaming程序

IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1、端口號為9999、時間窗口為30秒和滑動時間間隔10

clip_image064

1.6.4 查看結果

第一步   IDEA運行情況

IDEA的運行窗口中,可以觀察到第一次運行統計單詞總數為4,第二次為14,第N次為10(N-1)+4,即統計單詞的總數為程序運行單詞數總和。

clip_image066

第二步   在監控頁面觀察執行情況

webUI上監控作業運行情況,可以觀察到每10秒運行一次作業

clip_image068

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM