spark記錄(15)SparkStreaming算子操作


1 foreachRDD

  • output operation算子,必須對抽取出來的RDD執行action類算子,代碼才能執行。
  • 代碼:見上個隨筆例子

2 transform

  • transformation類算子
  • 可以通過transform算子,對Dstream做RDD到RDD的任意操作。
  • 代碼:
/**
 * 過濾黑名單
 * transform操作
 * DStream可以通過transform做RDD到RDD的任意操作。
 * @author root
 *
 */
public class TransformOperator {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]").setAppName("transform");
        JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
        
        //黑名單
        List<String> list = Arrays.asList("zhangsan");
        final Broadcast<List<String>> bcBlackList = jsc.sparkContext().broadcast(list);
        
        //接受socket數據源
        JavaReceiverInputDStream<String> nameList = jsc.socketTextStream("node5", 9999);
        JavaPairDStream<String, String> pairNameList = 
                nameList.mapToPair(new PairFunction<String, String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, String> call(String s) throws Exception {
                return new Tuple2<String, String>(s.split(" ")[1], s);
            }
        });
        /**
         * transform 可以拿到DStream中的RDD,做RDD到RDD之間的轉換,不需要Action算子觸發,需要返回RDD類型。
         * 注意:transform call方法內,拿到RDD 算子外的代碼 在Driver端執行,也可以做到動態改變廣播變量。
         */
        JavaDStream<String> transFormResult =
                pairNameList.transform(new Function<JavaPairRDD<String,String>, JavaRDD<String>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public JavaRDD<String> call(JavaPairRDD<String, String> nameRDD)
                    throws Exception {
                
                 JavaPairRDD<String, String> filter = nameRDD.filter(new Function<Tuple2<String,String>, Boolean>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<String, String> tuple) throws Exception {
                        return !bcBlackList.value().contains(tuple._1);
                    }
                });
                
                JavaRDD<String> map = filter.map(new Function<Tuple2<String,String>, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public String call(Tuple2<String, String> tuple) throws Exception {
                        return tuple._2;
                    }
                });
                //返回過濾好的結果
                return map;
            }
        });
        
        transFormResult.print();
        
        jsc.start();
        jsc.awaitTermination();
        jsc.stop();
    }
}

 

3 updateStateByKey

  • transformation算子
  • updateStateByKey作用:

    1)   為SparkStreaming中每一個Key維護一份state狀態,state類型可以是任意類型的,可以是一個自定義的對象,更新函數也可以是自定義的。

    2)   通過更新函數對該key的狀態不斷更新,對於每個新的batch而言,SparkStreaming會在使用updateStateByKey的時候為已經存在的key進行state的狀態更新。

  • 使用到updateStateByKey要開啟checkpoint機制和功能。
  • 多久會將內存中的數據寫入到磁盤一份?

    如果batchInterval設置的時間小於10秒,那么10秒寫入磁盤一份。如果batchInterval設置的時間大於10秒,那么就會batchInterval時間間隔寫入磁盤一份。

  • 代碼
public class UpdateStateByKeyDemo {

public static void main(String[] args) {

/*

* 第一步:配置SparkConf:

* 1,至少2條線程:因為Spark Streaming應用程序在運行的時候,至少有一條

* 線程用於不斷的循環接收數據,並且至少有一條線程用於處理接受的數據(否則的話無法

* 有線程用於處理數據,隨着時間的推移,內存和磁盤都會不堪重負);

* 2,對於集群而言,每個Executor一般肯定不止一個Thread,那對於處理Spark Streaming的

* 應用程序而言,每個Executor一般分配多少Core比較合適?根據我們過去的經驗,5個左右的

* Core是最佳的(一個段子分配為奇數個Core表現最佳,例如3個、5個、7個Core等);

*/

SparkConf conf = new SparkConf().setMaster("local[2]").

setAppName("UpdateStateByKeyDemo");

/*

* 第二步:創建SparkStreamingContext:

* 1,這個是SparkStreaming應用程序所有功能的起始點和程序調度的核心

* SparkStreamingContext的構建可以基於SparkConf參數,也可基於持久化的SparkStreamingContext的內容

* 來恢復過來(典型的場景是Driver崩潰后重新啟動,由於Spark Streaming具有連續7*24小時不間斷運行的特征,

* 所有需要在Driver重新啟動后繼續上衣系的狀態,此時的狀態恢復需要基於曾經的Checkpoint);

* 2,在一個Spark Streaming應用程序中可以創建若干個SparkStreamingContext對象,使用下一個SparkStreamingContext

* 之前需要把前面正在運行的SparkStreamingContext對象關閉掉,由此,我們獲得一個重大的啟發SparkStreaming框架也只是

* Spark Core上的一個應用程序而已,只不過Spark Streaming框架箱運行的話需要Spark工程師寫業務邏輯處理代碼;

*/

JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

//報錯解決辦法做checkpoint,開啟checkpoint機制,把checkpoint中的數據放在這里設置的目錄中,

//生產環境下一般放在HDFS中

jsc.checkpoint("/usr/local/tmp/checkpoint");

/*

* 第三步:創建Spark Streaming輸入數據來源input Stream:

* 1,數據輸入來源可以基於File、HDFS、Flume、Kafka、Socket等

* 2, 在這里我們指定數據來源於網絡Socket端口,Spark Streaming連接上該端口並在運行的時候一直監聽該端口

* 的數據(當然該端口服務首先必須存在),並且在后續會根據業務需要不斷的有數據產生(當然對於Spark Streaming

* 應用程序的運行而言,有無數據其處理流程都是一樣的); 

* 3,如果經常在每間隔5秒鍾沒有數據的話不斷的啟動空的Job其實是會造成調度資源的浪費,因為並沒有數據需要發生計算,所以

* 實例的企業級生成環境的代碼在具體提交Job前會判斷是否有數據,如果沒有的話就不再提交Job;

*/

JavaReceiverInputDStream lines = jsc.socketTextStream("hadoop100", 9999);

/*

* 第四步:接下來就像對於RDD編程一樣基於DStream進行編程!!!原因是DStream是RDD產生的模板(或者說類),在Spark Streaming具體

* 發生計算前,其實質是把每個Batch的DStream的操作翻譯成為對RDD的操作!!!

*對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算

    * 第4.1步:講每一行的字符串拆分成單個的單詞

    */

JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { //如果是Scala,由於SAM轉換,所以可以寫成val words = lines.flatMap { line => line.split(" ")}

@Override

public Iterable<String> call(String line) throws Exception {

return Arrays.asList(line.split(" "));

}

});

/*

      * 第四步:對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算

      * 第4.2步:在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word, 1)

      */

JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

@Override

public Tuple2<String, Integer> call(String word) throws Exception {

return new Tuple2<String, Integer>(word, 1);

}

});

/*

      * 第四步:對初始的DStream進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算

      *第4.3步:在這里是通過updateStateByKey來以Batch Interval為單位來對歷史狀態進行更新,

      * 這是功能上的一個非常大的改進,否則的話需要完成同樣的目的,就可能需要把數據保存在Redis、

      * Tagyon或者HDFS或者HBase或者數據庫中來不斷的完成同樣一個key的State更新,如果你對性能有極為苛刻的要求,

      * 且數據量特別大的話,可以考慮把數據放在分布式的Redis或者Tachyon內存文件系統中;

      * 當然從Spark1.6.x開始可以嘗試使用mapWithState,Spark2.X后mapWithState應該非常穩定了。

 */

JavaPairDStream<String, Integer> wordsCount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)

@Override

public Optional<Integer> call(List<Integer> values, Optional<Integer> state)

throws Exception {

Integer updatedValue = 0 ;

if(state.isPresent()){

updatedValue = state.get();

}

for(Integer value: values){

updatedValue += value;

}

return Optional.of(updatedValue);

}

});

/*

*此處的print並不會直接出發Job的執行,因為現在的一切都是在Spark Streaming框架的控制之下的,對於Spark Streaming

*而言具體是否觸發真正的Job運行是基於設置的Duration時間間隔的

*諸位一定要注意的是Spark Streaming應用程序要想執行具體的Job,對Dtream就必須有output Stream操作,

*output Stream有很多類型的函數觸發,類print、saveAsTextFile、saveAsHadoopFiles等,最為重要的一個

*方法是foraeachRDD,因為Spark Streaming處理的結果一般都會放在Redis、DB、DashBoard等上面,foreachRDD

*主要就是用用來完成這些功能的,而且可以隨意的自定義具體數據到底放在哪里!!!

*/

wordsCount.print();

/*

* Spark Streaming執行引擎也就是Driver開始運行,Driver啟動的時候是位於一條新的線程中的,當然其內部有消息循環體,用於

* 接受應用程序本身或者Executor中的消息;

*/

jsc.start();

jsc.awaitTermination();

jsc.close();

}

 4 窗口操作

  • 窗口操作理解圖:

假設每隔5s 1個batch,上圖中窗口長度為15s,窗口滑動間隔10s。

  • 窗口長度和滑動間隔必須是batchInterval的整數倍。如果不是整數倍會檢測報錯。
  • 優化后的window窗口操作示意圖:

  • 優化后的window操作要保存狀態所以要設置checkpoint路徑,沒有優化的window操作可以不設置checkpoint路徑。
  • 代碼:
/**
 * 基於滑動窗口的熱點搜索詞實時統計
 * @author root
 *
 */
public class WindowOperator {
    
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setMaster("local[2]")
                .setAppName("WindowHotWord"); 
        
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        /**
         * 設置日志級別為WARN
         *
         */
        jssc.sparkContext().setLogLevel("WARN");
        /**
         * 注意:
         *  沒有優化的窗口函數可以不設置checkpoint目錄
         *  優化的窗口函數必須設置checkpoint目錄         
         */
//           jssc.checkpoint("hdfs://node1:9000/spark/checkpoint");
           jssc.checkpoint("./checkpoint");
        JavaReceiverInputDStream<String> searchLogsDStream = jssc.socketTextStream("node5", 9999);
        JavaDStream<String> window = searchLogsDStream.window(Durations.seconds(15), Durations.seconds(5));
        //word    1
        JavaDStream<String> searchWordsDStream = searchLogsDStream.flatMap(new FlatMapFunction<String, String>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String t) throws Exception {
                return Arrays.asList(t.split(" "));
            }
        });
        
        // 將搜索詞映射為(searchWord, 1)的tuple格式
        JavaPairDStream<String, Integer> searchWordPairDStream = searchWordsDStream.mapToPair(
                
                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String searchWord)
                            throws Exception {
                        return new Tuple2<String, Integer>(searchWord, 1);
                    }
                    
                });
        /**
         * 每隔10秒,計算最近60秒內的數據,那么這個窗口大小就是60秒,里面有12個rdd,在沒有計算之前,這些rdd是不會進行計算的。
         * 那么在計算的時候會將這12個rdd聚合起來,然后一起執行reduceByKeyAndWindow操作 ,
         * reduceByKeyAndWindow是針對窗口操作的而不是針對DStream操作的。
         */
//            JavaPairDStream<String, Integer> searchWordCountsDStream = 
//                
//                searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
//
//                    private static final long serialVersionUID = 1L;
//
//                    @Override
//                    public Integer call(Integer v1, Integer v2) throws Exception {
//                        return v1 + v2;
//                    }
//        }, Durations.seconds(15), Durations.seconds(5)); 
        
        
        /**
         * window窗口操作優化:
         */
         JavaPairDStream<String, Integer> searchWordCountsDStream = 
        
         searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
            
        },new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 - v2;
            }
            
        }, Durations.seconds(15), Durations.seconds(5));    

          searchWordCountsDStream.print();
        
        jssc.start();     
        jssc.awaitTermination();
        jssc.close();
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM