Spark 用戶自定義函數 Java 示例


Spark UDF Java 示例

這篇文章中提到了用Spark做用戶昵稱文本聚類分析,聚類需要選定K個中心點,然后迭代計算其他樣本點到中心點的距離。由於中文文字分詞之后(n-gram)再加上昵稱允許各個特殊字符(數字、字母、各種符號……),如果直接在原來的文本數據上進行聚類,由於文本的“多樣性”,聚類效果並不一定好。因此准確對昵稱先進行一個預分類的過程,這里的分類不是機器學習里面的分類算法(邏輯回歸、線性回歸),而是根據昵稱文本的特征進行分類:給定一個文本昵稱字符串,分類方法逐個地將每個字符轉換成定義好的模式:

即將所有漢字替換成H、所有大寫字母替換成U、小寫字母替換成L、數字替換成N,其他各種符號替換成O,然后系統將每一個字符串表示成能標識其組成的字符串模式。
例如用戶名:“你好abc123”會被表示成HHLLLNNN,而“你好aaa456”也會被標識成HHLLLNNN。
這樣相同的字符串模式的字符將獲得相同的表示。

而這里采用Spark 用戶自定義函數來實現這種轉換。然后再在每個預分類下,進行聚類。

Spark 用戶自定義函數介紹

在Java里面通過實現接口UDF(一共定義了22個吧,根據不同參數個數進行選擇)來定義一個Spark UDF,簡單一點的UDF可以使用Lambda表達式。具體介紹可參考官方文檔。如下的NickFormatterUDF接收一個字符串作為輸入,將該字符串轉換成 由 HLUNWO 組成的字符串模式。

import org.apache.spark.sql.api.java.UDF1;

/**
 * @author psj
 * @date 2018/11/16
 */
public class NickFormatterUDF implements UDF1<String, String> {

    @Override
    public String call(String nick) throws Exception {
        StringBuilder pattern = new StringBuilder();
        for (int i = 0; i < nick.length(); i++) {
            char ch = nick.charAt(i);
            if (ParseChar.isChinese(ch)) {
                pattern.append('H');
            } else if (ParseChar.isLowerCase(ch)) {
                pattern.append('L');
            } else if (ParseChar.isUpperCase(ch)) {
                pattern.append('U');
            } else if (ParseChar.isNumber(ch)) {
                pattern.append('N');
            } else if (ParseChar.isWhiteSpace(ch)) {
                pattern.append('W');
            }else{
                pattern.append('O');
            }
        }//END FOR
        return pattern.toString();
    }
}

ParseChar.java就是一個簡單地判斷某個字符是中文字符、還是數字、還是大寫字母、還是小寫字母的工具類。

public class ParseChar {

    public static boolean isChinese(char ch) {
        //獲取此字符的UniCodeBlock
        Character.UnicodeBlock ub = Character.UnicodeBlock.of(ch);
        //  GENERAL_PUNCTUATION 判斷中文的“號
        //  CJK_SYMBOLS_AND_PUNCTUATION 判斷中文的。號
        //  HALFWIDTH_AND_FULLWIDTH_FORMS 判斷中文的,號
        if (ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS || ub == Character.UnicodeBlock.CJK_COMPATIBILITY_IDEOGRAPHS
                || ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS_EXTENSION_A || ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS_EXTENSION_B
                || ub == Character.UnicodeBlock.CJK_SYMBOLS_AND_PUNCTUATION || ub == Character.UnicodeBlock.HALFWIDTH_AND_FULLWIDTH_FORMS
                || ub == Character.UnicodeBlock.GENERAL_PUNCTUATION) {
//            System.out.println(ch + " 是中文");
            return true;
        }
        return false;
    }

    public static boolean isNumber(char ch) {
        return ch >= '0' && ch <= '9';
    }

    public static boolean isLowerCase(char ch) {
        return ch >= 'a' && ch <= 'z';
    }

    public static boolean isUpperCase(char ch) {
        return Character.isUpperCase(ch);
    }

    public static boolean isWhiteSpace(char ch) {
        return Character.isWhitespace(ch);
    }
}

寫完了自定義函數,接下來看看在Spark中如何調用自己定義的函數。在這里我碰到了很多奇怪的問題。我們的樣本數據如下:

{"created":1542020126816,"nick":"a357410","uid":123456}

{"created":1542020138522,"nick":"alichao","signature":"┌?┐?┊雨┊?┊蒙┊?┊蒙┊?└?┘","uid":123456}

{"created":1542020127633,"details":"走過了之后才明白,往事是用來回憶的,幸福是用來感受的,傷痛是用來成長的。。。。","nick":"菲兒","signature":"游戲有你更精彩","uid":123456}

可以看出,樣本數據中即有昵稱字符、又有簽名字段、還有created 字段……而我們只針對昵稱字段進行預分類。

首先將樣本數據nick_class.json上傳到HDFS:

./bin/hdfs dfs -put ~/data_spark/nick_class.json /user/xxx/

然后程序中加載數據:

        Dataset<Row> dataset = spark.read().format("json").option("header", "false")
                .load("hdfs://localhost:9000/user/xxx/nick_class.json");
        dataset.show(10);

對樣本中昵稱為空的字段進行過濾,並只選取昵稱字段應用到Spark UDF上:

        Dataset<Row> nickDataset = dataset.filter(col("nick").isNotNull()).select(col("nick"));
        nickDataset.show();
        nickDataset.printSchema();

先來一個簡單一點的Spark中內置的UDF函數:小寫字母轉換成大寫,哈哈。

        UserDefinedFunction mode = udf(
                (String nick) -> nick.toUpperCase(), DataTypes.StringType);
        Dataset upperNickDataFrame = nickDataset.select(mode.apply(col("nick")));
        System.out.println(upperNickDataFrame.count());
        upperNickDataFrame.show();

對昵稱字符串進行分類的用戶自定義函數NickFormatterUDF.java。創建對象注冊到SparkSession中即可,創建臨時視圖就是方便后面可通過SQL形式對nickDataFrame進行調用。

        UDF1<String, String> nickPreClassificationUDF = new NickFormatterUDF();
        spark.udf().register("nick_classifier", nickPreClassificationUDF, DataTypes.StringType);
        nickDataset.createOrReplaceTempView("nickDataFrame");

通過functions.callUDF調用UDF:

        Dataset nickClassifyDF = nickDataset.select(functions.callUDF("nick_classifier", col("nick")));
        System.out.println(nickClassifyDF.count());
        nickClassifyDF.show();

Spark作業提交運行

這篇文章中介紹搭建Spark遠程調試開發環境,本以為能夠在遠程調試環境中運行,但每次執行到 upperNickDataFrame.show();或者 nickClassifyDF.show();就拋出異常:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues

一直以為是自定義函數的bug,找了好久沒有找到原因,后來在SPARK-18075發現:原來是Spark提交作業的方式有問題。

在自己的Intellij 開發環境下以debug調試運行Spark應用程序固然方便,但這不符合官方推薦的以打成jar包的方式運行Spark作業這種方式。

It is very convenient to write Spark code in an IDE as part of a larger application framework and test it in development by simply running the main function, instead of packaging it into a jar for every single change and submitting this jar to a cluster. Often you have to run it on a remote cluster even for development, especially when handling large quantities of data.

於是:mvn package 將這種工程打成nick_classifier.jar包,上傳到服務器上。以命令:./spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class net.hapjin.spark.nick.SparkNickPreClassification nick_classifier.jar

結果還是報同樣的錯誤,或者連接拒絕錯誤。出現這個問題,主要是環境配置不一致的問題:

  • 程序代碼里面創建SparkSession時,需要指定Spark Master地址,這個地址是填 spark://ip:port,還是填spark://master_name:port,還是填spark://localhost:port這個要視集群配置而定了。
  • 第二個是:/etc/hosts里面配置的主機名到ip地址的映射
  • 第三個是conf/spark-env.sh里面的參數:SPARK_LOCAL_IP的設置。

記錄一下我在實驗環境下運行的結果:

spark-2.3.1-bin-hadoop2.7、hadoop-2.7.7、按hadoop官網的Standalone Operation方式配置啟動HDFS。

./spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class net.hapjin.spark.nick.SparkNickPreClassification nick_classifier.jar提交運行。

源碼如下:

package net.hapjin.spark.nick;

import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.udf;

/**
 * @author psj
 * @date 2018/11/16
 */
public class SparkNickPreClassification {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder().appName("nick_classification")
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> dataset = spark.read().format("json").option("header", "false")
                .load("hdfs://localhost:9000/user/xxx/nick_class.json");
        dataset.show(10);

        Dataset<Row> nickDataset = dataset.filter(col("nick").isNotNull()).select(col("nick"), col("uid"));
        nickDataset.show();
        nickDataset.printSchema();

        //https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser
//        spark.udf().register("udfUpperCase", (String string) -> string.toUpperCase(), DataTypes.StringType);
//        Dataset<Row> df = nickDataset.withColumn("upper", callUDF("udfUpperCase", nickDataset.col("nick")));
//        System.out.println(df.count());
//        df.show();

        //https://issues.apache.org/jira/browse/SPARK-18075
        //沒有按標准來運行spark app jar
        UserDefinedFunction mode = udf(
                (String nick) -> nick.toUpperCase(), DataTypes.StringType);
        Dataset upperNickDataFrame = nickDataset.select(mode.apply(col("nick")));
        System.out.println(upperNickDataFrame.count());
        upperNickDataFrame.show();

        UDF1<String, String> nickPreClassificationUDF = new NickFormatterUDF();
        spark.udf().register("nick_classifier", nickPreClassificationUDF, DataTypes.StringType);
        nickDataset.createOrReplaceTempView("nickDataFrame");

//        spark.sql("select uid, nick_classifier(nick) from nickDataFrame").show();
        Dataset nickClassifyDF = nickDataset.select(functions.callUDF("nick_classifier", col("nick")), col("nick"), col("uid"));
        System.out.println(nickClassifyDF.count());
        //https://stackoverflow.com/questions/39953245/how-to-fix-java-lang-classcastexception-cannot-assign-instance-of-scala-collect
        nickClassifyDF.show();
    }
}

最終運行出來的結果:可以看出已經成功地將昵稱轉換成 自定義的 字符模式。左邊列就是每個昵稱的模式,右邊列是實際的昵稱。比如第一行:UU 代表兩個大寫的英文字符,而左邊的nick是"JX"(意味着將所有 兩個大寫字母 的昵稱 轉換成類別 UU 了)

后面會將 這些自定義的字符模式 歸為幾個類別,然后在每個類別上進行聚類分析。

參考資料:

[基於層次聚類的虛假用戶檢測 ]

Spark Java API 計算 Levenshtein 距離

Spark Java API 之 CountVectorizer

spark JAVA 開發環境搭建及遠程調試


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM