【Python3之多線程】


一、threading模塊

multiprocess模塊的完全模仿了threading模塊的接口,二者在使用層面,有很大的相似性。

1.開啟線程的兩種方式(同Process)

方法一

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('hh',))
    t.start()
    print('主線程')
主線程
hh say hello

 

方法二

from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('hh')
    t.start()
    print('主線程')
主線程
hh say hello

 

2.在一個進程下開啟多個線程與在一個進程下開啟多個子進程的區別

  • 開啟速度,主進程下開啟線程速度較快。
from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()
    print('主進程-->線程')
    '''
    打印結果:
    hello
    主進程-->線程
    '''

    #在主進程下開啟子進程
    t=Process(target=work)
    t.start()
    print('主進程-->子進程')
    '''
    打印結果:
    主進程-->子進程
    hello
    '''
hello
主進程-->線程
主進程-->子進程
hello

 

  • 開啟PID

在主進程下開啟多個線程,每個線程都跟主進程的pid一樣,開多個進程,每個進程都有不同的pid

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主進程下開啟多個線程,每個線程都跟主進程的pid一樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主進程-->線程pid',os.getpid())

    #part2:開多個進程,每個進程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主進程-->子進程pid',os.getpid())
hello 13002
hello 13002
主進程-->線程pid 13002
主進程-->子進程pid 13002
hello 13003
hello 13004

 

#_*_coding:utf-8_*_
#!/usr/bin/env python
import multiprocessing
import threading

import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
    while True:
        data=conn.recv(1024)
        print(data)
        conn.send(data.upper())

if __name__ == '__main__':

    while True:
        conn,addr=s.accept()


        p=threading.Thread(target=action,args=(conn,))
        p.start()
多線程並發的socket服務端
import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if not msg:continue

    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data)
客戶端

 

三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化后的結果存入文件

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()

 

3.線程的join與setdaemon

與進程的方法都是類似的,其實是multiprocessing模仿threading的接口

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('h',))
    t.setDaemon(True)
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())

 

4.線程相關的其他方法補充

Thread實例對象的方法
# isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

 

 

from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主進程下開啟線程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count())
    print('主線程/主進程')
MainThread
<_MainThread(MainThread, started 140736678523840)>
[<_MainThread(MainThread, started 140736678523840)>, <Thread(Thread-1, started 123145314349056)>]
2
主線程/主進程
Thread-1

 

二、 Python GIL(Global Interpreter Lock)

全局解釋器鎖。在Cpython解釋器中,同一個進程下開啟的多線程,同一時刻只能有一個線程執行,無法利用多核優勢。

首先需要明確的一點是GIL並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標准,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這里要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL

 

補充:

1. cpu到底是用來做計算的,還是用來做I/O的?

1. 多cpu,意味着可以有多個核並行完成計算,所以多核提升的是計算性能

2. 每個cpu一旦遇到I/O阻塞,仍然需要等待,所以多核對I/O操作沒什么用處

一個工人相當於cpu,此時計算相當於工人在干活,I/O阻塞相當於為工人干活提供所需原材料的過程,工人干活的過程中如果沒有原材料了,則工人干活的過程需要停止,直到等待原材料的到來。

如果你的工廠干的大多數任務都要有准備原材料的過程(I/O密集型),那么你有再多的工人,意義也不大,還不如一個人,在等材料的過程中讓工人去干別的活,

反過來講,如果你的工廠原材料都齊全,那當然是工人越多,效率越高

結論:

  對計算來說,cpu越多越好,但是對於I/O來說,再多的cpu也沒用

 

我們有四個任務需要處理,處理方式肯定是要玩出並發的效果,解決方案可以是:

方案一:開啟四個進程

方案二:一個進程下,開啟四個線程

 

單核情況下,分析結果: 

  如果四個任務是計算密集型,沒有多核來並行計算,方案一徒增了創建進程的開銷,方案二勝

  如果四個任務是I/O密集型,方案一創建進程的開銷大,且進程的切換速度遠不如線程,方案二勝

 

多核情況下,分析結果:

  如果四個任務是計算密集型,多核意味着並行計算,在python中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝

  如果四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

 

結論:現在的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提升,甚至不如串行(沒有大量切換),但是,對於IO密集型的任務效率還是有顯著提升的。

 

#計算密集型
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
    res=0
    for i in range(1000000):
        res+=i

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    # for i in range(300): #串行
    #     work()

    for i in range(300):
        # t=Thread(target=work) #多線程49.64094281196594
        t=Process(target=work) #多進程11.664679050445557
        t_l.append(t)
        t.start()

    for i in t_l:
        i.join()

    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

    print('主線程')


# run time is 49.64094281196594
# 主線程

# run time is 11.664679050445557
# 主線程

 

#I/O密集型
from threading import Thread
from multiprocessing import Process
import time
import os
def work():
    time.sleep(2) #模擬I/O操作,可以打開一個文件來測試I/O,與sleep是一個效果
    print(os.getpid())

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(1000):
        t=Thread(target=work) #耗時大概為2秒
        # t=Process(target=work) #耗時大概為25秒,創建進程的開銷遠高於線程,而且對於I/O密集型,多cpu根本不管用
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

 

結論:

多線程用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析

 

三、同步鎖

import time
import threading

def addNum():
    global num #在每個線程中都獲取這個全局變量
    #num-=1

    temp=num
    time.sleep(0.1)
    num =temp-1  # 對此公共變量進行-1操作

num = 100  #設定一個共享變量

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有線程執行完畢
    t.join()

print('Result: ', num)
Result:  99

 

鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完后,再調用release方法釋放鎖:

import threading

R=threading.Lock()

R.acquire()
'''
對公共數據的操作
'''
R.release()

 

補充:

GIL VS Lock

    Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock? 

 首先我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據

    然后,我們可以得出結論:保護不同的數據就應該加不同的鎖。

 最后,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),后者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock   

 

四、死鎖與遞歸鎖

死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,如下就是死鎖

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()

'''
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
然后就卡住,死鎖了
'''

 

解決方法,遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

 mutexA=mutexB=threading.RLock()
 #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止

 

五、信號量Semahpore

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):

import threading
import time

semaphore = threading.Semaphore(5)

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() + ' get semaphore')
        time.sleep(2)
        semaphore.release()

for i in range(20):
  t1 = threading.Thread(target=func)
  t1.start()

與進程池是完全不同的概念,進程池Pool(4),最大只能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程

 

六、Event

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就 會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。

可以考慮一種應用場景(僅僅作為說明),例如,我們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去連接Redis的服務,一般情況下,如果Redis連接不成功,在各個線程的代碼中,都會去嘗試重新連接。如果我們想要在啟動時確保Redis服務正常,才讓那些工作線程去連接Redis服務器,那么我們就可以采用threading.Event機制來協調各個工作線程的連接操作:主線程中會去嘗試連接Redis服務,如果正常的話,觸發事件,各工作線程會嘗試連接Redis服務。

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()
(t1        ) Waiting for redis ready...
(t2        ) Waiting for redis ready...
(MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
(t1        ) redis ready, and connect to redis server and do some work [Tue Jul 11 11:53:19 2017]
(t2        ) redis ready, and connect to redis server and do some work [Tue Jul 11 11:53:19 2017]

threading.Event的wait方法還接受一個超時參數,默認情況下如果事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數之后,如果阻塞時間超過這個參數設定的值之后,wait方法會返回。對應於上面的應用場景,如果Redis服務器一致沒有啟動,我們希望子線程能夠打印一些日志來不斷地提醒我們當前沒有一個可以連接的Redis服務,我們就可以通過設置這個超時參數來達成這樣的目的:

def conn_mysql():
    count=0
    while not e.is_set():
        print('%s 第 <%s> 次嘗試' %(threading.current_thread().getName(),count))
        count+=1
        e.wait(0.5)
    print('%s ready to conn mysql' %threading.current_thread().getName())
    time.sleep(1)

 

七、定時器

定時器,指定n秒后執行某操作

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

 

八、線程queue

queue隊列 :使用import queue,用法與進程Queue一樣

 

  • class queue.Queue(maxsize=0) #先進先出
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

 

  • class queue.LifoQueue(maxsize=0) #last in fisrt out 
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(后進先出):
third
second
first
'''

 

  • class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

 

九、paramiko模塊

1.定義

paramiko是一個用於做遠程控制的模塊,使用該模塊可以對遠程服務器進行命令或文件操作,值得一說的是,fabric和ansible內部的遠程管理就是使用的paramiko來現實。

 

2.下載安裝

pycrypto,由於 paramiko 模塊內部依賴pycrypto,所以先下載安裝pycrypto
pip3 install pycrypto
pip3 install paramiko
注:如果在安裝pycrypto2.0.1時發生如下錯誤
        command 'gcc' failed with exit status 1...
可能是缺少python-dev安裝包導致
如果gcc沒有安裝,請事先安裝gcc

 

3.使用

SSHClient

用於連接遠程服務器並執行基本命令

  • 基於用戶名密碼連接:
import paramiko
  
# 創建SSH對象
ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123')
  
# 執行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結果
result = stdout.read()
  
# 關閉連接
ssh.close()
  • SSHClient 封裝 Transport
import paramiko

transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', password='123')

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')
print stdout.read()

transport.close()

 

  • 基於公鑰密鑰連接:

客戶端文件名:id_rsa

服務端必須有文件名:authorized_keys

import paramiko
 
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
 
# 創建SSH對象
ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務器
ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)
 
# 執行命令
stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結果
result = stdout.read()
 
# 關閉連接
ssh.close()

 

  • SSHClient 封裝 Transport
import paramiko

private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')

transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key)

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')

transport.close()

 

 

  • 基於私鑰字符串進行連接
import paramiko
from io import StringIO

key_str = """-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8
NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans
H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e
7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC
tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP
c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A
ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+
c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh
IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8
S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz
zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6
01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC
OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl
HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq
UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65
lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA
539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM
WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH
C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8
RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg
9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/
pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj
98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw
DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI
+MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0=
-----END RSA PRIVATE KEY-----"""

private_key = paramiko.RSAKey(file_obj=StringIO(key_str))
transport = paramiko.Transport(('10.0.1.40', 22))
transport.connect(username='wupeiqi', pkey=private_key)

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')
result = stdout.read()

transport.close()

print(result)

 

 

SFTPClient

用於連接遠程服務器並執行上傳下載

基於用戶名密碼上傳下載

import paramiko
 
transport = paramiko.Transport(('hostname',22))
transport.connect(username='wupeiqi',password='123')
 
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path
sftp.get('remove_path', 'local_path')
 
transport.close()
  • 基於公鑰密鑰上傳下載
import paramiko
 
private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
 
transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key )
 
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py
sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path
sftp.get('remove_path', 'local_path')
 
transport.close()

 

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import paramiko
import uuid

class Haproxy(object):

    def __init__(self):
        self.host = '172.16.103.191'
        self.port = 22
        self.username = 'wupeiqi'
        self.pwd = '123'
        self.__k = None

    def create_file(self):
        file_name = str(uuid.uuid4())
        with open(file_name,'w') as f:
            f.write('sb')
        return file_name

    def run(self):
        self.connect()
        self.upload()
        self.rename()
        self.close()

    def connect(self):
        transport = paramiko.Transport((self.host,self.port))
        transport.connect(username=self.username,password=self.pwd)
        self.__transport = transport

    def close(self):

        self.__transport.close()

    def upload(self):
        # 連接,上傳
        file_name = self.create_file()

        sftp = paramiko.SFTPClient.from_transport(self.__transport)
        # 將location.py 上傳至服務器 /tmp/test.py
        sftp.put(file_name, '/home/wupeiqi/tttttttttttt.py')

    def rename(self):

        ssh = paramiko.SSHClient()
        ssh._transport = self.__transport
        # 執行命令
        stdin, stdout, stderr = ssh.exec_command('mv /home/wupeiqi/tttttttttttt.py /home/wupeiqi/ooooooooo.py')
        # 獲取命令結果
        result = stdout.read()


ha = Haproxy()
ha.run()

 

 

十、協程

1.定義

協程:是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。

 

需要強調的是:

  1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其他線程運行)

  2. 單線程內開啟協程,一旦遇到io,從應用程序級別(而非操作系統)控制切換

對比操作系統控制線程的切換,用戶在單線程內控制協程的切換,優點如下:

  1.  協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級

  2. 單線程內就可以實現並發的效果,最大限度地利用cpu

 

要實現協程,關鍵在於用戶程序自己控制程序切換,切換之前必須由用戶程序自己保存協程上一次調用時的狀態,如此,每次重新調用時,能夠從上次的位置繼續執行

(詳細的:協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧)

 

為此,我們之前已經學習過一種在單線程下可以保存程序運行狀態的方法,即yield,我們來簡單復習一下:

  1. yiled可以保存狀態,yield的狀態保存與操作系統的保存線程狀態很像,但是yield是代碼級別控制的,更輕量級
  2. send可以把一個函數的結果傳給另外一個函數,以此實現單線程內程序之間的切換
#不用yield:每次函數調用,都需要重復開辟內存空間,即重復創建名稱空間,因而開銷很大
import time
def consumer(item):
    # print('拿到包子%s' %item)
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333

    pass
def producer(target,seq):
    for item in seq:
        target(item) #每次調用函數,會臨時產生名稱空間,調用結束則釋放,循環100000000次,則重復這么多次的創建和釋放,開銷非常大

start_time=time.time()
producer(consumer,range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time)) #30.132838010787964


#使用yield:無需重復開辟內存空間,即重復創建名稱空間,因而開銷小
import time
def init(func):
    def wrapper(*args,**kwargs):
        g=func(*args,**kwargs)
        next(g)
        return g
    return wrapper

@init
def consumer():
    x=11111111111
    x1=12111111111
    x3=13111111111
    x4=14111111111
    y=22222222222
    z=33333333333
    while True:
        item=yield
        # print('拿到包子%s' %item)
        pass
def producer(target,seq):
    for item in seq:
        target.send(item) #無需重新創建名稱空間,從上一次暫停的位置繼續,相比上例,開銷小

start_time=time.time()
producer(consumer(),range(100000000))
stop_time=time.time()
print('run time is:%s' %(stop_time-start_time)) #21.882073879241943

缺點:

協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程

協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程

 

協程的定義(滿足1,2,3就可稱為協程):

  1. 必須在只有一個單線程里實現並發
  2. 修改共享數據不需加鎖
  3. 用戶程序里自己保存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))

yield切換在沒有io的情況下或者沒有重復開辟內存空間的操作,對效率沒有什么提升,甚至更慢,為此,可以用greenlet來為大家演示這種切換

 

十一、Greenlet

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator

from greenlet import greenlet

def test1():
    print('test1,first')
    gr2.switch()
    print('test1,sencod')
    gr2.switch()
def test2():
    print('test2,first')
    gr1.switch()
    print('test2,sencod')


gr1=greenlet(test1)
gr2=greenlet(test2)
gr1.switch()
import time
from greenlet import greenlet
def eat(name):
    print('%s eat food 1' %name)
    gr2.switch('alex飛飛飛')
    print('%s eat food 2' %name)
    gr2.switch()
def play_phone(name):
    print('%s play 1' %name)
    gr1.switch()
    print('%s play 2' %name)

gr1=greenlet(eat)
gr2=greenlet(play_phone)
gr1.switch(name='egon啦啦啦')#可以在第一次switch時傳入參數,以后都不需要

 

十二、Gevent

Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。

 

g1=gevent.spawn()創建一個協程對象g1,

spawn括號內第一個參數是函數名,如eat,后面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的

 遇到IO阻塞時會自動切換任務

import gevent
import time


def eat():
    print('eat food 1')
    gevent.sleep(2) #等飯來
    print('eat food 2')

def play_phone():
    print('play phone 1')
    gevent.sleep(1) #網卡了
    print('play phone 2')



# gevent.spawn(eat)
# gevent.spawn(play_phone)
# print('主') # 直接結束


#因而也需要join方法,進程或現場的jion方法只能join一個,而gevent的join方法可以join多個

g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')

上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,

而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了

from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊之前

或者我們干脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭

from gevent import monkey;monkey.patch_all()

import gevent
import time


def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play_phone():
    print('play phone 1')
    time.sleep(1)
    print('play phone 2')



g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')

同步與異步

import gevent
 
def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)
 
def synchronous():
    for i in range(1,10):
        task(i)
 
def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)
 
print('Synchronous:')
synchronous()
 
print('Asynchronous:')
asynchronous()

上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,后者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完后才會繼續向下走。

#gevent線程的一些用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

 

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,'https://www.python.org/'),
    gevent.spawn(get_page,'https://www.yahoo.com/'),
    gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
協程應用:爬蟲

通過gevent實現單線程下的socket並發(from gevent import monkey;monkey.patch_all()一定要放到導入socket模塊之前,否則gevent無法識別socket的阻塞)

 

from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#如果不想用money.patch_all()打補丁,可以用gevent自帶的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print('client %s:%s msg: %s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)
服務端
#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
客戶端
from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM)
    c.connect((server_ip,port))

    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8080))
        t.start()
多線程並發多個客戶端

 

 

作業

題目:簡單主機批量管理工具

需求:

  1. 主機分組
  2. 主機信息配置文件用configparser解析
  3. 可批量執行命令、發送文件,結果實時返回,執行格式如下 
    1. batch_run  -h h1,h2,h3   -g web_clusters,db_servers    -cmd  "df -h" 
    2. batch_scp   -h h1,h2,h3   -g web_clusters,db_servers  -action put  -local test.py  -remote /tmp/ 
  4. 主機用戶名密碼、端口可以不同
  5. 執行遠程命令使用paramiko模塊
  6. 批量命令需使用multiprocessing並發


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM