Spark UDF Java 示例
在這篇文章中提到了用Spark做用戶昵稱文本聚類分析,聚類需要選定K個中心點,然后迭代計算其他樣本點到中心點的距離。由於中文文字分詞之后(n-gram)再加上昵稱允許各個特殊字符(數字、字母、各種符號……),如果直接在原來的文本數據上進行聚類,由於文本的“多樣性”,聚類效果並不一定好。因此准確對昵稱先進行一個預分類的過程,這里的分類不是機器學習里面的分類算法(邏輯回歸、線性回歸),而是根據昵稱文本的特征進行分類:給定一個文本昵稱字符串,分類方法逐個地將每個字符轉換成定義好的模式:
即將所有漢字替換成H、所有大寫字母替換成U、小寫字母替換成L、數字替換成N,其他各種符號替換成O,然后系統將每一個字符串表示成能標識其組成的字符串模式。
例如用戶名:“你好abc123”會被表示成HHLLLNNN,而“你好aaa456”也會被標識成HHLLLNNN。
這樣相同的字符串模式的字符將獲得相同的表示。
而這里采用Spark 用戶自定義函數來實現這種轉換。然后再在每個預分類下,進行聚類。
Spark 用戶自定義函數介紹
在Java里面通過實現接口UDF(一共定義了22個吧,根據不同參數個數進行選擇)來定義一個Spark UDF,簡單一點的UDF可以使用Lambda表達式。具體介紹可參考官方文檔。如下的NickFormatterUDF接收一個字符串作為輸入,將該字符串轉換成 由 HLUNWO 組成的字符串模式。
import org.apache.spark.sql.api.java.UDF1;
/**
* @author psj
* @date 2018/11/16
*/
public class NickFormatterUDF implements UDF1<String, String> {
@Override
public String call(String nick) throws Exception {
StringBuilder pattern = new StringBuilder();
for (int i = 0; i < nick.length(); i++) {
char ch = nick.charAt(i);
if (ParseChar.isChinese(ch)) {
pattern.append('H');
} else if (ParseChar.isLowerCase(ch)) {
pattern.append('L');
} else if (ParseChar.isUpperCase(ch)) {
pattern.append('U');
} else if (ParseChar.isNumber(ch)) {
pattern.append('N');
} else if (ParseChar.isWhiteSpace(ch)) {
pattern.append('W');
}else{
pattern.append('O');
}
}//END FOR
return pattern.toString();
}
}
ParseChar.java
就是一個簡單地判斷某個字符是中文字符、還是數字、還是大寫字母、還是小寫字母的工具類。
public class ParseChar {
public static boolean isChinese(char ch) {
//獲取此字符的UniCodeBlock
Character.UnicodeBlock ub = Character.UnicodeBlock.of(ch);
// GENERAL_PUNCTUATION 判斷中文的“號
// CJK_SYMBOLS_AND_PUNCTUATION 判斷中文的。號
// HALFWIDTH_AND_FULLWIDTH_FORMS 判斷中文的,號
if (ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS || ub == Character.UnicodeBlock.CJK_COMPATIBILITY_IDEOGRAPHS
|| ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS_EXTENSION_A || ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS_EXTENSION_B
|| ub == Character.UnicodeBlock.CJK_SYMBOLS_AND_PUNCTUATION || ub == Character.UnicodeBlock.HALFWIDTH_AND_FULLWIDTH_FORMS
|| ub == Character.UnicodeBlock.GENERAL_PUNCTUATION) {
// System.out.println(ch + " 是中文");
return true;
}
return false;
}
public static boolean isNumber(char ch) {
return ch >= '0' && ch <= '9';
}
public static boolean isLowerCase(char ch) {
return ch >= 'a' && ch <= 'z';
}
public static boolean isUpperCase(char ch) {
return Character.isUpperCase(ch);
}
public static boolean isWhiteSpace(char ch) {
return Character.isWhitespace(ch);
}
}
寫完了自定義函數,接下來看看在Spark中如何調用自己定義的函數。在這里我碰到了很多奇怪的問題。我們的樣本數據如下:
{"created":1542020126816,"nick":"a357410","uid":123456}
{"created":1542020138522,"nick":"alichao","signature":"┌?┐?┊雨┊?┊蒙┊?┊蒙┊?└?┘","uid":123456}
{"created":1542020127633,"details":"走過了之后才明白,往事是用來回憶的,幸福是用來感受的,傷痛是用來成長的。。。。","nick":"菲兒","signature":"游戲有你更精彩","uid":123456}
可以看出,樣本數據中即有昵稱字符、又有簽名字段、還有created 字段……而我們只針對昵稱字段進行預分類。
首先將樣本數據nick_class.json
上傳到HDFS:
./bin/hdfs dfs -put ~/data_spark/nick_class.json /user/xxx/
然后程序中加載數據:
Dataset<Row> dataset = spark.read().format("json").option("header", "false")
.load("hdfs://localhost:9000/user/xxx/nick_class.json");
dataset.show(10);
對樣本中昵稱為空的字段進行過濾,並只選取昵稱字段應用到Spark UDF上:
Dataset<Row> nickDataset = dataset.filter(col("nick").isNotNull()).select(col("nick"));
nickDataset.show();
nickDataset.printSchema();
先來一個簡單一點的Spark中內置的UDF函數:小寫字母轉換成大寫,哈哈。
UserDefinedFunction mode = udf(
(String nick) -> nick.toUpperCase(), DataTypes.StringType);
Dataset upperNickDataFrame = nickDataset.select(mode.apply(col("nick")));
System.out.println(upperNickDataFrame.count());
upperNickDataFrame.show();
對昵稱字符串進行分類的用戶自定義函數NickFormatterUDF.java
。創建對象注冊到SparkSession中即可,創建臨時視圖就是方便后面可通過SQL形式對nickDataFrame
進行調用。
UDF1<String, String> nickPreClassificationUDF = new NickFormatterUDF();
spark.udf().register("nick_classifier", nickPreClassificationUDF, DataTypes.StringType);
nickDataset.createOrReplaceTempView("nickDataFrame");
通過functions.callUDF
調用UDF:
Dataset nickClassifyDF = nickDataset.select(functions.callUDF("nick_classifier", col("nick")));
System.out.println(nickClassifyDF.count());
nickClassifyDF.show();
Spark作業提交運行
在這篇文章中介紹搭建Spark遠程調試開發環境,本以為能夠在遠程調試環境中運行,但每次執行到 upperNickDataFrame.show();
或者 nickClassifyDF.show();
就拋出異常:
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues
一直以為是自定義函數的bug,找了好久沒有找到原因,后來在SPARK-18075發現:原來是Spark提交作業的方式有問題。
在自己的Intellij 開發環境下以debug調試運行Spark應用程序固然方便,但這不符合官方推薦的以打成jar包的方式運行Spark作業這種方式。
It is very convenient to write Spark code in an IDE as part of a larger application framework and test it in development by simply running the main function, instead of packaging it into a jar for every single change and submitting this jar to a cluster. Often you have to run it on a remote cluster even for development, especially when handling large quantities of data.
於是:mvn package 將這種工程打成nick_classifier.jar
包,上傳到服務器上。以命令:./spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class net.hapjin.spark.nick.SparkNickPreClassification nick_classifier.jar
結果還是報同樣的錯誤,或者連接拒絕錯誤。出現這個問題,主要是環境配置不一致的問題:
- 程序代碼里面創建SparkSession時,需要指定Spark Master地址,這個地址是填
spark://ip:port
,還是填spark://master_name:port
,還是填spark://localhost:port
這個要視集群配置而定了。 - 第二個是:
/etc/hosts
里面配置的主機名到ip地址的映射 - 第三個是
conf/spark-env.sh
里面的參數:SPARK_LOCAL_IP
的設置。
記錄一下我在實驗環境下運行的結果:
spark-2.3.1-bin-hadoop2.7、hadoop-2.7.7、按hadoop官網的Standalone Operation方式配置啟動HDFS。
以./spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class net.hapjin.spark.nick.SparkNickPreClassification nick_classifier.jar
提交運行。
源碼如下:
package net.hapjin.spark.nick;
import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;
/**
* @author psj
* @date 2018/11/16
*/
public class SparkNickPreClassification {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("nick_classification")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = spark.read().format("json").option("header", "false")
.load("hdfs://localhost:9000/user/xxx/nick_class.json");
dataset.show(10);
Dataset<Row> nickDataset = dataset.filter(col("nick").isNotNull()).select(col("nick"), col("uid"));
nickDataset.show();
nickDataset.printSchema();
//https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser
// spark.udf().register("udfUpperCase", (String string) -> string.toUpperCase(), DataTypes.StringType);
// Dataset<Row> df = nickDataset.withColumn("upper", callUDF("udfUpperCase", nickDataset.col("nick")));
// System.out.println(df.count());
// df.show();
//https://issues.apache.org/jira/browse/SPARK-18075
//沒有按標准來運行spark app jar
UserDefinedFunction mode = udf(
(String nick) -> nick.toUpperCase(), DataTypes.StringType);
Dataset upperNickDataFrame = nickDataset.select(mode.apply(col("nick")));
System.out.println(upperNickDataFrame.count());
upperNickDataFrame.show();
UDF1<String, String> nickPreClassificationUDF = new NickFormatterUDF();
spark.udf().register("nick_classifier", nickPreClassificationUDF, DataTypes.StringType);
nickDataset.createOrReplaceTempView("nickDataFrame");
// spark.sql("select uid, nick_classifier(nick) from nickDataFrame").show();
Dataset nickClassifyDF = nickDataset.select(functions.callUDF("nick_classifier", col("nick")), col("nick"), col("uid"));
System.out.println(nickClassifyDF.count());
//https://stackoverflow.com/questions/39953245/how-to-fix-java-lang-classcastexception-cannot-assign-instance-of-scala-collect
nickClassifyDF.show();
}
}
最終運行出來的結果:可以看出已經成功地將昵稱轉換成 自定義的 字符模式。左邊列就是每個昵稱的模式,右邊列是實際的昵稱。比如第一行:UU 代表兩個大寫的英文字符,而左邊的nick是"JX"(意味着將所有 兩個大寫字母 的昵稱 轉換成類別 UU 了)
后面會將 這些自定義的字符模式 歸為幾個類別,然后在每個類別上進行聚類分析。
參考資料:
Spark Java API 計算 Levenshtein 距離