Python邊學邊用--BT客戶端實現之(三)Peer協議設計


與peer建立tcp連接后,首先發送handshake消息進行握手

handshake消息格式如下:

一個字節0x19 + 一個字符串'BitTorrent protocol' + 8 byte 保留字節默認值為0(draft中對保留字節有定義)

+ 種子文件中info 部分的sha1字,大小為20個字節 + 20個自己的peer id(從tracker獲取到的peer信息大多沒有peerid,這個可以使用本地的peer id)

如果handshake信息協商不上,tcp連接將被關閉。

 

BT標准BEP-3中定義了8種peer消息:消息格式為msg_len(4個字節) + msg_type(1一個字節) + payload

0 - choke  --發送該消息表示本段發送阻塞,對端將不能獲取到piece數據,payload 為 0

1 - unchoke  --發送該消息表示解阻塞,對端可以開始發送請求獲取piece數據,payload 為 0

2 - interested  --發送該消息,表示對對端的pieces數據有興趣,payload 為 0

3 - not interested  ---發送該消息,表示對對端的pieces數據沒有興趣了,payload 為 0

4 - have       ---發送該消息,通告對端 本段擁有的pieces,payload為4個字節的piece index

5 - bitfield  ---發送該消息,通告對端 本段擁有的pieces,為bit map的方式表示每個piece index在本端是否擁有。piece index所在bit位為1,表示擁有。

                     該消息為handshake成功后的第一個消息。

6 - request   ---piece請求消息,payload為: index, begin, and length,都是4個字節表示,length一般實現為0x8000, 最大不能超過0x20000。

7 - piece     ---piece  數據,payload為: index, begin,data 

8 - cancel    ---發送該消息,表示本端取消了某個piece請求。payload為:index, begin, and length

 

使用python的異步socket接口實現,為了減少處理過程被socket阻塞,使用多個線程處理每個peer。

每個peer包括3個線程:request timeout timer ,socket send data thread, socket receive data thread,使用select 函數判斷socket是否可讀、可寫。

對socket讀寫操作時使用RLock進行保護,select阻塞進程時不加鎖,避免阻塞其他線程。

發送數據數據時先寫一個隊列,然后通過set threading.Event 變量出發socket send data thread發送數據,保證發送數據的線程不阻塞

由於 python沒有結束線程的接口,socket send data thread, socket receive data thread 需要依賴特殊變量的賦值,使socket處理進程結束。

使用同步調用來觸發下載過程運轉,盡量不使用timer輪詢的方式,可以降低cpu使用率並加快下載過程。

但是,多線程間的同步調用由於鎖的存在,會導致性能下降並容易引入信號量死鎖的問題。需要仔細設計好多線程的運行軌跡避免死鎖。

draft BEP中定義的功能暫未實現,peer的上傳流控未實現,peer質量分級未實現。

PeerConnect
  1 '''
  2 Created on 2012-10-3
  3 
  4 @author: ddt
  5 '''
  6 from socket import *
  7 import threading
  8 import log_info
  9 import select
 10 
 11 class PeerConnect(object):
 12     '''
 13     TODO: upload flow control
 14     TODO: peer quality management
 15     '''
 16     def __init__(self, ip, port, task_info):
 17         '''
 18         Constructor
 19         '''
 20         self.__ip = ip
 21         self.__port = port
 22         
 23         self.__info_hash = task_info.get_info_hash()
 24         self.__local_id = task_info.get_local_id()
 25         self.__task_info = task_info
 26         
 27         leading_string = chr(19)+'BitTorrent protocol'
 28         reserved_string = chr(0)*8
 29         self.__handshake_info = leading_string + reserved_string 
 30         self.__handshake_info += self.__info_hash + self.__local_id
 31     
 32         self.__request_piece_len = 0x4000
 33         self.__receive_data_len = 0x8000
 34         
 35         self.__tcp_con = None
 36         self.__tcp_connect_timeout = 60
 37         
 38         self.__tcp_handshake_timeout = 60
 39         
 40         self.__keepalive_timer = None
 41         self.__sck_send_thread = None
 42         
 43         self.__retry_timer = None
 44         self.__retry_intvl = 2 # second
 45         self.__retry_times = 0
 46         self.__max_retrys = 10
 47         
 48         self.__local_choked = True
 49         self.__peer_choked = True
 50         self.__local_interested = False
 51         self.__peer_interested = False
 52         
 53         self.__peer_have_pieces = []
 54 
 55     
 56         self.__local_requesting = False;
 57         self.__local_requesting_pieces = []
 58         self.__local_max_requesting = 10
 59         self.__local_requesting_timer = None
 60         self.__local_requesting_timeout_intvl = 30
 61             
 62         self.__receiving_cache = ''
 63     
 64         self.__peer_pending_request = []
 65         self.__local_pending_request = []
 66         
 67         self.__local_pending_request_less = 10
 68         self.__peer_have_pieces_pending = []
 69         
 70         self.__local_sending_queue = None
 71         self.__rlock_sck_send = threading.RLock() 
 72         self.__local_sending_event = threading.Event()
 73         self.__min_sck_send_msg = 0x1000
 74         
 75         self.__rlock_common = threading.RLock()
 76         
 77         self.__peer_started = False
 78         
 79         self.__peer_choked_timer = None
 80         self.__peer_choked_timeout_intvl = 180
 81         
 82         self.__dispatch_timer = None
 83         self.__dispatch_timeout = 5
 84 
 85     def start(self):
 86         with self.__rlock_common:
 87             if not self.__peer_started:
 88                 self.__retry_times = 0
 89                 self.__startup_thread = threading.Thread(target=PeerConnect.__connect,args=(self,))
 90                 self.__peer_started = True
 91                 self.__startup_thread.start()
 92     
 93     def stop(self):
 94         with self.__rlock_common:
 95             if self.__peer_started:
 96                 self.__retry_times = self.__max_retrys
 97                 self.__disconnect()
 98                 self.__peer_started = False
 99         pass
100 
101     def dispatch_pieces(self,pieces, piece_len):
102         
103         with self.__rlock_common:
104         
105             self.__write_log(str(pieces))
106             self.__write_log(str(self.__peer_have_pieces))
107             
108             if len(pieces) == 0:
109                 return False
110             
111             for piece_index in pieces:
112                 if piece_index not in self.__peer_have_pieces:
113                     return False 
114                  
115             for piece_index in pieces:       
116                 for offset in range(0, piece_len, self.__request_piece_len):
117                     length = self.__request_piece_len
118                     if offset+length > piece_len:
119                         length = piece_len - offset
120                     piece = (piece_index, offset, length)
121                     if piece not in self.__local_pending_request:
122                         self.__local_pending_request.append(piece)
123             
124             if self.__dispatch_timer != None:
125                 self.__dispatch_timer.cancel()
126             
127             self.__check_local_request()
128             
129             if self.__peer_choked:
130                 if self.__peer_choked_timer == None or not self.__peer_choked_timer.is_alive():
131                     self.__peer_choked_timer = threading.Timer(self.__peer_choked_timeout_intvl, PeerConnect.__peer_choked_timeout, [self,])
132                     self.__peer_choked_timer.start()
133         
134             
135         return True
136     
137     def cancel_pieces(self, pieces):
138         with self.__rlock_common:
139             for piece in self.__local_pending_request:
140                 if piece[0] in pieces:
141                     self.__local_pending_request.remove(piece)
142                     
143             if self.__local_requesting:
144                 for piece in self.__local_requesting_pieces:
145                     if piece[0] in pieces:
146                         self.__send_cancel(piece)
147                         self.__local_requesting_pieces.remove(piece)
148                 if len(self.__local_requesting_pieces) == 0:
149                     self.__local_requesting = False
150                     if self.__local_requesting_timer != None:
151                         self.__local_requesting_timer.cancel()       
152                 self.__check_local_request()
153 
154         
155     def set_choke_state(self, choked):
156         with self.__rlock_common:
157             if self.__local_choked != choked:
158                 if choked:
159                     self.__send_choked()
160                 else:
161                     self.__send_unchoked()
162                     self.__check_peer_request()
163             pass
164         
165     def get_peer_have_pieces(self):
166         with self.__rlock_common:
167             return self.__peer_have_pieces
168     
169     def notify_local_have_pieces(self, pieces):
170         with self.__rlock_common:
171             self.__send_have(pieces)
172          
173     def is_dead_peer(self):
174         with self.__rlock_common:
175             return  self.__retry_times > self.__max_retrys
176     
177     def get_local_pending_pieces(self):
178         with self.__rlock_common:
179             pieces_index = []
180             for piece in self.__local_pending_request:
181                 if piece[0] not in pieces_index:
182                     pieces_index.append(piece[0])
183             if self.__local_requesting:
184                 for piece in self.__local_requesting_pieces: 
185                     if piece[0] not in pieces_index:
186                         pieces_index.append(piece[0])
187 
188         return pieces_index
189     
190     def get_peer_addr(self):
191         return (self.__ip, self.__port)
192         
193     def __connect(self):
194 
195         self.__tcp_con = socket(AF_INET, SOCK_STREAM)
196         self.__tcp_con.settimeout(self.__tcp_connect_timeout)
197 
198         try:
199             self.__tcp_con.connect((self.__ip,self.__port))
200         except error , e:
201             self.__write_log('peer connect error: %s, retry' %e)
202             self.__retry_connect()
203             return
204         
205         self.__tcp_con.settimeout(None)
206         
207         self.__start_send_proc()
208 
209         if not self.__handshake():
210             self.__retry_connect()
211             return
212         
213         self.__send_bitfield()
214         self.__send_unchoked()
215         self.__start_keepalive_timer()      
216 
217         self.__recv_loop()
218     pass
219 
220     def __disconnect(self):
221         self.__write_log('__disconnect:begin')
222         
223         if self.__retry_timer != None:
224             self.__retry_timer.cancel()
225         
226         if self.__keepalive_timer != None:
227             self.__keepalive_timer.cancel()
228             
229         if self.__local_sending_queue != None:
230             self.__local_sending_queue = None
231             self.__local_sending_event.set()
232             
233         if self.__peer_choked_timer != None:
234             self.__peer_choked_timer.cancel()
235             
236         if self.__dispatch_timer != None:
237             self.__dispatch_timer.cancel()    
238 
239         
240         if self.__local_requesting:
241             self.__local_requesting = False
242             self.__local_pending_request = self.__local_requesting_pieces + self.__local_pending_request
243             self.__local_requesting_pieces = []
244             if self.__local_requesting_timer != None:
245                 self.__local_requesting_timer.cancel()
246         
247         self.__tcp_con.close()
248         self.__write_log('__disconnect: self.__tcp_con.closed')
249         
250         self.__receiving_cache = ''
251         
252         self.__local_choked = True
253         self.__peer_choked = True
254         self.__local_interested = False
255         self.__peer_interested = False
256         self.__local_requesting_pieces = []
257         self.__peer_pending_request = []
258         self.__peer_have_pieces = []
259         self.__peer_have_pieces_pending = []
260         pass
261     
262     def __start_keepalive_timer(self):
263         if self.__keepalive_timer != None:
264             self.__keepalive_timer.cancel()
265         self.__keepalive_timer = threading.Timer(120,PeerConnect.__send_keepalive_timeout,[self,])
266         self.__keepalive_timer.start()
267         
268     def __send_keepalive_timeout(self):
269         
270         with self.__rlock_common:
271             self.__send_keepalive()
272             self.__start_keepalive_timer()
273         
274     def __recv_loop(self):
275         self.__tcp_con.setblocking(False)
276         while True:
277             ready_r, ready_w, in_err = select.select([self.__tcp_con,], [], [self.__tcp_con,], 600)
278             
279             with self.__rlock_common:
280                 if self.__tcp_con in in_err:
281                     self.__write_log('__recv_loop: socket in error select result:%s' %str(in_err))
282                     self.__retry_connect()
283                     break
284                 
285                 if self.__tcp_con not in ready_r:
286                     self.__write_log('__recv_loop: unexpected select result!')
287                     continue 
288                 
289                 try:
290                     received_data = self.__tcp_con.recv(self.__receive_data_len)
291                         
292                 except error, e:
293                     self.__write_log("receive data failed, error:%s, retry" %e)
294                     self.__retry_connect()
295                     break
296             
297                 if len(received_data) == 0:
298                     self.__write_log("have received null data")
299                     self.__retry_connect()
300                     break
301             
302                 self.__reveived_data(received_data)
303         pass
304     
305     def __start_send_proc(self):
306         with self.__rlock_sck_send:
307             self.__local_sending_queue = ''
308         self.__sck_send_thread = threading.Thread(target=PeerConnect.__proc_sending, args=(self,))
309         self.__sck_send_thread.start()
310         
311     def __proc_sending(self):
312         while self.__local_sending_queue != None:
313                         
314             ready_r, ready_w, in_err = select.select([], [self.__tcp_con,], [self.__tcp_con,])
315             
316             self.__local_sending_event.wait()
317 
318             with self.__rlock_common:
319                 
320                 if self.__tcp_con  in in_err:
321                     self.__tcp_con.close()
322                     break
323                 
324                 if self.__tcp_con not in ready_w:
325                     self.__write_log('__proc_sending: unexpected select result!')
326                     continue
327                 
328                 if self.__local_sending_queue == None:
329                     break
330                 
331                 try:
332                     sent_len = self.__tcp_con.send(self.__local_sending_queue)
333                     self.__local_sending_queue = self.__local_sending_queue[sent_len:]
334                     
335                 except error,e:
336                     self.__tcp_con.close()
337                     self.__write_log('__proc_sending failed! error:%s' %str(e))
338                     break
339                     
340                 if len(self.__local_sending_queue) == 0:
341                     self.__local_sending_event.clear()
342         pass
343     
344     def __check_peer_request(self):
345         if self.__peer_interested and not self.__local_choked:
346             while len(self.__peer_pending_request) > 0:
347                 piece = self.__peer_pending_request.pop(0)
348                 piece_index, offset, length = piece
349                 if self.__local_have(piece_index):
350                     data = self.__read_piecedata(piece_index,offset, length)
351                     self.__send_piece(piece_index, offset, data)
352                 else:
353                     self.__write_log('peer request piece:%d not have.' %piece_index)
354         pass
355 
356     def __check_local_request(self): 
357         with self.__rlock_common:
358             self.__check_interested() 
359             
360             if self.__local_requesting and len(self.__local_requesting_pieces) >= self.__local_max_requesting:
361                 return
362             
363             if len(self.__local_pending_request) != 0:
364                 if not self.__local_interested:
365                     self.__send_interested()
366             else:
367                 if len(self.__peer_have_pieces) != 0:
368                     if self.__dispatch_timer != None:
369                         self.__dispatch_timer.cancel()
370                         
371                     self.__dispatch_timer = threading.Timer(self.__dispatch_timeout,PeerConnect.__check_local_request ,[self,])
372                     self.__dispatch_timer.start()
373                     self.__local_interested = False
374                     self.__notify_pieces_completed()
375                 return
376             
377             if self.__peer_choked:
378                 return
379         
380             adding_piece = self.__local_pending_request.pop(0)
381             if adding_piece[0] not in self.__peer_have_pieces:
382                 for piece in self.__local_pending_request:
383                     if piece[0] == adding_piece[0]:
384                         self.__local_pending_request.remove(piece)
385                 self.__notify_pieces_canceled([adding_piece[0],])
386                 self.__check_local_request()
387             else:
388                 self.__local_requesting = True
389                 self.__local_requesting_pieces.append(adding_piece)    
390                 self.__send_request(adding_piece)
391                 self.__check_local_request()
392                 
393                 if self.__local_requesting_timer == None or not self.__local_requesting_timer.is_alive():
394                     self.__local_requesting_timer = threading.Timer(self.__local_requesting_timeout_intvl, PeerConnect.__local_requesting_timeout, [self,])
395                     self.__local_requesting_timer.start()
396             pass
397     
398     def __local_requesting_timeout(self):
399         with self.__rlock_common:
400             if self.__local_requesting:
401                 self.__local_requesting = False
402                 self.__local_pending_request = self.__local_requesting_pieces + self.__local_pending_request
403                 self.__local_requesting_pieces = []
404                 self.__local_interested = False
405                 self.__check_local_request()
406 
407         pass
408     
409     def __peer_choked_timeout(self):
410         with self.__rlock_common:
411             if self.__peer_choked:
412                 pending_pieces = []
413                 for piece in self.__local_pending_request:
414                     if piece[0] not in pending_pieces:
415                         pending_pieces.append(piece[0])
416                 if len(pending_pieces) != 0:
417                     self.__notify_pieces_canceled(pending_pieces)
418                     self.__local_pending_request = []
419         
420                     
421     
422     def __check_interested(self):
423         if not self.__local_requesting:
424             if len(self.__local_pending_request) != 0 and not self.__local_interested:
425                 self.__send_interested()
426                 
427         if not self.__local_requesting and len(self.__local_pending_request) == 0:
428             if self.__local_interested:
429                 self.__send_notintrested()
430         pass
431                 
432     def __retry_connect(self):
433         
434         self.__write_log('__retry_connect')
435         pending_pieces = []
436         peer_dead = False
437          
438         with self.__rlock_common:
439             self.__disconnect()
440             
441             for piece in self.__local_pending_request:
442                 if piece[0] not in pending_pieces:
443                     pending_pieces.append(piece[0])
444                     
445             self.__retry_times += 1
446             if self.__retry_times > self.__max_retrys:
447                 peer_dead = True
448 
449             else:
450                 self.__retry_timer = threading.Timer(self.__retry_intvl**self.__retry_times, PeerConnect.__connect, [self,])
451                 self.__retry_timer.start()
452                 
453         if peer_dead == True:        
454             self.__notify_peer_dead()
455             
456         if len(pending_pieces) != 0:
457             self.__notify_pieces_canceled(pending_pieces)
458             self.__local_pending_request = []
459 
460     def __reveived_data(self, data):
461         self.__receiving_cache += data
462             
463         while len(self.__receiving_cache) >= 4:
464             msg_len = _str_ntohl(self.__receiving_cache[0:4])
465             
466             if (len(self.__receiving_cache)-4) >= msg_len:
467                 self.__proc_msg(self.__receiving_cache[4:(4+msg_len)])
468                 self.__receiving_cache = data[4+msg_len:]
469             else:
470                 break
471                     
472                 
473     def __proc_msg(self, msg):
474         with self.__rlock_common:
475             if len(msg) == 0:
476                 self.__received_keepalive()
477             else: 
478                 msg_type = msg[0]
479                 if msg_type == chr(0):
480                     self.__received_choked()
481                 elif msg_type == chr(1):
482                     self.__received_unchoked()
483                 elif msg_type == chr(2):
484                     self.__received_interested()
485                 elif msg_type == chr(3):
486                     self.__received_notinterested()
487                 elif msg_type == chr(4):
488                     self.__received_have(msg[1:])                                
489                 elif msg_type == chr(5):
490                     self.__received_bitfield(msg[1:])
491                 elif msg_type == chr(6):
492                     self.__received_request(msg[1:])             
493                 elif msg_type == chr(7):
494                     self.__received_piece(msg[1:])
495                 elif msg_type == chr(8):
496                     self.__received_cancel(msg[1:]) 
497                 else:
498                     self.__write_log('received unknown msg :%s' %list(msg))
499     
500     def  __handshake(self):
501         self.__write_log('send handshake: %s' %list(self.__handshake_info))
502         self.__tcp_con.sendall(self.__handshake_info)
503         
504         try:
505             self.__tcp_con.settimeout(self.__tcp_handshake_timeout)
506             rsp = self.__tcp_con.recv(68)
507             
508             if len(rsp) != 68:
509                 return False
510             
511             self.__tcp_con.settimeout(None)
512             self.__write_log('recived handshake rsp: %s' %list(rsp))
513             self.__peer_id = rsp[47:67]
514             self.__write_log('peer_id:%s' %self.__peer_id)
515             
516         except (error,timeout), e:
517             self.__write_log('handshake failed, error:%s' %e)
518             return False
519         return True
520     
521     def __received_keepalive(self):
522         self.__write_log('received keepalive')
523         #self.__send_keepalive()
524         pass
525             
526     def __received_choked(self):
527 
528         self.__peer_choked = True
529         
530         if self.__local_requesting:
531             self.__local_requesting = False
532             self.__local_pending_request =  self.__local_requesting_pieces + self.__local_pending_request
533             self.__local_requesting_pieces = []
534             
535         self.__notify_peer_choked()
536         
537         if len(self.__local_pending_request) != 0:
538             pending_pieces = []
539             for piece in self.__local_pending_request:
540                 if piece[0] not in pending_pieces:
541                     pending_pieces.append(piece[0])
542             self.__notify_pieces_canceled(pending_pieces)
543             self.__local_pending_request = []
544             self.__local_interested = False
545             
546         self.__write_log('received choked')
547         
548     def __received_unchoked(self):
549         self.__peer_choked = False
550         
551         self.__notify_peer_unchoked()
552         
553         #if len(self.__local_pending_request) < self.__local_pending_request_less:
554         self.__check_local_request()
555         
556         if self.__peer_choked_timer != None:
557             self.__peer_choked_timer.cancel()
558             
559         self.__write_log('received unchoked')
560         
561     
562     def __received_interested(self):
563         self.__peer_interested = True
564         #self.__send_unchoked()
565         self.__write_log('received interested')
566         
567     def __received_notinterested(self):
568         self.__peer_interested = False
569         self.__peer_pending_request = [] 
570         self.__write_log('received notinterested')
571         
572     def __received_have(self, data):
573         '''
574         TODO:Notify peer have pieces changed
575         '''
576         piece_index = _str_ntohl(data[0:4])
577         if piece_index not in self.__peer_have_pieces:
578             self.__peer_have_pieces.append(piece_index)
579             self.__peer_have_pieces_pending.append(piece_index)
580             if len(self.__local_pending_request) < self.__local_pending_request_less:
581                 self.__notify_pieces_have(self.__peer_have_pieces_pending)
582                 self.__peer_have_pieces_pending = []
583             
584             
585         self.__write_log('received have piece:%d' %piece_index)
586 
587     def __received_bitfield(self, data):
588         '''
589         '''
590         bitfield_len = len(data)
591         for i in range(0,bitfield_len):
592             byte = data[i]
593             for j in range(0,8):
594                 byte_mask = 1<<(7-j)
595                 piece_index = i*8+j
596                 have = ord(byte)&byte_mask
597                 if have != 0:
598                     if piece_index not in self.__peer_have_pieces:
599                         self.__peer_have_pieces.append(piece_index)
600                         
601         self.__notify_pieces_have(self.__peer_have_pieces)
602 
603         self.__write_log('received bitfield ,peer have pieces:%s' %self.__peer_have_pieces)
604 
605     def __received_request(self, data):
606         piece_index = _str_ntohl(data[0:4])
607         offset      = _str_ntohl(data[4:8])
608         data_len    = _str_ntohl(data[8:12])
609         if self.__peer_interested:
610                 self.__peer_pending_request.append((piece_index, offset, data_len))
611         else:
612             self.__write_log("received request piece:%d , but peer not interested" %piece_index)
613         
614         self.__check_peer_request()
615         self.__write_log("received request piece:%d " %piece_index)
616     
617     def __received_piece(self, data):
618         piece_index = _str_ntohl(data[0:4])
619         offset      = _str_ntohl(data[4:8])
620         piece = (piece_index, offset, len(data)-8)
621         
622         if self.__local_requesting:
623             if piece in self.__local_requesting_pieces:
624                 self.__write_piecedata(piece_index, offset, data[8:])
625                 self.__local_requesting_pieces.remove(piece)
626                 
627             if self.__local_requesting_timer != None:
628                 self.__local_requesting_timer.cancel()
629                 
630             if len(self.__local_requesting_pieces) == 0:  
631                 self.__local_requesting = False
632                 
633             self.__check_local_request()
634             self.__write_log("received  piece: %s" %str((piece_index, offset, len(data)-8)))
635         else:
636             self.__write_log("received unexpected piece: %s" %str((piece_index, offset, len(data)-8)))
637 
638     def __received_cancel(self, data):
639         piece_index = _str_ntohl(data[0:4])
640         offset      = _str_ntohl(data[4:8])
641         data_len    = _str_ntohl(data[8:12])
642         request = (piece_index, offset, data_len)
643         if request in self.__peer_pending_request:
644             self.__peer_pending_request.remove(request)
645         self.__check_peer_request()
646         self.__write_log("received cancel: %s" %str((piece_index,offset,data_len)))
647             
648     def __send_keepalive(self):
649         msg_len = 0
650         msg = _htonl_str(msg_len)
651         self.__write_log('send keepalive: %s' %list(msg))
652         self.__sck_send(msg)
653         
654     def __send_choked(self):
655         self.__local_choked = True
656         msg_type = chr(0)
657         msg_len = 1
658         msg = _htonl_str(msg_len) + msg_type
659         self.__write_log('send choked: %s' %list(msg))
660         self.__sck_send(msg)
661         
662     def __send_unchoked(self):
663         self.__local_choked = False
664         msg_type = chr(1)
665         msg_len = 1
666         msg = _htonl_str(msg_len) + msg_type
667         self.__write_log('send unchoked: %s' %list(msg))
668         self.__sck_send(msg)    
669         
670     def __send_interested(self):
671         self.__local_interested = True
672         msg_type = chr(2)
673         msg_len = 1
674         msg = _htonl_str(msg_len) + msg_type
675         self.__write_log('send intrested: %s' %list(msg))
676         self.__sck_send(msg)
677         
678     def __send_notintrested(self):
679         self.__local_interested = False
680         msg_type = chr(3)
681         msg_len = 1
682         msg = _htonl_str(msg_len) + msg_type
683         self.__write_log('send notintrested: %s' %list(msg))
684         self.__sck_send(msg)
685         
686     def __send_have(self,pieces):
687         msg = ''
688         msg_type = chr(4)
689         msg_len = 5
690         for piece_index in pieces:
691             msg += _htonl_str(msg_len) + msg_type +  _htonl_str(piece_index)
692         self.__write_log('send have: %s' %str(list(msg)))
693         self.__sck_send(msg)
694         
695     def __send_bitfield(self):
696         bitfield = self.__get_local_bitfield()
697         msg_type = chr(5)
698         msg_len = 1 + len(bitfield)
699         msg = _htonl_str(msg_len) + msg_type + bitfield
700         self.__sck_send(msg)
701         self.__write_log('send bitfield: %s' %list(msg))
702         
703     def __send_request(self, piece):
704         msg = ''
705         msg_type = chr(6)
706         msg_len = 13
707         (piece_index, begin, length) = piece
708         msg += _htonl_str(msg_len) + msg_type + _htonl_str(piece_index) + _htonl_str(begin) + _htonl_str(length)
709         self.__write_log('send request: %s' %list(msg))
710         self.__sck_send(msg)
711     
712     def __send_piece(self, piece_index, offset, data):
713         msg = ''
714         msg_type = chr(7)
715         data_len = len(data)
716         msg_len = 1 + 4 + 4 + data_len
717         msg += _htonl_str(msg_len) + msg_type + _htonl_str(piece_index) + _htonl_str(offset)
718         msg += data
719         self.__write_log('send piece (%d,%d)' %(piece_index, offset))
720         self.__sck_send(msg)
721         
722     def __send_cancel(self, piece_index, offset, length):
723         msg = ''
724         msg_type = chr(8)
725         msg_len = 13
726         msg += _htonl_str(msg_len) + msg_type + _htonl_str(piece_index) + _htonl_str(offset) + _htonl_str(length)
727         self.__write_log('send cancel: %s' %list(msg))
728         self.__sck_send(msg)
729             
730     def __sck_send(self, msg):
731         with self.__rlock_common:
732             if self.__local_sending_queue == None:
733                 self.__write_log('sck send msg failed, because queue is none!')
734                 return
735             
736             self.__local_sending_queue += msg
737             self.__local_sending_event.set()
738             #self.__tcp_con.sendall(msg)
739 
740         
741     def __local_have(self,piece_index):
742         pieces = self.__task_info.get_local_have_pieces()
743         if piece_index in pieces:
744             return True
745         else:
746             return False
747 
748     def __read_piecedata(self, piece_index, offset, data_len):
749         return self.__task_info.read_piecedata(piece_index, offset, data_len)
750     
751     def __write_piecedata(self, piece_index, offset, data):
752         self.__task_info.write_piecedata(piece_index, offset, data)
753     
754     def __notify_pieces_canceled(self, pieces):
755         self.__write_log('notify taskinfo canceled pieces')
756         self.__task_info.peer_pieces_canceled(self, pieces)
757     
758     def __notify_pieces_have(self, pieces):
759         self.__write_log('notify taskinfo peeer have pieces')
760         self.__task_info.peer_have_pieces(self, pieces)
761         
762     def __notify_pieces_completed(self):
763         self.__task_info.peer_pieces_completed(self)
764     
765     def __notify_peer_choked(self):
766         self.__task_info.peer_choked(self)
767         
768     def __notify_peer_unchoked(self):
769         self.__task_info.peer_unchoked(self)
770         
771     def __notify_peer_dead(self):
772         self.__task_info.peer_dead(self)
773     
774     def __get_local_bitfield(self):
775         pieces_num = self.__task_info.get_pieces_num()
776         bitfield_len = pieces_num/8
777         if pieces_num%8 != 0:
778             bitfield_len += 1
779         bitfield = [chr(0),]*bitfield_len
780         
781         pieces = self.__task_info.get_local_have_pieces()
782         for index in pieces:
783             bit_filed_index = index / 8
784             bit_field_offset = index % 8
785             byte_mask = 1<<bit_field_offset
786             byte = ord(bitfield[bit_filed_index])
787             byte |= byte_mask
788             bitfield[bit_filed_index] = chr(byte)
789         return ''.join(bitfield)
790     
791     def __write_log(self, info):
792         log_info.write_log('#peer_connect[%s]# '  %self.__ip + info)
793         pass
794          
795 def _htonl_str(integer):
796     msg = ''
797     msg += chr((integer>>24)%0x100)
798     msg += chr((integer>>16)%0x100)
799     msg += chr((integer>>8)%0x100)
800     msg += chr(integer%0x100)
801     return msg
802 
803 def _str_ntohl(msg):
804     integer = 0
805     integer += ord(msg[0])<<24
806     integer += ord(msg[1])<<16
807     integer += ord(msg[2])<<8
808     integer += ord(msg[3])
809     return integer


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM