TF-IDF
TF-IDF(Term frequency-inverse document frequency ) 是文本挖掘中一種廣泛使用的特征向量化方法。TF-IDF反映了語料中單詞對文檔的重要程度。假設單詞用t表示,文檔用d表示,語料用D表示,那么文檔頻度DF(t, D)是包含單詞t的文檔數。如果我們只是使用詞頻度量重要性,就會很容易過分強調重負次數多但攜帶信息少的單詞,例如:”a”, “the”以及”of”。如果某個單詞在整個語料庫中高頻出現,意味着它沒有攜帶專門針對某特殊文檔的信息。逆文檔頻度(IDF)是單詞攜帶信息量的數值度量。

其中 |D|是語料中的文檔總數。由於使用了log計算,如果單詞在所有文檔中出現,那么IDF就等於0。注意這里做了平滑處理(+1操作),防止單詞沒有在語料中出現時IDF計算中除0。TF-IDF度量是TF和IDF的簡單相乘:

事實上詞頻和文檔頻度的定義有多重變體。在MLlib中,為了靈活性我們將TF和IDF分開處理。
MLlib中詞頻統計的實現使用了hashing trick(散列技巧),也就是使用哈希函數將原始特征映射到一個數字索引。然后基於這個
索引來計算詞頻。這個方法避免了全局的單詞到索引的映射,全局映射對於大量語料有非常昂貴的計算/存儲開銷;但是該方法也帶
來了潛在哈希沖突的問題,不同原始特征可能會被映射到相同的索引。為了減少沖突率,我們可以提升目標特征的維度,例如,哈
希表中桶的數量。默認特征維度是220 = 1048576。
from pyspark import SparkContext from pyspark.mllib.feature import HashingTF sc = SparkContext() # Load documents (one per line). documents = sc.textFile("...").map(lambda line: line.split(" ")) hashingTF = HashingTF() tf = hashingTF.transform(documents) from pyspark.mllib.feature import IDF # ... continue from the previous example tf.cache()idf = IDF().fit(tf) tfidf = idf.transform(tf) # ... continue from the previous example tf.cache()idf = IDF(minDocFreq=2).fit(tf) tfidf = idf.transform(tf)
Word2Vec:
Word2Vec 計算單詞的向量表示。這種表示的主要優點是相似的詞在向量空間中離得近,這使得向新模式的泛化更容易並且模型估計更魯棒。向量表示在諸如命名實體識別、歧義消除、句子解析、打標簽以及機器翻譯等自然語言處理程序中比較有用。
Model:
MLlib中的Word2Vec實現,使用的是skip-gram模型。skip-gram的目標函數是學習擅長預測同一個句子中詞的上下文的詞向量表示。用數學語言表達就是,給定一個訓練單詞序列:w1, w2, …, wT, skip-gram模型的目標是最大化平均log似然函數(log-likelihood):

其中k是訓練窗口的大小,也就是給定一個詞,需要分別查看前后k個詞。
在skip-gram模型中,每個詞w跟兩個向量uw和vw關聯:uw是w的詞向量表示,是vw上下文。給定單詞wj,正確預測單詞wi的概率取決於softmax模型:

使用softmax的skip-gram模型開銷很大,因為log p(wi|wj)的計算量跟V成比例,而V很可能在百萬量級。為了加速Word2Vec的訓練,我們引入了層次softmax,該方法將計算log p(wi|wj)時間復雜度降低到了O(log(V))。
from pyspark import SparkContext from pyspark.mllib.feature import Word2Vec sc = SparkContext(appName='Word2Vec') inp = sc.textFile("text8_lines").map(lambda row: row.split(" ")) word2vec = Word2Vec() model = word2vec.fit(inp) synonyms = model.findSynonyms('china', 40) for word, cosine_distance in synonyms: print("{}: {}".format(word, cosine_distance))
標准化(StandardScaler)
標准化是指:對於訓練集中的樣本,基於列統計信息將數據除以方差或(且)者將數據減去其均值(結果是方差等於1,數據在0附近)。這是很常用的預處理步驟。
例如,當所有的特征具有值為1的方差且/或值為0的均值時,SVM的徑向基函數(RBF)核或者L1和L2正則化線性模型通常有更好的效果。
標准化可以提升模型優化階段的收斂速度,還可以避免方差很大的特征對模型訓練產生過大的影響。
模型擬合[Model Fitting]:
StandardScaler的構造函數具有下列參數:
withMean 默認值False. 在尺度變換(除方差)之前使用均值做居中處理(減去均值)。這會導致密集型輸出,所以在稀疏數據上無效。
withStd 默認值True. 將數據縮放(尺度變換)到單位標准差。
StandardScaler.fit()方法以RDD[Vector]為輸入,計算匯總統計信息,然后返回一個模型,該模型可以根據StandardScaler配置將輸入數據轉換為標准差為1,均值為0的特征。
模型中還實現了VectorTransformer,這個類可以對Vector和RDD[Vector]做轉化。
注意:如果某特征的方差是0,那么標准化之后返回默認的0.0作為特征值。
from pyspark.mllib.util import MLUtils from pyspark.mllib.linalg import Vectors from pyspark.mllib.feature import StandardScaler data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") label = data.map(lambda x: x.label) features = data.map(lambda x: x.features) scaler1 = StandardScaler().fit(features) scaler2 = StandardScaler(withMean=True, withStd=True).fit(features) # scaler3 is an identical model to scaler2, and will produce identical transformations scaler3 = StandardScalerModel(scaler2.std, scaler2.mean) # data1 will be unit variance. data1 = label.zip(scaler1.transform(features)) # Without converting the features into dense vectors, transformation with zero mean will raise # exception on sparse vector. # data2 will be unit variance and zero mean. data2 = label.zip(scaler1.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
歸一化(Normalizer):
歸一化是指將每個獨立樣本做尺度變換從而是該樣本具有單位Lp范數。這是文本分類和聚類中的常用操作。例如,兩個做了L2歸一化的TF-IDF向量的點積是這兩個向量的cosine(余弦)相似度。
Normalizer 的構造函數有以下參數:
在Lp空間的p范數, 默認p=2。
Normlizer實現了VectorTransformer ,這個類可以對Vector和RDD[Vector]做歸一化。
注意:如果輸入的范數是0,會返回原來的輸入向量。
from pyspark.mllib.util import MLUtils from pyspark.mllib.linalg import Vectors from pyspark.mllib.feature import Normalizer data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") labels = data.map(lambda x: x.label) features = data.map(lambda x: x.features) normalizer1 = Normalizer() normalizer2 = Normalizer(p=float("inf")) # Each sample in data1 will be normalized using $L^2$ norm. data1 = labels.zip(normalizer1.transform(features)) # Each sample in data2 will be normalized using $L^\infty$ norm. data2 = labels.zip(normalizer2.transform(features))
特征選擇:
卡方選擇(ChiSqSelector):
Feature selection特征選擇是指為建模過程選擇最相關的特征。特征選擇降低了向量空間的大小,從而降低了后續向量操作的時間復雜度。選擇的特征的數量可以通過驗證集來調節。
ChiSqSelector是指使用卡方(Chi-Squared)做特征選擇。該方法操作的是有標簽的類別型數據。ChiSqSelector基於卡方檢驗來排 序數據,然后選出卡方值較大(也就是跟標簽最相關)的特征(topk)。
模型擬合
ChiSqSelector 的構造函數有如下特征:
numTopFeatures 保留的卡方較大的特征的數量。
ChiSqSelector.fit() 方法以具有類別特征的RDD[LabeledPoint]為輸入,計算匯總統計信息,然后返回
ChiSqSelectorModel,這個類將輸入數據轉化到降維的特征空間。
模型實現了 VectorTransformer,這個類可以在Vector和RDD[Vector]上做卡方特征選擇。
注意:也可以手工構造一個ChiSqSelectorModel,需要提供升序排列的特征索引。
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.feature.ChiSqSelector; import org.apache.spark.mllib.feature.ChiSqSelectorModel; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.util.MLUtils; SparkConf sparkConf = new SparkConf().setAppName("JavaChiSqSelector"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<LabeledPoint> points = MLUtils.loadLibSVMFile(sc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD().cache(); // Discretize data in 16 equal bins since ChiSqSelector requires categorical features // Even though features are doubles, the ChiSqSelector treats each unique value as a category JavaRDD<LabeledPoint> discretizedData = points.map( new Function<LabeledPoint, LabeledPoint>() { @Override public LabeledPoint call(LabeledPoint lp) { final double[] discretizedFeatures = new double[lp.features().size()]; for (int i = 0; i < lp.features().size(); ++i) { discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16); } return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures)); } }); // Create ChiSqSelector that will select top 50 of 692 features ChiSqSelector selector = new ChiSqSelector(50); // Create ChiSqSelector model (selecting features) final ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd()); // Filter the top 50 features from each feature vector JavaRDD<LabeledPoint> filteredData = discretizedData.map( new Function<LabeledPoint, LabeledPoint>() { @Override public LabeledPoint call(LabeledPoint lp) { return new LabeledPoint(lp.label(), transformer.transform(lp.features())); } }); sc.stop();
ElementwiseProduct:
ElementwiseProduct對輸入向量的每個元素乘以一個權重向量的每個元素,對輸入向量每個元素逐個進行放縮。這個稱為對輸入向量v 和變換向量scalingVec 使用Hadamard product(阿達瑪積)進行變換,最終產生一個新的向量。用向量 w 表示 scalingVec ,則Hadamard product可以表示為
Vect(v_1, … , v_N)\o Vect(w_1, … , w_N) = Vect(v_1 w_1, … , v_N w_N)
Hamard 乘積需要配置一個權向量 scalingVec
1) scalingVec: 變換向量
ElementwiseProduct實現 VectorTransformer 方法,就可以對向量乘以權向量,得到新的向量,或者對RDD[Vector] 乘以權向量得到RDD[Vector]
import java.util.Arrays; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.feature.ElementwiseProduct; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; // Create some vector data; also works for sparse vectors JavaRDD<Vector> data = sc.parallelize(Arrays.asList( Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0))); Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0); ElementwiseProduct transformer = new ElementwiseProduct(transformingVector); // Batch transform and per-row transform give the same results: JavaRDD<Vector> transformedData = transformer.transform(data); JavaRDD<Vector> transformedData2 = data.map( new Function<Vector, Vector>() { @Override public Vector call(Vector v) { return transformer.transform(v); } });
PCA:
PCA可以將特征向量投影到低維空間,實現對特征向量的降維。
import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.feature.PCA val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) }.cache() val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) val pca = new PCA(training.first().features.size/2).fit(data.map(_.features)) val training_pca = training.map(p => p.copy(features = pca.transform(p.features))) val test_pca = test.map(p => p.copy(features = pca.transform(p.features))) val numIterations = 100 val model = LinearRegressionWithSGD.train(training, numIterations) val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations) val valuesAndPreds = test.map { point => val score = model.predict(point.features) (score, point.label)} val valuesAndPreds_pca = test_pca.map { point => val score = model_pca.predict(point.features) (score, point.label)} val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean() val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean() println("Mean Squared Error = " + MSE)println("PCA Mean Squared Error = " + MSE_pca)