Python自動化運維之16、線程、進程、協程、queue隊列


一、線程

1、什么是線程

  線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。

一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務

2、基本使用

(1)創建線程的兩種方式

直接調用(常用)

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

def f1(arg):   # 定義每個線程要執行的函數
    time.sleep(0.1)
    print(arg,threading.current_thread())    # threading.current_thread()詳細的線程信息

for i in range(10):    # 創建10個線程並發執行函數
    t = threading.Thread(target=f1,args=('python',))   # args是函數的參數,元組最后一個必須要逗號,
    t.start()   # 啟動線程

print(t.getName())  # 可以獲取主線程的名字

繼承調用

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

class MyThread(threading.Thread):   # 繼承threading.Thread類
    def __init__(self,func,args):
        self.func = func
        self.args = args
        super(MyThread,self).__init__()  # 執行父類的構造方法

    def run(self):   # run()方法,是cpu調度線程會使用的方法,必須是run()方法
        self.func(self.args)

def f2(arg):
    time.sleep(0.1)
    print(arg,threading.current_thread())

for i in range(10):   # 創建10個線程
    obj = MyThread(f2,123)
    obj.start()

(2)更多方法  

自己還可以為線程自定義名字,通過 t = threading.Thread(target=f1, args=(i,), name='mythread{}'.format(i)) 中的name參數,除此之外,Thread還有一下一些方法

t.join(n)       表示主線程等待子線程多少時間,n表示主線程等待子線程的超時時間,如果在n時間內子線程未完成,主線程不在等待,執行后面的代碼
t.run()         線程被cpu調度后自動執行線程對象的run方法(一般我們無需設置,除非自己定義類調用)
t.start()       線程准備就緒,等待CPU調度
t.getName()     獲取線程的名稱
t.setName()     設置線程的名稱 
t.name          獲取或設置線程的名稱
t.is_alive()    判斷線程是否為激活狀態
t.isAlive()     判斷線程是否為激活狀態
t.isDaemon()    判斷是否為守護線程
t.setDaemon     設置True或False(默認)
                   True表示主線程不等待子線程全部完成就執行后面的代碼
                   False默認值,標識主線程等待子線程全部執行完后繼續執行后面的代碼
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

def f1(arg):
    time.sleep(5)
    print(arg)

t = threading.Thread(target=f1,args=('python',))
t.setDaemon(True) # 默認是False,設置為true表示主線程不等子線程
t.start()  
t.join(2)  # 表示主線程到此,等待子線程執行完畢,2表示主線程最多等待2秒

print('end') # 默認主線程在等待子線程結束
print('end')
print('end')

3、線程鎖(Lock、RLock)  

  由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖 - 同一時刻允許一個線程執行操作。這里使用Rlock,而不使用Lock,因為Lock如果多次獲取鎖的時候會出錯,而RLock允許在同一線程中被多次acquire,但是需要用n次的release才能真正釋放所占用的瑣,一個線程獲取了鎖在釋放之前,其他線程只有等待。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

NUM = 10

def func(l):
    global NUM
    # 上鎖
    l.acquire()
    NUM -=1
    time.sleep(0.1)
    print(NUM,threading.current_thread())
    # 開鎖
    l.release()

# lock = threading.Lock()
lock = threading.RLock()  # 遞歸鎖

for j in range(10):
    t = threading.Thread(target=func,args=(lock,))
    t.start()

4、信號量(Semaphore)

  互斥鎖同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

NUM = 30

def func(i,l):
    global NUM
    # 上鎖
    l.acquire()
    NUM -=1
    time.sleep(1)
    print(NUM,i,threading.current_thread())
    # 開鎖
    l.release()

lock = threading.BoundedSemaphore(5)  # 設置信號量5,表示同時5個線程同時執行

for i in range(30):
    t = threading.Thread(target=func,args=(i,lock,))
    t.start()

5、事件(event)

  python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個"Flag",如果"Flag"值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果"Flag"值為True,那么event.wait 方法時便不再阻塞。

  • clear:將"Flag"設置為False
  • set:  將"Flag"設置為True
  • wait: 檢測當前"Flag",如果"Flag"值為 False,那么當線程執行 event.wait 方法時就會阻塞,如果"Flag"值為True,那么event.wait 方法時便不再阻塞。

下面是一個紅綠燈的例子,主線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading

def func(i,e):
    print(i)
    e.wait()  # 檢測是什么燈,如果是True紅燈,停;綠燈False行,默認是紅燈
    print(i+100)


event = threading.Event()

for i in range(10):
    t = threading.Thread(target=func,args=(i,event,))
    t.start()

event.clear()  # 主動設置成紅燈,默認是紅燈,此句可以不寫
inp = input('>>>')
if inp == '1':
    event.set()  # 設置成綠燈,就會執行func()函數中print(i+100)語句

6、條件(Condition)

使得線程等待,只有滿足某條件時,才釋放n個線程

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading

def func(i,con):
    print(i)
    con.acquire()  # 固定寫法acquire,wait
    con.wait()
    print(i+100)
    con.release()

c = threading.Condition()  # 設置條件

for i in range(10):
    t = threading.Thread(target=func,args=(i,c,))
    t.start()


while True:
    inp = input('>>>')
    if inp == 'q':
        break
    c.acquire() # 這里是固定寫法,acquire,notify,release
    c.notify(int(inp))
    c.release()

第二種

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading

def condition():
    ret = False
    r = input('>>>')
    if r == 'true':
        ret = True
    else:
        ret = False
    return ret

def func(i,con):
    print(i)
    con.acquire()
    con.wait_for(condition)  # 和上一個例子的差別在這里
    print(i+100)
    con.release()

c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func,args=(i,c,))
    t.start()

6、Timer

定時器,指定n秒后執行某操作

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from threading import Timer

def hello():
    print("hello python")

t = Timer(1,hello)
t.start() 

7、線程池,點擊這里  

二、進程  

  線程的上一級就是進程,進程可包含很多線程,進程和線程的區別是進程間的數據不共享,多進程也可以用來處理多任務,不過多進程很消耗資源,計算型的任務最好交給多進程來處理,IO密集型最好交給多線程來處理,此外進程的數量應該和cpu的核心數保持一致。  

1、線程與進程的區別

1、線程共享創建它的進程的地址空間,進程有自己的地址空間。
2、線程是直接可以訪問線程之間的數據;進程需要復制父進程的數據才能訪問。
3、線程可以直接與其他線程的通信過程,進程必須使用進程間通信和同胞交流過程。
4、新創建一個線程很容易;新創建一個進程需要復制父進程。
5、主線程可以控制相當大的線程在同一進程中;進程只能控制子進程。
6、主線程變更(取消、優先級變化等)可能會影響進程的其他線程的行為;父進程的變化不會影響子進程。

2、基本使用

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process

def foo(i):
    print('say hi',i)

for i in range(10):
    p = Process(target=foo,args=(i,))
    #p.daemon = True  # 和線程t.setdaemon是一樣的
    p.start()
    #p.join()

注意:由於進程之間的數據需要各自持有一份,所以創建進程需要的非常大的開銷。其他使用方法和線程threading.Thread是一樣的  

3、進程數據共享

進程各自持有一份數據,默認無法共享數據;queues,Array,Manager.dict,pipe這些方法都能實現數據共享

(1)特殊隊列queues()

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing

def foo(i,arg):
    arg.put(i)
    print('say hi',i,arg.qsize())

li = queues.Queue(20,ctx=multiprocessing)

for i in range(10):
    p = Process(target=foo,args=(i,li,))
    p.start()

(2)數組Array()

數組和列表很像,但是數組中的元素在內存中的地址是一段連續的空間地址,而列表中的元素則不是一段連續的的地址,是通過鏈表的形式找到下一個元素

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process
from multiprocessing import Array

def foo(i,arg):
    arg[i] = i+100
    for item in arg:
        print(item)

li = Array('i',10)  # 指定數組時需要指定類型
for i in range(10):
    p = Process(target=foo,args=(i,li,))
    p.start()
    'c': ctypes.c_char,  'u': ctypes.c_wchar,
    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int,   'I': ctypes.c_uint,
    'l': ctypes.c_long,  'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
類型對應表

(3)Manager.dict()

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process
from multiprocessing import Manager

def foo(i,arg):
    arg[i] = i +100
    print(arg.values())

obj = Manager()
li = obj.dict()
for i in range(10):
    p = Process(target=foo,args=(i,li,))
    p.start()
    p.join()

(4)pipe()

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())   # 父進程可以收到子進程的共享信息prints "[42, None, 'hello']" 
p.join()

4、進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:

  • apply
  • apply_async
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Pool
import time

def f1(arg):
    time.sleep(1)
    print(arg)

pool = Pool(5)

for i in range(30):  # 定義30個任務
    #pool.apply(func=f1,args=(i,))  # 所有進程串行執行沒有多大意義
    pool.apply_async(func=f1,args=(i,)) # 異步並行執行

pool.close() #等待所有的任務執行完畢
#time.sleep(1)
#pool.terminate()  # 立即終止子進程的任務,主進程繼續執行
pool.join() # 執行pool.join時必須先執行pool.close或者pool.terminate
            # 進程池中進程執行完畢后在關閉,如果注釋,那么程序直接關閉close,terminate也無效
print('end')

三、協程

  線程和進程的操作是由程序觸發系統接口,最后的執行者是系統;協程的操作則是程序員。

協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。

協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用於協程;由greenlet,gevent實現,gevent是調用greenlet進行封裝;需要安裝pip install greenlet;pip install gevent;

greenlet  

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
 
from greenlet import greenlet
 
 
def test1():
    print 12
    gr2.switch()
    print 34
    gr2.switch()
 
 
def test2():
    print 56
    gr1.switch()
    print 78
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

gevent

import gevent
 
def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')
 
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')
 
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

遇到IO操作自動切換:此操作在python2.x中執行的,urllib2不支持python3.x

from gevent import monkey; monkey.patch_all()
import gevent
import urllib2

def f(url):
    print('GET: %s' % url)
    resp = urllib2.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])

四、queue隊列  

queue有哪些隊列? 

  • queue.Queue(maxsize) 先進先出隊列
  • queue.LifoQueue(maxsize) 后進先出
  • queue.PriorityQueue(maxsize) 優先級隊列
  • queue.deque(maxsize) 雙向隊列

先進先出

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue

q = queue.Queue(5)  # 默認maxsize=0無限接收,最大支持的個數
print(q.empty())    # 查看隊列是否為空
q.put(11)           # put防數據,是否阻塞默認是阻塞block=True,timeout超時時間
q.put(22)
q.put(33,block=False,timeout=2)
print(q.full())     # 查看隊列是否已經放滿
print(q.qsize())    # 隊列中多少元素
print(q.maxsize)    # 隊列最大支持的個數
print(q.get(block=False,timeout=2))  # get取數據,是否阻塞默認是阻塞block=True,timeout超時時間
print("*" * 10)

print(q.get())
q.task_done()       # join配合task_done,隊列中有任務就會阻塞進程,當隊列中的任務執行完畢之后,不在阻塞
print(q.get())
q.task_done()
q.join()            # 隊列中還有元素的話,程序就不會結束程序,只有元素被取完配合task_done執行,程序才會結束

后進先出

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue

q = queue.LifoQueue()
q.put(123)
q.put(456)
print(q.get())

優先級隊列 

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue

q = queue.PriorityQueue()
q.put((1,'python1'))
q.put((5,'python'))
q.put((3,'python3'))
print(q.get())

雙向隊列  

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue

q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
print(q.pop())
print(q.popleft())

更多請查看官方文檔

生產者消費者模型

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import time
q = queue.Queue()

def productor(arg):
    while True:
        q.put(str(arg))
        print('%s 號窗口有票' %str(arg))
        time.sleep(1)

for i in range(3):
    t = threading.Thread(target=productor,args=(i,))
    t.start()


def consumer(arg):
    while True:
        print('第 %s 人取 %s 號窗口票' %(str(arg),q.get()))
        time.sleep(1)

for j in range(300):
    t = threading.Thread(target=consumer,args=(j,))
    t.start()

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM