1、使用MapPartitions代替map
1.1 為什么要死使用MapPartitions代替map
普通的map,每條數據都會傳入function中進行計算一次;而是用MapPartitions時,function會一次接受所有partition的數據出入到function中計算一次,性能較高。
但是如果內存不足時,使用MapPartitions,一次將所有的partition數據傳入,可能會發生OOM異常
1.2 如何使用
有map的操作的地方,都可以使用MapPartitions進行替換
/** * 使用mapPartitionsToPair代替mapToPair */ JavaPairRDD<String, Row> sessionRowPairRdd =dateRangeRdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, String, Row>() { private static final long serialVersionUID = 1L; public Iterable<Tuple2<String, Row>> call(Iterator<Row> rows) throws Exception { List<Tuple2<String, Row>> list=new ArrayList<Tuple2<String, Row>>(); while (rows.hasNext()) { Row row=rows.next(); list.add(new Tuple2<String, Row>(row.getString(2), row)); } return list; } }); /*JavaPairRDD<String, Row> sessionRowPairRdd = dateRangeRdd .mapToPair(new PairFunction<Row, String, Row>() { private static final long serialVersionUID = 1L; // 先將數據映射為<sessionId,row> public Tuple2<String, Row> call(Row row) throws Exception { return new Tuple2<String, Row>(row.getString(2), row); } });*/
2、使用coalesce對過濾后的Rdd進行重新分區和壓縮
2.1 為什么使用coalesce
默認情況下,經過過濾后的數據的分區數和原分區數是一樣的,這就導致過濾后各個分區中的數據可能差距很大,在之后的操作中造成數據傾斜
使用coalesce可以使過濾后的Rdd的分區數減少,並讓每個分區中的數據趨於平等
2.2 如何使用
//過濾符合要求的ClickCategoryIdRow
filteredSessionRdd.filter(new Function<Tuple2<String,Row>, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Tuple2<String, Row> tuple2) throws Exception { return (Long.valueOf(tuple2._2.getLong(6))!=null)?true:false; } }) //使用coalesce將過濾后的數據重新分區和壓縮,時新的分區中的數據大致相等 .coalesce(100)
3、使用foreachPartition替代foreach
3.1 為什么使用foreachPartition
默認使用的foreach,每條數據都會傳入function進行計算;如果操作數據庫,每條數據都會獲取一個數據庫連接並發送sql進行保存,消耗資源比較大,性能低。
使用foreachPartition,會把所用partition的數據一次出入function,只需要獲取一次數據庫連接,性能高。
3.2 如何使用
/** * 使用foreachPartition替代foreach */ sessionRdd.join(sessionRowPairRdd).foreachPartition(new VoidFunction<Iterator<Tuple2<String,Tuple2<String,Row>>>>() { private static final long serialVersionUID = 1L; public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> iterator) throws Exception { List<SessionDetail> sessionDetails=new ArrayList<SessionDetail>(); if (iterator.hasNext()) { Tuple2<String, Tuple2<String, Row>> tuple2=iterator.next(); String sessionId=tuple2._1; Row row=tuple2._2._2; SessionDetail sessionDetail=new SessionDetail(); sessionDetail.setSessionId(sessionId); sessionDetail.setTaskId((int)taskId); sessionDetail.setUserId((int)row.getLong(1)); sessionDetails.add(sessionDetail); } DaoFactory.getSessionDetailDao().batchInsertSessionDao(sessionDetails); } }); /* sessionRdd.join(sessionRowPairRdd).foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() { private static final long serialVersionUID = 1L; public void call(Tuple2<String, Tuple2<String, Row>> tuple2) throws Exception { String sessionId=tuple2._1; Row row=tuple2._2._2; SessionDetail sessionDetail=new SessionDetail(); sessionDetail.setSessionId(sessionId); sessionDetail.setTaskId((int)taskId); sessionDetail.setUserId((int)row.getLong(1)); DaoFactory.getSessionDetailDao().insertSessionDao(sessionDetail); } });*/
4、使用repartition進行調整並行度
4.1 為什么要使用repartition
spark.default.parallelism設置的並行度只能對沒有Spark SQL(DataFrame)的階段有用,對Spark SQL的並行度是無法設置的,該並行度是通過hdfs文件所在的block塊決定的。
可以通過repartition調整之后的並行度
4.2 如何使用
sqlContext.sql("select * from user_visit_action where date >= '" + startDate + "' and date <= '" + endDate + "'").javaRDD() //使用repartition調整並行度 .repartition(100)
5、使用reduceByKey進行本地聚合
5.1 reduceByKey有哪些優點
reduceByKey相對於普通的shuffle操作(如groupByKey)的一個最大的優點,會進行map端的本地聚合,從而減少文件的輸出,減少磁盤IO,網絡傳輸,內存占比以及reduce端的聚合操作數據。
5.2 使用場景
只有是針對每個不同的key進行相應的操作都可以使用reduceByKey進行處理