1、基本概念
並發和並行的區別:
1)並行,parallel
同時做某些事,可以互不干擾的同一時刻做幾件事。(解決並發的一種方法)
高速公路多個車道,車輛都在跑。同一時刻。
2)並發 concurrency
同時做某些事,一個時段內有事情要處理。(遇到的問題)
高並發,同一時刻內,有很多事情要處理。
2、並發的解決
1)隊列、緩沖區
排隊就是把人排成隊列,先進先出,解決了資源使用的問題。
排成的隊列,其實就是一個緩沖地帶,就是緩沖區。
Queue模塊的類queue、lifoqueue、priorityqueue。
2)爭搶的
會有一個人占據窗口,其他人會繼續爭搶,可以鎖定窗口,窗口不在為其他人服務,這就是鎖機制。(鎖的概念,排他性鎖,非排他性鎖)。
3)預處理
一種提前加載用戶需要的數據的思路,預處理思想,緩存常用。
4)並行
日常可以通過購買更多的服務器,或者開多線程,實現並行處理,來解決並發問題。
水平擴展思想。
如果在但CPU上處理,就不是並行了。
但是多數服務都是多CPU的,服務的部署就是多機、分布式的,都是並行處理。
(串行比並行快)
5)提速
提高單個CPU性能,或單個服務器安裝更多的CPU
這就是一種垂直擴展思想。
6)消息中間件
例如地跌站外的九曲回腸的走廊,緩沖人流。
常見的消息中間件有RabbitMQ,ActiveMQ(Apache)、RocketMQ(Apache)。
3、進程和線程
在實現了線程的操作系統中,線程是操作系統能夠進行運算調度的最小單位。他包含在進程中,是進程中的實際運作單位。一個程序執行實例就是一個進程。
進程(process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。
(可執行,可運行的加載到內存中。程序是有一定格式的,Python解釋器加載,所有進程都是有入口的。偏移多少位。主線程達不到要求,就會啟用多線程。
多核。調度到不同的CPU上面去,虛擬的計算單元。)
資源爭搶問題:鎖,排他性鎖。隊列,不爭搶的人排隊。預加載,減少數據處理速度,提前加載到內存中。一變多。
進程和程序的關系
程序是源代碼編譯后的文件,而這些文件存放在磁盤上。當程序被操作系統加載到內存
中,就是進程,進程中存放着指令和數據(資源),也是線程的容器。
Linux進程有父進程、子進程,Windows的進程是平等關系。
線程,有時被稱為輕量級進程,是程序執行流的最小單元,一個標准的線程由線程ID,當前指令指針(pc),寄存器集合和堆棧組成。每個線程有自己獨立的棧。
在許多系統中,創建一個線程比創建一個進程快10-100倍。
進程、線程的理解
現代操作系統提出的進程的概念,每一個進程都認為自己是獨占所有的計算機硬件資源。
進程就是獨立的王國,進程間不可以隨便的共享數據。
線程就是省份,同一個進程內的線程可以共享進程的資源,每一個線程擁有自己獨立的堆棧。
4、線程狀態
| 狀態 |
含義 |
| 就緒(ready) |
線程能夠運行,但在等待被調度,可能線程剛剛創建啟動,或剛剛從阻塞恢復,或者被其他線程搶占。 |
| 運行(running) |
線程正在運行 |
| 阻塞(Blocked) |
線程等待外部事件發生而無法運行,如I/O操作。 |
| 終止(Terminated) |
線程完成,或退出,或被取消。 |
5、Python中的線程和進程
進程會啟動一個解釋器進程,線程會共享一個解釋器進程。
1)Python的線程開發
Python的線程開發使用標准庫threading
2)Thread類
簽名:
def __init__(self, group: None = ...,
target: Optional[Callable[..., None]] = ...,
name: Optional[str] = ...,
args: Iterable = ...,
kwargs: Mapping[str, Any] = ...,
*, daemon: Optional[bool] = ...) -> None: ...
| 參數名 |
含義 |
| target |
線程調用對象,就是目標函數 |
| name |
為線程起名字 |
| args |
為目標函數傳遞實參,元組 |
| Kwargs |
為目標函數關鍵詞傳參,字典 |
3)線程啟動
import threading
import time
def worker():
print('before')
time.sleep(3)
print('finished')
t = threading.Thread(target=worker) #線程對象
t.start() #啟動
通過threading.Thread創建一個線程對象,target是目標函數,name可以指定名稱。
需要調用start方法啟動函數。
線程之所以執行函數,是因為線程中就是用來執行代碼的,所以還是函數調用。
函數執行完畢后,線程也就退出了。
如果想讓一個線程一直工作,不讓線程退出就要利用到while循環。
import threading
import time
def worker():
count = 0
while True:
count += 1
print('before')
time.sleep(3)
if count >5:
print('finished')
break
t = threading.Thread(target=worker) #線程對象
t.start() #啟動
4)線程退出
Python中沒有提供終止線程的方法。線程在下面情況下退出。
(1)線程函數內語句執行完畢
(2)線程函數中拋出未處理的異常。
import threading
import time
def worker():
count = 0
while True:
if count >5:
break
#return
#raise RuntimeError(count)
time.sleep(3)
print('before')
count += 1
print('finished')
t = threading.Thread(target=worker) #線程對象
t.start() #啟動
print('end')
線程沒有優先級,沒有線程組的概念。也不能被銷毀、停止、掛起,那么就是沒有恢復和中斷了。
5)線程的傳參
import threading
import time
def add(x,y):
print('{}+{}={}'.format(x,y,x+y))
t1 = threading.Thread(target=add,name='1',args=(4,5))
t1.start()
time.sleep(2)
t2 = threading.Thread(target=add,name = '2',args=(4,),kwargs={'y':6})
t2.start()
time.sleep(2)
t3 = threading.Thread(target=add,name='3',kwargs={'x':4,'y':7})
t3.start()
線程中的傳參,和函數傳參沒有什么區別,本質上就是函數傳承。
6)threading的屬性和方法
| 名稱 |
含義 |
| current_thread() |
返回當前主線程 |
| main_thread() |
返回主線程對象 |
| active_count() |
當前處於alive狀態的線程個數 |
| enumerate() |
返回所有活着的線程的列表,不包括已經終止的線程和未開始的線程 |
| git_ident() |
返回當前線程的ID,非0整數。 |
active_count、enumerate方法返回的值還包括主線程。
import threading
import time
def showinfo():
print('currentthread = {}'.format(threading.current_thread()))
print('main thread = {}'.format(threading.main_thread()))
print('active count = {}'.format(threading.active_count()))
def worker():
count = 0
showinfo()
while True:
if count>5:
break
time.sleep(5)
count += 1
print('finsh')
t = threading.Thread(target=worker,name='work')
showinfo()
t.start()
print('end')
currentthread = <_MainThread(MainThread, started 4048)>
main thread = <_MainThread(MainThread, started 4048)>
active count = 1
currentthread = <Thread(work, started 9084)>
end
main thread = <_MainThread(MainThread, stopped 4048)>
active count = 2
finsh
finsh
finsh
finsh
finsh
Finsh
| 名稱 |
含義 |
| Name |
他只是一個名字,只是一個標識符,名字可以重名,getname()獲取,setname()設置這個名詞 |
| Ident |
線程id,是非0的整數,線程啟動后才會有ID,否則為None,線程退出,此id依舊可以訪問,此id可以重復訪問。 |
| Is_alive() |
返回線程是否或者 |
線程的name只是一個名稱,可以重復;id必須唯一,但可以在線程退出后在利用。
import threading
import time
def worker():
count = 0
while True:
if count > 5:
break
time.sleep(2)
count += 1
print(threading.current_thread().name)
t = threading.Thread(name='work',target=worker)
print(t.ident)
t.start()
while True:
time.sleep(1)
if t.is_alive():
print('{}{}alive'.format(t.name,t.ident))
else:
print('{}{}dead'.format(t.name,t.ident))
| 名稱 |
含義 |
| Start() |
啟動線程,每一個線程必須且只能執行該方法一次 |
| Run() |
運行線程函數 |
Start()啟動線程,只能執行一次。操作系統。開辟新的線程。
Run()直接做的是主線程。函數調用。
(1)start()
import threading
import time
def worker():
count = 0
while True:
if count > 5:
break
time.sleep(3)
count += 1
print('running')
class Mythread(threading.Thread):
def start(self):
print('start----')
super().start()
def run(self):
print('run----')
super().run()
t = Mythread(target=worker,name='work')
t.start()
start方法運行結果是start----
run----
Running
按照線程進行執行。
(2)run()
import threading
import time
def worker():
count = 0
while True:
if count>3:
break
time.sleep(2)
count += 1
print('runing')
class Mythread(threading.Thread):
def start(self):
print('start----')
super().start()
def run(self):
print('run----')
super().run()
t = Mythread(target=worker,name='work1')
t.run()
# run----
# runing
總結:run()執行結果就是直接是函數,調用,調用run函數。
Start()方法會調用run()方法,而run()方法可以運行函數。
(3)start和run的區別
Start方法啟動線程,啟動了一個新的線程,名字叫做worker運行,但是run方法,並沒有啟動新的線程,只是在主線程內調用了一個普通的函數。
7)多線程
多線程,一個進程中如果有多個線程,就是多線程,是先一種並發。
import threading
import time
def worker():
count = 0
while True:
if count>3:
break
time.sleep(2)
count += 1
print('runing')
print(threading.current_thread().name,threading.current_thread().ident)
class Mythread(threading.Thread):
def start(self):
print('start----')
super().start()
def run(self):
print('run----')
super().run()
t1 = Mythread(target=worker,name='work1')
t2 = Mythread(target=worker,name='work2')
# t1.run()
# t2.run()
####runing
# MainThread 1380
# runing
# MainThread 1380
# runing
# MainThread 1380
t1.start()
t2.start()
# start----
# run----
# start----
# run----
# runing
# work2 5048
# runing
# work1 9048
Start()方法work1和work2交替執行。啟動線程后,進程內多個活動的線程並行工作,就是多線程。
Run()方法中沒有開啟新的線程,就是普通函數調用,所以執行完t1.run()
,然后執行t2.run(),run()方法就不是多線程。
一個進程中至少有一個線程,並作為程序的入口,這個線程就是主線程,一個線程必須有一個主線程。
其他線程成為工作線程。
8)線程安全
import threading
def worker():
for x in range(100):
print('{}is running'.format(threading.current_thread().name))
for x in range(1,4):
name = 'worker{}'.format(x)
t = threading.Thread(name=name,target=worker)
t.start()
利用ipython執行的結果是不是一行行的打印,而是很多字符串打印在了一起。
這樣說明了print函數被打斷了,被線程切換打斷了,print函數分為兩步,第一步是打印字符串,第二部是換行,就在這個期間,發生了線程的切換,說明了print函數是線程不安全的。
線程安全:線程執行一段代碼,不會產生不確定的結果,那么這段代碼是線程安全的。
也是要用鎖,進程的鎖是管進程內的線程。獨占資源。
解決上面打印的問題:
(1)不讓print打印換行
import threading
def worker():
for x in range(100):
print('{} is running.\n'.format(threading.current_thread().name),end='')
for x in range(1,5):
name = 'worker{}'.format(x)
t = threading.Thread(name=name,target=worker)
t.start()
利用字符串是不可變類型,可以作為一個整體不可分割輸出,end=’’就不在print輸出換行了。
(2)使用logging
標准庫里面的logging模塊,是日志處理模塊,線程安全的,生產環境代碼都使用logging。
import threading
import logging
def worker():
for x in range(100):
# print('{} is running.\n'.format(threading.current_thread().name),end='')
logging.warning('{}is running'.format(threading.current_thread().name))
for x in range(1,5):
name = 'worker{}'.format(x)
t = threading.Thread(name=name,target=worker)
t.start()
9)daemon線程和non-daemon線程
daemon不是Linux里面的守護進程。
進程靠線程執行代碼,至少有一個主線程,其他線程是工作線程。
主線程是第一個啟動的線程。
父線程:如果A中啟動了一個線程B,那么A就是B的父線程。
子線程:B就是A的子線程。
源碼Thread的__init__ 方法中。
If deamon is not None:
Self._daemonic = daemon
else:
Self._daemonic = current_thread().daemon
Self._ident = None
線程daemon屬性,如果設定就是用戶的設置,否則,就取當前線程的daemon的值。
主線程是non-daemon線程,即daemon = False。
import time
import threading
def foo():
time.sleep(5)
for i in range(20):
print(i)
t = threading.Thread(target=foo,daemon=False)
t.start()
print('end')
daemon設置False值,主線程執行完畢后,等待工作線程。
import time
import threading
def foo():
time.sleep(5)
for i in range(20):
print(i)
t = threading.Thread(target=foo,daemon=True)
t.start()
print('end')
Daemon值改為true,主線程執行完畢后直接退出。
| 名稱 |
含義 |
| Daemon |
表示線程是否是daemon,這個值必須在start()之前設置,否則引發RuntimeError異常 |
| IsDaemon() |
是否是daemon線程 |
| SetDaemon |
設置daemon線程,必須在start方法之前設置。 |
總結:線程具有一個daemon屬性,可以顯示設置為True或者False,也可以不設置,則取默認值None。
如果不設置daemon,就取當前線程的daemon來設置他。
主線程是non-daemon線程,即daemon = False。
從主線程創建的所有線程的不設置daemon屬性,則默認daemon = False,也就是non-daemon線程。
程序在沒有活着的non-daemon線程運行時推出,也是就剩下的只是daemon線程,主線程才能推出。否則主線程只能等待。
構造線程的時候,可以設置daemon屬性,這個屬性必須在start方法前設置好。
daemon=True主線程不等。工作線程
daemon=False主線程等。只要有一個non-daemon就會等待。
控制一個屬性的。
在start之前。
只是有一個non-daemon就會等待,沒有的話直接不等,直接結束線程。
總結:
線程具有daemon屬性,可以設置為True或者False。
(激活的non-daemon,主線程才會等待工作線程。)
import time
import threading
def bar():
time.sleep(10)
print('bar')
def foo():
for i in range(20):
print(i)
t = threading.Thread(target=bar,daemon=False)
t.start()
t = threading.Thread(target=foo,daemon=True)
t.start()
print('end')
這樣不會執行bar的,因為主線程的daemon設置的值為True,改為False就好了。
活着讓主線程sleep幾秒。
import time
import threading
def bar():
time.sleep(10)
print('bar')
def foo(n):
for i in range(n):
print(i)
t1 = threading.Thread(target=foo,args=(10,),daemon=True)
t1.start()
t2 = threading.Thread(target=foo,args=(20,),daemon=False)
t2.start()
time.sleep(6)
print('end')
如果non-daemon線程的時候,主線程退出,也不會結束所有的daemon線程,直到所有的non-daemon線程全部結束,如果還有daemon線程,主線程需要退出,會結束所有的daemon線程,退出。
主線程是non-daemon。其他線程靠傳參。
決定的是是否需要等待。如果有激活的non-daemon,就需要等待,沒有激活的,主線程直接退出。
10)join方法
import time
import threading
def foo(n):
for i in range(n):
print(i)
time.sleep(1)
t1 = threading.Thread(target=foo,args=(10,),daemon=True)
t1.start()
t1.join()
利用join,主線程被迫等待他。把當前線程阻塞住了,x.join就等待誰。保證代碼的執行順序。
使用了join方法后,daemon線程執行完了,主線程才退出了。
Join(timeout= None),是線程的標准方法之一。
一個線程中調用另一個線程的join方法,調用者將被阻塞,直到被調用線程終止。
一個線程可以被join多次。
Timeout參數指定調用者等待多久,沒有設置超時的,就會一直等待到調用的線程結束。
調用誰的join方法,就是join誰,就要等睡。
11)daemon線程應用場景
簡單來說,本來並沒有daemon Thread,這個概念唯一的作用是,當把一個線程設置為daemon,他會隨着主線程的退出而退出。
主要應用場景為:
(1)后台任務。發送心跳包,監控。
(2)主線程工作才有用的線程。如主線程中維護着公共的資源,主線程已經清理了,准備退出,而工作線程使用這些資源工作沒有意義了,一起退出最合適。
(3)隨時可以被終止的線程。
如果主線程退出,想所有其他工作線程一起退出,就使用daemon=True來創建工作線程。
import time
import threading
def bar():
while True:
time.sleep(1)
print('bar')
def foo():
print('t1 daemon = {}'.format(threading.current_thread().isDaemon()))
t2 = threading.Thread(target=bar)
t2.start()
print('t2 daemon = {}'.format(t2.isDaemon()))
t1 = threading.Thread(target=foo,daemon=True)
t1.start()
time.sleep(3)
print('Main end')
改造成一直執行的:
import time
import threading
def bar():
while True:
time.sleep(1)
print('bar')
def foo():
print('t1 daemon = {}'.format(threading.current_thread().isDaemon()))
t2 = threading.Thread(target=bar)
t2.start()
t2.join()
print('t2 daemon = {}'.format(t2.isDaemon()))
t1 = threading.Thread(target=foo,daemon=True)
t1.start()
t1.join()
time.sleep(3)
print('Main end')
Daemon線程,簡化了手動關閉線程的工作。
12)threading.local 類
局部變量的實現:
import threading
import time
def worker():
x = 0
for i in range(10):
time.sleep(0.01)
x += 1
print(threading.current_thread(),x)
for i in range(10):
threading.Thread(target=worker).start()
利用全局變量實現:
import threading
import time
globals_data = threading.local()
def worker():
globals_data.x = 0
for i in range(10):
time.sleep(0.01)
globals_data.x += 1
print(threading.current_thread(),globals_data.x)
for i in range(10):
threading.Thread(target=worker).start()
import threading
X = 'abc'
ctx = threading.local()
ctx.x = 123
print(ctx,type(ctx),ctx.x)
def worker():
print(X)
print(ctx)
print(ctx.x) #打印的時候出錯,表示x不能跨線程
print('working')
worker()
print()
threading.Thread(target=worker).start() #另一個線程啟動
threading.local類構建了一個大字典,其元素是每一線程實例地址為key和線程對象引用線程單獨的字典的映射。
通過threading.local實例就可在不同的線程中,安全的使用線程獨有的數據,做到了線程間數據隔離,如同本地變量一樣安全。
Local和線程相關的大字典,每次利用的時候利用線程的小字典來頂替local實例的大字典。
不利用的話,全局變量的話直接就是threading.local和本地線程相關的數據。
13)定時器timer延遲執行
Threading.Timer繼承自thread,這個類用來另一多久執行一個函數。
Class threading.Timer(interval,function,args=None,kwargs=None)
Start方法執行以后,Timer對象會處於等待狀態,等待了interval之后,開始執行function函數的。如果在執行函數之前的等待階段,使用了cancel方法,就會跳過執行函數結果。
本質上就是一個Thread,只是沒有提供name,daemon。
import threading
import logging
import time
def worker():
logging.info('in worker')
time.sleep(2)
t = threading.Timer(5,worker)
t.start() #啟動
print(threading.enumerate())
t.cancel() #取消
time.sleep(1)
print(threading.enumerate())
[<_MainThread(MainThread, started 7512)>, <Timer(Thread-1, started 6644)>]
[<_MainThread(MainThread, started 7512)>]
import threading
import logging
import time
def worker():
logging.info('in worker')
time.sleep(2)
t = threading.Timer(5,worker)
t.cancel() #取消
t.start() #啟動
print(threading.enumerate())
time.sleep(1)
print(threading.enumerate())
[<_MainThread(MainThread, started 7512)>]
[<_MainThread(MainThread, started 7512)>]
二、線程同步
1、概念
線程同步,線程間協同,通過某種技術,讓一個線程訪問某些數據時候,其他線程不能訪問這些數據,直到該線程完成對數據的操作。
不同操作系統實現技術有所不同,有臨界區、互斥量、信號量、事件Event。
2、Event
Event事件,是線程間通信機制中最簡單的實現,使用一個內部的標記flag,通過flag的True或False的變化來進行操作。
| 名稱 |
含義 |
| set() |
標記為True |
| clear() |
標記為False |
| is_set() |
標記是否為True |
| Wait(timeout=None) |
設置等待標記為True的時長,None為無限等待,等到返回True,未等到超時了返回False。 |
課堂練習:老板雇佣了一個工人,讓他生產杯子,老板一直等着這個工人,直到上產了十個杯子。
1)利用join
import threading
import time
import logging
def worker(count=10):
cups = []
while len(cups)<count:
logging.info('wprking')
time.sleep(0.01)
cups.append(1)
print(len(cups))
logging.info('I am finished')
w = threading.Thread(target=worker)
w.start()
w.join()
2)利用event
import threading
import logging
import time
def boss(event:threading.Event):
logging.info('I am boss,waiting')
event.wait()
logging.info('good job')
def worker(event:threading.Event,count=10):
logging.info('I am working for u')
cups = []
while True:
logging.info('makeing')
time.sleep(1)
cups.append(1)
if len(cups) >= count:
print(len(cups))
event.set()
break
logging.info('finished my job.cups={}'.format(cups))
event = threading.Event()
w = threading.Thread(target=worker,args=(event,))
b = threading.Thread(target=boss,args=(event,))
w.start()
b.start()
3)wait的應用
import threading
import logging
logging.basicConfig(level=logging.INFO)
def do(event:threading.Event,interval:int):
while not event.wait(interval): #沒有置set,所以是False。 不是False的時候就不能進入循環了。
logging.info('do sth') #沒三秒打印一次。 not False執行此語句
e = threading.Event()
threading.Thread(target=do,args=(e,10)).start()
e.wait(12) #整體停留了十秒。
e.set() #重置為True。
print('end')
4)練習,實現timer。
總結:
使用同一個Event用來做標記。
Event的wait優於time.sleep,更快的切換到其他線程,提高並發效率。
import threading
import time
class MyTimer:
def __init__(self,interval,function,args,kwargs):
self.interval = interval
self.target = function
self.args = args
self.kwargs = kwargs
self.event = threading.Event()
self.thread = threading.Thread(target=self.target,args=self.args,kwargs=self.kwargs)
def start(self):
self.event.wait(self.interval)
if not self.event.is_set(): #如果沒有置False,那么就是False,not False為True,執行run語句。
self.run()
def run(self):
self.start()
self.event.set()
def cancel(self):
self.event.set()
Lock鎖
1)鎖,凡是存在共享資源爭搶的地方都可以使用鎖。從而保證只有一個使用者可以完全使用這個資源。
lock.acquire 上鎖 lock.release 解鎖
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
cups = []
def worker(count=10):
logging.info('i am work')
while len(cups) < count:
time.sleep(0.1)
cups.append(1)
logging.info('i am finsh.cups={}'.format(len(cups)))
for _ in range(10):
threading.Thread(target=worker,args=(1000,)).start()
2018-05-26 15:38:25,913 Thread-1 32 i am work
2018-05-26 15:38:25,913 Thread-2 4332 i am work
2018-05-26 15:38:25,913 Thread-3 9992 i am work
2018-05-26 15:38:25,914 Thread-4 8464 i am work
2018-05-26 15:38:25,914 Thread-5 9968 i am work
2018-05-26 15:38:25,915 Thread-6 8712 i am work
2018-05-26 15:38:25,915 Thread-7 4412 i am work
2018-05-26 15:38:25,915 Thread-8 8456 i am work
2018-05-26 15:38:25,915 Thread-9 8316 i am work
2018-05-26 15:38:25,915 Thread-10 9772 i am work
2018-05-26 15:38:35,925 Thread-8 8456 i am finsh.cups=1000
2018-05-26 15:38:36,023 Thread-7 4412 i am finsh.cups=1001
2018-05-26 15:38:36,023 Thread-1 32 i am finsh.cups=1002
2018-05-26 15:38:36,023 Thread-6 8712 i am finsh.cups=1003
2018-05-26 15:38:36,024 Thread-5 9968 i am finsh.cups=1004
2018-05-26 15:38:36,024 Thread-4 8464 i am finsh.cups=1005
2018-05-26 15:38:36,024 Thread-10 9772 i am finsh.cups=1006
2018-05-26 15:38:36,024 Thread-2 4332 i am finsh.cups=1007
2018-05-26 15:38:36,025 Thread-3 9992 i am finsh.cups=1008
2018-05-26 15:38:36,025 Thread-9 8316 i am finsh.cups=1009
運行結果來看,多線程調度,導致了判斷失誤,多生產了杯子只有用到了鎖。
Lock,鎖,一旦線程獲得鎖,其他要獲得鎖的線程將被阻塞。
| 名稱 |
含義 |
| acquire(blocking=True,timeout=-1) |
默認阻塞,阻塞可以設置超時時間,非阻塞時,timeout禁止設置,成果獲取鎖,返回True,否則返回None |
| Release |
釋放鎖,可以從任何線程調用釋放, 已上鎖的鎖,會被重置到unlocked未上鎖的鎖上調用,拋出RuntimeError異常。 |
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
cups = []
lock = threading.Lock()
def worker(count=10):
logging.info('i am work')
lock.acquire()
while len(cups) < count:
print(threading.current_thread(),len(cups))
time.sleep(0.000001)
cups.append(1)
logging.info('i am finsh.cups={}'.format(len(cups)))
lock.release()
for _ in range(10):
threading.Thread(target=worker,args=(1000,)).start()
上鎖位置不對,由一個線程搶占,並獨自占鎖並完成任務。
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
cups = []
lock = threading.Lock()
def worker(count=10):
logging.info('i am work')
flag= False
while True:
lock.acquire() #獲取鎖
if len(cups) >= count:
flag = True
# print(threading.current_thread(),len(cups))
time.sleep(0.000001)
if not flag:
cups.append(1)
print(threading.current_thread(),len(cups))
lock.release() #追加后釋放鎖
if flag:
break
logging.info('i am finsh.cups={}'.format(len(cups)))
for _ in range(10):
threading.Thread(target=worker,args=(1000,)).start()
鎖保證了數據完整性,但是性能下降好多。
If flag:break是為了保證release方法被執行,否則就出現了死鎖,得到鎖的永遠沒有釋放。
計數器類,可以加可以減。
2)加鎖、解鎖
一般加鎖就需要解鎖,但是加鎖后解鎖前,還要有一些代碼執行,就有可能拋出異常,一旦出現異常鎖是無法釋放的,但是當前線程可能因為這個就異常終止了,這就產生了死鎖。
加鎖。解鎖常用語句:
(1)使用try...finally語句保證鎖的釋放。
(2)With上下文管理,鎖對象支持上下文管理。
import threading
import time
class Counter:
def __init__(self):
self._val = 0
self.__lock = threading.Lock()
@property
def value(self):
return self._val
def inc(self):
try:
self.__lock.acquire()
self._val += 1
finally:
self.__lock.release()
def dec(self):
with self.__lock:
self._val -= 1
def run(c:Counter,count=1000):
for _ in range(10):
for i in range(-50,50):
if i<0:
c.dec()
else:
c.inc()
c = Counter()
c1 = 10
c2 = 10
for i in range(c1):
threading.Thread(target=run,args=(c,c2)).start()
while True:
time.sleep(1)
if threading.active_count() == 1:
print(threading.enumerate())
print(c.value)
break
else:
print(threading.enumerate())
不影響其他線程的切換,但是上鎖后其他線程被阻塞了。只能等待。
3)鎖的應用場景
適用於訪問和修改同一個共享資源的時候,讀寫同一個資源的時候。
全部是讀取同一個共享資源需要鎖嗎?
因為共享資源是不可變的,每一次讀取都是一樣的值,所以不用加鎖。
使用鎖的注意事項:
少用鎖必要時用鎖,使用了鎖,多線程訪問被鎖的資源時候,就成了串行,要么排隊執行,要么爭搶執行。
加鎖時間越短越好,不需要拍就立即釋放鎖。
一定要避免死鎖。(死鎖,打不開,解不開,A有鎖,B也鎖,占有這把鎖的人遲遲不釋放鎖。沒有使用上下文,持有鎖的的線程異常退出了)
不使用鎖,有了效率,但是結果是錯的。
使用了鎖,效率低下,但是結果是對的。
4)非阻塞鎖使用
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
def worker(tasks):
for task in tasks:
time.sleep(0.01)
if task.lock.acquire(False):
logging.info('{}{}begin to start'.format(threading.current_thread(),task.name))
else:
logging.info('{}{}is working'.format(threading.current_thread(),task.name))
class Task:
def __init__(self,name):
self.name = name
self.lock = threading.Lock()
tasks = [Task('task-{}'.format(x))for x in range(10)]
for i in range(5):
threading.Thread(target=worker,name='worker-{}'.format(i),args=(tasks,)).start()
5)可重入鎖RLock:
是線程相關的鎖
線程A可重復鎖,並可以多次成功獲取,不會阻塞 ,最后要在線程A中做和acquire次數相同的release。
拿到這把鎖的線程可以多次使用。
別的線程拿到的話也是被阻塞的。
一個線程占用鎖的時候,其他線程不能拿到,只能的是阻塞。直到當前線程次有的鎖全部釋放完,其他線程才可以獲取。
可重入鎖,與線程相關,可在一個線程中獲取鎖,並可繼續在同一線程中不阻塞獲取鎖,當鎖未釋放完,其他線程獲取鎖就會阻塞。直到當前持有鎖的線程釋放完了鎖。
四、Condition
構造方法:condition(lock=None),可以傳入一個lock對象或Rlock對象,默認是Rlock。
| 名稱 |
含義 |
| Acquire(*args) |
獲取鎖 |
| Wait(self,timeout=None) |
等待超時 |
| Notify(n=1) |
喚醒之多指定書目個數的等待的線程,沒有等待的線程就沒有任何操作 |
| Notify_all() |
喚醒所有等待的線程。 |
用於生產者、消費者模型,為了解決生產者消費者速度匹配的問題:
import threading
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
class Dispatcher:
def __init__(self):
self.data = None
self.event = threading.Event()
def produce(self,total):
for _ in range(total):
data = random.randint(0,100)
logging.info(data)
self.data = data
self.event.wait(1)
self.event.set()
def consume(self):
while not self.event.is_set():
data = self.data
logging.info('recieved{}'.format(data))
self.data = None
self.event.wait(0.5)
d = Dispatcher()
p = threading.Thread(target=d.produce,args=(10,),name='producer')
c = threading.Thread(target=d.consume,name='consume')
c.start()
p.start()
消費者采用主動消費,消費者浪費了大量的時間,主動來查看有沒有數據。換成通知的機制。
import threading
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
class Dispatcher:
def __init__(self):
self.data = None
self.event = threading.Event()
self.cond = threading.Condition()
def produce(self,total):
for _ in range(total):
data = random.randint(0,100)
with self.cond:
logging.info(data)
self.data = data
self.cond.notify_all()
self.event.wait(1)
self.event.set()
def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait()
logging.info('recieved{}'.format(self.data))
self.data = None
self.event.wait(0.5)
d = Dispatcher()
p = threading.Thread(target=d.produce,args=(10,),name='producer')
c = threading.Thread(target=d.consume,name='consume')
c.start()
p.start()
如果是一個生產者,多個消費者呢:
import threading
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
class Dispatcher:
def __init__(self):
self.data = None
self.event = threading.Event()
self.cond = threading.Condition()
def produce(self,total):
for _ in range(total):
data = random.randint(0,100)
with self.cond:
logging.info(data)
self.data = data
self.cond.notify_all()
self.event.wait(1) #模擬生產速度
self.event.set()
def consume(self):
while not self.event.is_set():
with self.cond:
self.cond.wait() #阻塞等通知
logging.info('recieved{}'.format(self.data))
self.event.wait(0.5) #模擬消費 的速度
d = Dispatcher()
p = threading.Thread(target=d.produce,args=(10,),name='producer')
for i in range(5):
c = threading.Thread(target=d.consume, name='consume{}'.format(i))
c.start()
p.start()
Self.cond.notify_all()發通知:
修改為self.cond.notify(n=2) 隨機通知兩個消費者。
Condition總結:
用於生產者消費者模型中,解決生產者,消費者速度匹配的問題。
采用了通知機制,非常有效率。
使用方式:
使用condition,必須先acquire,用完了要release。因為內部實現了鎖,默認使用了RLock鎖。最好的方式就是使用上下文。
消費者wait,等待通知。
生產者生產好消息,對消費者發出通知,可以使用notify或者notify_all方法。
操作系統中基本單位是進程,進程是獨立的王國,操作系統中不可調用線程,線程是輕量級進程,有獨立自己棧。資源就是獨立的棧。
加載到內存中是進程管理,子系統之一。變成為一個實例,進程ID號。
驅動管理,
協議,
Tcp udp,
http協議。
Linux:
Unix:b語言基礎上c語言寫的。
Windows:
數據在哪里,計算就在哪里。
五、Barrier
1、柵欄,屏障、為路障、道閘
達到一定的條件,才會打開barrier。
| 名稱 |
含義 |
| Barrier(parties,action=None,timeout=None) |
構建barrier對象,指定參與方數目,timeout是wait方法未指定超時的默認值。 |
| n_waiting |
當前在屏障中等待的線程數 |
| Parties |
各方數,就是需要多少個等待 |
| Wait(timeout=None) |
等待通過屏障,返回0到線程數-1的整數,每個線程返回不同,如果wait方法設置了超時,並超時發送,屏障將處於broken狀態。 |
import threading
import logging
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
def worker(barrier:threading.Barrier):
logging.info('waiting for {}threads'.format(barrier.n_waiting))
try:
barrier_id = barrier.wait()
logging.info('after barrier{}'.format(barrier_id))
except threading.BrokenBarrierError:
logging.info('Broken Barrier')
barrier = threading.Barrier(3)
for x in range(3):
threading.Thread(target=worker,name='worker-{}'.format(x),args=(barrier,)).start()
logging.info('started')
2018-06-11 21:29:45,173 worker-0 8804 waiting for 0threads
2018-06-11 21:29:45,198 worker-1 2668 waiting for 1threads
2018-06-11 21:29:45,199 worker-2 2716 waiting for 2threads
2018-06-11 21:29:45,199 MainThread 10160 started
2018-06-11 21:29:45,199 worker-2 2716 after barrier2
2018-06-11 21:29:45,199 worker-0 8804 after barrier0
2018-06-11 21:29:45,199 worker-1 2668 after barrier1
如果Barrier()的值設置為3,開啟5個線程,前三個線程執行后,后面兩個線程不夠三個線程,所以一直在等待,直到湊到三個barrier才打開。
上面的運行結果,所有線程沖到了barrier前等待,直到到達parties的數目,屏障才打開,所有線程停止等待,繼續執行。
再有線程wait,屏障就就緒等待到達參數方數目。
例如就是賽馬需要的馬匹全部就位,開閘,下一批陸續來到繼續等待比賽。。
| 名稱 |
含義 |
| Broken |
如果屏障處於打破的狀態 返回true。 |
| Abort() |
將屏障至於broken狀態,等待中的線程或者調用等待方法的線程中都會拋出brokenbarriererror異常,直達reset方法來恢復屏障 |
| Reset() |
重置,恢復屏障,重新開始攔截 |
import threading
import logging
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
def worker(barrier:threading.Barrier):
logging.info('waiting for {}threads'.format(barrier.n_waiting))
try:
barrier_id = barrier.wait()
logging.info('after barrier{}'.format(barrier_id))
except threading.BrokenBarrierError:
logging.info('Broken Barrier')
barrier = threading.Barrier(3)
for x in range(9):
if x == 2:
barrier.abort()
elif x == 6:
barrier.reset()
threading.Event().wait(1)
threading.Thread(target=worker,name='worker-{}'.format(x),args=(barrier,)).start()
logging.info('started')
2018-06-11 21:42:20,952 worker-0 8460 waiting for 0threads
2018-06-11 21:42:21,953 worker-1 8484 waiting for 1threads
2018-06-11 21:42:21,953 worker-1 8484 Broken Barrier
2018-06-11 21:42:21,954 worker-0 8460 Broken Barrier
2018-06-11 21:42:22,954 worker-2 1500 waiting for 0threads
2018-06-11 21:42:22,955 worker-2 1500 Broken Barrier
2018-06-11 21:42:23,956 worker-3 1200 waiting for 0threads
2018-06-11 21:42:23,956 worker-3 1200 Broken Barrier
2018-06-11 21:42:24,958 worker-4 6652 waiting for 0threads
2018-06-11 21:42:24,958 worker-4 6652 Broken Barrier
2018-06-11 21:42:25,959 worker-5 3212 waiting for 0threads
2018-06-11 21:42:25,959 worker-5 3212 Broken Barrier
2018-06-11 21:42:26,961 worker-6 6344 waiting for 0threads
2018-06-11 21:42:27,962 worker-7 9732 waiting for 1threads
2018-06-11 21:42:28,964 worker-8 6068 waiting for 2threads
2018-06-11 21:42:28,964 worker-8 6068 after barrier2
2018-06-11 21:42:28,965 worker-6 6344 after barrier0
2018-06-11 21:42:28,965 MainThread 9768 started
2018-06-11 21:42:28,965 worker-7 9732 after barrier1
屏障等待了兩個,屏障就被break,waiting的線程跑出了brokenbarriererror異常,新wait的線程也是拋出異常,直到屏障恢復,才繼續按照parties數目要求繼續攔截線程。
非broken狀態情況下才可以繼續等待。
2、wait方法超時實例
import threading
import logging
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
def worker(barrier:threading.Barrier,i:int):
logging.info('waiting for {}threads'.format(barrier.n_waiting))
try:
# barrier_id = barrier.wait()
# logging.info('after barrier{}'.format(barrier_id))
logging.info(barrier.broken) #是否是broken
if i < 3:
barrier_id = barrier.wait(1)#超時后,屏障broken
else:
if i == 6:
barrier.reset() #恢復屏障
barrier_id = barrier.wait()
logging.info('after barrier{}'.format(barrier_id))
except threading.BrokenBarrierError:
logging.info('Broken Barrier.run.')
barrier = threading.Barrier(3)
for x in range(9):
# if x == 2:
# barrier.abort()
# elif x == 6:
# barrier.reset()
threading.Event().wait(2)
threading.Thread(target=worker,name='worker-{}'.format(x),args=(barrier,x)).start()
logging.info('started')
2018-06-11 21:53:47,966 worker-0 8656 waiting for 0threads
2018-06-11 21:53:47,967 worker-0 8656 False
2018-06-11 21:53:48,967 worker-0 8656 Broken Barrier.run.
2018-06-11 21:53:49,968 worker-1 168 waiting for 0threads
2018-06-11 21:53:49,968 worker-1 168 True
2018-06-11 21:53:49,969 worker-1 168 Broken Barrier.run.
2018-06-11 21:53:51,969 worker-2 6448 waiting for 0threads
2018-06-11 21:53:51,970 worker-2 6448 True
2018-06-11 21:53:51,970 worker-2 6448 Broken Barrier.run.
2018-06-11 21:53:53,970 worker-3 6192 waiting for 0threads
2018-06-11 21:53:53,970 worker-3 6192 True
2018-06-11 21:53:53,971 worker-3 6192 Broken Barrier.run.
2018-06-11 21:53:55,972 worker-4 6380 waiting for 0threads
2018-06-11 21:53:55,972 worker-4 6380 True
2018-06-11 21:53:55,973 worker-4 6380 Broken Barrier.run.
2018-06-11 21:53:57,973 worker-5 3228 waiting for 0threads
2018-06-11 21:53:57,973 worker-5 3228 True
2018-06-11 21:53:57,974 worker-5 3228 Broken Barrier.run.
2018-06-11 21:53:59,975 worker-6 3924 waiting for 0threads
2018-06-11 21:53:59,975 worker-6 3924 True
2018-06-11 21:54:01,975 worker-7 6636 waiting for 1threads
2018-06-11 21:54:01,975 worker-7 6636 False
2018-06-11 21:54:03,976 worker-8 9684 waiting for 2threads
2018-06-11 21:54:03,976 worker-8 9684 False
2018-06-11 21:54:03,977 worker-8 9684 after barrier2
2018-06-11 21:54:03,977 worker-7 6636 after barrier1
2018-06-11 21:54:03,977 MainThread 10036 started
2018-06-11 21:54:03,978 worker-6 3924 after barrier0
3、Barrier應用
並發初始化:
所有線程都必須初始化完成后,才能繼續工作。運行前加載數據、檢查,如果這些工作沒完成,就開始運行,將不能正常工作。
10個線程做10種准備工作,每一個線程負責一種工作,只有這10個線程都完成后,才能繼續工作,先完成的要等待后完成的線程。
啟動一個程序,需要先加載磁盤文件、緩存預熱、初始化連接池等工作,這些工作可以齊頭並進,不過只有都滿足了,程序才會繼續向后執行,假設數據庫連接失敗,則初始化工作失敗,就要abort,所有線程收到異常退出。
4、semaphore信號量
和lock很像,信號量對象內部維護一個倒計數器,每一次acquire都會減一,當acquire方法發現技術為0就阻塞請求的線程,直到其他線程對信號量release后,計數器大於0,恢復阻塞的線程。
| 名稱 |
含義 |
| Semaphore(value=1) |
構造方法,value小於0,拋出valueerror異常 |
| Acquire(blocking=True,timeout=None) |
獲取信號量,計數器減一,獲取成功返回true |
| Release() |
釋放信號量,計數器加1 |
計數器永遠不會低於0,因為acquire的時候,發現是0,都會被阻塞。
用with語法。
Boundedsemaphore。
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
def worker(s:threading.Semaphore):
logging.info('in sub thread')
logging.info(s.acquire())
logging.info('sub thread over')
s = threading.Semaphore(3)
logging.info(s.acquire())
print(s._value)
logging.info(s.acquire())
print(s._value)
logging.info(s.acquire())
print(s._value)
threading.Thread(target=worker,args=(s,)).start()
time.sleep(2)
logging.info(s.acquire(False))
logging.info(s.acquire(timeout=3))
logging.info('relesed')
s.release()
2018-06-11 22:36:17,646 MainThread 9340 True
2
2018-06-11 22:36:17,647 MainThread 9340 True
1
2018-06-11 22:36:17,647 MainThread 9340 True
0
2018-06-11 22:36:17,647 Thread-1 8124 in sub thread
2018-06-11 22:36:19,647 MainThread 9340 False
2018-06-11 22:36:22,648 MainThread 9340 False
2018-06-11 22:36:22,648 MainThread 9340 relesed
2018-06-11 22:36:22,648 Thread-1 8124 True
2018-06-11 22:36:22,649 Thread-1 8124 sub thread over
5、連接池
連接池,因為資源有限,且開啟一個連接成本較高,所以利用連接池。
一個簡單的連接池,連接池應該有容量(總數),有一個工廠方法可以獲取連接,能夠把不用的連接返回。供其他調用者使用。
class Conn:
def __init__(self,name):
self.name = name
class Pool:
def __init__(self,count:int):
self.count = count
#池中是連接對象的列表
self.pool = [self._connect('conn-{}'.format(x))for x in range(self.count)]
def _connect(self,conn_name):
#創建連接的方法,返回一個名稱
return Conn(conn_name)
def get_conn(self):
#從池中拿走一個連接
if len(self.pool) > 0:
return self.pool.pop()
def return_conn(self,conn:Conn):
#向池中添加一個連接
self.pool.append(conn)
上面的例子只是一個簡單的功能實現。Get_conn()方法在多線程的時候也會有線程安全問題。
import threading
import logging
import random
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
class Conn:
def __init__(self,name):
self.name = name
def __repr__(self):
return self.name
class Pool:
def __init__(self,count:int):
self.count = count
self.pool = [self._connect('conn-{}'.format(x))for x in range(count)]
self.semaphore = threading.Semaphore(count)
def _connect(self,conn_name):
return Conn(conn_name)
def get_conn(self):
print('----')
self.semaphore.acquire()
print('======')
conn = self.pool.pop()
return conn
def return_conn(self,conn:Conn):
self.pool.append(conn)
self.semaphore.release()
pool = Pool(3)
def worker(pool:Pool):
conn = pool.get_conn()
logging.info(conn)
threading.Event().wait(random.randint(1,2))
pool.return_conn(conn)
for i in range(6):
threading.Thread(target=worker,name='worker{}'.format(i),args=(pool,)).start()
----
2018-06-12 20:13:21,119 worker0 8392 conn-2
======
----
======
2018-06-12 20:13:21,120 worker1 8240 conn-1
----
======
----
2018-06-12 20:13:21,120 worker2 3216 conn-0
----
----
======
2018-06-12 20:13:22,120 worker3 5700 conn-0
2018-06-12 20:13:23,119 worker4 7448 conn-2
======
2018-06-12 20:13:23,120 worker5 2556 conn-1
======
上例中,使用信號量解決資源有限的問題,如果池中有資源,請求者獲取資源時候信號量減1,拿走資源,當請求超過資源數,請求者只能等待,當使用者用完歸還資源后信號量加1,等待線程就可以被喚醒拿走資源。
容器,預加載,懶加載。
6、問題
Self.conns.append(conn)是否需要加鎖。
1)邏輯分析處理
還沒有使用信號量,就release,。
import logging
import threading
sema = threading.Semaphore(3)
logging.warning(sema.__dict__)
for i in range(3):
sema.acquire()
logging.warning('~~~~~~~')
logging.warning(sema.__dict__)
for i in range(4):
sema.release()
logging.warning(sema.__dict__)
for i in range(3):
sema.acquire()
logging.warning('~~~~')
logging.warning(sema.__dict__)
sema.acquire()
logging.warning('~~~~~')
logging.warning(sema.__dict__)
WARNING:root:{'_cond': <Condition(<unlocked _thread.lock object at 0x000000C13D6C0B98>, 0)>, '_value': 3}
WARNING:root:~~~~~~~
WARNING:root:{'_cond': <Condition(<unlocked _thread.lock object at 0x000000C13D6C0B98>, 0)>, '_value': 0}
WARNING:root:{'_cond': <Condition(<unlocked _thread.lock object at 0x000000C13D6C0B98>, 0)>, '_value': 4}
WARNING:root:~~~~
WARNING:root:{'_cond': <Condition(<unlocked _thread.lock object at 0x000000C13D6C0B98>, 0)>, '_value': 1}
WARNING:root:~~~~~
WARNING:root:{'_cond': <Condition(<unlocked _thread.lock object at 0x000000C13D6C0B98>, 0)>, '_value': 0}
計數器超過了4,超過了設置的最大值,需要解決問題:
Boundedsemaphore類
有界的信號量,不允許使用release超出初始值的范圍,否則就會拋出valueerror異常。
保證了多歸還連接拋出異常。
2)如果使用了信號量,還沒用完。
計數器還差一個就滿了,有三個線程ABC都執行了第一句,都沒有來得及release,這個時候輪到 A release,正常的release,然后輪到線程C先release,一定會出現問題,超屆,會拋出異常,信號量可以保證,一定不能多歸還。
3)許多線程用完了信號量
沒有信號量的線程都被阻塞,沒有線程和歸還的線程爭搶,當append后才release,這個時候才能等待的線程被喚醒,才能pop,沒有獲取信號量的不能pop,這樣才是安全的。
7、信號量和鎖
鎖,只允許同一個時間一個線程獨占資源,他是特殊的信號量,即信號量初始值為1.
信號量,可以多個線程訪問共享資源,但這個共享資源屬相有限,鎖,可以看做是特殊的信號量。
Event lock sem 三個必須要會用。
8、數據結構和gill
Queue是線程安全的,里面用到了鎖,還有condition,用的是lock。
Queue是標准庫模塊,提供FIFO的queue,lifo的隊列,有限隊列。
Queue是線程安全的,適用於線程間的安全交換數據,內部使用了lock和condition。
原子操作,一堆操作中要么全部做完,要么全部做不完。
Guarantee 保證。
嚴格要注意的事項:
9、gil全局解釋器鎖
CPython在解釋器進程中大鎖,叫做gill全局解釋器鎖。大鎖解決進程內的所有線程的問題,在CPU上只有一個線程被調度使用。
同一個時間內同一個進程內只有一個線程在工作,在執行字節碼,甚至在多核的CPU的情況下,也是如此。
cPython中:
IO密集型:由於線程阻塞,就會調度其他線程。
CPU密集型:不訪問網絡,當前線程可能會連續的獲得gill,導致其他線程幾乎無法使用CPU。
在cPython中由於gill存在,IO密集型,使用多線程較為合算,CPU密集型,使用多進程,要繞開gill。
新版cPython正在努力優化gill的問題,但是不是移除的問題,
Python中絕大多數內置數據結構的讀、寫操作都是原子操作。
由於gill的存在,Python的內置數據類型在多線程編程的時候變成了安全的,但是實際上本身不是線程和安全類型。
移除gill,會降低cPython單線程的執行效率。
本身不安全,有全局解釋器鎖gill,都是由線程操作的,線程安全的。
