一、前述
SparkSql中自定義函數包括UDF和UDAF
UDF:一進一出 UDAF:多進一出 (聯想Sum函數)
二、UDF函數
UDF:用戶自定義函數,user defined function
* 根據UDF函數參數的個數來決定是實現哪一個UDF UDF1,UDF2。。。。UDF1xxx
* UDF1 傳一個參數 UDF2傳兩個參數。。。。。
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(String t1) throws Exception {
return t1.length();
}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();
sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Integer call(String t1, Integer t2) throws Exception {
return t1.length()+t2;
}
} ,DataTypes.IntegerType );
sqlContext.sql("select name ,StrLen(name,10) as length from user").show();
三、UDAF函數
UDAF:用戶自定義聚合函數,user defined aggreagatefunction
package com.spark.sparksql.udf_udaf; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** * UDAF 用戶自定義聚合函數 * @author root * */ public class UDAF { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("udaf"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> parallelize = sc.parallelize( Arrays.asList("zhangsan","lisi","wangwu","zhangsan","zhangsan","lisi")); JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() { /** * */ private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); DataFrame df = sqlContext.createDataFrame(rowRDD, schema); df.registerTempTable("user"); /** * 注冊一個UDAF函數,實現統計相同值得個數 * 注意:這里可以自定義一個類繼承UserDefinedAggregateFunction類也是可以的 */ sqlContext.udf().register("StringCount",new UserDefinedAggregateFunction() { /** * */ private static final long serialVersionUID = 1L; /** * 初始化一個內部的自己定義的值,在Aggregate之前每組數據的初始化結果 */ @Override public void initialize(MutableAggregationBuffer buffer) { buffer.update(0, 0); } /** * 更新 可以認為一個一個地將組內的字段值傳遞進來 實現拼接的邏輯 * buffer.getInt(0)獲取的是上一次聚合后的值 * 相當於map端的combiner,combiner就是對每一個map task的處理結果進行一次小聚合 * 大聚和發生在reduce端. * 這里即是:在進行聚合的時候,每當有新的值進來,對分組后的聚合如何進行計算 */ @Override public void update(MutableAggregationBuffer buffer, Row arg1) { buffer.update(0, buffer.getInt(0)+1); } /** * 合並 update操作,可能是針對一個分組內的部分數據,在某個節點上發生的 但是可能一個分組內的數據,會分布在多個節點上處理 * 此時就要用merge操作,將各個節點上分布式拼接好的串,合並起來 * buffer1.getInt(0) : 大聚合的時候 上一次聚合后的值 * buffer2.getInt(0) : 這次計算傳入進來的update的結果 * 這里即是:最后在分布式節點完成后需要進行全局級別的Merge操作 * 也可以是一個節點里面的多個executor合並 reduce端大聚合 */ @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0)); } /** * 在進行聚合操作的時候所要處理的數據的結果的類型 */ @Override public StructType bufferSchema() { return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bffer111", DataTypes.IntegerType, true))); } /** * 最后返回一個和DataType的類型要一致的類型,返回UDAF最后的計算結果 */ @Override public Object evaluate(Row row) { return row.getInt(0); } /** * 指定UDAF函數計算后返回的結果類型 */ @Override public DataType dataType() { return DataTypes.IntegerType; } /** * 指定輸入字段的字段及類型 */ @Override public StructType inputSchema() { return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("nameeee", DataTypes.StringType, true))); } /** * 確保一致性 一般用true,用以標記針對給定的一組輸入,UDAF是否總是生成相同的結果。 */ @Override public boolean deterministic() { return true; } }); sqlContext.sql("select name ,StringCount(name) as strCount from user group by name").show(); sc.stop(); } }

傳入到UDAF中的數據必須在分組字段里面,相當於是一組數據進來。
