reduceByKey函數API:
def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V]
該函數利用映射函數將每個K對應的V進行運算。
其中參數說明如下:
- func:映射函數,根據需求自定義;
- partitioner:分區函數;
- numPartitions:分區數,默認的分區函數是HashPartitioner。
返回值:可以看出最終是返回了一個KV鍵值對。
使用示例:
linux:/$ spark-shell 。。。 17/10/28 20:33:54 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). 17/10/28 20:33:55 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. Spark context available as sc. 17/10/28 20:33:57 WARN SessionState: load mapred-default.xml, HIVE_CONF_DIR env not found! 17/10/28 20:33:58 WARN SessionState: load mapred-default.xml, HIVE_CONF_DIR env not found! SQL context available as sqlContext. scala> val x = sc.parallelize(List( | ("a", "b", 1), | ("a", "b", 1), | ("c", "b", 1), | ("a", "d", 1)) | ) x: org.apache.spark.rdd.RDD[(String, String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:21 scala> val byKey = x.map({case (id,uri,count) => (id,uri)->count}) byKey: org.apache.spark.rdd.RDD[((String, String), Int)] = MapPartitionsRDD[1] at map at <console>:23 scala> val reducedByKey = byKey.reduceByKey(_ + _) reducedByKey: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[2] at reduceByKey at <console>:25 scala> reducedByKey.collect.foreach(println) ((c,b),1) ((a,d),1) ((a,b),2)
使用reduceByKey實現group by:
假設有一張表:my_table,按照key進行group by並統計出((l_scrsrp-l_ncrsrp)-(scrsrp-ncrsrp))*((l_scrsrp-l_ncrsrp)-(scrsrp-ncrsrp)),以及count個數:
create table if not exists my_table( key string, l_scrsrp int, l_ncrsrp int, scrsrp int, ncrsrp int ) insert into my_table(key,l_scrsrp,l_ncrsrp,scrsrp,ncrsrp)values("key1",1,0,2,0); insert into my_table(key,l_scrsrp,l_ncrsrp,scrsrp,ncrsrp)values("key1",1,0,2,0); insert into my_table(key,l_scrsrp,l_ncrsrp,scrsrp,ncrsrp)values("key2",1,0,2,0); insert into my_table(key,l_scrsrp,l_ncrsrp,scrsrp,ncrsrp)values("key3",1,0,3,0); insert into my_table(key,l_scrsrp,l_ncrsrp,scrsrp,ncrsrp)values("key2",1,0,3,0); 0: jdbc:hive2://xx.xx.xx.xx:xxxx/> 0: jdbc:hive2://xx.xx.xx.xx:xxxx/> select key,sum(((l_scrsrp-l_ncrsrp)-(scrsrp-ncrsrp))*((l_scrsrp-l_ncrsrp)-(scrsrp-ncrsrp))),count(0) myvalue 0: jdbc:hive2://xx.xx.xx.xx:xxxx/> from my_table 0: jdbc:hive2://xx.xx.xx.xx:xxxx/> group by key;
+-------+------+----------+--+ | key | _c1 | myvalue | +-------+------+----------+--+ | key1 | 2 | 2 | | key2 | 5 | 2 | | key3 | 4 | 1 | +-------+------+----------+--+
使用reduceByKey實現的代碼如下:
scala> val y=sc.parallelize(List( | ("key1",1,0,2,0), | ("key1",1,0,2,0), | ("key2",1,0,2,0), | ("key3",1,0,3,0), | ("key2",1,0,3,0) | )) y: org.apache.spark.rdd.RDD[(String, Int, Int, Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:21 scala> val byKey = y.map({case (key,scrsrp,ncrsrp,l_scrsrp,l_ncrsrp) => (key)->((((l_scrsrp-l_ncrsrp)-(scrsrp-ncrsrp))*((l_scrsrp-l_ncrsrp)-(scrsrp-ncrsrp))),(1))}) byKey: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[1] at map at <console>:23 scala> byKey.foreach(println) (key3,(4,1)) (key1,(1,1)) (key1,(1,1)) (key2,(1,1)) (key2,(4,1)) scala> val reducedByKey = byKey.reduceByKey((x1, x2) =>(x1._1 + x2._1,x1._2 + x2._2)) reducedByKey: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[3] at reduceByKey at <console>:25 scala> reducedByKey.collect.foreach(println) (key1,(2,2)) (key2,(5,2)) (key3,(4,1))
實現統計字符個數:
scala> val x = sc.parallelize(List("a", "b", "a", "a", "b", "b", "b", "b")) x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:21 scala> val s = x.map((_, 1)) s: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:23 scala> val result = s.reduceByKey((pre, after) => pre + after) result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:25 scala> println(result.collect().toBuffer) ArrayBuffer((a,3), (b,5)) scala> result.foreach(println) (a,3) (b,5)