經過一段時間的研究和學習,大致了解了DHT網絡的一些信息,大部分還是參會別人的相關代碼,一方面主要對DHT爬蟲原理感興趣,最主要的是為了學習python,大部分是別人的東西原理還是引用別人的吧
DHT網絡爬蟲的實現 | 學步園 http://www.xuebuyuan.com/1287052.html
DHT協議原理以及一些重點分析:
要做DHT的爬蟲,首先得透徹理解DHT,這樣才能知道在什么地方究竟該應用什么算法去解決問題。關於DHT協議的細節以及重要的參考文章,請參考文末1
DHT協議作為BT協議的一個輔助,是非常好玩的。它主要是為了在BT正式下載時得到種子或者BT資源。傳統的網絡,需要一台中央服務器存放種子或者BT資源,不僅浪費服務器資源,還容易出現單點的各種問題,而DHT網絡則是為了去中心化,也就是說任意時刻,這個網絡總有節點是亮的,你可以去詢問問這些亮的節點,從而將自己加入DHT網絡。
要實現DHT協議的網絡爬蟲,主要分3步,第一步是得到資源信息(infohash,160bit,20字節,可以編碼為40字節的十六進制字符串),第二步是確認這些infohash是有效的,第三步是通過有效的infohash下載到BT的種子文件,從而得到對這個資源的完整描述。
其中第一步是其他節點用DHT協議中的get_peers方法向爬蟲發送請求得到的,第二步是其他節點用DHT協議中的announce_peer向爬蟲發送請求得到的,第三步可以有幾種方式得到,比如可以去一些保存種子的網站根據infohash直接下載到,或者通過announce_peer的節點來下載到,具體如何實現,可以取決於你自己的爬蟲。
DHT協議中的主要幾個操作:
主要負責通過UDP與外部節點交互,封裝4種基本操作的請求以及相應。
ping:檢查一個節點是否“存活”
在一個爬蟲里主要有兩個地方用到ping,第一是初始路由表時,第二是驗證節點是否存活時
find_node:向一個節點發送查找節點的請求
在一個爬蟲中主要也是兩個地方用到find_node,第一是初始路由表時,第二是驗證桶是否存活時
get_peers:向一個節點發送查找資源的請求
在爬蟲中有節點向自己請求時不僅像個正常節點一樣做出回應,還需要以此資源的info_hash為機會盡可能多的去認識更多的節點。如圖,get_peers實際上最后一步是announce_peer,但是因為爬蟲不能announce_peer,所以實際上get_peers退化成了find_node操作。
announce_peer:向一個節點發送自己已經開始下載某個資源的通知
爬蟲中不能用announce_peer,因為這就相當於通報虛假資源,對方很容易從上下文中判斷你是否通報了虛假資源從而把你禁掉
DHT協議中有幾個重點的需要澄清的地方:
1. node與infohash同樣使用160bit的表示方式,160bit意味着整個節點空間有2^160 = 730750818665451459101842416358141509827966271488,是48位10進制,也就是說有百億億億億億個節點空間,這么大的節點空間,是足夠存放你的主機節點以及任意的資源信息的。
2. 每個節點有張路由表。每張路由表由一堆K桶組成,所謂K桶,就是桶中最多只能放K個節點,默認是8個。而桶的保存則是類似一顆前綴樹的方式。相當於一張8桶的路由表中最多有160-4個K桶。
3. 根據DHT協議的規定,每個infohash都是有位置的,因此,兩個infohash之間就有距離一說,而兩個infohash的距離就可以用異或來表示,即infohash1 xor infohash2,也就是說,高位一樣的話,他們的距離就近,反之則遠,這樣可以快速的計算兩個節點的距離。計算這個距離有什么用呢,在DHT網絡中,如果一個資源的infohash與一個節點的infohash越近則該節點越有可能擁有該資源的信息,為什么呢?可以想象,因為人人都用同樣的距離算法去遞歸的詢問離資源接近的節點,並且只要該節點做出了回應,那么就會得到一個announce信息,也就是說跟資源infohash接近的節點就有更大的概率拿到該資源的infohash
4. 根據上述算法,DHT中的查詢是跳躍式查詢,可以迅速的跨越的的節點桶而接近目標節點桶。之所以在遠處能夠大幅度跳躍,而在近處只能小幅度跳躍,原因是每個節點的路由表中離自身越接近的節點保存得越多,如下圖
5. 在一個DHT網絡中當爬蟲並不容易,不像普通爬蟲一樣,看到資源就可以主動爬下來,相反,因為得到資源的方式(get_peers, announce_peer)都是被動的,所以爬蟲的方式就有些變化了,爬蟲所要做的事就是像個正常節點一樣去響應其他節點的查詢,並且得到其他節點的回應,把其中的數據收集下來就算是完成工作了。而爬蟲唯一能做的,是盡可能的去多認識其他節點,這樣,才能有更多其他節點來向你詢問。
6. 有人說,那么我把DHT爬蟲的K桶中的容量K增大是不是就能增加得到資源的機會,其實不然,之前也分析過了,DHT爬蟲最重要的信息來源全是被動的,因為你不能增大別人的K,所以距離遠的節點保存你自身的概率就越小,當然距離遠的節點去請求你的概率相對也比較小。
一些主要的組件(實際實現更加復雜一些,有其他的模塊,這里僅列舉主要幾個):
DHT crawler:
這個就是DHT爬蟲的主邏輯,為了簡化多線程問題,跟server用了生產者消費者模型,負責消費,並且復用server的端口。
主要任務就是負責初始化,包括路由表的初始化,以及初始的請求。另外負責處理所有進來的消息事件,由於生產者消費者模型的使用,里面的操作都基本上是單線程的,簡化了不少問題,而且相信也比上鎖要提升速度(當然了,加鎖這步按理是放到了queue這里了,不過對於這種生產者源源不斷生產的類型,可以用ring-buffer大幅提升性能)。
DHT server:
這里是DHT爬蟲的服務器端,DHT網絡中的節點不單是client,也是server,所以要有server擔當生產者的角色,最初也是每個消費者對應一個生產者,但實際上發現可以利用IO多路復用來達到消息事件的目的,這樣一來大大簡化了系統中線程的數量,如果client可以的話,也應該用同樣的方式來組織,這樣系統的速度應該會快很多。(尚未驗證)
DHT route table:
主要負責路由表的操作。
路由表有如下操作:
init:剛創建路由表時的操作。分兩種情況:
1. 如果之前已經初始化過,並且將上次路由表的數據保存下來,則只需要讀入保存數據。
2. 如果之前沒有初始化過,則首先應當初始化。
首先,應當有一個接入點,也就是說,你要想加進這個網絡,必須認識這個網絡中某個節點i並將i加入路由表,接下來對i用find_node詢問自己的hash_info,這里巧妙的地方就在於,理論上通過一定數量的詢問就會找到離自己距離很近的節點(也就是經過一定步驟就會收斂)。find_node目的在於盡可能早的讓自己有數據,並且讓網絡上別的節點知道自己,如果別人不認識你,就不會發送消息過來,意味着你也不能獲取到想要的信息。
search:比較重要的方法,主要使用它來定位當前infohash所在的桶的位置。會被其他各種代理方法調用到。
findNodes:找到路由表中與傳入的infohash最近的k個節點
getPeer:找到待查資源是否有peer(即是否有人在下載,也就是是否有人announce過)
announcePeer:通知該資源正在被下載
DHT bucket:
acitiveNode:邏輯比較多,分如下幾點。
1. 查找所要添加的節點對應路由表的桶是否已經滿,如果未滿,添加節點
2. 如果已經滿,檢查該桶中是否包含爬蟲節點自己,如果不包含,拋棄待添加節點
3. 如果該桶中包含本節點,則平均分裂該桶
其他的諸如locateNode,
replaceNode, updateNode,
removeNode,就不一一說明了
DHT torrent parser:
主要從bt種子文件中解析出以下幾個重要的信息:name,size,file list(sub file name, sub file size),比較簡單,用bencode方向解碼就行了
Utils:
distance:計算兩個資源之間的距離。在kad中用a xor b表示
為了增加難度,選用了不太熟悉的語言python,結果步步為營,但是也感慨python的簡潔強大。在實現中,也碰到很多有意思的問題。比如如何保存一張路由表中的所有桶,之前想出來幾個辦法,甚至為了節省資源,打算用bit數組+dict直接保存,但是因為估計最終的幾個操作不是很方便直觀容易出錯而放棄,選用的結構就是前綴樹,操作起來果然是沒有障礙;
在超時問題上,比如桶超時和節點超時,一直在思考一個高效但是比較優雅的做法,可以用一個同步調用然后等待它的超時,但是顯然很低效,尤其我沒有用更多線程的情況,一旦阻塞了就等於該端口所有事件都被阻塞了。所以必須用異步操作,但是異步操作很難去控制它的精確事件,當然,我可以在每個事件來的時候檢查一遍是否超時,但是顯然也是浪費和低效。那么,剩下的只有采用跟tomcat類似的方式了,增加一個線程來監控,當然,這個監控線程最好是全局的,能監控所有crawler中所有事務的超時。另外,超時如果控制不當,容易導致內存沒有回收以至於內存泄露,也值得注意。超時線程是否會與其他線程互相影響也應當仔細檢查。
最初超時的控制沒處理好,出現了ping storm,運行一定時間后大多數桶已經滿了,如果按照協議中的方式去跑的話會發現大量的事件都是在ping以確認這個節點是否ok以至於大量的cpu用於處理ping和ping響應。深入理解后發現,檢查節點狀態是不需要的,因為節點狀態只是為了提供給詢問的人一些好的節點,既然如此,可以將每次過來的節點替換當前桶中最老的節點,如此一來,我們將總是保存着最新的節點。
搜索算法也是比較讓我困惑的地方,簡而言之,搜索的目的並不是真正去找資源,而是去認識那些能夠保存你的節點。為什么說是能夠保存你,因為離你越遠,桶的數量越少,這樣一來,要想進他們的桶中去相對來說就比較困難,所以搜索的目標按理應該是附近的節點最好,但是不能排除遠方節點也可能保存你的情況,這種情況會發生在遠方節點初始化時或者遠方節點的桶中節點超時的時候,但總而言之,概率要小些。所以搜索算法也不應該不做判斷就胡亂搜索,但是也不應該將搜索的距離嚴格限制在附近,所以這是一個權衡問題,暫時沒有想到好的方式,覺得暫時讓距離遠的以一定概率發生,而距離近的必然發生
還有一點,就是搜索速度問題,因為DHT網絡的這種結構,決定了一個節點所認識的其他節點必然是有限的附近節點,於是每個節點在一定時間段內能拿到的資源數必然是有限的,所以應當分配多個節點同時去抓取,而抓取資源的數量很大程度上就跟分配節點的多少有關了。
最后一個值得優化的地方是findnodes方法,之前的方式是把一個桶中所有數據拿出來排序,然后取其中前K個返回回去,但是實際上我們做了很多額外的工作,這是經典的topN問題,使用排序明顯是浪費時間的,因為這個操作非常頻繁,所以即便所有保存的節點加起來很少((160 - 4) * 8),也會一定程度上增加時間。而采用的算法是在一篇論文《可擴展的DHT網絡爬蟲設計和優化》中找到的,基本公式是IDi = IDj xor 2 ^(160 - i),這樣,已知IDi和i就能知道IDj,若已知IDi和IDj就能知道i,通過這種方式,可以快速的查找該桶A附近的其他桶(顯然是離桶A層次最近的桶中的節點距離A次近),比起全部遍歷再查找效率要高不少。
dht協議http://www.bittorrent.org/beps/bep_0005.html 及其翻譯http://gobismoon.blog.163.com/blog/static/5244280220100893055533/
爬蟲源碼參考別人的,非原創,只為學習
1 #encoding: utf-8 2 3 from hashlib import sha1 4 from random import randint 5 from struct import unpack, pack 6 from socket import inet_aton, inet_ntoa 7 from bisect import bisect_left 8 from threading import Timer 9 from time import sleep 10 11 from bencode import bencode, bdecode 12 13 BOOTSTRAP_NODES = [ 14 ("router.bittorrent.com", 6881), 15 ("dht.transmissionbt.com", 6881), 16 ("router.utorrent.com", 6881) 17 ] 18 TID_LENGTH = 4 19 KRPC_TIMEOUT = 10 20 REBORN_TIME = 5 * 60 21 K = 8 22 23 def entropy(bytes): 24 s = "" 25 for i in range(bytes): 26 s += chr(randint(0, 255)) 27 return s 28 29 # """把爬蟲"偽裝"成正常node, 一個正常的node有ip, port, node ID三個屬性, 因為是基於UDP協議, 30 # 所以向對方發送信息時, 即使沒"明確"說明自己的ip和port時, 對方自然會知道你的ip和port, 31 # 反之亦然. 那么我們自身node就只需要生成一個node ID就行, 協議里說到node ID用sha1算法生成, 32 # sha1算法生成的值是長度是20 byte, 也就是20 * 8 = 160 bit, 正好如DHT協議里說的那范圍: 0 至 2的160次方, 33 # 也就是總共能生成1461501637330902918203684832716283019655932542976個獨一無二的node. 34 # ok, 由於sha1總是生成20 byte的值, 所以哪怕你寫SHA1(20)或SHA1(19)或SHA1("I am a 2B")都可以, 35 # 只要保證大大降低與別人重復幾率就行. 注意, node ID非十六進制, 36 # 也就是說非FF5C85FE1FDB933503999F9EB2EF59E4B0F51ECA這個樣子, 即非hash.hexdigest(). """ 37 def random_id(): 38 hash = sha1() 39 hash.update( entropy(20) ) 40 return hash.digest() 41 42 def decode_nodes(nodes): 43 n = [] 44 length = len(nodes) 45 if (length % 26) != 0: 46 return n 47 for i in range(0, length, 26): 48 nid = nodes[i:i+20] 49 ip = inet_ntoa(nodes[i+20:i+24]) 50 port = unpack("!H", nodes[i+24:i+26])[0] 51 n.append( (nid, ip, port) ) 52 return n 53 54 def encode_nodes(nodes): 55 strings = [] 56 for node in nodes: 57 s = "%s%s%s" % (node.nid, inet_aton(node.ip), pack("!H", node.port)) 58 strings.append(s) 59 60 return "".join(strings) 61 62 def intify(hstr): 63 #"""這是一個小工具, 把一個node ID轉換為數字. 后面會頻繁用到.""" 64 return long(hstr.encode('hex'), 16) #先轉換成16進制, 再變成數字 65 66 def timer(t, f): 67 Timer(t, f).start() 68 69 70 class BucketFull(Exception): 71 pass 72 73 74 class KRPC(object): 75 def __init__(self): 76 self.types = { 77 "r": self.response_received, 78 "q": self.query_received 79 } 80 self.actions = { 81 "ping": self.ping_received, 82 "find_node": self.find_node_received, 83 "get_peers": self.get_peers_received, 84 "announce_peer": self.announce_peer_received, 85 } 86 87 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 88 self.socket.bind(("0.0.0.0", self.port)) 89 90 def response_received(self, msg, address): 91 self.find_node_handler(msg) 92 93 def query_received(self, msg, address): 94 try: 95 self.actions[msg["q"]](msg, address) 96 except KeyError: 97 pass 98 99 def send_krpc(self, msg, address): 100 try: 101 self.socket.sendto(bencode(msg), address) 102 except: 103 pass 104 105 106 class Client(KRPC): 107 def __init__(self, table): 108 self.table = table 109 110 timer(KRPC_TIMEOUT, self.timeout) 111 timer(REBORN_TIME, self.reborn) 112 KRPC.__init__(self) 113 114 def find_node(self, address, nid=None): 115 nid = self.get_neighbor(nid) if nid else self.table.nid 116 tid = entropy(TID_LENGTH) 117 118 msg = { 119 "t": tid, 120 "y": "q", 121 "q": "find_node", 122 "a": {"id": nid, "target": random_id()} 123 } 124 self.send_krpc(msg, address) 125 126 def find_node_handler(self, msg): 127 try: 128 nodes = decode_nodes(msg["r"]["nodes"]) 129 for node in nodes: 130 (nid, ip, port) = node 131 if len(nid) != 20: continue 132 if nid == self.table.nid: continue 133 self.find_node( (ip, port), nid ) 134 except KeyError: 135 pass 136 137 def joinDHT(self): 138 for address in BOOTSTRAP_NODES: 139 self.find_node(address) 140 141 def timeout(self): 142 if len( self.table.buckets ) < 2: 143 self.joinDHT() 144 timer(KRPC_TIMEOUT, self.timeout) 145 146 def reborn(self): 147 self.table.nid = random_id() 148 self.table.buckets = [ KBucket(0, 2**160) ] 149 timer(REBORN_TIME, self.reborn) 150 151 def start(self): 152 self.joinDHT() 153 154 while True: 155 try: 156 (data, address) = self.socket.recvfrom(65536) 157 msg = bdecode(data) 158 self.types[msg["y"]](msg, address) 159 except Exception: 160 pass 161 162 def get_neighbor(self, target): 163 return target[:10]+random_id()[10:] 164 165 166 class Server(Client): 167 def __init__(self, master, table, port): 168 self.table = table 169 self.master = master 170 self.port = port 171 Client.__init__(self, table) 172 173 def ping_received(self, msg, address): 174 try: 175 nid = msg["a"]["id"] 176 msg = { 177 "t": msg["t"], 178 "y": "r", 179 "r": {"id": self.get_neighbor(nid)} 180 } 181 self.send_krpc(msg, address) 182 self.find_node(address, nid) 183 except KeyError: 184 pass 185 186 def find_node_received(self, msg, address): 187 try: 188 target = msg["a"]["target"] 189 neighbors = self.table.get_neighbors(target) 190 191 nid = msg["a"]["id"] 192 msg = { 193 "t": msg["t"], 194 "y": "r", 195 "r": { 196 "id": self.get_neighbor(target), 197 "nodes": encode_nodes(neighbors) 198 } 199 } 200 self.table.append(KNode(nid, *address)) 201 self.send_krpc(msg, address) 202 self.find_node(address, nid) 203 except KeyError: 204 pass 205 206 def get_peers_received(self, msg, address): 207 try: 208 infohash = msg["a"]["info_hash"] 209 210 neighbors = self.table.get_neighbors(infohash) 211 212 nid = msg["a"]["id"] 213 msg = { 214 "t": msg["t"], 215 "y": "r", 216 "r": { 217 "id": self.get_neighbor(infohash), 218 "nodes": encode_nodes(neighbors) 219 } 220 } 221 self.table.append(KNode(nid, *address)) 222 self.send_krpc(msg, address) 223 self.master.log(infohash) 224 self.find_node(address, nid) 225 except KeyError: 226 pass 227 228 def announce_peer_received(self, msg, address): 229 try: 230 infohash = msg["a"]["info_hash"] 231 nid = msg["a"]["id"] 232 233 msg = { 234 "t": msg["t"], 235 "y": "r", 236 "r": {"id": self.get_neighbor(infohash)} 237 } 238 239 self.table.append(KNode(nid, *address)) 240 self.send_krpc(msg, address) 241 self.master.log(infohash) 242 self.find_node(address, nid) 243 except KeyError: 244 pass 245 # 該類只實例化一次. 246 class KTable(object): 247 # 這里的nid就是通過node_id()函數生成的自身node ID. 協議里說道, 每個路由表至少有一個bucket, 248 # 還規定第一個bucket的min=0, max=2^160次方, 所以這里就給予了一個buckets屬性來存儲bucket, 這個是列表. 249 def __init__(self, nid): 250 self.nid = nid 251 self.buckets = [ KBucket(0, 2**160) ] 252 253 def append(self, node): 254 index = self.bucket_index(node.nid) 255 try: 256 bucket = self.buckets[index] 257 bucket.append(node) 258 except IndexError: 259 return 260 except BucketFull: 261 if not bucket.in_range(self.nid): 262 return 263 self.split_bucket(index) 264 self.append(node) 265 266 267 # 返回與目標node ID或infohash的最近K個node. 268 269 # 定位出與目標node ID或infohash所在的bucket, 如果該bucuck有K個節點, 返回. 270 # 如果不夠到K個節點的話, 把該bucket前面的bucket和該bucket后面的bucket加起來, 只返回前K個節點. 271 # 還是不到K個話, 再重復這個動作. 要注意不要超出最小和最大索引范圍. 272 # 總之, 不管你用什么算法, 想盡辦法找出最近的K個節點. 273 def get_neighbors(self, target): 274 nodes = [] 275 if len(self.buckets) == 0: return nodes 276 if len(target) != 20 : return nodes 277 278 index = self.bucket_index(target) 279 try: 280 nodes = self.buckets[index].nodes 281 min = index - 1 282 max = index + 1 283 284 while len(nodes) < K and ((min >= 0) or (max < len(self.buckets))): 285 if min >= 0: 286 nodes.extend(self.buckets[min].nodes) 287 288 if max < len(self.buckets): 289 nodes.extend(self.buckets[max].nodes) 290 291 min -= 1 292 max += 1 293 294 num = intify(target) 295 nodes.sort(lambda a, b, num=num: cmp(num^intify(a.nid), num^intify(b.nid))) 296 return nodes[:K] #K是個常量, K=8 297 except IndexError: 298 return nodes 299 300 def bucket_index(self, target): 301 return bisect_left(self.buckets, intify(target)) 302 303 304 # 拆表 305 306 # index是待拆分的bucket(old bucket)的所在索引值. 307 # 假設這個old bucket的min:0, max:16. 拆分該old bucket的話, 分界點是8, 然后把old bucket的max改為8, min還是0. 308 # 創建一個新的bucket, new bucket的min=8, max=16. 309 # 然后根據的old bucket中的各個node的nid, 看看是屬於哪個bucket的范圍里, 就裝到對應的bucket里. 310 # 各回各家,各找各媽. 311 # new bucket的所在索引值就在old bucket后面, 即index+1, 把新的bucket插入到路由表里. 312 def split_bucket(self, index): 313 old = self.buckets[index] 314 point = old.max - (old.max - old.min)/2 315 new = KBucket(point, old.max) 316 old.max = point 317 self.buckets.insert(index + 1, new) 318 for node in old.nodes[:]: 319 if new.in_range(node.nid): 320 new.append(node) 321 old.remove(node) 322 323 def __iter__(self): 324 for bucket in self.buckets: 325 yield bucket 326 327 328 class KBucket(object): 329 __slots__ = ("min", "max", "nodes") 330 331 # min和max就是該bucket負責的范圍, 比如該bucket的min:0, max:16的話, 332 # 那么存儲的node的intify(nid)值均為: 0到15, 那16就不負責, 這16將會是該bucket后面的bucket的min值. 333 # nodes屬性就是個列表, 存儲node. last_accessed代表最后訪問時間, 因為協議里說到, 334 # 當該bucket負責的node有請求, 回應操作; 刪除node; 添加node; 更新node; 等這些操作時, 335 # 那么就要更新該bucket, 所以設置個last_accessed屬性, 該屬性標志着這個bucket的"新鮮程度". 用linux話來說, touch一下. 336 # 這個用來便於后面說的定時刷新路由表. 337 338 def __init__(self, min, max): 339 self.min = min 340 self.max = max 341 self.nodes = [] 342 343 344 # 添加node, 參數node是KNode實例. 345 346 # 如果新插入的node的nid屬性長度不等於20, 終止. 347 # 如果滿了, 拋出bucket已滿的錯誤, 終止. 通知上層代碼進行拆表. 348 # 如果未滿, 先看看新插入的node是否已存在, 如果存在, 就替換掉, 不存在, 就添加, 349 # 添加/替換時, 更新該bucket的"新鮮程度". 350 def append(self, node): 351 if node in self: 352 self.remove(node) 353 self.nodes.append(node) 354 else: 355 if len(self) < K: 356 self.nodes.append(node) 357 else: 358 raise BucketFull 359 360 def remove(self, node): 361 self.nodes.remove(node) 362 363 def in_range(self, target): 364 return self.min <= intify(target) < self.max 365 366 def __len__(self): 367 return len(self.nodes) 368 369 def __contains__(self, node): 370 return node in self.nodes 371 372 def __iter__(self): 373 for node in self.nodes: 374 yield node 375 376 def __lt__(self, target): 377 return self.max <= target 378 379 380 class KNode(object): 381 # """ 382 # nid就是node ID的簡寫, 就不取id這么模糊的變量名了. __init__方法相當於別的OOP語言中的構造方法, 383 # 在python嚴格來說不是構造方法, 它是初始化, 不過, 功能差不多就行. 384 # """ 385 __slots__ = ("nid", "ip", "port") 386 387 def __init__(self, nid, ip, port): 388 self.nid = nid 389 self.ip = ip 390 self.port = port 391 392 def __eq__(self, other): 393 return self.nid == other.nid 394 395 396 397 #using example 398 class Master(object): 399 def __init__(self, f): 400 self.f = f 401 402 def log(self, infohash): 403 self.f.write(infohash.encode("hex")+"\n") 404 self.f.flush() 405 try: 406 f = open("infohash.log", "a") 407 m = Master(f) 408 s = Server(Master(f), KTable(random_id()), 8001) 409 s.start() 410 except KeyboardInterrupt: 411 s.socket.close() 412 f.close()
種子從迅雷下,初期為學習從http://torrage.com/sync/下的infohash,去重用了別人寫的Bloom Filter算法,數據庫用Mysql,建表語句如下,其中uinthash是根據infohash的頭四個字節和最后四個字節組成的一個int整數,先這樣設計,看后期查詢的時候用得到不,總覺得用infohash來查很慢
1 CREATE TABLE `torrentinfo` ( 2 `id` int(11) NOT NULL AUTO_INCREMENT, 3 `infohash` char(40) NOT NULL DEFAULT '', 4 `filename` varchar(128) DEFAULT NULL, 5 `filelength` bigint(11) DEFAULT NULL, 6 `recvtime` datetime DEFAULT NULL, 7 `filecontent` text, 8 `uinthash` int(11) unsigned NOT NULL DEFAULT '0', 9 PRIMARY KEY (`id`), 10 KEY `uinthash_index` (`uinthash`) 11 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Thunder.py
1 # _*_ coding: utf-8 _*_ 2 import socket 3 import os,glob 4 import time as time_p 5 import requests 6 from bencode import bdecode, BTL 7 from torrent import * 8 import threading, signal 9 import MySQLdb 10 from BloomFilter import * 11 12 class Thunder(object): 13 def __init__(self): 14 self.connstr={'host':'127.0.0.1','user':'root','passwd':'123456','port':3306,'charset':"UTF8"} 15 def download(self, infohash): 16 try: 17 tc = self._download(infohash) 18 if(tc==-1): 19 return 20 tc = bdecode(tc) 21 info = torrentInfo(tc) 22 # print info['name'] 23 # print info['length'] 24 # print info['files'] 25 uint=int(infohash[:4]+infohash[-4:],16) 26 time_now=time_p.strftime('%Y-%m-%d %H:%M:%S',time_p.localtime(time_p.time())) 27 sql="insert into torrentinfo(infohash,filename,filelength,recvtime,filecontent,uinthash) values('%s','%s','%d','%s','%s','%d')"%(infohash,MySQLdb.escape_string(info['name']),info['length'],time_now,MySQLdb.escape_string(info['files']),uint) 28 self.executeSQL(sql) 29 except Exception,e: 30 print e 31 pass 32 33 def openConnection(self): 34 try: 35 self.conn=MySQLdb.connect(**self.connstr) 36 self.cur=self.conn.cursor() 37 self.conn.select_db('dht') 38 except MySQLdb.Error,e: 39 print 'mysql error %d:%s'%(e.args[0],e.args[1]) 40 41 42 def executeSQL(self,sql): 43 try: 44 self.cur.execute(sql) 45 self.conn.commit() 46 except MySQLdb.Error,e: 47 print 'mysql error %d:%s'%(e.args[0],e.args[1]) 48 def closeConnection(self): 49 try: 50 self.cur.close() 51 self.conn.close() 52 except MySQLdb.Error,e: 53 print 'mysql error %d:%s'%(e.args[0],e.args[1]) 54 55 def _download(self, infohash): 56 infohash = infohash.upper() 57 start = infohash[0:2] 58 end = infohash[-2:] 59 url = "http://bt.box.n0808.com/%s/%s/%s.torrent" % (start, end, infohash) 60 headers = { 61 "Referer": "http://bt.box.n0808.com" 62 } 63 try: 64 r = requests.get(url, headers=headers, timeout=10) 65 if r.status_code == 200: 66 # f=open("d:\\"+infohash+'.torrent','wb') 67 # f.write(r.content) 68 # f.close() 69 return r.content 70 except (socket.timeout, requests.exceptions.Timeout), e: 71 pass 72 return -1 73 74 class torrentBean(object): 75 """docstring for torrentBean""" 76 __slots__=('infohash','filename','recvtime','filecontent','uinthash') 77 78 def __init__(self, infohash,filename,recvtime,filecontent,uinthash): 79 super(torrentBean, self).__init__() 80 self.infohash = infohash 81 self.filename = filename 82 self.recvtime = recvtime 83 self.filecontent = filecontent 84 self.uinthash = uinthash 85 86 87 bf = BloomFilter(0.001, 1000000) 88 a=Thunder() 89 a.openConnection() 90 # info_hash="a02d2735e6e1daa6f7d58f21bd7340a7b7c4b7a5" 91 # info_hash='cf3a6a4f07da0b90beddae838462ca0012bef285' 92 # a.download('cf3a6a4f07da0b90beddae838462ca0012bef285') 93 94 95 files=glob.glob('./*.txt') 96 for fl in files: 97 print os.path.basename(fl) 98 f=open(fl,'r') 99 for line in f: 100 infohash=line.strip('\n') 101 if not bf.is_element_exist(infohash): 102 bf.insert_element(infohash) 103 a.download(infohash) 104 a.closeConnection()
torrent種子文件經過bencode解析,獲取key為info對應value值,種子大致的格式如下,有亂碼,不影響觀看
{ 'files': [{ 'path': ['PGD660.avi'], 'length': 1367405512, 'filehash': 'J\xef\xfe\xb3K\xd4g\x8d\x07m\x03\xbb\xb3\xadt\xa1\xa0\xf0\xec\xab', 'ed2k': '/\xfb\xe55#n\xbd1\xb6\x1c\x0f\xf3\xe4\x9dP\xfb', 'path.utf-8': ['PGD660.avi'] }, { 'path': ['PGD660B.jpg'], 'length': 135899, 'filehash': '*$O\x17w\xe9E\x95>O\x1f\xfb\x0e\x9b\x16\x15B\\Q\x9d', 'ed2k': 'T/L*\xbb\x8e.\xe2d\xddu\nR\x07\xca\x19', 'path.utf-8': ['PGD660B.jpg'] }, { 'path': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba@\xe6\x9c\x80\xe6\x96\xb0\xe5\x9c\xb0\xe5\x9d\x80.mht'], 'length': 472, 'filehash': '&\xa92\xb7\xdd8\xeel3\xcc-S\x07\xb5e\xd35\xc0\xb7r', 'ed2k': '\x13\xd2 a\x0cA\xb4\xf2X\x12\xea\xd4\xe8\xac`\x92', 'path.utf-8': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba@\xe6\x9c\x80\xe6\x96\xb0\xe5\x9c\xb0\xe5\x9d\x80.mht'] }, { 'path': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba\xe5\xae\xa3\xe4\xbc\xa0.txt'], 'length': 363, 'filehash': '\x96nA*\xe2\xb6Y+[\xe3\xaf\xd4\x14A\x94\xf5@\xcd\xc1\x91', 'ed2k': '8V\xa6X\xd9\x82l\xdbNO8\xe8D\xe9E\xed', 'path.utf-8': ['yoy123@\xe8\x8d\x89\xe6\xa6\xb4\xe7\xa4\xbe\xe5\x8c\xba\xe5\xae\xa3\xe4\xbc\xa0.txt'] }, { 'path': ['\xe2\x98\x85\xe5\xb0\x91\xe5\xa6\x87 \xe8\xae\xba\xe5\x9d\x9b \xe9\x99\x90\xe9\x87\x8f\xe5\xbc\x80\xe6\x94\xbe\xe4\xb8\xad\xe3\x80\x82\xe3\x80\x82.mht'], 'length': 475, 'filehash': '\xec\xde\xeb-6\x86\x1avB\xdd\xd8q\x8b\x8f\xc06\xf0XX\x0e', 'ed2k': '\xa7\x8dU\xfd\xfc=\x12\x15>yE\x8f&A\xc2u', 'path.utf-8': ['\xe2\x98\x85\xe5\xb0\x91\xe5\xa6\x87 \xe8\xae\xba\xe5\x9d\x9b \xe9\x99\x90\xe9\x87\x8f\xe5\xbc\x80\xe6\x94\xbe\xe4\xb8\xad\xe3\x80\x82\xe3\x80\x82.mht'] }, { 'path': ['\xe6\x9f\x8f\xe6\x8b\x89\xe5\x9c\x96\xe7\xa7\x98\xe5\xaf\x86\xe8\x8a\xb1\xe5\x9c\x92.mht'], 'length': 478, 'filehash': "\xe4\xb5'Td\x0b=P\xc0\x9aG\xa2\xd7\xfapg\xc6.\x8e\xa7", 'ed2k': '\xdd\x8d\xbb\x0b\x04\xcb\x03O\xb1\x18"\x03\xb1\x1d\xba\x08', 'path.utf-8': ['\xe6\x9f\x8f\xe6\x8b\x89\xe5\x9c\x96\xe7\xa7\x98\xe5\xaf\x86\xe8\x8a\xb1\xe5\x9c\x92.mht'] }, { 'path': ['\xe7\xbe\x8e\xe5\xa5\xb3\xe4\xb8\x8a\xe9\x96\x80\xe6\x8f\xb4\xe4\xba\xa4\xe6\x9c\x8d\xe5\x8b\x99.mht'], 'length': 478, 'filehash': "\xe4\xb5'Td\x0b=P\xc0\x9aG\xa2\xd7\xfapg\xc6.\x8e\xa7", 'ed2k': '\xdd\x8d\xbb\x0b\x04\xcb\x03O\xb1\x18"\x03\xb1\x1d\xba\x08', 'path.utf-8': ['\xe7\xbe\x8e\xe5\xa5\xb3\xe4\xb8\x8a\xe9\x96\x80\xe6\x8f\xb4\xe4\xba\xa4\xe6\x9c\x8d\xe5\x8b\x99.mht'] }], 'publisher': 'yoy123', 'piece length': 524288, 'name': 'PGD660 \xe6\x83\xb3\xe8\xa9\xa6\xe8\x91\x97\xe5\x85\xa8\xe5\x8a\x9b\xe6\x93\x8d\xe6\x93\x8d\xe7\x9c\x8b\xe9\x80\x99\xe5\x80\x8b\xe6\xb7\xab\xe8\x95\xa9\xe7\xbe\x8e\xe5\xa5\xb3\xe5\x97\x8e \xe5\xb0\x8f\xe5\xb7\x9d\xe3\x81\x82\xe3\x81\x95\xe7\xbe\x8e', 'publisher.utf-8': 'yoy123', }
解析代碼torrent.py
1 # _*_ coding: utf-8 _*_ 2 from time import time 3 4 def torrentInfo(torrentContent): 5 metadata = torrentContent["info"] 6 print metadata 7 info = { 8 "name": getName(metadata), 9 "length": calcLength(metadata), 10 "timestamp": getCreateDate(torrentContent), 11 "files": extraFiles(metadata) 12 } 13 return info 14 15 def calcLength(metadata): 16 length = 0 17 try: 18 length = metadata["length"] 19 except KeyError: 20 try: 21 for file in metadata["files"]: 22 length += file["length"] 23 except KeyError: 24 pass 25 return length 26 27 def extraFiles(metadata): 28 files = [] 29 try: 30 for file in metadata["files"]: 31 path = file["path.utf-8"] 32 size=file['length'] 33 if len(path) > 1: 34 main = path[0] 35 for f in path[1:2]: 36 files.append("%s/%s %d bytes" % (main, f,size)) 37 else: 38 files.append("%s %d bytes" % (path[0],size) ) 39 if files: 40 return '\r\n'.join(files) 41 else: 42 return getName(metadata) 43 except KeyError: 44 return getName(metadata) 45 46 def getName(metadata): 47 try: 48 name = metadata["name.utf-8"] 49 if name.strip()=="": 50 raise KeyError 51 except KeyError: 52 try: 53 name = metadata["name"] 54 if name.strip()=="": 55 raise KeyError 56 except KeyError: 57 name = getMaxFile(metadata) 58 59 return name 60 def getMaxFile(metadata): 61 try: 62 maxFile = metadata["files"][0] 63 for file in metadata["files"]: 64 if file["length"] > maxFile["length"]: 65 maxFile = file 66 name = maxFile["path"][0] 67 return name 68 except KeyError: 69 return "" 70 71 def getCreateDate(torrentContent): 72 try: 73 timestamp = torrentContent["creation date"] 74 except KeyError: 75 timestamp = int( time() ) 76 return timestamp
最后還有別人寫的BloomFilter代碼
1 #encoding: utf-8 2 ''' 3 Created on 2012-11-7 4 5 @author: palydawn 6 ''' 7 import cmath 8 from BitVector import BitVector 9 10 class BloomFilter(object): 11 def __init__(self, error_rate, elementNum): 12 #計算所需要的bit數 13 self.bit_num = -1 * elementNum * cmath.log(error_rate) / (cmath.log(2.0) * cmath.log(2.0)) 14 15 #四字節對齊 16 self.bit_num = self.align_4byte(self.bit_num.real) 17 18 #分配內存 19 self.bit_array = BitVector(size=self.bit_num) 20 21 #計算hash函數個數 22 self.hash_num = cmath.log(2) * self.bit_num / elementNum 23 24 self.hash_num = self.hash_num.real 25 26 #向上取整 27 self.hash_num = int(self.hash_num) + 1 28 29 #產生hash函數種子 30 self.hash_seeds = self.generate_hashseeds(self.hash_num) 31 32 def insert_element(self, element): 33 for seed in self.hash_seeds: 34 hash_val = self.hash_element(element, seed) 35 #取絕對值 36 hash_val = abs(hash_val) 37 #取模,防越界 38 hash_val = hash_val % self.bit_num 39 #設置相應的比特位 40 self.bit_array[hash_val] = 1 41 42 #檢查元素是否存在,存在返回true,否則返回false 43 def is_element_exist(self, element): 44 for seed in self.hash_seeds: 45 hash_val = self.hash_element(element, seed) 46 #取絕對值 47 hash_val = abs(hash_val) 48 #取模,防越界 49 hash_val = hash_val % self.bit_num 50 51 #查看值 52 if self.bit_array[hash_val] == 0: 53 return False 54 return True 55 56 #內存對齊 57 def align_4byte(self, bit_num): 58 num = int(bit_num / 32) 59 num = 32 * (num + 1) 60 return num 61 62 #產生hash函數種子,hash_num個素數 63 def generate_hashseeds(self, hash_num): 64 count = 0 65 #連續兩個種子的最小差值 66 gap = 50 67 #初始化hash種子為0 68 hash_seeds = [] 69 for index in xrange(hash_num): 70 hash_seeds.append(0) 71 for index in xrange(10, 10000): 72 max_num = int(cmath.sqrt(1.0 * index).real) 73 flag = 1 74 for num in xrange(2, max_num): 75 if index % num == 0: 76 flag = 0 77 break 78 79 if flag == 1: 80 #連續兩個hash種子的差值要大才行 81 if count > 0 and (index - hash_seeds[count - 1]) < gap: 82 continue 83 hash_seeds[count] = index 84 count = count + 1 85 86 if count == hash_num: 87 break 88 return hash_seeds 89 90 def hash_element(self, element, seed): 91 hash_val = 1 92 for ch in str(element): 93 chval = ord(ch) 94 hash_val = hash_val * seed + chval 95 return hash_val 96 97 98 def SaveBitToFile(self,f): 99 self.bit_array.write_bits_to_fileobject(f) 100 pass
表內容見下圖