Spark Streaming官方文檔學習--上


官方文檔地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html

Spark Streaming是spark api的擴展
能實現可擴展,高吞吐,可容錯,的流式處理
從外接數據源接受數據流,處理數據流使用的是復雜的高度抽象的算法函數map reduce join window等
輸出的數據可以存儲到文件系統和數據庫甚至是直接展示在命令行
也可以應用ml 和graph processing在這些數據流上

spark streaming本質還是spark只是實現了所謂的微批量
 spark streaming中連續數據流用DStream表示,DStream可以從輸入數據創建,也可以從其他的DStream轉化來
本質上DStream是一組RDD組成的序列

一個迅速上手的例子:
   
   
   
           
  1. # coding: utf-8
  2. # In[ ]:
  3. from pyspark import SparkContext
  4. from pyspark.streaming import StreamingContext
  5. # In[ ]:
  6. #創建兩個工作線程,將這兩個線程喂給StreamingContext,時間間隔是1秒
  7. #這里有個錯誤Cannot run multiple SparkContexts at once
  8. #參考:http://stackoverflow.com/questions/28259756/how-to-create-multiple-sparkcontexts-in-a-console
  9. #先要嘗試關閉sc才能創建多個SparkContext
  10. try:
  11. sc.stop()
  12. except:
  13. pass
  14. sc = SparkContext("local[2]", "NetworkWordCount")
  15. ssc = StreamingContext(sc, 1)
  16. #sc.stop()
  17. # In[ ]:
  18. #創建一個DStream,從本機的這個端口取數據
  19. lines = ssc.socketTextStream("localhost", 9999)
  20. # In[ ]:
  21. #lines中的數據記錄是一行行的文本,下面將文本切割成字
  22. words = lines.flatMap(lambda line: line.split(" "))
  23. #這里的flatMap是一對多DStream操作,生成一個新的DStream,就是變成了字流了
  24. # In[ ]:
  25. #下面數一數每一批次的字的個數
  26. # Count each word in each batch
  27. pairs = words.map(lambda word: (word, 1))
  28. wordCounts = pairs.reduceByKey(lambda x, y: x + y)
  29. # In[ ]:
  30. # 打印DStream中的每個RDD的前十個元素
  31. wordCounts.pprint()
  32. # In[ ]:
  33. ssc.start() # 開始計算
  34. ssc.awaitTermination() #等待計算停止
  35. # In[ ]:
  36. #將這個文件的py腳本提交計算: ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
  37. #在命令行輸入nc -lk 9999 然后模擬輸入字符串文本,那么在pyspark命令行會打印出每秒鍾輸入的數據的統計結果


基本概念

    要想寫自己的streaming程序,首先要添加maven或者sbt的依賴
    
  
  
  
          
  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming_2.11</artifactId>
  4. <version>2.0.0</version>
  5. </dependency>

對於外部輸入流的依賴現在不在核心api中了,需要單獨添加依賴。

初始化StreamingContext
可以從SparkContext對象中創建
   
   
   
           
  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. sc = SparkContext(master, appName)
  4. ssc = StreamingContext(sc, 1)
appName是程序名字可以在UI中顯示
master是Spark,Mesos,YARN cluster URL    或者是 聲明的Local[*]字符串使得運行在本地模式
當運行在集群上的時候,不要寫死在代碼里面,而是要從spark-submit啟動,傳遞進去。
對於本地測試或者單元測試,可以傳遞local[*]

在context被定義之后,要做下面的事情:
1. 通過創建DStream去定義輸入資源
2. 通過對DStream的轉換和輸出操作定義流的計算
3. 使用streamingContext.start()開始接收數據並處理數據
4. 使用streamingContext.awaitTermination()等待處理過程停止(手動或者因為錯誤)
5. 可以使用streamingContext.stop()手動停止處理過程

要點:
1. 一旦一個context被啟動,不能再添加任何新的流進去了
2. 一旦context被停止,就不能重啟了
3. 在JVM中只能有一個StreamingContext被激活
4. 在streamingContext上使用stop()也會停止SparkContext(),要想單獨停止前者,設置stop()的可選的參數 stopSparkContext()參數為false
5. 一個sparkContext可以被重復使用,去創建多個StreamingContext,只要前一個StreamingContext被單獨停止,下一個就可以接着創建。

Discretized Streams(DStreams)

是Spark Streaming的基本的抽象,代表了一個連續的數據流
可以是從數據源接受的輸入數據流,也可以是轉換輸入數據流得到的數據流
一個DStream代表一串RDD,RDD是不可分割的基本數據單元抽象。
每一個在DStream中的RDD包含特定時間間隔的數據
 
如果時間是1秒的話,從0-1秒的很多RDD,與從1-2的RDD等,組成了DStream

任何對DStream的操作,都會被翻譯成對底層的RDD的操作,例如,將Lines轉換成words的操作
 
對DStream的操作,隱藏了很多細節,給開發者提供高度抽象的API

輸入DStreams和Receivers
每一個輸出DStream(除了file strem)都關聯一個Receiver對象,這個對象從數據源接受數據存儲在spark的內存中等待處理。
Spark Streaming支持兩種類型的內建數據源
    1. Basic Sources:直接在StreamingContext API中可用的Sources,比如file systems和socket connections
    2. Advances Sources:從外部工具類中調過來的例如Kafka,Flume,Kinesis等,需要鏈接一些外部依賴。

關鍵點:
    1. 注意在本地運行SparkStreaming的時候,不要使用local或者local[1]作為主機的URL,因為這些都是意味着開一個線程,因為如果只是輸入一個數據源,那么這個單一的線程會用來運行receiver,那么沒有線程去處理接收到的數據了。所以本地運行的時候,參數local[n]中的n最好大於運行中的receiver。
    2. 相應的在集群上運行的時候,分配的核心數要比接收者的數目多,否則的話系統能接收數據,但是不能處理。

Basic Sources 基礎數據源

在基礎例子中已經看到過ssc.socketTextStream(),下面看file streams
   
   
   
           
  1. streamingContext.textFileStream(dataDirectory)
可以創建一個DStream
spark會監控這個路徑,處理在那個路徑中的任何文件
注意:
    1. 路徑中的文件要有相同的數據格式
    2. 文件必須通過自動專業或者重命名進入到這個路徑的
    3. 一旦進入,這些文件不能被改變,所以如果文件被連續附加,那么新的數據不能被讀取的
針對簡單的文本文件,有個簡單的方法streamingContext.textFileStream(dataDirectory)
因為file stream不需要運行receiver所以不需要分配核心或者線程去處理
Python API不支持fileStream只是支持textFileStream

可以使用RDD的queue去創建DStream,使用streamingContext.queueStream(queueofRDDs)

Advanced Souces 高級數據源

As of Spark 2.0.0, Kafka, Kinesis and Flume are available in the Python API.
因為這些高級的數據源的支持比較復雜,需要依賴單獨的包,現在被轉移出了核心的API,所以不能再shell中使用,也就不能在shell中測試這些數據源。如果非要的話,需要下載對應的maven jar包,和對應的依賴,然后添加到classpath

Custom Sources自定義數據源

現在python還不支持,但是要想從自定義的數據源創建DStream,就要自己實現用戶定義的receiver,這可以接受自定義的數據,並且發送到spark中

Receiver Reliability 接收者的可靠性

按照可靠性可以把數據分為兩種,有的數據源例如Kafka和Flume運行傳送被回復的數據、
如果系統正確接受到這些要被確認的數據,可以保證不會因為某種失敗而導致數據丟失。這導致兩種類型的接收者。
    1. 可靠的接收者:當數據被接受並存儲到spark之后,必須回復確認消息給可靠數據源。
    2. 不可靠的接收者:不用回復確認,針對沒有確認機制的數據源,或者有確認機制但是不需要執行復雜確認機制的數據源。


Transformations on DStreams DStream的轉換
map(fun) 這個函數將輸入的DStream的每一個元素傳遞給func得到一個新的DStream
flatMap(func) 同上,只是每個輸入可以map到多個輸出項
filter(func) 選擇func返回結果為true的DStream中的記錄組成新的DStream
reparitition(numPartitions) 通過改變划分去改變DStream的並行水平
union(otherStream) 合並
count() 返回一個新的DStream,是原始的DStream中的每個RDD的元素的數目
reduce(func) 使用函數func聚合原始數據匯總的每個RDD得到一個新的單一元素RDD組成的DStream
countByValue() 調用類型K的DStream時候返回一個新的DStream有(K,long)對,其中long是k在每個RDD中出現的頻率
reduceByKey(func,[numTasks]) 將(k,v)中的v按照k使用func進行聚合
join(otherStream,[numTasks]) (k,v)(k,w)得到(k,(v,w))
cogroup(otherStream,[numTasks]) (k,v)(k,w)得到(k,Seq[V],Seq[W])
tansform(func) 作用在每個RDD上得到新的RDD組成的DStream
updateStateByKey(func) 每個鍵都是通過將給定的函數作用在其值上得到的新的DStream

下面是對某些轉換的詳細的討論
UpdateStateByKey Operation
允許當使用新的信息連續更新的時候,維護任意的狀態
    1. 定義狀態,這個狀態可以是任意的數據類型
    2. 定義狀態的更新函數
不管有沒有數據,spark都會更新狀態,如果更新函數返回為none那么鍵值對就會被消除
假設想要維持一個運行時數目,那么運行時數目就是一個狀態,是個整數,下面是一個更新函數
   
   
   
           
  1. def updateFunction(newValues, runningCount):
  2. if runningCount is None:
  3. runningCount = 0
  4. return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
假設使用前面的paris DStream包含(word,1)對
    
    
    
            
  1. runningCounts = pairs.updateStateByKey(updateFunction)


轉換操作:
tranform操作允許任意的RDD-to-RDD函數應用到DStream,下面是一個例子:
    
    
    
            
  1. spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
  2. # join data stream with spam information to do data cleaning
  3. cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))


窗口操作:
   允許我們應用transformation到一個滑動窗口的數據上
 上面的例子說明了每個窗口操作要聲明下面的兩個參數
    windows length:窗口的長度,上面的例子是3
    sliding interval:窗口被執行的時間間隔,例子中的書2
上面的兩個參數都應該是元素DStream批間隔(上面的間隔是1)的整數倍

下面是一個窗口操作的例子,假設我們想生成過去的30秒的數據的wordcounts,每10秒鍾一次
    
    
    
            
  1. # Reduce last 30 seconds of data, every 10 seconds
  2. windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
下面是一個 常見的窗口操作的描述,所有的操作都傳遞兩個參數,窗口的長度和時間間隔
window(長度,間隔) 原來的DStream按照新的指定窗口進行切分返回新的DStream
countByWindow(長度,間隔) 返回滑動窗口的元素個數
reduceByWindow(func, windowLength, slideInterval)
讀原來的DStream數據進行聚合得到新的DStream
reduceByKeyAndWindow(func,長度,間隔,[numtasks]) (k,v)中的k被函數合並得到新的DStream
reduceByKeyAndWindow(func,invFunc,長度,間隔,[numtasks]) 比上面的更高效,對窗口內的數據增量聚合和逐步移去得到聚合后新的DStream
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 根據窗口計算每個元素的頻次

Join Operations
下面是簡單的流的join
    
    
    
            
  1. stream1 = ...
  2. stream2 = ...
  3. joinedStream = stream1.join(stream2)
You can also do leftOuterJoin, rightOuterJoin, fullOuterJoin
下面是基於窗口的流的Join
    
    
    
            
  1. windowedStream1 = stream1.window(20)
  2. windowedStream2 = stream2.window(60)
  3. joinedStream = windowedStream1.join(windowedStream2)
sream-dataset的join
流和數據集的join操作是使用lambda表達式實現的
    
    
    
            
  1. dataset = ... # some RDD
  2. windowedStream = stream.window(20)
  3. joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))


DStream的輸出操作

print()
前十個元素打印出來
saveAsTextFiles(prefix, [suffix])
將DStream中的內容以文本方式保存成文件,每次批處理間隔內產生的文件按照prefix-TIME_IN_MS[.suffix]命名
saveAsObjectFiles(prefix, [suffix])
將DStream中的內容按對象序列化並且以SequenceFile格式保存,每次批處理間隔文件按照上面的命名
saveAsHadoopFiles(prefix, [suffix])
將DStream中的內容按對象序列化並且以hadoop格式保存,每次批處理間隔文件按照上面的命名
foreachRDD(func)
對每個RDD應用這個函數,將RDD保存在外部文件中

Design Patterns for using foreachRDD
foreachRDD的設計模式
dstream.foreachRDD非常強大,但是容易出錯
將數據寫到外部系統需要創建一個連接對象,使用這個對象例如Tcp Connection發送數據到遠程的系統
開發者可能會錯誤的連接到Spark Driver,然后試圖在worker中使用將數據保存到RDD中
例如:
     
     
     
             
  1. def sendRecord(rdd):
  2. connection = createNewConnection() # executed at the driver
  3. rdd.foreach(lambda record: connection.send(record))
  4. connection.close()
  5. dstream.foreachRDD(sendRecord)
這是錯誤的,因為這要求連接對象序列化並且從driver發送到worker
這樣的連接對象很少能跨機器轉讓
正確的做法是在worker中創建連接對象
     
     
     
             
  1. def sendRecord(record):
  2. connection = createNewConnection()
  3. connection.send(record)
  4. connection.close()
  5. dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
通常的,創建一個對象需要時間和資源的管理費用,因此,為每個記錄創建和摧毀連接對象可能會帶來不必要的管理費用,這可能會顯著降低系統的吞吐量,一個更好的解決方案是使用rdd.foreachPartition去創造唯一連接對象,並且用這個對象發送所有的RDD。
     
     
     
             
  1. def sendPartition(iter):
  2. connection = createNewConnection()
  3. for record in iter:
  4. connection.send(record)
  5. connection.close()
  6. dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
最終的優化是,跨RDD或者批次,重用連接對象
程序員可以維護一個連接對象的靜態的池。
     
     
     
             
  1. def sendPartition(iter):
  2. # ConnectionPool is a static, lazily initialized pool of connections
  3. connection = ConnectionPool.getConnection()
  4. for record in iter:
  5. connection.send(record)
  6. # return to the pool for future reuse
  7. ConnectionPool.returnConnection(connection)
  8. dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))







免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM