pyspark minHash LSH 查找相似度


先看看官方文檔:

MinHash for Jaccard Distance

MinHash is an LSH family for Jaccard distance where input features are sets of natural numbers. Jaccard distance of two sets is defined by the cardinality of their intersection and union:

d(A,B)=1|AB||AB|d(A,B)=1−|A∩B||A∪B|

MinHash applies a random hash function g to each element in the set and take the minimum of all hashed values:

h(A)=minaA(g(a))h(A)=mina∈A(g(a))

 

The input sets for MinHash are represented as binary vectors, where the vector indices represent the elements themselves and the non-zero values in the vector represent the presence of that element in the set. While both dense and sparse vectors are supported, typically sparse vectors are recommended for efficiency. For example, Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) means there are 10 elements in the space. This set contains elem 2, elem 3 and elem 5. All non-zero values are treated as binary “1” values.

Note: Empty sets cannot be transformed by MinHash, which means any input vector must have at least 1 non-zero entry.

Refer to the MinHashLSH Python docs for more details on the API.

from pyspark.ml.feature import MinHashLSH from pyspark.ml.linalg import Vectors from pyspark.sql.functions import col dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),), (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),), (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)] dfA = spark.createDataFrame(dataA, ["id", "features"]) dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),), (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),), (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)] dfB = spark.createDataFrame(dataB, ["id", "features"]) key = Vectors.sparse(6, [1, 3], [1.0, 1.0]) mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5) model = mh.fit(dfA) # Feature Transformation print("The hashed dataset where hashed values are stored in the column 'hashes':") model.transform(dfA).show() # Compute the locality sensitive hashes for the input rows, then perform approximate # similarity join. # We could avoid computing hashes by passing in the already-transformed dataset, e.g. # `model.approxSimilarityJoin(transformedA, transformedB, 0.6)` print("Approximately joining dfA and dfB on distance smaller than 0.6:") model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\ .select(col("datasetA.id").alias("idA"), col("datasetB.id").alias("idB"), col("JaccardDistance")).show() # Compute the locality sensitive hashes for the input rows, then perform approximate nearest # neighbor search. # We could avoid computing hashes by passing in the already-transformed dataset, e.g. # `model.approxNearestNeighbors(transformedA, key, 2)` # It may return less than 2 rows when not enough approximate near-neighbor candidates are # found. print("Approximately searching dfA for 2 nearest neighbors of the key:") model.approxNearestNeighbors(dfA, key, 2).show() 
Find full example code at "examples/src/main/python/ml/min_hash_lsh_example.py" in the Spark repo.

[PySpark] LSH相似度計算

一、問題場景

假設我們要找海量用戶中哪些是行為相似的——

用戶A:

id: 1001
name: 用戶A
data: "07:00 吃早餐,09:00 工作,12:00 吃午飯,13:00 打王者,18:00 吃晚飯,22:00 睡覺"
mat: "1010001000010001100001101010000"

用戶B:

id: 1002
name: 用戶B
data: "07:00 晨運,08:00 吃早餐,12:30 吃午飯,13:00 學習,19:00 吃晚飯,21:00 學習,23:00 睡覺"
mat: "1110001000010000001011101010000"

用戶C:......

mat是對用戶的數據的特征化描述,比如可以定義第一位為“早起”,第二位為“晨運”,第三位為“吃早餐”,那么我們有了這個矩陣,怎么找到和他相近行為習慣的人呢?

從描述的one-hot向量中,我們看到A和B其實有很多相似性,但有部分不同,比如A打王者、但是B愛學習——

用戶A: "1010001000010000001001101010000"
用戶B: "1110001000010000000011101010000"

這就可以用LSH大法了。

二、思路介紹

Q:LSH相似度用來干嘛?
A:全稱是“局部敏感哈希”(Locality Sensitive Hashing)。能在特征向量相似又不完全相同的情況下,找出盡可能近的樣本。當然了,還是需要先定義好特征,再用LSH方法。

參考資料:Extracting, transforming and selecting features(特征的提取,轉換和選擇)

工作中的問題是如何在海量數據中跑起來,pyspark實現時,有MinHashLSH, BucketedRandomProjectionLSH兩個選擇。

MinHashLSH

MinHash 是一個用於Jaccard 距離的 LSH family,它的輸入特征是自然數的集合。 兩組的Jaccard距離由它們的交集和並集的基數定義:

MinHash 將隨機哈希函數g應用於集合中的每個元素,並取得所有哈希值中的最小值。

BucketedRandomProjectionLSH(歐幾里得度量的隨機投影)

隨機桶投影是用於歐幾里德距離的 LSH family。 歐氏度量的定義如下:

其LSH family將向量x特征向量映射到隨機單位矢量v,並將映射結果分為哈希桶中:

其中r是用戶定義的桶長度,桶長度可用於控制哈希桶的平均大小(因此也可用於控制桶的數量)。 較大的桶長度(即,更少的桶)增加了將特征哈希到相同桶的概率(增加真實和假陽性的數量)。

桶隨機投影接受任意向量作為輸入特征,並支持稀疏和密集向量。

三、代碼實現

不多說了,直接上代碼吧。

import os import re import hashlib from pyspark import SparkContext, SparkConf from pyspark import Row from pyspark.sql import SQLContext, SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql.functions import udf,collect_list, collect_set from pyspark.ml.feature import MinHashLSH, BucketedRandomProjectionLSH from pyspark.ml.linalg import Vectors, VectorUDT # 控制spark服務啟動 spark = SparkSession.builder.appName('app_name').getOrCreate() spark.stop() spark = SparkSession.builder.appName('app_name').getOrCreate() class PySpark(object): @staticmethod def execute(df_input): """  程序入口,需用戶重載  :return:必須返回一個DataFrame類型對象  """ # step 1:讀入DataFrame df_mid = df_input.select('id','name','data','mat') # step 2:特征向量預處理 def mat2vec(mat): """  定義UDF函數,將特征矩陣向量化  :return:返回相似度計算所需的VectorUDT類型  """ arr = [0.0]*len(mat) for i in range(len(mat)): if mat[i]!='0': arr[i]=1.0 return Vectors.dense(arr) udf_mat2vec = udf(mat2vec,VectorUDT()) df_mid = df_mid.withColumn('vec', udf_mat2vec('mat')).select( 'id','name','data','mat','vec') # step 3:計算相似度 ## MinHashLSH,可用EuclideanDistance minlsh = MinHashLSH(inputCol="vec", outputCol="hashes", seed=123, numHashTables=3) model_minlsh = minlsh.fit(df_mid) ## BucketedRandomProjectionLSH brplsh = BucketedRandomProjectionLSH(inputCol="vec", outputCol="hashes", seed=123, bucketLength=10.0, numHashTables=10) model_brplsh = brplsh.fit(df_mid) # step 4:計算(忽略自相似,最遠距離限制0.8) ## model_brplsh類似,可用EuclideanDistance df_ret = model_minlsh.approxSimilarityJoin(df_mid, df_mid, 0.8, distCol='JaccardDistance').select( col("datasetA.id").alias("id"), col("datasetA.name").alias("name"), col("datasetA.data").alias("data"), col("datasetA.mat").alias("mat"), col("JaccardDistance").alias("distance"), col("datasetB.id").alias("ref_id"), col("datasetB.name").alias("ref_name"), col("datasetB.data").alias("ref_data"), col("datasetB.mat").alias("ref_mat") ).filter("id=ref_id") return df_ret df_in = spark.createDataFrame([ (1001,"A","xxx","1010001000010000001001101010000"), (1002,"B","yyy","1110001000010000000011101010000"), (1003,"C","zzz","1101100101010111011101110111101")], ['id', 'name', 'data', 'mat']) df_out = PySpark.execute(df_in) df_out.show()

跑出來的效果是,MinHashLSH模式下,A和B距離是0.27,比較近,但A、B到C都是0.75左右,和預期相符。

好了,夠鍾上去舉鐵了……

MLlib支持兩種矩陣,dense密集型和sparse稀疏型。一個dense類型的向量背后其實就是一個數組,而sparse向量背后則是兩個並行數組——索引數組和值數組。比如向量(1.0, 0.0, 3.0)既可以用密集型向量表示為[1.0, 0.0, 3.0],也可以用稀疏型向量表示為(3, [0,2],[1.0,3.0]),其中3是數組的大小。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM