對API的解釋:
1.1 transform
l map(func):對調用map的RDD數據集中的每個element都使用func,然后返回一個新的RDD,這個返回的數據集是分布式的數據集
l filter(func) : 對調用filter的RDD數據集中的每個元素都使用func,然后返回一個包含使func為true的元素構成的RDD
l flatMap(func):和map差不多,但是flatMap生成的是多個結果
l mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition
l mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index
l sample(withReplacement,faction,seed):抽樣
l union(otherDataset):返回一個新的dataset,包含源dataset和給定dataset的元素的集合
l distinct([numTasks]):返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element
l groupByKey(numTasks):返回(K,Seq[V]),也就是Hadoop中reduce函數接受的key-valuelist
l reduceByKey(func,[numTasks]):就是用一個給定的reduce func再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數
l sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean類型
1.2 action
l reduce(func):說白了就是聚集,但是傳入的函數是兩個參數輸入返回一個值,這個函數必須是滿足交換律和結合律的
l collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組
l count():返回的是dataset中的element的個數
l first():返回的是dataset中的第一個元素
l take(n):返回前n個elements
l takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素,隨機種子seed
l saveAsTextFile(path):把dataset寫到一個text file中,或者hdfs,或者hdfs支持的文件系統中,spark把每條記錄都轉換為一行記錄,然后寫到file中
l saveAsSequenceFile(path):只能用在key-value對上,然后生成SequenceFile寫到本地或者hadoop文件系統
l countByKey():返回的是key對應的個數的一個map,作用於一個RDD
l foreach(func):對dataset中的每個元素都使用func
以下是案例:
/*數據情況
a 1
b 2
c 3
d 4
e 5*/
主函數:
public class SparkCoreTest{
public static void main( String[] args )
{
if(args.length<1){
System.out.println("請輸入參數!");
}
String filepath=args[0];
JavaSparkContext sc =JavaSparkContextFactory.getJavaSparkContext("sparkCoreTest");
JavaRDD<String> rdd=sc.textFile(filepath);
--transform
//testSparkCoreApiMap(logData);
//testSparkCoreApiFilter(rdd);
//testSparkCoreApiFlatMap(rdd);
//testSparkCoreApiUnion(rdd);
// testSparkCoreApiDistinct(rdd);
// testSparkCoreApiMaptoPair(rdd);
//testSparkCoreApiGroupByKey(rdd);
//testSparkCoreApiReduceByKey(rdd);
--action
testSparkCoreApiReduce(rdd);
}
/**
* Map主要是對數據進行處理,不進行數據集的增減
*
* 本案例實現,打印所有數據
*
* @param rdd
*/
private static void testSparkCoreApiMap(JavaRDD<String> rdd){
JavaRDD<String> logData1=rdd.map(new Function<String,String>(){
public String call(String s){
return s;
}
});
List list = logData1.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*
*
*
* filter主要是過濾數據的功能
* 本案例實現:過濾含有a的那行數據
*
*
*/
private static void testSparkCoreApiFilter(JavaRDD<String> rdd){
JavaRDD<String> logData1=rdd.filter(new Function<String,Boolean>(){
public Boolean call(String s){
return (s.split(" "))[0].equals("a");
}
});
List list = logData1.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*
*
*
* flatMap 用戶行轉列
* 本案例實現:打印所有的字符
*
*
*/
private static void testSparkCoreApiFlatMap(JavaRDD<String> rdd){
JavaRDD<String> words=rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
List list = words.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/**
* testSparkCoreApiUnion
* 合並兩個RDD
* @param rdd
*/
private static void testSparkCoreApiUnion(JavaRDD<String> rdd){
JavaRDD<String> unionRdd=rdd.union(rdd);
unionRdd.foreach(new VoidFunction<String>(){
public void call(String lines){
System.out.println(lines);
}
});
}
/**
* testSparkCoreApiDistinct Test
* 對RDD去重
* @param rdd
*/
private static void testSparkCoreApiDistinct(JavaRDD<String> rdd){
JavaRDD<String> unionRdd=rdd.union(rdd).distinct();
unionRdd.foreach(new VoidFunction<String>(){
public void call(String lines){
System.out.println(lines);
}
});
}
/**
* testSparkCoreApiMaptoPair Test
* 把RDD映射為鍵值對類型的數據
* @param rdd
*/
private static void testSparkCoreApiMaptoPair(JavaRDD<String> rdd){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], st[1]);
}
});
pairRdd.foreach(new VoidFunction<Tuple2<String, Integer>>(){
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._2());
}
});
}
/**
* testSparkCoreApiGroupByKey Test
* 對鍵值對類型的數據進行按鍵值合並
* @param rdd
*/
private static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});
JavaPairRDD<String, Iterable<Integer>> pairrdd2= pairRdd.union(pairRdd).groupByKey();
pairrdd2.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){
@Override
public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
Iterable<Integer> iter = t._2();
for (Integer integer : iter) {
System.out.println(integer);
}
}
});
}
/**
* testSparkCoreApiReduceByKey
* 對鍵值對進行按鍵相同的對值進行操作
* @param rdd
*/
private static void testSparkCoreApiReduceByKey(JavaRDD<String> rdd){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});
JavaPairRDD<String, Integer> pairrdd2 =pairRdd.union(pairRdd).reduceByKey(
new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}
).sortByKey() ;
pairrdd2.foreach(new VoidFunction<Tuple2<String, Integer>>(){
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._2());
}
});
}
/**
* testSparkCoreApiReduce
* 對RDD進行遞歸調用
* @param rdd
*/
private static void testSparkCoreApiReduce(JavaRDD<String> rdd){
//由於原數據是String,需要轉為Integer才能進行reduce遞歸
JavaRDD<Integer> rdd1=rdd.map(new Function<String,Integer>(){
@Override
public Integer call(String v1) throws Exception {
// TODO Auto-generated method stub
return Integer.valueOf(v1.split(" ")[1]);
}
});
Integer a= rdd1.reduce(new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1,Integer v2) throws Exception {
return v1+v2;
}
});
System.out.println(a);
}
}