1 foreachRDD
- output operation算子,必須對抽取出來的RDD執行action類算子,代碼才能執行。
- 代碼:見上個隨筆例子
2 transform
- transformation類算子
- 可以通過transform算子,對Dstream做RDD到RDD的任意操作。
- 代碼:
/** * 過濾黑名單 * transform操作 * DStream可以通過transform做RDD到RDD的任意操作。 * @author root * */ public class TransformOperator { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local[2]").setAppName("transform"); JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5)); //黑名單 List<String> list = Arrays.asList("zhangsan"); final Broadcast<List<String>> bcBlackList = jsc.sparkContext().broadcast(list); //接受socket數據源 JavaReceiverInputDStream<String> nameList = jsc.socketTextStream("node5", 9999); JavaPairDStream<String, String> pairNameList = nameList.mapToPair(new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<String, String>(s.split(" ")[1], s); } }); /** * transform 可以拿到DStream中的RDD,做RDD到RDD之間的轉換,不需要Action算子觸發,需要返回RDD類型。 * 注意:transform call方法內,拿到RDD 算子外的代碼 在Driver端執行,也可以做到動態改變廣播變量。 */ JavaDStream<String> transFormResult = pairNameList.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String>>() { private static final long serialVersionUID = 1L; @Override public JavaRDD<String> call(JavaPairRDD<String, String> nameRDD) throws Exception { JavaPairRDD<String, String> filter = nameRDD.filter(new Function<Tuple2<String,String>, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<String, String> tuple) throws Exception { return !bcBlackList.value().contains(tuple._1); } }); JavaRDD<String> map = filter.map(new Function<Tuple2<String,String>, String>() { private static final long serialVersionUID = 1L; @Override public String call(Tuple2<String, String> tuple) throws Exception { return tuple._2; } }); //返回過濾好的結果 return map; } }); transFormResult.print(); jsc.start(); jsc.awaitTermination(); jsc.stop(); } }
3 updateStateByKey
- transformation算子
- updateStateByKey作用:
1) 為SparkStreaming中每一個Key維護一份state狀態,state類型可以是任意類型的,可以是一個自定義的對象,更新函數也可以是自定義的。
2) 通過更新函數對該key的狀態不斷更新,對於每個新的batch而言,SparkStreaming會在使用updateStateByKey的時候為已經存在的key進行state的狀態更新。
- 使用到updateStateByKey要開啟checkpoint機制和功能。
- 多久會將內存中的數據寫入到磁盤一份?
如果batchInterval設置的時間小於10秒,那么10秒寫入磁盤一份。如果batchInterval設置的時間大於10秒,那么就會batchInterval時間間隔寫入磁盤一份。
- 代碼
public class UpdateStateByKeyDemo { public static void main(String[] args) { /* * 第一步:配置SparkConf: * 1,至少2條線程:因為Spark Streaming應用程序在運行的時候,至少有一條 * 線程用於不斷的循環接收數據,並且至少有一條線程用於處理接受的數據(否則的話無法 * 有線程用於處理數據,隨着時間的推移,內存和磁盤都會不堪重負); * 2,對於集群而言,每個Executor一般肯定不止一個Thread,那對於處理Spark Streaming的 * 應用程序而言,每個Executor一般分配多少Core比較合適?根據我們過去的經驗,5個左右的 * Core是最佳的(一個段子分配為奇數個Core表現最佳,例如3個、5個、7個Core等); */ SparkConf conf = new SparkConf().setMaster("local[2]"). setAppName("UpdateStateByKeyDemo"); /* * 第二步:創建SparkStreamingContext: * 1,這個是SparkStreaming應用程序所有功能的起始點和程序調度的核心 * SparkStreamingContext的構建可以基於SparkConf參數,也可基於持久化的SparkStreamingContext的內容 * 來恢復過來(典型的場景是Driver崩潰后重新啟動,由於Spark Streaming具有連續7*24小時不間斷運行的特征, * 所有需要在Driver重新啟動后繼續上衣系的狀態,此時的狀態恢復需要基於曾經的Checkpoint); * 2,在一個Spark Streaming應用程序中可以創建若干個SparkStreamingContext對象,使用下一個SparkStreamingContext * 之前需要把前面正在運行的SparkStreamingContext對象關閉掉,由此,我們獲得一個重大的啟發SparkStreaming框架也只是 * Spark Core上的一個應用程序而已,只不過Spark Streaming框架箱運行的話需要Spark工程師寫業務邏輯處理代碼; */ JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); //報錯解決辦法做checkpoint,開啟checkpoint機制,把checkpoint中的數據放在這里設置的目錄中, //生產環境下一般放在HDFS中 jsc.checkpoint("/usr/local/tmp/checkpoint"); /* * 第三步:創建Spark Streaming輸入數據來源input Stream: * 1,數據輸入來源可以基於File、HDFS、Flume、Kafka、Socket等 * 2, 在這里我們指定數據來源於網絡Socket端口,Spark Streaming連接上該端口並在運行的時候一直監聽該端口 * 的數據(當然該端口服務首先必須存在),並且在后續會根據業務需要不斷的有數據產生(當然對於Spark Streaming * 應用程序的運行而言,有無數據其處理流程都是一樣的); * 3,如果經常在每間隔5秒鍾沒有數據的話不斷的啟動空的Job其實是會造成調度資源的浪費,因為並沒有數據需要發生計算,所以 * 實例的企業級生成環境的代碼在具體提交Job前會判斷是否有數據,如果沒有的話就不再提交Job; */ JavaReceiverInputDStream lines = jsc.socketTextStream("hadoop100", 9999); /* * 第四步:接下來就像對於RDD編程一樣基於DStream進行編程!!!原因是DStream是RDD產生的模板(或者說類),在Spark Streaming具體 * 發生計算前,其實質是把每個Batch的DStream的操作翻譯成為對RDD的操作!!! *對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算 * 第4.1步:講每一行的字符串拆分成單個的單詞 */ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { //如果是Scala,由於SAM轉換,所以可以寫成val words = lines.flatMap { line => line.split(" ")} @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); /* * 第四步:對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算 * 第4.2步:在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word, 1) */ JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); /* * 第四步:對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算 *第4.3步:在這里是通過updateStateByKey來以Batch Interval為單位來對歷史狀態進行更新, * 這是功能上的一個非常大的改進,否則的話需要完成同樣的目的,就可能需要把數據保存在Redis、 * Tagyon或者HDFS或者HBase或者數據庫中來不斷的完成同樣一個key的State更新,如果你對性能有極為苛刻的要求, * 且數據量特別大的話,可以考慮把數據放在分布式的Redis或者Tachyon內存文件系統中; * 當然從Spark1.6.x開始可以嘗試使用mapWithState,Spark2.X后mapWithState應該非常穩定了。 */ JavaPairDStream<String, Integer> wordsCount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce) @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception { Integer updatedValue = 0 ; if(state.isPresent()){ updatedValue = state.get(); } for(Integer value: values){ updatedValue += value; } return Optional.of(updatedValue); } }); /* *此處的print並不會直接出發Job的執行,因為現在的一切都是在Spark Streaming框架的控制之下的,對於Spark Streaming *而言具體是否觸發真正的Job運行是基於設置的Duration時間間隔的 *諸位一定要注意的是Spark Streaming應用程序要想執行具體的Job,對Dtream就必須有output Stream操作, *output Stream有很多類型的函數觸發,類print、saveAsTextFile、saveAsHadoopFiles等,最為重要的一個 *方法是foraeachRDD,因為Spark Streaming處理的結果一般都會放在Redis、DB、DashBoard等上面,foreachRDD *主要就是用用來完成這些功能的,而且可以隨意的自定義具體數據到底放在哪里!!! */ wordsCount.print(); /* * Spark Streaming執行引擎也就是Driver開始運行,Driver啟動的時候是位於一條新的線程中的,當然其內部有消息循環體,用於 * 接受應用程序本身或者Executor中的消息; */ jsc.start(); jsc.awaitTermination(); jsc.close(); }
4 窗口操作
- 窗口操作理解圖:
假設每隔5s 1個batch,上圖中窗口長度為15s,窗口滑動間隔10s。
- 窗口長度和滑動間隔必須是batchInterval的整數倍。如果不是整數倍會檢測報錯。
- 優化后的window窗口操作示意圖:
- 優化后的window操作要保存狀態所以要設置checkpoint路徑,沒有優化的window操作可以不設置checkpoint路徑。
- 代碼:
/** * 基於滑動窗口的熱點搜索詞實時統計 * @author root * */ public class WindowOperator { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("WindowHotWord"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 設置日志級別為WARN * */ jssc.sparkContext().setLogLevel("WARN"); /** * 注意: * 沒有優化的窗口函數可以不設置checkpoint目錄 * 優化的窗口函數必須設置checkpoint目錄 */ // jssc.checkpoint("hdfs://node1:9000/spark/checkpoint"); jssc.checkpoint("./checkpoint"); JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream("node5", 9999); JavaDStream<String> window = searchLogsDStream.window(Durations.seconds(15), Durations.seconds(5)); //word 1 JavaDStream<String> searchWordsDStream = searchLogsDStream.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); // 將搜索詞映射為(searchWord, 1)的tuple格式 JavaPairDStream<String, Integer> searchWordPairDStream = searchWordsDStream.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String searchWord) throws Exception { return new Tuple2<String, Integer>(searchWord, 1); } }); /** * 每隔10秒,計算最近60秒內的數據,那么這個窗口大小就是60秒,里面有12個rdd,在沒有計算之前,這些rdd是不會進行計算的。 * 那么在計算的時候會將這12個rdd聚合起來,然后一起執行reduceByKeyAndWindow操作 , * reduceByKeyAndWindow是針對窗口操作的而不是針對DStream操作的。 */ // JavaPairDStream<String, Integer> searchWordCountsDStream = // // searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { // // private static final long serialVersionUID = 1L; // // @Override // public Integer call(Integer v1, Integer v2) throws Exception { // return v1 + v2; // } // }, Durations.seconds(15), Durations.seconds(5)); /** * window窗口操作優化: */ JavaPairDStream<String, Integer> searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } },new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 - v2; } }, Durations.seconds(15), Durations.seconds(5)); searchWordCountsDStream.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } }