1, 多線程
- 線程是進程的一個實體,是CPU進行調度的最小單位,他是比進程更小能獨立運行的基本單位。
- 線程基本不擁有系統資源,只占用一點運行中的資源(如程序計數器,一組寄存器和棧),但是它可以與同屬於一個進程的其他線程共享全部的資源。
- 提高程序的運行速率,上下文切換快,開銷比較少,但是不夠穩定,容易丟失數據,形成死鎖。
直接上代碼:
import time import threading # 函數1用時2秒 def fun1(): time.sleep(2) print(threading.current_thread().name, time.ctime()) # 函數2用時4秒 def fun2(): time.sleep(4) print(threading.current_thread().name, time.ctime()) # 函數3用時6秒 def fun3(): time.sleep(6) print('hello python', time.ctime()) th1 = threading.Thread(target=fun1) th2 = threading.Thread(target=fun2) th3 = threading.Thread(target=fun3) th1.start() th2.start() th3.start()
打印結果:
Thread-1 Mon Jan 7 11:01:52 2019 Thread-2 Mon Jan 7 11:01:54 2019 hello python Mon Jan 7 11:01:56 2019
解析:從結果看出,他們同一時間 11:01:50開始執行,分別用了不同的時間結束
接着往下看,添加join阻塞線程
''''''
th1.start()
th1.join()
th2.start()
th2.join()
th3.start()
th3.join()
打印結果:
Thread-1 Mon Jan 7 11:19:00 2019
Thread-2 Mon Jan 7 11:19:04 2019
hello python Mon Jan 7 11:19:10 2019
我們看到這三線程按順序依次執行。
我們接着看看線程的方法使用:
threading.enumerate() #列舉線程,返回列表,其中里面會有一條主線程 threading.activeCount() #查看線程運行個數 threading.current_thread().name #查看當前運行線程名稱 join() #阻塞線程運行
我們接着看第二種開線程的方式:
import threading import time class MyThread(threading.Thread): def run(self): for i in range(3): time.sleep(1) msg = "I'm "+self.name+' @ '+str(i) #name屬性中保存的是當前線程的名字 print(msg) if __name__ == '__main__': t = MyThread() t.setName('yangzhenyu') a = t.isAlive() print(a) print(t.getName()) t.start() b = t.isAlive() print(b)
打印結果:
False yanzghenyu True I'm yanzghenyu @ 0 I'm yanzghenyu @ 1 I'm yanzghenyu @ 2
方法總結:
t.setName() #設置運行線程名稱,不指定默認Thread-1 t.getName() #獲取線程名稱 t.isAlive() #判斷線程是否運行,返回布爾類型
線程間共享全局變量:
import threading import time n = 100 def work01(): global n for i in range(3): n += 1 print(n) //103 def work02(): global n print(n) //103 print(n) //100 t1 = threading.Thread(target=work01) t1.start()
time.sleep(1)
t2 = threading.Thread(target=work02) t2.start()
關於線程鎖
- 用threading.Lock()創建鎖,用acquire()申請鎖,每次只有一個線程獲得鎖,其他線程必須等此線程release()后才能獲得鎖
- RLock允許在同一線程中被多次acquire。而Lock卻不允許這種情況。
- 注意:如果使用RLock,那么acquire和release必須成對出現,即同一線程中調用了n次acquire,必須調用n次的release才能真正釋放所占用的瑣
下面例子中我們用到的是Lock(),當加完鎖之后,該方法同一時間內只能被一個線程調用。
import threading mylock=threading.Lock()#創建鎖 num = 0 def add_num(name): global num while True: mylock.acquire()#申請鎖也就是加鎖 print('thread %s locked! num=%d'%(name,num)) if num>=5: print('thread %s release! num=%d'%(name,num)) mylock.release()#釋放鎖 return num += 1 print('thread %s release! num = %d'%(name,num)) mylock.release() t1 = threading.Thread(target=add_num,args=('A',)) t2 = threading.Thread(target=add_num,args=('B',)) t1.start() t2.start()
打印結果:
thread A locked! num=0 thread A release! num = 1 thread A locked! num=1 thread A release! num = 2 thread A locked! num=2 thread A release! num = 3 thread A locked! num=3 thread A release! num = 4 thread A locked! num=4 thread A release! num = 5 thread A locked! num=5 thread A release! num=5 thread B locked! num=5 thread B release! num=5

cpu io密集型適合用多線程進行開發
關於進程:
- 進程是系統進行資源分配的最小單位,每個進程都有自己的獨立內存空間,不用進程通過進程間通信來通信。
- 但是進程占據獨立空間,比較重量級,所以上下文進程間的切換開銷比較大,但是比較穩定安全。
進程創建:
第一種創建進程的方式:
from multiprocessing import Process import time import random import os def piao(name): print("%s is piaoping"%name) time.sleep(random.randint(0,1)) print("%s is piao end"%name) if __name__ == '__main__': print("CPU的個數是:%d"%os.cpu_count()) p1 = Process(target=piao,args=("alex",),name="進程1") print(p1.name) p1.start() print("父進程!") #執行速度要遠快於建立新進程的時間
打印結果:
CPU的個數是:2 進程1 父進程! alex is piaoping alex is piao end
第二種創建進程的方式:
from multiprocessing import Process import time import random #繼承Process類,並實現自己的run方法 class Piao(Process): def __init__(self,name): #必須調用父類的init方法 super().__init__() self.name = name def run(self): print("%s is piaoing"%self.name) time.sleep(random.randint(1,3)) print("%s is piaoeng"%self.name) if __name__ == '__main__': p1 = Piao("Alex") #開辟一個新的進程實際上就是執行本進程所對應的run()方法 p1.start() print("主進程!")
結果:
主進程! Alex is piaoing Alex is piaoeng
解析:join括號中不攜帶參數,表示父進程在這個位置要等待p1進程執行完成后,如果指定參數,也就是等待時間s,那么主進程將在這個時間內結束,
用is_active() 方法即可檢測進程的狀態,不加join() 返回True,表示進程還在進行。
進程的方法,
start() 啟動進程實例(創建子進程); terminate():不管任務是否完成,立即終止; name: 當前進程實例別名,默認為Process-N,N為從1開始遞增的整數; pid: 當前進程實例的PID值; os.getpid() is_alive(): 判斷進程實例是否還在執行; join([timeout]):是否等待進程實例執行結束,或等待多少秒;
進程池:
在程序實際處理問題時,忙時會有成千上萬個任務需要執行,閑時有零星任務,創建時需要消耗時間,銷毀也需要時間,
即使開啟成千上萬個進程,操作系統也不能 讓他同時執行。這里就用到了進程池,用於管理小塊內存的申請與釋放。
,
1,上代碼:
from multiprocessing.pool import Pool from time import sleep def fun(a): sleep(1) print(a) if __name__ == '__main__': p = Pool() # 這里不加參數,但是進程池的默認大小,等於電腦CPU的核數 # 也是創建子進程的個數,也是每次打印的數字的個數 for i in range(10): p.apply_async(fun, args=(i,))
p.close() p.join() # 等待所有子進程結束,再往后執行 print("end")
2,callback 舉例:
from multiprocessing import Process,Pool def func(i): i+=1 return i#普通進程處理過的數據返回給主進程p1 def call_back(p1): p1+=1 print(p1) if __name__ == '__main__': p = Pool() for i in range(10): p1 = p.apply_async(func,args=(i,),callback = call_back)#p調用普通進程並且接受其返回值,將返回值給要執行的回調函數處理 p.close() p.join()
解析: 1,p.apply ( func,args = ()) 同步的效率,也就是說池中的進程一個一個的去執行任務
p.apply_async( func,args = () , callback = None) : 異步的效率,也就是池中的進程一次性都去執行任務.
2,異步處理任務時 : 必須要加上close和join. 進程池的所有進程都是守護進程(主進程代碼執行結束,守護進程就結束).
3,func : 進程池中的進程執行的任務函數
4,args : 可迭代對象性的參數,是傳給任務函數的參數
5,callback : 回調函數,就是每當進程池中有進程處理完任務了,返回的結果可以交給回調函數,
由回調函數進行進一步處理,回調函數只異步才有,同步沒有.回調函數是父進程調用.
3. map( func,iterable) (該方法經常用到爬蟲)
from multiprocessing import Pool def func(num): num += 1 print(num) return num if __name__ == '__main__': p = Pool(2) res = p.map(func,[i for i in range(100)]) # p.close()#map方法自帶這兩種功能 # p.join() print('主進程中map的返回值',res)
func : 進程池中的進程執行的任務函數
iterable : 可迭代對象,是把可迭代對象那個中的每個元素一次傳給任務函數當參數.
map方法自帶close和join
進程間的通信:
1)隊列
from multiprocessing import Queue,Process import os,time,random #添加數據函數 def proc_write(queue,urls): print("進程(%s)正在寫入..."%(os.getpid())) for url in urls: queue.put(url) print("%s被寫入到隊列中"%(url)) time.sleep(random.random()*3) #讀取數據函數 def proc_read(queue): print("進程(%s)正在讀取..."%(os.getpid())) while True: url = queue.get() print("從隊列中提取到:%s"%(url)) if __name__ =="__main__":
queue = Queue() proc_writer1 = Process(target=proc_write,args=(queue,["ur1","ur2","ur3","ur4"])) proc_writer2 = Process(target=proc_write,args=(queue,["ur5","ur6","ur7","ur8"])) proc_reader = Process(target=proc_read,args=(queue,)) proc_writer1.start() proc_writer1.join() proc_writer2.start() proc_writer2.join() proc_reader.start() proc_reader.terminate()
生產者與消費者模式(線程間的通信):
from queue import Queue import threading,time class Producer(threading.Thread): def run(self): global queue count = 0 while True: if queue.qsize() < 1000: for i in range(100): count = count +1 msg = '生成產品'+str(count) queue.put(msg) print(msg) time.sleep(0.5) class Consumer(threading.Thread): def run(self): global queue while True: if queue.qsize() > 100: for i in range(3): msg = self.name + '消費了 '+queue.get() print(msg) time.sleep(1) if __name__ == '__main__': queue = Queue() for i in range(500): queue.put('初始產品'+str(i)) for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start()
2) 進程間的通信(管道)
from multiprocessing import Pipe,Process import random,time,os def proc_send(pipe,urls): for url in urls: print("進程(%s)發送:%s"%(os.getpid(),url)) pipe.send(url) time.sleep(random.random()) def proc_recv(pipe): while True: print("進程(%s)接收到:%s"%(os.getpid(),pipe.recv())) time.sleep(random.random()) if __name__ == "__main__": pipe = Pipe() p1 = Process(target=proc_send,args=(pipe[0],["url_"+str(i) for i in range(10)],)) p2 = Process(target=proc_recv,args=(pipe[1],)) p1.start() p2.start() p1.join() p2.terminate()
解析:
pipe用於兩個進程間的通信,兩個進程分別位於管道的兩端,Pipe方法返回(conn1,conn2)代表一個管道的兩端,
Pipe方法有dumplex參數,若該參數為True,管道為全雙工模式,
若為Fasle,conn1只負責接收消息,conn2只負責發送消息.send和recv方法分別是發送和接收消息的方法
協程:
協程:是更小的執行單位,是一種輕量級的線程,協程的切換只是單純的操作CPU的上下文,所以切換速度特別快,且耗能小。
gevent是第三方庫,通過greenlet實現協程,其基本思想是:
當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。
由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標准庫,這一過程在啟動時通過monkey patch完成:
from gevent import monkey monkey.patch_all() # 用來在運行時動態修改已有的代碼,而不需要修改原始代碼。 import gevent import requests def f(url): print('GET: %s' % url) html = requests.get(url).text print(url, len(html)) gevent.joinall([ gevent.spawn(f, 'http://i.maxthon.cn/'), # 先執行這個函數,發送請求,等待的時候發送第二個請求 gevent.spawn(f, 'http://www.jianshu.com/u/3cfeb3395a95'), gevent.spawn(f, 'http://edu.51cto.com/?jydh')])
運行結果:
GET: http://i.maxthon.cn/ GET: http://www.jianshu.com/u/3cfeb3395a95 GET: http://edu.51cto.com/?jydh http://i.maxthon.cn/ 461786 http://edu.51cto.com/?jydh 353858 http://www.jianshu.com/u/3cfeb3395a95 597
從結果看,3個網絡操作是並發執行的,而且結束順序不同,但只有一個線程。
使用gevent,可以獲得極高的並發性能,但gevent只能在Unix/Linux下運行,在Windows下不保證正常安裝和運行。
