Spark Locality Sensitive Hashing (LSH)局部哈希敏感


1、概念

LSH是一類重要的散列技術,通常用於聚類,近似最近鄰搜索和大型數據集的異常檢測。
LSH的一般思想是使用一個函數族(“ LSH族”)將數據點散列(hash)到存儲桶中,以便彼此靠近的數據點很有可能位於同一存儲桶中,而彼此相距很遠的情況很可能在不同的存儲桶中。 
在度量空間(M,d)中,M是集合,d是M上的距離函數,LSH族是滿足以下屬性的函數h族:

 

 

該LSH系列稱為(r1,r2,p1,p2)敏感。

在Spark中,不同的LSH系列在單獨的類(例如MinHash)中實現,並且類似近似相似聯接和近似最近鄰居的模型在每個類中提供了用於特征轉換的API。

在LSH中,我們將假陽性定義為一對距離較遠輸入特征(d(p,q)≥r2)散列到同一存儲桶中,我們將假陰性定義為散布到不同存儲桶中的一對鄰近特征(d(p,q)≤r1)。

2、LSH操作

我們描述了LSH可以用於的主要操作類型。擬合的LSH模型具有用於每個操作的方法。

2.1、Feature Transformation 特征轉換

特征轉換是將哈希值添加為新列的基本功能。這對於降低數據維度很有用。用戶可以通過設置inputCol和outputCol來指定​​輸入和輸出列的名稱。

LSH還支持多個LSH哈希表。用戶可以通過設置numHashTables來指定哈希表的數量。這也用於近似相似連接和近似最近鄰中的OR-amplification。散列表的數量增加將提高准確性,但也會增加通信成本和運行時間。

outputCol的類型為Seq [Vector],其中數組的維數等於numHashTables,向量的維數當前設置為1。在將來的版本中,我們將實現AND-amplification,以便用戶可以指定這些向量的維數。

2.2、Approximate Similarity Join 近似相似聯接

近似相似聯接采用兩個數據集,並近似返回數據集中距離小於用戶定義閾值的行對。近似相似聯接既支持聯接兩個不同的數據集,也支持自聯接。自連接會產生一些重復的對。

近似相似性聯接接受已轉換和未轉換的數據集作為輸入。如果使用未轉換的數據集,它將被自動轉換。在這種情況下,哈希簽名將創建為outputCol。

在合並的數據集中,可以在數據集A和數據集B中查詢原始數據集。距離列將添加到輸出數據集中,以顯示返回的每對行之間的真實距離。

2.3、Approximate Nearest Neighbor Searc 近似最近鄰搜索

近似最近鄰居搜索采用(特征向量的)數據集和鍵(單個特征向量),並近似返回數據集中最接近向量的指定行數。

近似最近鄰搜索將已轉換和未轉換的數據集都接受為輸入。如果使用未轉換的數據集,它將被自動轉換。在這種情況下,哈希簽名將創建為outputCol。

距離列將添加到輸出數據集中,以顯示每個輸出行和搜索到的鍵之間的真實距離。

注意:如果哈希存儲桶中沒有足夠的候選者,則近似最近鄰居搜索將返回少於k行的結果。

 

3、LSH算法

3.1 Bucketed Random Projection for Euclidean Distance 歐式距離分桶隨機投影

分桶隨機投影是歐氏距離的LSH族。歐幾里得距離的定義如下:

 

 它的LSH系列將特征向量x投影到隨機單位向量v上,並將投影的結果分配到哈希桶中:


其中r是用戶定義的存儲區長度。存儲桶長度可用於控制哈希存儲桶的平均大小(以及存儲桶的數量)。較大的存儲桶長度(即,較少的存儲桶)增加了將特征散列到同一存儲桶的可能性(增加了真假性確定數)。 分桶隨機投影接受任意矢量作為輸入特征,並支持稀疏矢量和密集矢量。

3.2、BucketedRandomProjectionLSH code

package com.home.spark.ml

import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

/**
  * @Description: BucketedRandomProjectionLSH局部敏感哈希
  * 為歐幾里得距離度量實現局部敏感哈希函數
  *
  * 輸入是密集或稀疏向量,每個向量代表歐幾里得距離空間中的一個點。 輸出將是可配置尺寸的向量。 相同維中的哈希值由相同的哈希函數計算。
  **/
object Ex_BucketedRandomProjectionLSH {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf(true).setMaster("local[2]").setAppName("spark ml")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    val dfA = spark.createDataFrame(Seq(
      (0, Vectors.dense(1.0, 1.0)),
      (1, Vectors.dense(1.0, -1.0)),
      (2, Vectors.dense(-1.0, -1.0)),
      (3, Vectors.dense(-1.0, 1.0))
    )).toDF("id", "features")

    val dfB = spark.createDataFrame(Seq(
      (4, Vectors.dense(1.0, 1.0)),
      (5, Vectors.dense(-1.0, 0.0)),
      (6, Vectors.dense(0.0, 1.0)),
      (7, Vectors.dense(0.0, -1.0))
    )).toDF("id", "features")

    val key = Vectors.dense(1.0, 0.0)

    val brp = new BucketedRandomProjectionLSH().setInputCol("features").setOutputCol("hashes")
      //增大參數降低假陰性率,但以增加計算復雜性為代價
      .setNumHashTables(3)
      //每個哈希存儲桶的長度(較大的存儲桶可降低假陰性)
      .setBucketLength(2.0)

    val model = brp.fit(dfA)

    // Feature Transformation
    println("The hashed dataset where hashed values are stored in the column 'hashes':")
    model.transform(dfA).show(false)

    // Compute the locality sensitive hashes for the input rows, then perform approximate similarity join.
    // We could avoid computing hashes by passing in the already-transformed dataset,
    // e.g. `model.approxSimilarityJoin(transformedA, transformedB, 2.5)`
    println("Approximately joining dfA and dfB on Euclidean distance smaller than 2.5:")
    model.approxSimilarityJoin(dfA, dfB, 2.5, "EuclideanDistance")
      .select(col("datasetA.id").alias("idA"),
        col("datasetB.id").alias("idB"),
        col("EuclideanDistance")).show()

    // Compute the locality sensitive hashes for the input rows, then perform approximate nearest neighbor search.
    // We could avoid computing hashes by passing in the already-transformed dataset,
    // e.g. `model.approxNearestNeighbors(transformedA, key, 2)`
    println("Approximately searching dfA for 2 nearest neighbors of the key:")
    model.approxNearestNeighbors(dfA, key, 2).show(false)

    spark.stop()
  }
}
+---+-----------+-----------------------+
|id |features   |hashes                 |
+---+-----------+-----------------------+
|0  |[1.0,1.0]  |[[0.0], [0.0], [-1.0]] |
|1  |[1.0,-1.0] |[[-1.0], [-1.0], [0.0]]|
|2  |[-1.0,-1.0]|[[-1.0], [-1.0], [0.0]]|
|3  |[-1.0,1.0] |[[0.0], [0.0], [-1.0]] |
+---+-----------+-----------------------+

Approximately joining dfA and dfB on Euclidean distance smaller than 2.5:
+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|  0|  6|              1.0|
|  0|  5| 2.23606797749979|
|  1|  5| 2.23606797749979|
|  1|  7|              1.0|
|  3|  5|              1.0|
|  3|  6|              1.0|
|  2|  7|              1.0|
|  2|  5|              1.0|
|  3|  4|              2.0|
|  0|  4|              0.0|
+---+---+-----------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+----------+-----------------------+-------+
|id |features  |hashes                 |distCol|
+---+----------+-----------------------+-------+
|0  |[1.0,1.0] |[[0.0], [0.0], [-1.0]] |1.0    |
|1  |[1.0,-1.0]|[[-1.0], [-1.0], [0.0]]|1.0    |
+---+----------+-----------------------+-------+

3.3、MinHash for Jaccard Distance

傑卡德距離(Jaccard Distance) 是用來衡量兩個集合差異性的一種指標,它是傑卡德相似系數的補集,被定義為1減去Jaccard相似系數。而傑卡德相似系數(Jaccard similarity coefficient),也稱傑卡德指數(Jaccard Index),是用來衡量兩個集合相似度的一種指標。

 

 

 

MinHash是Jaccard距離的LSH系列,其中輸入特征是自然數集。兩組的Jaccard距離由其交集和並集的基數定義:

 

 

MinHash將隨機哈希函數g應用於集合中的每個元素,並采用所有哈希值中的最小值:

 

 

MinHash的輸入集表示為二進制向量,其中向量索引表示元素本身,向量中的非零值表示該元素在集合中的存在。雖然同時支持密集和稀疏向量,但通常建議使用稀疏向量以提高效率。

例如,Vectors.sparse(10,Array [(2,1.0),(3,1.0),(5,1.0)])表示空間中有10個元素。該集合包含elem 2,elem 3和elem5。所有非零值都被視為二進制“ 1”值。 注意:MinHash不能轉換空集,這意味着任何輸入向量必須至少具有1個非零條目。

3.4、code MinHashLSH

package com.home.spark.ml

import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

/**
  * @Description: LSH class for Jaccard distance.
  **/
object Ex_MinHash {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf(true).setMaster("local[2]").setAppName("spark ml")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    val dfA = spark.createDataFrame(Seq(
      (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
      (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
      (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
    )).toDF("id", "features")

    val dfB = spark.createDataFrame(Seq(
      (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
      (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
      (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
    )).toDF("id", "features")

    dfA.show(false)

    dfB.show(false)

    val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))

    val mh = new MinHashLSH()
      .setNumHashTables(5)
      .setInputCol("features")
      .setOutputCol("hashes")

    val model = mh.fit(dfA)

    // Feature Transformation
    println("The hashed dataset where hashed values are stored in the column 'hashes':")
    model.transform(dfA).show(false)

    // Compute the locality sensitive hashes for the input rows, then perform approximate similarity join.
    // We could avoid computing hashes by passing in the already-transformed dataset,
    // e.g. `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
    println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
    model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
      .select(col("datasetA.id").alias("idA"),
        col("datasetB.id").alias("idB"),
        col("JaccardDistance")).show(false)

    // Compute the locality sensitive hashes for the input rows, then perform approximate nearest neighbor search.
    // We could avoid computing hashes by passing in the already-transformed dataset,
    // e.g. `model.approxNearestNeighbors(transformedA, key, 2)`
    // It may return less than 2 rows when not enough approximate near-neighbor candidates are found.
    println("Approximately searching dfA for 2 nearest neighbors of the key:")
    model.approxNearestNeighbors(dfA, key, 2).show(false)


    spark.stop()
  }
}

The hashed dataset where hashed values are stored in the column 'hashes':
+---+-------------------------+---------------------------------------------------------------------------------+
|id |features                 |hashes                                                                           |
+---+-------------------------+---------------------------------------------------------------------------------+
|0  |(6,[0,1,2],[1.0,1.0,1.0])|[[2.25592966E8], [6.5902527E7], [2.82845246E8], [4.95314097E8], [7.01119548E8]]  |
|1  |(6,[2,3,4],[1.0,1.0,1.0])|[[2.25592966E8], [4.98143035E8], [4.76528358E8], [1.247220523E9], [1.64558731E8]]|
|2  |(6,[0,2,4],[1.0,1.0,1.0])|[[2.25592966E8], [6.5902527E7], [2.82845246E8], [4.95314097E8], [6.65063373E8]]  |
+---+-------------------------+---------------------------------------------------------------------------------+

Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:
+---+---+---------------+
|idA|idB|JaccardDistance|
+---+---+---------------+
|1  |4  |0.5            |
|0  |5  |0.5            |
|1  |5  |0.5            |
|2  |5  |0.5            |
+---+---+---------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+-------------------------+---------------------------------------------------------------------------------+-------+
|id |features                 |hashes                                                                           |distCol|
+---+-------------------------+---------------------------------------------------------------------------------+-------+
|1  |(6,[2,3,4],[1.0,1.0,1.0])|[[2.25592966E8], [4.98143035E8], [4.76528358E8], [1.247220523E9], [1.64558731E8]]|0.75   |
+---+-------------------------+---------------------------------------------------------------------------------+-------+

4、LSH 是一項有大量應用方向的多功能技術

    近似重復的檢測: LSH 通常用於對大量文檔,網頁和其他文件進行去重處理。
    全基因組的相關研究:生物學家經常使用 LSH 在基因組數據庫中鑒定相似的基因表達。
    大規模的圖片搜索: Google 使用 LSH 和 PageRank 來構建他們的圖片搜索技術VisualRank。
    音頻/視頻指紋識別:在多媒體技術中,LSH 被廣泛用於 A/V 數據的指紋識別。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM