spark性能調優04-算子調優


1、使用MapPartitions代替map

  1.1 為什么要死使用MapPartitions代替map

    普通的map,每條數據都會傳入function中進行計算一次;而是用MapPartitions時,function會一次接受所有partition的數據出入到function中計算一次,性能較高。

    但是如果內存不足時,使用MapPartitions,一次將所有的partition數據傳入,可能會發生OOM異常

  1.2 如何使用

    有map的操作的地方,都可以使用MapPartitions進行替換

        /**
         * 使用mapPartitionsToPair代替mapToPair
         */
        JavaPairRDD<String, Row> sessionRowPairRdd =dateRangeRdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, String, Row>() {

            private static final long serialVersionUID = 1L;

            public Iterable<Tuple2<String, Row>> call(Iterator<Row> rows) throws Exception {
                List<Tuple2<String, Row>> list=new ArrayList<Tuple2<String, Row>>();
                while (rows.hasNext()) {
                    Row row=rows.next();
                    list.add(new Tuple2<String, Row>(row.getString(2), row));
                }
                return list;
            }
        });
        
        /*JavaPairRDD<String, Row> sessionRowPairRdd = dateRangeRdd
                .mapToPair(new PairFunction<Row, String, Row>() {

                    private static final long serialVersionUID = 1L;
                    // 先將數據映射為<sessionId,row>
                    public Tuple2<String, Row> call(Row row) throws Exception {
                        return new Tuple2<String, Row>(row.getString(2), row);
                    }
                });*/    

2、使用coalesce對過濾后的Rdd進行重新分區和壓縮

  2.1 為什么使用coalesce

    默認情況下,經過過濾后的數據的分區數和原分區數是一樣的,這就導致過濾后各個分區中的數據可能差距很大,在之后的操作中造成數據傾斜

    使用coalesce可以使過濾后的Rdd的分區數減少,並讓每個分區中的數據趨於平等

  2.2 如何使用   

       //過濾符合要求的ClickCategoryIdRow    
    filteredSessionRdd
.filter(new Function<Tuple2<String,Row>, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Tuple2<String, Row> tuple2) throws Exception { return (Long.valueOf(tuple2._2.getLong(6))!=null)?true:false; } }) //使用coalesce將過濾后的數據重新分區和壓縮,時新的分區中的數據大致相等 .coalesce(100)

3、使用foreachPartition替代foreach

  3.1 為什么使用foreachPartition

    默認使用的foreach,每條數據都會傳入function進行計算;如果操作數據庫,每條數據都會獲取一個數據庫連接並發送sql進行保存,消耗資源比較大,性能低。

    使用foreachPartition,會把所用partition的數據一次出入function,只需要獲取一次數據庫連接,性能高。

  3.2 如何使用

        /**
         * 使用foreachPartition替代foreach
         */
        sessionRdd.join(sessionRowPairRdd).foreachPartition(new VoidFunction<Iterator<Tuple2<String,Tuple2<String,Row>>>>() {
            private static final long serialVersionUID = 1L;
            public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> iterator)
                    throws Exception {
                List<SessionDetail> sessionDetails=new ArrayList<SessionDetail>();
                if (iterator.hasNext()) {
                    Tuple2<String, Tuple2<String, Row>> tuple2=iterator.next();
                    String sessionId=tuple2._1;
                    Row row=tuple2._2._2;
                    SessionDetail sessionDetail=new SessionDetail();
                    sessionDetail.setSessionId(sessionId);
                    sessionDetail.setTaskId((int)taskId);
                    sessionDetail.setUserId((int)row.getLong(1));
                    sessionDetails.add(sessionDetail);
                }
                DaoFactory.getSessionDetailDao().batchInsertSessionDao(sessionDetails);
            }
        });
        
       /* sessionRdd.join(sessionRowPairRdd).foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {
            private static final long serialVersionUID = 1L;
            public void call(Tuple2<String, Tuple2<String, Row>> tuple2) throws Exception {
                String sessionId=tuple2._1;
                Row row=tuple2._2._2;
                SessionDetail sessionDetail=new SessionDetail();
                sessionDetail.setSessionId(sessionId);
                sessionDetail.setTaskId((int)taskId);
                sessionDetail.setUserId((int)row.getLong(1));
              DaoFactory.getSessionDetailDao().insertSessionDao(sessionDetail);
            }
        });*/   

 4、使用repartition進行調整並行度

  4.1 為什么要使用repartition

    spark.default.parallelism設置的並行度只能對沒有Spark SQL(DataFrame)的階段有用,對Spark SQL的並行度是無法設置的,該並行度是通過hdfs文件所在的block塊決定的。

    可以通過repartition調整之后的並行度

  4.2 如何使用 

sqlContext.sql("select * from user_visit_action where date >= '" + startDate + "' and date <= '" + endDate + "'").javaRDD()
    //使用repartition調整並行度
    .repartition(100)

 5、使用reduceByKey進行本地聚合

  5.1 reduceByKey有哪些優點

    reduceByKey相對於普通的shuffle操作(如groupByKey)的一個最大的優點,會進行map端的本地聚合,從而減少文件的輸出,減少磁盤IO,網絡傳輸,內存占比以及reduce端的聚合操作數據。

  5.2 使用場景

    只有是針對每個不同的key進行相應的操作都可以使用reduceByKey進行處理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM