Spark實現TF-IDF——文本相似度計算


        在Spark1.2之后,Spark自帶實現TF-IDF接口,只要直接調用就可以,但實際上,Spark自帶的詞典大小設置較於古板,如果設置小了,則導致無法計算,如果設置大了,Driver端回收數據的時候,容易發生OOM,所以更多時候都是自己根據實際情況手動實現TF-IDF。不過,在本篇文章中,兩種方式都會介紹。

數據准備:

        val df = ss.sql("select * from bigdatas.news_seg") //如果hive表的數據沒有切詞,則先對數據進行切詞操作(hive里面每一行是用空格將各個詞連接的字符串,或者說是一篇文章,結尾使用##@@##標識),得到一個數組類型數據
        val df_seg = df.selectExpr("split(split(sentence,'##@@##')[0],' ') as seg")

一、Spark自帶TF-IDF

1、Spark自帶TF實現

        首先需要實例化HashingTF,這個類用於根據給傳入的各篇已經分好詞的文章,對里面的每個詞進行hashing計算,每個hashing值對應詞表的一個位置,以及對每個詞在每篇文章中的一個統計;

        這個類有一個方法setBinaty()可以設置其統計時的計算方式:多項式分布計算和伯努利分布計算:

  • setBinary(false):多項式分布計算,一個詞在一篇文章中出現多少次,計算多少次;
  • setBinary(true):伯努利分布計算,一個詞在一篇文章中,不管多少次,只要出現了,就為1,否則為0

        還有一個重要方法setNumFeatures(),用於設置詞表的大小,默認是2^18。

        實例化HashingTF之后,使用transform就可以計算詞頻(TF)。

TF代碼實現:

// 多項式分布計算
        val hashingTF = new HashingTF() .setBinary(false) .setInputCol("seg") .setOutputCol("feature_tf") .setNumFeatures(1<<18) // 伯努利分布計算
        val hashingTF_BN = new HashingTF() .setBinary(true) .setInputCol("seg") .setOutputCol("feature_tf") .setNumFeatures(1<<18) /** * hashingTF.transform(df_seg):轉換之后會在原來基礎上增加一列,就是setOutputCol("feature_tf")設置的列 * 新增列的數據結構為:(詞表大小,[該行數據的每個詞對應詞表的hashCode],[該行數據的每個詞在該行數據出現的次數,即多項式統計詞頻]) */ val df_tf = hashingTF.transform(df_seg).select("feature_tf")

最后列“feature_tf”的數據結構為(詞典大小, [hashingCode], [term freq])。

2、Spark自帶實現TF-IDF

對word進行idf加權(完成tf計算的基礎上)

        實現原理跟上一步的TF類似,但多出一步,這一步是用於掃描一次上一步計算出來的tf數據。

代碼如下:

        val idf = new IDF() .setInputCol("feature_tf") .setOutputCol("feature_tfidf") .setMinDocFreq(2) //fit():內部對df_tf進行遍歷,統計doc Freq,這個操作是在tf完成后才做的
        val idfModel = idf.fit(df_tf) val df_tfidf = idfModel.transform(df_tf).select("feature_tfidf")

二、自己實現TF-IDF

手動實現,有計算的先后問題,必須先算DocFreq(DF),再算TermFreq(TF)。

1、doc Freq 文檔頻率計算 -> 同時可以得到所有文章的單詞集合(詞典)

        df的數據是每一行代表一篇文章,那么在計算某個詞出現的文章次數,那么轉化為某個詞的統計。即將一篇文章切好詞之后,放在一個set集合里面,表示這個set集合的每個詞出現1次;那么將所有文章的詞都切好,放在set集合里面,每篇文章擁有一個set集合,然后再根據詞groupBy,Count,就可以得到每個詞的DocFreq。

        val setUDF = udf((str:String)=>str.split(" ").distinct) // 1.1、對每篇文章的詞進行去重操作,即set集合
        val df_set = df.withColumn("words_set",setUDF(df("sentence"))) // 1.2、行轉列,groupby、count,順帶可以求出詞典大小,以及每個詞對應在詞典的位置index
        val docFreq_map = df_set.select(explode(df_set("words_set")).as("word")) .groupBy("word") .count() .rdd .map(x=>(x(0).toString,x(1).toString)) .collectAsMap() //collect有一個重要特性,就是會將數據回收到Driver端,方便分發

上面計算出來的數據格式為:Map(word->wordCount),或理解為:Map(word->DocFreq)

        除了計算出DF外,還要順便計算詞典大小,因為詞典大小代表了向量的大小,以及每個詞對應詞典的位置。

        val dictSize = docFreq_map.size // 對單詞進行編碼,得到索引,類型為int,每個詞對應於[0,dictSize-1]區間的一個位置
        val wordEncode = docFreq_map.keys.zipWithIndex.toMap

2、term Freq 詞頻計算 -> 同時計算tf-idf

        詞頻計算其實就是做wordCount,這里重點還需要順帶計算TF-IDF。

        val docCount = df_seg.count() val mapUDF = udf { str: String =>
    //每一行處理就是處理一篇文章 // wordCOunt
            val tfMap = str.split("##@##")(0).split(" ") .map((_,1L)) .groupBy(_._1) .mapValues(_.length) // tfMap{term->termCount} // docFreq_map{term->termDocCount} // wordEncode{term->index} // 處理后的tfIDFMap數據結果為:[(index:int,tf_idf:Double)] //必須要處理成這種形式
            val tfIDFMap = tfMap.map{x=> val idf_v = math.log10(docCount.toDouble/(docFreq_map.getOrElse(x._1,"0.0").toDouble+1)) (wordEncode.getOrElse(x._1,0),x._2.toDouble * idf_v) } // 做成向量:第一個參數為向量大小(詞典大小);第二個參數用於給指定的index賦值為tf-idf
 Vectors.sparse(dictSize,tfIDFMap.toSeq) } val dfTF = df.withColumn("tf_idf",mapUDF(df("sentence")))

三、完整Demo代碼

package com.cjs import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.ml.feature.HashingTF import org.apache.spark.ml.feature.IDF import org.apache.spark.ml.linalg.Vectors import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ object TFIDFTransform { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) val conf = new SparkConf() .set("spark.some.config.option","some-value") val ss = SparkSession .builder() .config(conf) .enableHiveSupport() .appName("test_tf-idf") //.master("local[2]") //單機版配置,集群情況下,可以去掉
 .getOrCreate() val df = ss.sql("select * from bigdatas.news_seg") //如果hive表的數據沒有切詞,則先對數據進行切詞操作(hive里面每一行是用空格將各個詞連接的字符串,或者說是一篇文章,結尾使用##@@##標識),得到一個數組類型數據
        val df_seg = df.selectExpr("split(split(sentence,'##@@##')[0],' ') as seg") val docCount = df_seg.count() //一、spark自帶tf-idf實現 //詞典默認是2^20,先給一個詞的一個hashCode,對應於詞典的一個位置 //詞典空間過大,Driver進行數據回收時,容易出現OOM // 1、spark自帶TF實現
        /** setBinary: false:多項式分布 -> 一個詞在一篇文章中出現多少次,計算多少次 true:伯努利分布 -> 一個詞在一篇文章中,不管多少次,只要出現了,就為1,否則為0 **/
        /** * setInputCol("seg") :輸入參數(DF)的列名 * setOutputCol("feature_tf"):輸出結果(結果)的列名 * setNumFeatures(1<<18):設置詞表大小,默認是1<<18 */
// 多項式分布計算
        val hashingTF = new HashingTF() .setBinary(false) .setInputCol("seg") .setOutputCol("feature_tf") .setNumFeatures(1<<18) // 伯努利分布計算
        val hashingTF_BN = new HashingTF() .setBinary(true) .setInputCol("seg") .setOutputCol("feature_tf") .setNumFeatures(1<<18) /** * hashingTF.transform(df_seg):轉換之后會在原來基礎上增加一列,就是setOutputCol("feature_tf")設置的列 * 新增列的數據結構為:(詞表大小,[該行數據的每個詞對應詞表的hashCode],[該行數據的每個詞在該行數據出現的次數,即多項式統計詞頻]) */ val df_tf = hashingTF.transform(df_seg).select("feature_tf") // 2、spark自帶IDF實現, 對word進行idf加權(完成tf計算的基礎上)
        val idf = new IDF() .setInputCol("feature_tf") .setOutputCol("feature_tfidf") .setMinDocFreq(2) //fit():內部對df_tf進行遍歷,統計doc Freq,這個操作是在tf完成后才做的
        val idfModel = idf.fit(df_tf) val df_tfidf = idfModel.transform(df_tf).select("feature_tfidf") //二、自己實現tf-idf,有順序的實現 //1、doc Freq 文檔頻率計算 -> 同時可以得到所有文章的單詞集合(詞典)
        val setUDF = udf((str:String)=>str.split(" ").distinct) // 1.1、對每篇文章的詞進行去重操作,即set集合
        val df_set = df.withColumn("words_set",setUDF(df("sentence"))) // 1.2、行轉列,groupby、count,順帶可以求出詞典大小,以及每個詞對應在詞典的位置index
        val docFreq_map = df_set.select(explode(df_set("words_set")).as("word")) .groupBy("word") .count() .rdd // .map(x=>(x(0).toString,x(1).toString))
            .map(x=>(x(0).toString,math.log10(docCount.toDouble/(x(1).toString.toDouble+1))))   //順帶計算idf
            .collectAsMap() //collect有一個重要特性,就是會將數據回收到Driver端,方便分發
        val dictSize = docFreq_map.size // 對單詞進行編碼,得到索引,類型為int,每個詞對應於[0,dictSize-1]區間的一個位置
        val wordEncode = docFreq_map.keys.zipWithIndex.toMap //2、term Freq 詞頻計算 // 返回數據結構:
        val mapUDF = udf { str: String =>
    //每一行處理就是處理一篇文章 // wordCOunt
            val tfMap = str.split("##@##")(0).split(" ") .map((_,1L)) .groupBy(_._1) .mapValues(_.length) // tfMap{term->termCount} // docFreq_map{term->termDocCount} // wordEncode{term->index} // 處理后的tfIDFMap數據結果為:[(index:int,tf_idf:Double)] //必須要處理成這種形式
            val tfIDFMap = tfMap.map{x=>
// val idf_v = math.log10(docCount.toDouble/(docFreq_map.getOrElse(x._1,"0.0").toDouble+1)) // (wordEncode.getOrElse(x._1,0),x._2.toDouble * idf_v)
                val idf_v = docFreq_map.getOrElse(x._1,0.0) (wordEncode.getOrElse(x._1,0),x._2.toDouble * idf_v)  //已經在第1步算出idf情況下使用
 } // 做成向量:第一個參數為向量大小(詞典大小);第二個參數用於給指定的index賦值為tf-idf
 Vectors.sparse(dictSize,tfIDFMap.toSeq) } val dfTF = df.withColumn("tf_idf",mapUDF(df("sentence"))) } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM