Python線程
Threading用於提供線程相關的操作,線程是應用程序中工作的最小單元。
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time def show(arg): time.sleep(1) print 'thread'+str(arg) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print 'main thread stop'
上述代碼創建了10個“前台”線程,然后控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。
更多方法:
- start 線程准備就緒,等待CPU調度
- setName 為線程設置名稱
- getName 獲取線程名稱
- setDaemon 設置為后台線程或前台線程(默認)
如果是后台線程,主線程執行過程中,后台線程也在進行,主線程執行完畢后,后台線程不論成功與否,均停止
如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,程序停止 - join 逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義
- run 線程被cpu調度后自動執行線程對象的run方法
線程鎖
由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,CPU接着執行其他線程。所以,可能出現如下問題:

#!/usr/bin/env python #coding:utf-8 import threading import time gl_num = 0 lock = threading.RLock() def Func(): lock.acquire() global gl_num gl_num +=1 time.sleep(1) print gl_num lock.release() for i in range(10): t = threading.Thread(target=Func) t.start()
event
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。
- clear:將“Flag”設置為False
- set:將“Flag”設置為True
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading def do(event): print 'start' event.wait() print 'execute' event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() inp = raw_input('input:') if inp == 'true': event_obj.set()
Python 進程
from multiprocessing import Process import threading import time def foo(i): print 'say hi',i for i in range(10): p = Process(target=foo,args=(i,)) p.start()
注意:由於進程之間的數據需要各自持有一份,所以創建進程需要的非常大的開銷。
進程數據共享
進程各自持有一份數據,默認無法共享數據

#方法一,Array from multiprocessing import Process,Array temp = Array('i', [11,22,33,44]) def Foo(i): temp[i] = 100+i for item in temp: print i,'----->',item for i in range(2): p = Process(target=Foo,args=(i,)) p.start() #方法二:manage.dict()共享數據 from multiprocessing import Process,Manager manage = Manager() dic = manage.dict() def Foo(i): dic[i] = 100+i print dic.values() for i in range(2): p = Process(target=Foo,args=(i,)) p.start() p.join()

當創建進程時(非使用時),共享數據會被拿到子進程中,當進程中執行完畢后,再賦值給原值。

進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
- apply
- apply_async
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) return i+100 def Bar(arg): print arg pool = Pool(5) #print pool.apply(Foo,(1,)) #print pool.apply_async(func =Foo, args=(1,)).get() for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) print 'end' pool.close() pool.join()#進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
協程
線程和進程的操作是由程序觸發系統接口,最后的執行者是系統;協程的操作則是程序員。
協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用於協程;
greenlet
#!/usr/bin/env python # -*- coding:utf-8 -*- from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 gr2.switch() def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
gevent
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
遇到IO操作自動切換:
from gevent import monkey; monkey.patch_all() import gevent import urllib2 def f(url): print('GET: %s' % url) resp = urllib2.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
線程池:
方案簡介:
方案一:簡單版本的線程池,每次都要創建線程池;
方案二:支持傳函數、傳參、傳回調函數、立即終止所有線程、最大優點:線程的循環利用,節省時間和資源 ★★★★★
方案三:現有模塊,直接調用即可,不支持回調函數
方案一:
#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading class ThreadPool(object): def __init__(self, max_num=20): self.queue = Queue.Queue(max_num) for i in xrange(max_num): self.queue.put(threading.Thread) def get_thread(self): return self.queue.get() def add_thread(self): self.queue.put(threading.Thread) """ pool = ThreadPool(10) def func(arg, p): print arg import time time.sleep(2) p.add_thread() for i in xrange(30): thread = pool.get_thread() t = thread(target=func, args=(i, pool)) t.start() """
方案二:
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue() self.max_num = max_num self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數) :return: 如果線程池已經終止,則返回True否則None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 創建一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) status = True except Exception as e: status = False result = e if callback is not None: try: callback(status, result) except Exception as e: pass if self.terminal: # False event = StopEvent else: with self.worker_state(self.free_list,current_thread): event = self.q.get() else: self.generate_list.remove(current_thread) @contextlib.contextmanager def worker_state(self,x,v): x.append(v) try: yield finally: x.remove(v) def close(self): num = len(self.generate_list) while num: self.q.put(StopEvent) num -= 1 # 終止線程(清空隊列) def terminate(self): self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() import time def work(i): time.sleep(1) print(i) pool = ThreadPool(10) for item in range(50): pool.run(func=work, args=(item,)) # pool.terminate() #立即終止所有線程
方案三、
from concurrent.futures import ThreadPoolExecutor import time def f1(a): time.sleep(2) print(a) return 1 pool=ThreadPoolExecutor(5) for i in range(30): a=pool.submit(f1,i) # x=a.result()#獲取返回值,如果有,會阻塞