一、線程
1、什么是線程
線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。
一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務
2、基本使用
(1)創建線程的兩種方式
直接調用(常用)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def f1(arg): # 定義每個線程要執行的函數
time.sleep(0.1)
print(arg,threading.current_thread()) # threading.current_thread()詳細的線程信息
for i in range(10): # 創建10個線程並發執行函數
t = threading.Thread(target=f1,args=('python',)) # args是函數的參數,元組最后一個必須要逗號,
t.start() # 啟動線程
print(t.getName()) # 可以獲取主線程的名字
繼承調用
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
class MyThread(threading.Thread): # 繼承threading.Thread類
def __init__(self,func,args):
self.func = func
self.args = args
super(MyThread,self).__init__() # 執行父類的構造方法
def run(self): # run()方法,是cpu調度線程會使用的方法,必須是run()方法
self.func(self.args)
def f2(arg):
time.sleep(0.1)
print(arg,threading.current_thread())
for i in range(10): # 創建10個線程
obj = MyThread(f2,123)
obj.start()
(2)更多方法
自己還可以為線程自定義名字,通過 t = threading.Thread(target=f1, args=(i,), name='mythread{}'.format(i)) 中的name參數,除此之外,Thread還有一下一些方法
t.join(n) 表示主線程等待子線程多少時間,n表示主線程等待子線程的超時時間,如果在n時間內子線程未完成,主線程不在等待,執行后面的代碼
t.run() 線程被cpu調度后自動執行線程對象的run方法(一般我們無需設置,除非自己定義類調用)
t.start() 線程准備就緒,等待CPU調度
t.getName() 獲取線程的名稱
t.setName() 設置線程的名稱
t.name 獲取或設置線程的名稱
t.is_alive() 判斷線程是否為激活狀態
t.isAlive() 判斷線程是否為激活狀態
t.isDaemon() 判斷是否為守護線程
t.setDaemon 設置True或False(默認)
True表示主線程不等待子線程全部完成就執行后面的代碼
False默認值,標識主線程等待子線程全部執行完后繼續執行后面的代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def f1(arg):
time.sleep(5)
print(arg)
t = threading.Thread(target=f1,args=('python',))
t.setDaemon(True) # 默認是False,設置為true表示主線程不等子線程
t.start()
t.join(2) # 表示主線程到此,等待子線程執行完畢,2表示主線程最多等待2秒
print('end') # 默認主線程在等待子線程結束
print('end')
print('end')
3、線程鎖(Lock、RLock)
由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖 - 同一時刻允許一個線程執行操作。這里使用Rlock,而不使用Lock,因為Lock如果多次獲取鎖的時候會出錯,而RLock允許在同一線程中被多次acquire,但是需要用n次的release才能真正釋放所占用的瑣,一個線程獲取了鎖在釋放之前,其他線程只有等待。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
NUM = 10
def func(l):
global NUM
# 上鎖
l.acquire()
NUM -=1
time.sleep(0.1)
print(NUM,threading.current_thread())
# 開鎖
l.release()
# lock = threading.Lock()
lock = threading.RLock() # 遞歸鎖
for j in range(10):
t = threading.Thread(target=func,args=(lock,))
t.start()
4、信號量(Semaphore)
互斥鎖同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
NUM = 30
def func(i,l):
global NUM
# 上鎖
l.acquire()
NUM -=1
time.sleep(1)
print(NUM,i,threading.current_thread())
# 開鎖
l.release()
lock = threading.BoundedSemaphore(5) # 設置信號量5,表示同時5個線程同時執行
for i in range(30):
t = threading.Thread(target=func,args=(i,lock,))
t.start()
5、事件(event)
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個"Flag",如果"Flag"值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果"Flag"值為True,那么event.wait 方法時便不再阻塞。
- clear:將"Flag"設置為False
- set: 將"Flag"設置為True
- wait: 檢測當前"Flag",如果"Flag"值為 False,那么當線程執行 event.wait 方法時就會阻塞,如果"Flag"值為True,那么event.wait 方法時便不再阻塞。
下面是一個紅綠燈的例子,主線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
def func(i,e):
print(i)
e.wait() # 檢測是什么燈,如果是True紅燈,停;綠燈False行,默認是紅燈
print(i+100)
event = threading.Event()
for i in range(10):
t = threading.Thread(target=func,args=(i,event,))
t.start()
event.clear() # 主動設置成紅燈,默認是紅燈,此句可以不寫
inp = input('>>>')
if inp == '1':
event.set() # 設置成綠燈,就會執行func()函數中print(i+100)語句
6、條件(Condition)
使得線程等待,只有滿足某條件時,才釋放n個線程
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
def func(i,con):
print(i)
con.acquire() # 固定寫法acquire,wait
con.wait()
print(i+100)
con.release()
c = threading.Condition() # 設置條件
for i in range(10):
t = threading.Thread(target=func,args=(i,c,))
t.start()
while True:
inp = input('>>>')
if inp == 'q':
break
c.acquire() # 這里是固定寫法,acquire,notify,release
c.notify(int(inp))
c.release()
第二種
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
def condition():
ret = False
r = input('>>>')
if r == 'true':
ret = True
else:
ret = False
return ret
def func(i,con):
print(i)
con.acquire()
con.wait_for(condition) # 和上一個例子的差別在這里
print(i+100)
con.release()
c = threading.Condition()
for i in range(10):
t = threading.Thread(target=func,args=(i,c,))
t.start()
6、Timer
定時器,指定n秒后執行某操作
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from threading import Timer
def hello():
print("hello python")
t = Timer(1,hello)
t.start()
7、線程池,點擊這里
二、進程
線程的上一級就是進程,進程可包含很多線程,進程和線程的區別是進程間的數據不共享,多進程也可以用來處理多任務,不過多進程很消耗資源,計算型的任務最好交給多進程來處理,IO密集型最好交給多線程來處理,此外進程的數量應該和cpu的核心數保持一致。
1、線程與進程的區別
1、線程共享創建它的進程的地址空間,進程有自己的地址空間。 2、線程是直接可以訪問線程之間的數據;進程需要復制父進程的數據才能訪問。 3、線程可以直接與其他線程的通信過程,進程必須使用進程間通信和同胞交流過程。 4、新創建一個線程很容易;新創建一個進程需要復制父進程。 5、主線程可以控制相當大的線程在同一進程中;進程只能控制子進程。 6、主線程變更(取消、優先級變化等)可能會影響進程的其他線程的行為;父進程的變化不會影響子進程。
2、基本使用
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
def foo(i):
print('say hi',i)
for i in range(10):
p = Process(target=foo,args=(i,))
#p.daemon = True # 和線程t.setdaemon是一樣的
p.start()
#p.join()
注意:由於進程之間的數據需要各自持有一份,所以創建進程需要的非常大的開銷。其他使用方法和線程threading.Thread是一樣的
3、進程數據共享
進程各自持有一份數據,默認無法共享數據;queues,Array,Manager.dict,pipe這些方法都能實現數據共享
(1)特殊隊列queues()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
def foo(i,arg):
arg.put(i)
print('say hi',i,arg.qsize())
li = queues.Queue(20,ctx=multiprocessing)
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
(2)數組Array()
數組和列表很像,但是數組中的元素在內存中的地址是一段連續的空間地址,而列表中的元素則不是一段連續的的地址,是通過鏈表的形式找到下一個元素
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
from multiprocessing import Array
def foo(i,arg):
arg[i] = i+100
for item in arg:
print(item)
li = Array('i',10) # 指定數組時需要指定類型
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double
(3)Manager.dict()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
from multiprocessing import Manager
def foo(i,arg):
arg[i] = i +100
print(arg.values())
obj = Manager()
li = obj.dict()
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
p.join()
(4)pipe()
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # 父進程可以收到子進程的共享信息prints "[42, None, 'hello']"
p.join()
4、進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
- apply
- apply_async
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time
def f1(arg):
time.sleep(1)
print(arg)
pool = Pool(5)
for i in range(30): # 定義30個任務
#pool.apply(func=f1,args=(i,)) # 所有進程串行執行沒有多大意義
pool.apply_async(func=f1,args=(i,)) # 異步並行執行
pool.close() #等待所有的任務執行完畢
#time.sleep(1)
#pool.terminate() # 立即終止子進程的任務,主進程繼續執行
pool.join() # 執行pool.join時必須先執行pool.close或者pool.terminate
# 進程池中進程執行完畢后在關閉,如果注釋,那么程序直接關閉close,terminate也無效
print('end')
三、協程
線程和進程的操作是由程序觸發系統接口,最后的執行者是系統;協程的操作則是程序員。
協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用於協程;由greenlet,gevent實現,gevent是調用greenlet進行封裝;需要安裝pip install greenlet;pip install gevent;
greenlet
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from greenlet import greenlet
def test1():
print 12
gr2.switch()
print 34
gr2.switch()
def test2():
print 56
gr1.switch()
print 78
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
gevent
import gevent
def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
遇到IO操作自動切換:此操作在python2.x中執行的,urllib2不支持python3.x
from gevent import monkey; monkey.patch_all()
import gevent
import urllib2
def f(url):
print('GET: %s' % url)
resp = urllib2.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))
gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])
四、queue隊列
queue有哪些隊列?
- queue.Queue(maxsize) 先進先出隊列
- queue.LifoQueue(maxsize) 后進先出
- queue.PriorityQueue(maxsize) 優先級隊列
- queue.deque(maxsize) 雙向隊列
先進先出
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
q = queue.Queue(5) # 默認maxsize=0無限接收,最大支持的個數
print(q.empty()) # 查看隊列是否為空
q.put(11) # put防數據,是否阻塞默認是阻塞block=True,timeout超時時間
q.put(22)
q.put(33,block=False,timeout=2)
print(q.full()) # 查看隊列是否已經放滿
print(q.qsize()) # 隊列中多少元素
print(q.maxsize) # 隊列最大支持的個數
print(q.get(block=False,timeout=2)) # get取數據,是否阻塞默認是阻塞block=True,timeout超時時間
print("*" * 10)
print(q.get())
q.task_done() # join配合task_done,隊列中有任務就會阻塞進程,當隊列中的任務執行完畢之后,不在阻塞
print(q.get())
q.task_done()
q.join() # 隊列中還有元素的話,程序就不會結束程序,只有元素被取完配合task_done執行,程序才會結束
后進先出
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue q = queue.LifoQueue() q.put(123) q.put(456) print(q.get())
優先級隊列
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue q = queue.PriorityQueue() q.put((1,'python1')) q.put((5,'python')) q.put((3,'python3')) print(q.get())
雙向隊列
#!/usr/bin/env python # -*- coding:utf-8 -*- import queue q = queue.deque() q.append(123) q.append(333) q.appendleft(456) print(q.pop()) print(q.popleft())
更多請查看官方文檔
生產者消費者模型
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
import time
q = queue.Queue()
def productor(arg):
while True:
q.put(str(arg))
print('%s 號窗口有票' %str(arg))
time.sleep(1)
for i in range(3):
t = threading.Thread(target=productor,args=(i,))
t.start()
def consumer(arg):
while True:
print('第 %s 人取 %s 號窗口票' %(str(arg),q.get()))
time.sleep(1)
for j in range(300):
t = threading.Thread(target=consumer,args=(j,))
t.start()
