Threading模塊 包括Thread、Condition、Event、Lock、Rlock、Semaphore等類。
1、Thread類可以實例化一個線程t,(target=) t.start()
Thread方法如下:
getName:返回線程t的名稱、setName設置線程t的名稱
isAlive:判斷一個線程是否是活動的,也就是線程的狀態在t.start和t.run之間
isDaemon/setDaemon:如果線程t是一個駐留程序 setDaemon(True)
join:其他調用線程(非t)會直接掛起,直到t結束,只能在t.start之后調用t.join
start:讓線程t活動
run:run是運行線程t主函數的方法,thread的子類通常會覆蓋run,如果不被覆蓋,run將調用在創建t時傳遞的target參數
2、線程同步對象:
timeout:
lock/rlock:
Python的線程池實現
http://www.cnblogs.com/nsnow/archive/2010/04/18/1714596.html
1 #coding:utf-8
2
3 #Python的線程池實現
4
5 import Queue
6 import threading
7 import sys
8 import time
9 import urllib
10
11 #替我們工作的線程池中的線程
12 class MyThread(threading.Thread):
13 def __init__(self, workQueue, resultQueue,timeout=30, **kwargs):
14 threading.Thread.__init__(self, kwargs=kwargs)
15 #線程在結束前等待任務隊列多長時間
16 self.timeout = timeout
17 self.setDaemon(True)
18 self.workQueue = workQueue
19 self.resultQueue = resultQueue
20 self.start()
21
22 def run(self):
23 while True:
24 try:
25 #從工作隊列中獲取一個任務
26 callable, args, kwargs = self.workQueue.get(timeout=self.timeout)
27 #我們要執行的任務
28 res = callable(args, kwargs)
29 #報任務返回的結果放在結果隊列中
30 self.resultQueue.put(res+" | "+self.getName())
31 except Queue.Empty: #任務隊列空的時候結束此線程
32 break
33 except :
34 print sys.exc_info()
35 raise
36
37 class ThreadPool:
38 def __init__( self, num_of_threads=10):
39 self.workQueue = Queue.Queue()
40 self.resultQueue = Queue.Queue()
41 self.threads = []
42 self.__createThreadPool( num_of_threads )
43
44 def __createThreadPool( self, num_of_threads ):
45 for i in range( num_of_threads ):
46 thread = MyThread( self.workQueue, self.resultQueue )
47 self.threads.append(thread)
48
49 def wait_for_complete(self):
50 #等待所有線程完成。
51 while len(self.threads):
52 thread = self.threads.pop()
53 #等待線程結束
54 if thread.isAlive():#判斷線程是否還存活來決定是否調用join
55 thread.join()
56
57 def add_job( self, callable, *args, **kwargs ):
58 self.workQueue.put( (callable,args,kwargs) )
59
60 def test_job(id, sleep = 0.001 ):
61 html = ""
62 try:
63 time.sleep(1)
64 conn = urllib.urlopen('http://www.google.com/')
65 html = conn.read(20)
66 except:
67 print sys.exc_info()
68 return html
69
70 def test():
71 print 'start testing'
72 tp = ThreadPool(10)
73 for i in range(50):
74 time.sleep(0.2)
75 tp.add_job( test_job, i, i*0.001 )
76 tp.wait_for_complete()
77 #處理結果
78 print 'result Queue\'s length == %d '% tp.resultQueue.qsize()
79 while tp.resultQueue.qsize():
80 print tp.resultQueue.get()
81 print 'end testing'
82 if __name__ == '__main__':
83 test()
condition:
wait:
event:
二、Queue模塊,threading一般都是和Queue模塊配合使用的,Queue產生一個隊列,隊列模式有三種:
FIFO先進先出隊列: Queue.Queue(maxsize) 如果maxsize小於1就表示隊列長度無限
LIFO先進后出隊列:Queue.LifoQueue(maxsize)
優先級隊列級別越低越先出來:Queue.PriorityQueue(maxsize)
1、FIFO先進先出隊列的方法:
Queue.qsize(): 返回隊列的大小
Queue.empty():如果隊列為空,返回True,反之False
Queue.full():如果隊列滿了,返回True,反之False
Queue.get() //從隊頭刪除並返回一個item,可選參數為block,默認為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有item可用。如果隊列為空且block為false,隊列將引發Empty異常。
Queue.get_nowait():相當於Queue.get(False)
Queue.put() //在隊尾插入一個item,put()有兩個參數,第一個item為必需的,第二個block為可選參數,默認為1.如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發full異常。
Queue.put_nowait():相當於Queue.put(item,False)
Queue.task_done():在完成一項工作后,Queue.task_done()函數向任務已經完成的隊列發送一個信號
Queue.join():等到隊列為空時,再執行其他的操作
以下是一些基本觀點和概念:
1.多線程采用的是分時復用技術,即不存在真正的多線程,cpu做的事是快速地切換線程,以達到類似同步運行的目的,因為高密集運算方面多線程是沒有用的,但是對於存在延遲的情況(延遲IO,網絡等)多線程可以大大減少等待時間,避免不必要的浪費。
2.原子操作:這件事情是不可再分的,如變量的賦值,不可能一個線程在賦值,到一半切到另外一個線程工作去了……但是一些數據結構的操作,如棧的push什么的,並非是原子操作,比如要經過棧頂指針上移、賦值、計數器加1等等,在其中的任何一步中斷,切換到另一線程再操作這個棧時,就會產生嚴重的問題,因此要使用鎖來避免這樣的情況。比如加鎖后的push操作就可以認為是原子的了……
3.阻塞:所謂的阻塞,就是這個線程等待,一直到可以運行為止。最簡單的例子就是一線程原子操作下,其它線程都是阻塞狀態,這是微觀的情況。對於宏觀的情況,比如服務器等待用戶連接,如果始終沒有連接,那么這個線程就在阻塞狀態。同理,最簡單的input語句,在等待輸入時也是阻塞狀態。
4.在創建線程后,執行p.start(),這個函數是非阻塞的,即主線程會繼續執行以后的指令,相當於主線程和子線程都並行地執行。所以非阻塞的函數立刻返回值的~
對於資源,加鎖是個重要的環節。因為python原生的list,dict等,都是not thread safe的。而Queue,是線程安全的,因此在滿足使用條件下,建議使用隊列。
隊列適用於 “生產者-消費者”模型。雙方無論數量多少,產生速度有何差異,都可以使用queue。