海量文件查重SimHash和Minhash


SimHash

  事實上,傳統比較兩個文本相似性的方法,大多是將文本分詞之后,轉化為特征向量距離的度量,比如常見的歐氏距離、海明距離或者余弦角度等等。兩兩比較固然能很好地適應,但這種方法的一個最大的缺點就是,無法將其擴展到海量數據。例如,試想像Google那種收錄了數以幾十億互聯網信息的大型搜索引擎,每天都會通過爬蟲的方式為自己的索引庫新增的數百萬網頁,如果待收錄每一條數據都去和網頁庫里面的每條記錄算一下余弦角度,其計算量是相當恐怖的。 

  我們考慮采用為每一個web文檔通過hash的方式生成一個指紋(fingerprint)。傳統的加密式hash,比如md5,其設計的目的是為了讓整個分布盡可能地均勻,輸入內容哪怕只有輕微變化,hash就會發生很大地變化。我們理想當中的哈希函數,需要對幾乎相同的輸入內容,產生相同或者相近的hashcode,換句話說,hashcode的相似程度要能直接反映輸入內容的相似程度。很明顯,前面所說的md5等傳統hash無法滿足我們的需求。 

  simhash是locality sensitive hash(局部敏感哈希)的一種,最早由Moses Charikar在《similarity estimation techniques from rounding algorithms》一文中提出。Google就是基於此算法實現網頁文件查重的。

  海明距離的定義,為兩個二進制串中不同位的數量。上述三個文本的simhash結果,其兩兩之間的海明距離為(p1,p2)=4,(p1,p3)=16以及(p2,p3)=12。事實上,這正好符合文本之間的相似度,p1和p2間的相似度要遠大於與p3的。 

  如何實現這種hash算法呢?以上述三個文本為例,整個過程可以分為以下六步: 

1、選擇simhash的位數,請綜合考慮存儲成本以及數據集的大小,比如說32位 
2、將simhash的各位初始化為0 
3、提取原始文本中的特征,一般采用各種分詞的方式。比如對於"the cat sat on the mat",采用兩兩分詞的方式得到如下結果:{"th", "he", "e ", " c", "ca", "at", "t ", " s", "sa", " o", "on", "n ", " t", " m", "ma"} 
4、使用傳統的32位hash函數計算各個word的hashcode,比如:"th".hash = -502157718 
,"he".hash = -369049682,…… 
5、對各word的hashcode的每一位,如果該位為1,則simhash相應位的值加它的權重(通常是出現的頻率);否則減它的權重 
6、對最后得到的32位的simhash,如果該位大於1,則設為1;否則設為0 

整個過程可以描述為:

  按照Charikar在論文中闡述的,64位simhash,海明距離在3以內的文本都可以認為是近重復文本。當然,具體數值需要結合具體業務以及經驗值來確定。 

  利用鴿舍原理在快速查找出位數不同的數目小於等於3的算法的描述如下:

  1)先復制原表T為Tt份:T1,T2,….Tt

  2)每個Ti都關聯一個pi和一個πi,其中pi是一個整數,πi是一個置換函數,負責把pi個bit位換到高位上。

  3)應用置換函數πi到相應的Ti表上,然后對Ti進行排序

  4)然后對每一個Ti和要匹配的指紋F、海明距離k做如下運算:

    a)      然后使用F’的高pi位檢索,找出Ti中高pi位相同的集合

    b)     在檢索出的集合中比較f-pi位,找出海明距離小於等於k的指紋

  5)最后合並所有Ti中檢索出的結果

 

代碼:

#!/usr/bin/python
#-*- coding:utf-8 -*-

from __future__ import division,unicode_literals

import sys

import re
import hashlib
import collections
import datetime

reload(sys)

sys.setdefaultencoding('utf-8')

import codecs


import itertools 


lib_newsfp_file = sys.argv[1] #讀入庫中存儲的所有新聞
result_file = sys.argv[2]

test_news_fp = {}
lib_news_fp = {}

bucket = collections.defaultdict(set)

offsets = []

def cacu_frequent(list1):
	frequent = {}
	for i in list1:
		if i not in frequent:
			frequent[i] = 0
		frequent[i] += 1
	return frequent

def load_lib_newsfp_file(): 
	global lib_news_fp
	
	fin = codecs.open(lib_newsfp_file,'r','utf-8')
	for line in fin:
		lines = line.strip()
		if len(lines) == 0:
			continue
		Arr = lines.split('\t')

		if len(Arr) < 3:
			continue
		lib_news_fp[Arr[0]] = Arr[3]

def get_near_dups(check_value):
	ans = set()
	
	for key in get_keys(int(check_value)):
		dups = bucket[key]
		for dup in dups:
			total_value,url = dup.split(',',1)
			if isSimilar(int(check_value),int(total_value)) == True:
				ans.add(url)
				break  #與一條重復 退出查找
		if ans:
			break

	return list(ans)

def ini_Index():
	global bucket 
	
	getoffsets()
	print offsets
	objs = [(str(url),str(values)) for url,values in lib_news_fp.items()]
	
	for i,q in enumerate(objs):
		addindex(*q)

def addindex(url,value):
	global bucket
	for key in get_keys(int(value)):
		v = '%d,%s' % (int(value),url)
		bucket[key].add(v)

def deleteindex(url,value):
	global bucket
	for key in get_keys(int(value)):
		v = '%d,%s' %(int(value),url)
		if v in bucket[key]:
			bucket[key].remove(v)

def getoffsets(f = 64 , k = 4):
	global offsets

	offsets = [f // (k + 1) * i for i in range(k + 1)]

def get_keys(value, f = 64):
	for i, offset in enumerate(offsets):
		if i == (len(offsets) - 1):
			m = 2 ** (f - offset) - 1
		else:
			m = 2 ** (offsets[i + 1] - offset) - 1
		c = value >> offset & m
		yield '%x:%x' % (c , i)

def bucket_size():
	return len(bucket)

def isSimilar(value1,value2,n = 4,f = 64):
	ans = 0
	x = (value1 ^ value2) &((1 << f) - 1)
	while x and (ans <= n):
		ans += 1
		x &= x - 1
	if ans <= n:
		return True
	return False


def load_test_file():
	
	global test_news_fp
	
	for line in sys.stdin:
		
		features = []
		
		result = line.strip().split('\t')
		
		url = result[0]
		content = result[2].split()
		title = result[1].split()
		features.extend(content)
		features.extend(title)
		total_features = cacu_frequent(features)
		
		test_news_fp[url] = build_by_features(total_features)

def load_test_newsfp_file():

	global test_news_fp

	for line in sys.stdin:
		lines = line.strip()
		if len(lines) == 0:
			continue
		Arr = lines.split('\t')

		if len(Arr) < 3:
			continue
		test_news_fp[Arr[0]] = Arr[3]

def build_by_features(features,f=64,hashfunc=None):
	v = [0]*f
	masks = [1 << i for i in range(f+f)]
	if hashfunc is None:
		def _hashfunc(x):
			return int(hashlib.md5(x).hexdigest(),16)
		hashfunc = _hashfunc
	if isinstance(features,dict):
		total_features = features.items()
	else:
		total_features = features

	for fea in total_features:
		if isinstance(fea,basestring):
			h = hashfunc(fea.encode('utf-8'))
			w = 1
		else:
			h = hashfunc(fea[0].encode('utf-8'))
			w = fea[1]
		for i in range(f):
			v[i] += w if h & masks[i+32] else -w
	ans = 0
	
	for i in range(f):
		if v[i] >= 0:
			ans |= masks[i]
	return ans
sum = 0
def process():
	global test_news_fp
        global sum

	fout = codecs.open(result_file,'w','utf-8')
	
	load_lib_newsfp_file()	
#	load_test_file()
	ini_Index()
	check_features = test_news_fp.items()
	lib_features = lib_news_fp.items()
	i = 0
	for check_fp in check_features:
#		print i
		ans = []
		ans = get_near_dups(check_fp[1])
		if ans:
			for url in ans:
				output_str = str(check_fp[0])+'\t'+str(url)
				fout.write(output_str+'\n')
				#break
				#print check_fp[0],'is duplicate'
			sum = sum + 1	#del test_news_fp[check_fp[0]]
			print i

		i += 1
	fout.close()

if __name__ == '__main__':
#        process()	
	begin = datetime.datetime.now()
	load_test_newsfp_file()
	
#	load_test_file()
#	getoffsets()
#	print offsets
#	load_lib_newsfp_file()

	process()

	end = datetime.datetime.now()

	print '耗時:',end - begin,' 重復新聞數:',sum,' 准確率: ', sum/2589

  

 

MinHash  

1.概述


    跟SimHash一樣,MinHash也是LSH的一種,可以用來快速估算兩個集合的相似度。MinHash由Andrei Broder提出,最初用於在搜索引擎中檢測重復網頁。它也可以應用於大規模聚類問題。
 
2.Jaccard index
 
    在介紹MinHash之前,我們先介紹下Jaccard index。
 

    Jaccard index是用來計算相似性,也就是距離的一種度量標准。假如有集合A、B,那么, 也就是說,集合A,B的Jaccard系數等於A,B中共同擁有的元素數與A,B總共擁有的元素數的比例。很顯然,Jaccard系數值區間為[0,1]。

 
3.MinHash
 
    先定義幾個符號術語:
    h(x):  把x映射成一個整數的哈希函數。   
    hmin(S):集合S中的元素經過h(x)哈希后,具有最小哈希值的元素。

    那么對集合A、B,hmin(A) = hmin(B)成立的條件是A ∪ B 中具有最小哈希值的元素也在 ∩ B中。這里

有一個假設,h(x)是一個良好的哈希函數,它具有很好的均勻性,能夠把不同元素映射成不同的整數。

 

 所以有,Pr[hmin(A) = hmin(B)] = J(A,B),即集合A和B的相似度為集合A、B經過hash后最小哈希值相

等的概率。

        有了上面的結論,我們便可以根據MinHash來計算兩個集合的相似度了。一般有兩種方法:
        
        第一種:使用多個hash函數
 
        為了計算集合A、B具有最小哈希值的概率,我們可以選擇一定數量的hash函數,比如K個。然后用這K個hash函數分別對集合A、B求哈希值,對
每個集合都得到K個最小值。比如Min(A)k={a1,a2,...,ak},Min(B)k={b1,b2,...,bk}。
        那么,集合A、B的相似度為|Min(A)k ∩ Min(B)k| / |Min(A)k  ∪  Min(B)k|,及Min(A)k和 Min(B)k中相同元素個數與總的元素個數的比例。
 
       第二種:使用單個hash函數
 
       第一種方法有一個很明顯的缺陷,那就是計算復雜度高。使用單個hash函數是怎么解決這個問題的呢?請看:
       前面我們定義過 h min(S)為集合S中具有最小哈希值的一個元素,那么我們也可以定義h mink(S)為集合S中具有最小哈希值的K個元素。這樣一來,
我們就只需要對每個集合求一次哈希,然后取最小的K個元素。計算兩個集合A、B的相似度,就是集合A中最小的K個元素與集合B中最小的K個元素
的交集個數與並集個數的比例。
        
      看完上面的,你應該大概清楚MinHash是怎么回事了。但是,MinHash的好處到底在哪里呢?計算兩篇文檔的相似度,就直接統計相同的詞數和總的
次數,然后就Jaccard index不就可以了嗎?對,如果僅僅對兩篇文檔計算相似度而言,MinHash沒有什么優勢,反而把問題復雜化了。但是如果有海量的文檔需要求相似度,比如在推薦系統
中計算物品的相似度,如果兩兩計算相似度,計算量過於龐大。下面我們看看MinHash是怎么解決問題的。
 
      比如 元素集合{a,b,c,d,e},其中s1={a,d},s2={c},s3={b,d,e},s4={a,c,d}   那么這四個集合的矩陣表示為:  

      

    如果要對某一個集合做MinHash,則可以從上面矩陣的任意一個行排列中選取一個,然后MinHash值是排列中第一個1的行號。
    例如,對上述矩陣,我們選取排列  beadc,那么對應的矩陣為
           
    那么,  h(S1) = a,同樣可以得到h(S2) = c, h(S3) = b, h(S4) = a。
        如果只對其中一個行排列做MinHash,不用說,計算相似度當然是不可靠的。因此,我們要選擇多個行排列來計算MinHash,最后根據Jaccard index公式 來計算相似度。但是求排列本身的復雜度比較高,特別是針對很大的矩陣來說。因此,我們可以設計一個隨機哈希函數去模擬排列,能夠把行號0~n隨機映射到0~n上。比如H(0)=100,H(1)=3...。當然,沖突是不可避免的,沖突后可以二次散列。並且如果選取的隨機哈希函數夠均勻,並且當n較大時,沖突發生的概率還是比較低的。關於隨機排列算法可以參考這篇文章:隨機排列生成算法的一些隨想
 
    說到這里,只是討論了用MinHash對海量文檔求相似度的具體過程,但是它到底是怎么減少復雜度的呢?
    比如有n個文檔,每個文檔的維度為m,我們可以選取其中k個排列求MinHash,由於每個對每個排列而言,MinHash把一篇文檔映射成一個整數,所以對k個排列計算MinHash就得到k個整數。那么所求的MinHash矩陣為n*k維,而原矩陣為n*m維。n>>m時,計算量就降了下來。

 

簡單粗暴的方法

    統計分詞后的文本中出現的各個詞,直接計算兩個文本的jaccard距離。(相同的詞出現的越多,文本重復的概率越大。)

  1 #!/usr/bin/python
  2 #-* coding:utf-8 -*-
  3 
  4   4 import sys
  5   5 import re
  6   6 import hashlib
  7   7 import collections
  8   8 import datetime
  9   9 import codecs
 10  10 
 11  11 reload(sys)
 12  12 sys.setdefaultencoding('utf-8')
 13  13 
 14  14 import threading
 15  15 from Queue import Queue
 16  16 queue = Queue()
 17  17 thread_flag_list = [0,0,0,0,0]
 18  18 
 19  19 res_file = sys.argv[1]
 20  20 
 21  21 news_list = []
 22  22 def load():
 23  23         global news_list
 24  24         for line in sys.stdin:
 25  25                 line = line.strip()
 26  26                 if len(line) == 0:
 27  27                         continue
 28  28                 Arr = line.split('\t')
 29  29 
 30  30                 if len(Arr) < 3:
 31  31                         continue
 32  32 
 33  33                 url = Arr[0]
 34  34                 title = Arr[1]
 35  35                 content = Arr[2]
 36  36 
 37  37                 term_list = content.split(' ')
 38  38                 term_set = set(term_list)
 39  39                 news_list.append([url,term_set])
 40  40 
 41  41 
 42  42 def calculate(news_f,news_s):
 43  43         set1 = news_f[1]
 44  44         set2 = news_s[1]
 45  45 
 46  46         set_join = set1 & set2
 47  47         set_union = set1 | set2
 48  48 
 49  49         simi_value = float(len(set_join))/float(len(set_union))
 50  50         return simi_value
 51  51 
 52  52 def run_thread(start_id,thread_id):
 53  53         global queue
 54  54         global thread_flag_list
 55  55         news_first = news_list[start_id]
 56  56         for i in range(start_id+1,len(news_list)):
 57  57                 news_second = news_list[i]
 58  58                 simi_value = calculate(news_first,news_second)
 59  59                 if simi_value > 0.8:
 60  60                         url1 = news_first[0]
 61  61                         url2 = news_second[0]
 62  62                         output_str = url1+'\t'+url2+'\t'+str(simi_value)
 63  63                         queue.put(output_str)
 64  64         thread_flag_list[thread_id] = 0#標記線程結束    
 65  65 
 66  66 def process():
 67  67         global queue
 68  68         global thread_flag_list
 69  69         fout = codecs.open(res_file,'w','utf-8')
 70  70         id_max = len(news_list)
 71  71         id_now = 0
 72  72         while True:
 73  73                 run_flag = False
 74  74                 thread_list = []
 75  75                 for i in range(0,len(thread_flag_list)):
 76  76                         if thread_flag_list[i] == 0:
 77  77                                 if id_now == id_max:
 78                                         continue
 79  79                                 thread_flag_list[i] = 1
 80  80                                 print 'now run is:',id_now
 81  81 
 82  82                                 thread = threading.Thread(target=run_thread,args=(id_now,i))
 83  83                                 thread_list.append(thread)
 84  84 
 85  85                                 id_now = id_now + 1
 86  86                         else:
 87  87                                 run_flag = True
 88  88 
 89  89                 for thread in thread_list:
 90  90                         thread.setDaemon(True)
 91  91                         thread.start()
 92  92 
 93  93                 while not queue.empty():
 94  94                         elem = queue.get()
 95  95                         print elem
 96  96                         fout.write(elem+'\n')
 97  97 
 98  98                 if run_flag != True and id_now == id_max:
 99  99                         break
100 100 
101 101         fout.close()
102 102 
103 103 if __name__ == '__main__':
104 104         load()
105 105         print 'load done'
106 106         process()
107 107 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM