【注】該系列文章以及使用到安裝包/測試數據 可以在《傾情大奉送--Spark入門實戰系列》獲取
1、實例演示
1.1 流數據模擬器
1.1.1 流數據說明
在實例演示中模擬實際情況,需要源源不斷地接入流數據,為了在演示過程中更接近真實環境將定義流數據模擬器。該模擬器主要功能:通過Socket方式監聽指定的端口號,當外部程序通過該端口連接並請求數據時,模擬器將定時將指定的文件數據隨機獲取發送給外部程序。
1.1.2 模擬器代碼
import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source
object StreamingSimulation {
// 定義隨機獲取整數的方法
def index(length: Int) = {
import java.util.Random
val rdm = new Random
rdm.nextInt(length)
}
def main(args: Array[String]) {
// 調用該模擬器需要三個參數,分為為文件路徑、端口號和間隔時間(單位:毫秒)
if (args.length != 3) {
System.err.println("Usage: <filename> <port> <millisecond>")
System.exit(1)
}
// 獲取指定文件總的行數
val filename = args(0)
val lines = Source.fromFile(filename).getLines.toList
val filerow = lines.length
// 指定監聽某端口,當外部程序請求時建立連接
val listener = new ServerSocket(args(1).toInt)
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(args(2).toLong)
// 當該端口接受請求時,隨機獲取某行數據發送給對方
val content = lines(index(filerow))
println(content)
out.write(content + '\n')
out.flush()
}
socket.close()
}
}.start()
}
}
}
1.1.3 生成打包文件
【注】可以參見第3課《Spark編程模型(下)--IDEA搭建及實戰》進行打包
在打包配置界面中,需要在Class Path加入:/app/scala-2.10.4/lib/scala-swing.jar /app/scala-2.10.4/lib/scala-library.jar /app/scala-2.10.4/lib/scala-actors.jar ,各個jar包之間用空格分開,
點擊菜單Build->Build Artifacts,彈出選擇動作,選擇Build或者Rebuild動作,使用如下命令復制打包文件到Spark根目錄下
cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar
cp LearnSpark.jar /app/hadoop/spark-1.1.0/
ll /app/hadoop/spark-1.1.0/
1.2 實例1:讀取文件演示
1.2.1 演示說明
在該實例中Spark Streaming將監控某目錄中的文件,獲取在間隔時間段內變化的數據,然后通過Spark Streaming計算出改時間段內單詞統計數。
1.2.2 演示代碼
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
object FileWordCount {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("FileWordCount").setMaster("local[2]")
// 創建Streaming的上下文,包括Spark的配置和時間間隔,這里時間為間隔20秒
val ssc = new StreamingContext(sparkConf, Seconds(20))
// 指定監控的目錄,在這里為/home/hadoop/temp/
val lines = ssc.textFileStream("/home/hadoop/temp/")
// 對指定文件夾變化的數據進行單詞統計並且打印
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
// 啟動Streaming
ssc.start()
ssc.awaitTermination()
}
}
1.2.3 運行代碼
第一步 創建Streaming監控目錄
創建/home/hadoop/temp為Spark Streaming監控的目錄,通過在該目錄中定時添加文件內容,然后由Spark Streaming統計出單詞個數
第二步 使用如下命令啟動Spark集群
$cd /app/hadoop/spark-1.1.0
$sbin/start-all.sh
第三步 在IDEA中運行Streaming程序
在IDEA中運行該實例,由於該實例沒有輸入參數故不需要配置參數,在運行日志中將定時打印時間戳。如果在監控目錄中加入文件內容,將輸出時間戳的同時將輸出單詞統計個數。
1.2.4 添加文本及內容
1.2.5 查看結果
第一步 查看IDEA中運行情況
在IDEA的運行日志窗口中,可以觀察到輸出時間戳的同時將輸出單詞統計個數
第二步 通過webUI監控運行情況
在http://hadoop1:4040監控Spark Streaming運行情況,可以觀察到每20秒運行一次作業
並且與其他運行作業相比在監控菜單增加了"Streaming"項目,點擊可以看到監控內容:
1.3 實例2:網絡數據演示
1.3.1 演示說明
在該實例中將由4.1流數據模擬以1秒的頻度發送模擬數據,Spark Streaming通過Socket接收流數據並每20秒運行一次用來處理接收到數據,處理完畢后打印該時間段內數據出現的頻度,即在各處理段時間之間狀態並無關系。
1.3.2 演示代碼
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(20))
// 通過Socket獲取數據,該處需要提供Socket的主機名和端口號,數據保存在內存和硬盤中
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
// 對讀入的數據進行分割、計數
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
1.3.3 運行代碼
第一步 啟動流數據模擬器
啟動4.1打包好的流數據模擬器,在該實例中將定時發送/home/hadoop/upload/class7目錄下的people.txt數據文件(該文件可以在本系列配套資源目錄/data/class7中找到),其中people.txt數據內容如下:
模擬器Socket端口號為9999,頻度為1秒,
$cd /app/hadoop/spark-1.1.0
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000
在沒有程序連接時,該程序處於阻塞狀態
第二步 在IDEA中運行Streaming程序
在IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1和端口號為9999
1.3.4 查看結果
第一步 觀察模擬器發送情況
IDEA中的Spark Streaming程序運行與模擬器建立連接,當模擬器檢測到外部連接時開始發送測試數據,數據是隨機的在指定的文件中獲取一行數據並發送,時間間隔為1秒
第二步 在監控頁面觀察執行情況
在webUI上監控作業運行情況,可以觀察到每20秒運行一次作業
第三步 IDEA運行情況
在IDEA的運行窗口中,可以觀測到的統計結果,通過分析在Spark Streaming每段時間內單詞數為20,正好是20秒內每秒發送總數。
1.4 實例3:銷售數據統計演示
1.4.1 演示說明
在該實例中將由4.1流數據模擬器以1秒的頻度發送模擬數據(銷售數據),Spark Streaming通過Socket接收流數據並每5秒運行一次用來處理接收到數據,處理完畢后打印該時間段內銷售數據總和,需要注意的是各處理段時間之間狀態並無關系。
1.4.2 演示代碼
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
object SaleAmount {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println("Usage: SaleAmount <hostname> <port> ")
System.exit(1)
}
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("SaleAmount").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
// 通過Socket獲取數據,該處需要提供Socket的主機名和端口號,數據保存在內存和硬盤中
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.map(_.split(",")).filter(_.length == 6)
val wordCounts = words.map(x=>(1, x(5).toDouble)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
1.4.3 運行代碼
第一步 啟動流數據模擬器
啟動4.1打包好的流數據模擬器,在該實例中將定時發送第五課/home/hadoop/upload/class5/saledata目錄下的tbStockDetail.txt數據文件(參見第五課《5.Hive(下)--Hive實戰》中2.1.2數據描述,該文件可以在本系列配套資源目錄/data/class5/saledata中找到),其中表tbStockDetail字段分別為訂單號、行號、貨品、數量、金額,數據內容如下:
模擬器Socket端口號為9999,頻度為1秒
$cd /app/hadoop/spark-1.1.0
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class5/saledata/tbStockDetail.txt 9999 1000
在IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1和端口號為9999
1.4.4 查看結果
第一步 觀察模擬器發送情況
IDEA中的Spark Streaming程序運行與模擬器建立連接,當模擬器檢測到外部連接時開始發送銷售數據,時間間隔為1秒
第二步 IDEA運行情況
在IDEA的運行窗口中,可以觀察到每5秒運行一次作業(兩次運行間隔為5000毫秒),運行完畢后打印該時間段內銷售數據總和。
第三步 在監控頁面觀察執行情況
在webUI上監控作業運行情況,可以觀察到每5秒運行一次作業
1.5 實例4:Stateful演示
1.5.1 演示說明
該實例為Spark Streaming狀態操作,模擬數據由4.1流數據模擬以1秒的頻度發送,Spark Streaming通過Socket接收流數據並每5秒運行一次用來處理接收到數據,處理完畢后打印程序啟動后單詞出現的頻度,相比較前面4.3實例在該實例中各時間段之間狀態是相關的。
1.5.2 演示代碼
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
object StatefulWordCount {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println("Usage: StatefulWordCount <filename> <port> ")
System.exit(1)
}
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// 定義更新狀態方法,參數values為當前批次單詞頻度,state為以往批次單詞頻度
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val conf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
// 創建StreamingContext,Spark Steaming運行時間間隔為5秒
val ssc = new StreamingContext(sc, Seconds(5))
// 定義checkpoint目錄為當前目錄
ssc.checkpoint(".")
// 獲取從Socket發送過來數據
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1))
// 使用updateStateByKey來更新狀態,統計從運行開始以來單詞總的次數
val stateDstream = wordCounts.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
1.5.3 運行代碼
第一步 啟動流數據模擬器
啟動4.1打包好的流數據模擬器,在該實例中將定時發送/home/hadoop/upload/class7目錄下的people.txt數據文件(該文件可以在本系列配套資源目錄/data/class7中找到),其中people.txt數據內容如下:
模擬器Socket端口號為9999,頻度為1秒
$cd /app/hadoop/spark-1.1.0
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000
在沒有程序連接時,該程序處於阻塞狀態,在IDEA中運行Streaming程序
在IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1和端口號為9999
1.5.4 查看結果
第一步 IDEA運行情況
在IDEA的運行窗口中,可以觀察到第一次運行統計單詞總數為1,第二次為6,第N次為5(N-1)+1,即統計單詞的總數為程序運行單詞數總和。
第二步 在監控頁面觀察執行情況
在webUI上監控作業運行情況,可以觀察到每5秒運行一次作業
第三步 查看CheckPoint情況
在項目根目錄下可以看到checkpoint文件
1.6 實例5:Window演示
1.6.1 演示說明
該實例為Spark Streaming窗口操作,模擬數據由4.1流數據模擬以1秒的頻度發送,Spark Streaming通過Socket接收流數據並每10秒運行一次用來處理接收到數據,處理完畢后打印程序啟動后單詞出現的頻度。相比前面的實例,Spark Streaming窗口統計是通過reduceByKeyAndWindow()方法實現的,在該方法中需要指定窗口時間長度和滑動時間間隔。
1.6.2 演示代碼
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object WindowWordCount {
def main(args: Array[String]) {
if (args.length != 4) {
System.err.println("Usage: WindowWorldCount <filename> <port> <windowDuration> <slideDuration>")
System.exit(1)
}
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
// 創建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
// 定義checkpoint目錄為當前目錄
ssc.checkpoint(".")
// 通過Socket獲取數據,該處需要提供Socket的主機名和端口號,數據保存在內存和硬盤中
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(","))
// windows操作,第一種方式為疊加處理,第二種方式為增量處理
val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
//val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_, _-_,Seconds(args(2).toInt), Seconds(args(3).toInt))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
1.6.3 運行代碼
第一步 啟動流數據模擬器
啟動4.1打包好的流數據模擬器,在該實例中將定時發送/home/hadoop/upload/class7目錄下的people.txt數據文件(該文件可以在本系列配套資源目錄/data/class7中找到),其中people.txt數據內容如下:
模擬器Socket端口號為9999,頻度為1秒
$cd /app/hadoop/spark-1.1.0
$java -cp LearnSpark.jar class7.StreamingSimulation /home/hadoop/upload/class7/people.txt 9999 1000
在沒有程序連接時,該程序處於阻塞狀態,在IDEA中運行Streaming程序
在IDEA中運行該實例,該實例需要配置連接Socket主機名和端口號,在這里配置參數機器名為hadoop1、端口號為9999、時間窗口為30秒和滑動時間間隔10秒
1.6.4 查看結果
第一步 IDEA運行情況
在IDEA的運行窗口中,可以觀察到第一次運行統計單詞總數為4,第二次為14,第N次為10(N-1)+4,即統計單詞的總數為程序運行單詞數總和。
第二步 在監控頁面觀察執行情況
在webUI上監控作業運行情況,可以觀察到每10秒運行一次作業