一、paramiko
二、進程、與線程區別
三、python GIL全局解釋器鎖
四、線程
- 語法
- join
- 線程鎖之Lock\Rlock\信號量
- 將線程變為守護進程
- Event事件
- queue隊列
- 生產者消費者模型
一、paramiko
用於遠程連接並執行簡單的命令
使用用戶名密碼連接:
1 import paramiko 2 3 # 創建SSH對象 4 ssh = paramiko.SSHClient() 5 # 允許連接不在know_hosts文件中的主機 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 連接服務器 8 ssh.connect(hostname='172.16.5.163', port=22, username='root', password='111111') 9 10 # 執行命令 11 stdin, stdout, stderr = ssh.exec_command('df') 12 # 獲取命令結果 13 result = stdout.read() 14 print(result.decode()) 15 16 # 關閉連接 17 ssh.close() 18 19 20 結果 21 Filesystem 1K-blocks Used Available Use% Mounted on 22 /dev/mapper/VolGroup-lv_root 51606140 1518048 47466652 4% / 23 tmpfs 510172 0 510172 0% /dev/shm 24 /dev/sda1 495844 33461 436783 8% /boot 25 /dev/mapper/VolGroup-lv_home 2059640248 203016 1954813516 1% /home
使用公鑰連接
import paramiko private_key = paramiko.RSAKey.from_private_key_file('id_rsa.txt') #創建SSH對象 ssh = paramiko.SSHClient() #允許連接不在know_host文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #連接服務器 ssh.connect(hostname='172.16.5.163',port=22,username='root',pkey=private_key) #執行命令 stdin,stdout,stderr = ssh.exec_command('df -h') #獲取命令結果 restult = stdout.read() #打印執行結果 print(restult.decode()) #關閉連接 ssh.close()
SFTPClient使用用戶名密碼完成上傳下載
1 import paramiko 2 3 transport = paramiko.Transport(('172.16.5.163',22)) 4 transport.connect(username='root',password='111111') 5 6 sftp = paramiko.SFTPClient.from_transport(transport) 7 # 將location.py 上傳至服務器 /tmp/test.py 8 sftp.put('D:\\test1\\put.txt', '/tmp/put.txt') 9 # 將remove_path 下載到本地 local_path 10 sftp.get('/tmp/get.txt', 'D:\\test1\\get.txt') 11 12 transport.close()
SFTPClient使用公鑰完成上傳下載
1 import paramiko 2 3 private_key = paramiko.RSAKey.from_private_key_file('id_rsa.txt') 4 5 transport = paramiko.Transport(('172.16.5.163', 22)) 6 transport.connect(username='root', pkey=private_key ) 7 8 sftp = paramiko.SFTPClient.from_transport(transport) 9 # 將location.py 上傳至服務器 /tmp/test.py 10 sftp.put('D:\\test1\\put.txt', '/tmp/put.txt') 11 # 將remove_path 下載到本地 local_path 12 sftp.get('/tmp/get.txt', 'D:\\test1\\get.txt') 13 14 transport.close()
二、進程、與線程區別
線程:是操作系統的最小調度單元,一堆指令的集合。
進程:操作CPU,必須先創建一個線程
進程和線程的區別 啟動一個線程比啟動一個進程快,運行速度沒有可比性。 先有一個進程然后才能有線程。 1、進程包含線程 2、線程共享內存空間 3、進程內存是獨立的(不可互相訪問) 4、進程可以生成子進程,子進程之間互相不能互相訪問(相當於在父級進程克隆兩個子進程) 5、在一個進程里面線程之間可以交流。兩個進程想通信,必須通過一個中間代理來實現 6、創建新線程很簡單,創建新進程需要對其父進程進行克隆。 7、一個線程可以控制或操作同一個進程里面的其它線程。但進程只能操作子進程。 8、父進程可以修改不影響子進程,但不能修改。
三、python GIL全局解釋器鎖
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行,擦。。。,那這還叫什么多線程呀?莫如此早的下結結論,聽我現場講。
首先需要明確的一點是GIL
並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標准,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL
歸結為Python語言的缺陷。所以這里要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL
詳細說明:http://www.dabeaz.com/python/UnderstandingGIL.pdf
其實就是為了解決同一時間內多個線程處理一個運算(浪費資源),全局解釋器鎖解決同一時間如果有線程執行過了,其它線程就不重復運算。
四、線程
- 語法
寫法一:
1 import threading 2 3 #創建一個函數,每個線程要使用的函數 4 def fun(n): 5 print(n) 6 7 n1 = threading.Thread(target=fun,args=('n1',))#生成一個線程 8 n2 = threading.Thread(target=fun,args=('n2',))#再生成一個線程 9 10 n1.start()#啟動n1這個線程 11 n2.start()#啟動n2這個線程 12 13 print(n1.getName())#獲取線程名 14 print(n2.getName())#獲取線程名
寫法二:
import threading,time class mythread(threading.Thread):#繼承thread.Thread這個函數 def __init__(self,n): super(mythread,self).__init__() self.n = n def run(self):#這里這個方法名必須命名為run,否則失敗 print('運行線程',self.n) n1 = mythread("n1") n2 = mythread("n2") n1.start() n2.start()
2、join
用法例如線程n1和n2,想要等n1的結果在執行n2,就需要n1.join。具體如下
1 import threading,time 2 3 class mythread(threading.Thread):#繼承thread.Thread這個函數 4 def __init__(self,n,sleep_time): 5 super(mythread,self).__init__() 6 self.n = n 7 self.sleep_time = sleep_time 8 9 10 def run(self):#這里這個方法名必須命名為run,否則失敗 11 print("開始運行線程",self.n) 12 time.sleep(self.sleep_time)#執行完一個線程后停止2秒 13 print('運行線程結束',self.n,threading.current_thread()) 14 15 16 n1 = mythread("n1",2) 17 n2 = mythread("n2",4) 18 19 n1.start() 20 n2.start() 21 n1.join 22 print('這里是主程序',threading.current_thread())
1 開始運行線程 n1 2 開始運行線程 n2 3 這里是主程序 <_MainThread(MainThread, started 12624)>#一個進程啟動收首先會打印主線程 4 運行線程結束 n1 <mythread(Thread-1, started 4020)>#這里是普通線程 5 運行線程結束 n2 <mythread(Thread-2, started 9088)>#這里是普通線程
3、線程鎖之Lock\Rlock\信號量
線程鎖:保證同一時間內只有一個線程可以修改一個數據(避免同一時間內多個線程修改同一個數據)
4、將線程變為守護進程
1 import threading 2 import time 3 4 def run(n): 5 print("task",n) 6 time.sleep(2) 7 print('停') 8 9 10 11 start_time = time.time() 12 t_obj = [] 13 for i in range(50): 14 t = threading.Thread(target=run,args=("t--%s"%i,)) 15 t.setDaemon(True)#把當前線程線程設置為守護進程,必須要寫在start前面,否則報錯 16 t.start() 17 t_obj.append(t) 18 19 20 print("主線程執行")#主線程 21 print("cost:",time.time() - start_time) 22 23 #非守護進程退出了,就全部退出(這里是主線程執行完畢后,就不等子線程了,主線程退出了,就都退出了)
線程鎖(互斥鎖Mutex)
一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味着每個線程可以訪問同一份數據,此時,如果2個線程同時要修改同一份數據,會出現什么狀況?
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每個線程中都獲取這個全局變量 6 print('--get num:',num ) 7 time.sleep(1) 8 num -=1 #對此公共變量進行-1操作 9 10 num = 100 #設定一個共享變量 11 thread_list = [] 12 for i in range(100): 13 t = threading.Thread(target=addNum) 14 t.start() 15 thread_list.append(t) 16 17 for t in thread_list: #等待所有線程執行完畢 18 t.join() 19 20 21 print('final num:', num )
正常來講,這個num結果應該是0, 但在python 2.7上多運行幾次,會發現,最后打印出來的num結果不總是0,為什么每次運行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操作, 由於2個線程是並發同時運行的,所以2個線程很有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量后,結果就都是99。那怎么辦呢? 很簡單,每個線程在要修改公共數據時,為了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉后才能再訪問此數據。
*注:不要在3.x上運行,不知為什么,3.x上的結果總是正確的,可能是自動加了鎖
加鎖版本
import time import threading def addNum(): global num #在每個線程中都獲取這個全局變量 print('--get num:',num ) time.sleep(1) lock.acquire() #修改數據前加鎖 num -=1 #對此公共變量進行-1操作 lock.release() #修改后釋放 num = 100 #設定一個共享變量 thread_list = [] lock = threading.Lock() #生成全局鎖 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢 t.join() print('final num:', num )
GIL VS Lock
機智的同學可能會問到這個問題,就是既然你之前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock? 注意啦,這里的lock是用戶級的lock,跟那個GIL沒關系 ,具體我們通過下圖來看一下+配合我現場講給大家,就明白了。
RLock(遞歸鎖)
一把大鎖中還有很多小鎖
1 import threading,time 2 3 def run1(): 4 print("grab the first part data") 5 lock.acquire() 6 global num 7 num +=1 8 lock.release() 9 return num 10 def run2(): 11 print("grab the second part data") 12 lock.acquire() 13 global num2 14 num2+=1 15 lock.release() 16 return num2 17 def run3(): 18 lock.acquire() 19 res = run1() 20 print('--------between run1 and run2-----') 21 res2 = run2() 22 lock.release() 23 print(res,res2) 24 25 26 if __name__ == '__main__': 27 28 num,num2 = 0,0 29 lock = threading.RLock() 30 for i in range(10): 31 t = threading.Thread(target=run3) 32 t.start() 33 34 while threading.active_count() != 1: 35 print(threading.active_count()) 36 else: 37 print('----all threads done---') 38 print(num,num2)
信號量:同一時間內,最大允許多個線程運行
1 import threading,time 2 3 def run(n): 4 semaphore.acquire() 5 time.sleep(1) 6 print("run the thread: %s\n" %n) 7 semaphore.release() 8 9 10 semaphore = threading.BoundedSemaphore(5) #最多允許5個線程同時運行 11 for i in range(19): 12 t = threading.Thread(target=run,args=(i,)) 13 t.start() 14 15 while threading.active_count() != 1: 16 pass #print threading.active_count() 17 else: 18 print('----all threads done---')
5、Event事件
通過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。
#event.wait()等待
#event.clear() #把標志位清空
#event.is_set()設定標志位

1 import threading,time 2 event = threading.Event() 3 4 def lighter(): 5 count = 0 6 event.set() #先設定綠燈 7 while True: 8 if count >5 and count < 10:#改為紅燈 9 event.clear() #把標志位清空 10 print("\033[41;1m \033[0m") 11 elif count > 10: 12 event.set() #變綠燈 13 count = 0 14 else: 15 print("\033[42;1m \033[0m") 16 time.sleep(1) 17 count += 1 18 def car(name): 19 while True: 20 if event.is_set():#代表綠燈 21 print("[%s] running"%name) 22 time.sleep(1) 23 else: 24 print("[%s] 紅燈稍等"%name) 25 event.wait() 26 print("033[34;1m[%s] green light is on,start going...\033[0m"%name) 27 lighter = threading.Thread(target=lighter,) 28 lighter.start() 29 car1 = threading.Thread(target=car,args=("Tesla",)) 30 car1.start()
6、queue隊列
提高運行效率,完成程序的解耦。
1 >>> import queue 2 >>> q = queue.Queue()#生成實例 3 >>> q.put('d1')#存第一個硬盤 4 >>> q.put('d2')#存第二個硬盤 5 >>> q.put('d3')#存第三個硬盤 6 >>> q.qsize()#獲取隊列大小 7 3 8 >>> q.get()#取值(先存先取,沒有就卡住),可以通過異常,來獲取 9 'd1' 10 >>> 11 >>> q.get() 12 'd2' 13 >>> q.get()
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
-
class
queue.
Queue
(maxsize=0) #先入先出
-
class
queue.
LifoQueue
(maxsize=0) #last in fisrt out -
class
queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
-
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
The lowest valued entries are retrieved first (the lowest valued entry is the one returned by
sorted(list(entries))[0]
). A typical pattern for entries is a tuple in the form:(priority_number, data)
.
-
exception
queue.
Empty
-
Exception raised when non-blocking
get()
(orget_nowait()
) is called on aQueue
object which is empty.
-
exception
queue.
Full
-
Exception raised when non-blocking
put()
(orput_nowait()
) is called on aQueue
object which is full.
-
Queue.
qsize
()
-
Queue.
empty
() #return True if empty
-
Queue.
full
() # return True if full
-
Queue.
put
(item, block=True, timeout=None) -
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the
Full
exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise theFull
exception (timeout is ignored in that case).
-
Queue.
put_nowait
(item) -
Equivalent to
put(item, False)
.
-
Queue.
get
(block=True, timeout=None) -
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the
Empty
exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise theEmpty
exception (timeout is ignored in that case).
-
Queue.
get_nowait
() -
Equivalent to
get(False)
.
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
-
Queue.
task_done
() -
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each
get()
used to fetch a task, a subsequent call totask_done()
tells the queue that the processing on the task is complete.If a
join()
is currently blocking, it will resume when all items have been processed (meaning that atask_done()
call was received for every item that had beenput()
into the queue).Raises a
ValueError
if called more times than there were items placed in the queue.
-
Queue.
join
() block直到queue被消費完畢
7、生產者消費者模型
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
1 import threading,time 2 3 import queue 4 5 q = queue.Queue(maxsize=10) 6 7 def Producer(name): 8 count = 1 9 while True: 10 q.put("骨頭%s" % count) 11 print("生產了骨頭",count) 12 count +=1 13 time.sleep(0.1) 14 15 16 17 def Consumer(name): 18 #while q.qsize()>0: 19 while True: 20 print("[%s] 取到[%s] 並且吃了它..." %(name, q.get())) 21 time.sleep(1) 22 23 24 25 p = threading.Thread(target=Producer,args=("Alex",)) 26 c = threading.Thread(target=Consumer,args=("ChengRonghua",)) 27 c1 = threading.Thread(target=Consumer,args=("王森",)) 28 29 30 p.start() 31 c.start() 32 c1.start()