python中多線程,多進程,多協程概念及編程上的應用


1, 多線程

 

  1.  線程是進程的一個實體,是CPU進行調度的最小單位,他是比進程更小能獨立運行的基本單位。
  2.  線程基本不擁有系統資源,只占用一點運行中的資源(如程序計數器,一組寄存器和棧),但是它可以與同屬於一個進程的其他線程共享全部的資源。
  3.  提高程序的運行速率,上下文切換快,開銷比較少,但是不夠穩定,容易丟失數據,形成死鎖。

 

直接上代碼:

import time
import threading

# 函數1用時2秒
def fun1():
    time.sleep(2)
    print(threading.current_thread().name, time.ctime())

# 函數2用時4秒
def fun2():
    time.sleep(4)
    print(threading.current_thread().name, time.ctime())

# 函數3用時6秒
def fun3():
    time.sleep(6)
    print('hello python', time.ctime())

th1 = threading.Thread(target=fun1)
th2 = threading.Thread(target=fun2)
th3 = threading.Thread(target=fun3)

th1.start()
th2.start()
th3.start()

打印結果:

Thread-1 Mon Jan  7     11:01:52 2019
Thread-2 Mon Jan  7     11:01:54 2019
hello python Mon Jan  7 11:01:56 2019

解析:從結果看出,他們同一時間 11:01:50開始執行,分別用了不同的時間結束

接着往下看,添加join阻塞線程

''''''

th1.start() th1.join() th2.start() th2.join() th3.start() th3.join()

打印結果:

Thread-1 Mon Jan 7     11:19:00 2019
Thread-2 Mon Jan 7     11:19:04 2019
hello python Mon Jan 7 11:19:10 2019

我們看到這三線程按順序依次執行。

我們接着看看線程的方法使用:

threading.enumerate()            #列舉線程,返回列表,其中里面會有一條主線程
threading.activeCount()          #查看線程運行個數
threading.current_thread().name     #查看當前運行線程名稱
join()                      #阻塞線程運行

 

我們接着看第二種開線程的方式:

import threading
import time

class MyThread(threading.Thread):
    def run(self):
        for i in range(3):
            time.sleep(1)
            msg = "I'm "+self.name+' @ '+str(i) #name屬性中保存的是當前線程的名字
            print(msg)

if __name__ == '__main__':
    t = MyThread()
    t.setName('yangzhenyu')
    a = t.isAlive()
    print(a)
    print(t.getName())

    t.start()
    b = t.isAlive()
    print(b)

打印結果:

False
yanzghenyu
True
I'm yanzghenyu @ 0
I'm yanzghenyu @ 1
I'm yanzghenyu @ 2

方法總結:

t.setName()    #設置運行線程名稱,不指定默認Thread-1 
t.getName()    #獲取線程名稱
t.isAlive()        #判斷線程是否運行,返回布爾類型   

 

線程間共享全局變量:

import threading
import time

n = 100

def work01():
    global n
    for i in range(3):
        n += 1
    print(n)                          //103

def work02():
    global n
    print(n)                         //103

print(n)                             //100

t1 = threading.Thread(target=work01)
t1.start()
time.sleep(
1)
t2
= threading.Thread(target=work02) t2.start()

關於線程鎖 

  1. 用threading.Lock()創建鎖,用acquire()申請鎖,每次只有一個線程獲得鎖,其他線程必須等此線程release()后才能獲得鎖
  2. RLock允許在同一線程中被多次acquire。而Lock卻不允許這種情況。
  3. 注意:如果使用RLock,那么acquire和release必須成對出現,即同一線程中調用了n次acquire,必須調用n次的release才能真正釋放所占用的瑣

下面例子中我們用到的是Lock(),當加完鎖之后,該方法同一時間內只能被一個線程調用。

import threading
mylock=threading.Lock()#創建鎖
num = 0
def add_num(name):
    global num
    while True:
        mylock.acquire()#申請鎖也就是加鎖
        print('thread %s locked! num=%d'%(name,num))
        if num>=5:
            print('thread %s release! num=%d'%(name,num))
            mylock.release()#釋放鎖
            return 
        num += 1
        print('thread %s release! num = %d'%(name,num))
        mylock.release()

t1 = threading.Thread(target=add_num,args=('A',))
t2 = threading.Thread(target=add_num,args=('B',))
t1.start()
t2.start()

打印結果:

thread A locked! num=0
thread A release! num = 1
thread A locked! num=1
thread A release! num = 2
thread A locked! num=2
thread A release! num = 3
thread A locked! num=3
thread A release! num = 4
thread A locked! num=4
thread A release! num = 5
thread A locked! num=5
thread A release! num=5
thread B locked! num=5
thread B release! num=5

 

cpu io密集型適合用多線程進行開發

 


 


 

關於進程:

  • 進程是系統進行資源分配的最小單位,每個進程都有自己的獨立內存空間,不用進程通過進程間通信來通信。
  • 但是進程占據獨立空間,比較重量級,所以上下文進程間的切換開銷比較大,但是比較穩定安全。

 

進程創建:

第一種創建進程的方式:

from multiprocessing import Process
import time
import random
import os


def piao(name):
    print("%s is piaoping"%name)
    time.sleep(random.randint(0,1))
    print("%s is piao end"%name)

if __name__ == '__main__':
    print("CPU的個數是:%d"%os.cpu_count())
    p1 = Process(target=piao,args=("alex",),name="進程1")
    print(p1.name)
    p1.start()
    print("父進程!") #執行速度要遠快於建立新進程的時間

 

打印結果:

CPU的個數是:2
進程1
父進程!
alex is piaoping
alex is piao end

 

第二種創建進程的方式:

from multiprocessing import Process
import time
import random

#繼承Process類,並實現自己的run方法

class Piao(Process):
    def __init__(self,name):
        #必須調用父類的init方法
        super().__init__()
        self.name = name

    def run(self):
        print("%s is piaoing"%self.name)
        time.sleep(random.randint(1,3))
        print("%s is piaoeng"%self.name)


if __name__ == '__main__':
    p1 = Piao("Alex")
    #開辟一個新的進程實際上就是執行本進程所對應的run()方法
    p1.start()
    print("主進程!")

結果:

主進程!
Alex is piaoing
Alex is piaoeng

解析:join括號中不攜帶參數,表示父進程在這個位置要等待p1進程執行完成后,如果指定參數,也就是等待時間s,那么主進程將在這個時間內結束,

     用is_active()  方法即可檢測進程的狀態,不加join() 返回True,表示進程還在進行。

 

進程的方法,

start()    啟動進程實例(創建子進程);
terminate():不管任務是否完成,立即終止;
name:    當前進程實例別名,默認為Process-N,N為從1開始遞增的整數;
pid:     當前進程實例的PID值;  os.getpid()
is_alive(): 判斷進程實例是否還在執行;
join([timeout]):是否等待進程實例執行結束,或等待多少秒;

 

進程池:

  在程序實際處理問題時,忙時會有成千上萬個任務需要執行,閑時有零星任務,創建時需要消耗時間,銷毀也需要時間,

即使開啟成千上萬個進程,操作系統也不能 讓他同時執行。這里就用到了進程池,用於管理小塊內存的申請與釋放。

,

1,上代碼:

from multiprocessing.pool import Pool
from time import sleep


def fun(a):
    sleep(1)
    print(a)

if __name__ == '__main__':
    p = Pool()   # 這里不加參數,但是進程池的默認大小,等於電腦CPU的核數
            # 也是創建子進程的個數,也是每次打印的數字的個數
    for i in range(10):
        p.apply_async(fun, args=(i,))
p.close() p.join()
# 等待所有子進程結束,再往后執行 print("end")

 

2,callback 舉例:

from multiprocessing import Process,Pool


def func(i):
    i+=1
    return i#普通進程處理過的數據返回給主進程p1

def call_back(p1):
    p1+=1
    print(p1)

if __name__ == '__main__':
    p = Pool()
    for i in range(10):
        p1 = p.apply_async(func,args=(i,),callback = call_back)#p調用普通進程並且接受其返回值,將返回值給要執行的回調函數處理
    p.close()
    p.join()

解析:   1,p.apply ( func,args = ())     同步的效率,也就是說池中的進程一個一個的去執行任務

      p.apply_async( func,args = () , callback = None) : 異步的效率,也就是池中的進程一次性都去執行任務.

    2,異步處理任務時 : 必須要加上close和join. 進程池的所有進程都是守護進程(主進程代碼執行結束,守護進程就結束). 

    3,func : 進程池中的進程執行的任務函數

       4,args : 可迭代對象性的參數,是傳給任務函數的參數

       5,callback : 回調函數,就是每當進程池中有進程處理完任務了,返回的結果可以交給回調函數,

                由回調函數進行進一步處理,回調函數只異步才有,同步沒有.回調函數是父進程調用.

 

3. map( func,iterable)  (該方法經常用到爬蟲)

from multiprocessing import Pool

def func(num):
    num += 1
    print(num)
    return num

if __name__ == '__main__':
    p = Pool(2)
    res = p.map(func,[i for i in range(100)])
    # p.close()#map方法自帶這兩種功能
    # p.join()
    print('主進程中map的返回值',res)

func : 進程池中的進程執行的任務函數

iterable : 可迭代對象,是把可迭代對象那個中的每個元素一次傳給任務函數當參數.

map方法自帶close和join

 

進程間的通信:

1)隊列

from multiprocessing import Queue,Process
import os,time,random

#添加數據函數
def proc_write(queue,urls):
    print("進程(%s)正在寫入..."%(os.getpid()))
    for url in urls:
        queue.put(url)
        print("%s被寫入到隊列中"%(url))
        time.sleep(random.random()*3)

#讀取數據函數
def proc_read(queue):
    print("進程(%s)正在讀取..."%(os.getpid()))

    while True:
        url = queue.get()
        print("從隊列中提取到:%s"%(url))

if __name__ =="__main__":
queue
= Queue() proc_writer1 = Process(target=proc_write,args=(queue,["ur1","ur2","ur3","ur4"])) proc_writer2 = Process(target=proc_write,args=(queue,["ur5","ur6","ur7","ur8"])) proc_reader = Process(target=proc_read,args=(queue,)) proc_writer1.start() proc_writer1.join() proc_writer2.start() proc_writer2.join() proc_reader.start() proc_reader.terminate()

生產者與消費者模式(線程間的通信):

from queue import Queue
import threading,time


class Producer(threading.Thread):
    def run(self):
        global queue
        count = 0
        while True:
            if queue.qsize() < 1000:
                for i in range(100):
                    count = count +1
                    msg = '生成產品'+str(count)
                    queue.put(msg)
                    print(msg)
            time.sleep(0.5)

class Consumer(threading.Thread):
    def run(self):
        global queue
        while True:
            if queue.qsize() > 100:
                for i in range(3):
                    msg = self.name + '消費了 '+queue.get()
                    print(msg)
            time.sleep(1)


if __name__ == '__main__':
    queue = Queue()

    for i in range(500):
        queue.put('初始產品'+str(i))
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()

 

2) 進程間的通信(管道)

from multiprocessing import Pipe,Process
import random,time,os

def proc_send(pipe,urls):
    for url in urls:
        print("進程(%s)發送:%s"%(os.getpid(),url))
        pipe.send(url)
        time.sleep(random.random())

def proc_recv(pipe):
    while True:
        print("進程(%s)接收到:%s"%(os.getpid(),pipe.recv()))
        time.sleep(random.random())

if __name__ == "__main__":
    pipe = Pipe()
    p1 = Process(target=proc_send,args=(pipe[0],["url_"+str(i) for i in range(10)],))   
    p2 = Process(target=proc_recv,args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p2.terminate()

解析:

  pipe用於兩個進程間的通信,兩個進程分別位於管道的兩端,Pipe方法返回(conn1,conn2)代表一個管道的兩端,

  Pipe方法有dumplex參數,若該參數為True,管道為全雙工模式,

  若為Fasle,conn1只負責接收消息,conn2只負責發送消息.send和recv方法分別是發送和接收消息的方法

 



 

 

協程:

協程:是更小的執行單位,是一種輕量級的線程,協程的切換只是單純的操作CPU的上下文,所以切換速度特別快,且耗能小。

gevent是第三方庫,通過greenlet實現協程,其基本思想是:

當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標准庫,這一過程在啟動時通過monkey patch完成:

from gevent import monkey

monkey.patch_all()  # 用來在運行時動態修改已有的代碼,而不需要修改原始代碼。

import gevent
import requests


def f(url):
    print('GET: %s' % url)
    html = requests.get(url).text
    print(url, len(html))


gevent.joinall([
    gevent.spawn(f, 'http://i.maxthon.cn/'),  # 先執行這個函數,發送請求,等待的時候發送第二個請求
    gevent.spawn(f, 'http://www.jianshu.com/u/3cfeb3395a95'),
    gevent.spawn(f, 'http://edu.51cto.com/?jydh')])

運行結果:

GET: http://i.maxthon.cn/
GET: http://www.jianshu.com/u/3cfeb3395a95
GET: http://edu.51cto.com/?jydh
http://i.maxthon.cn/ 461786
http://edu.51cto.com/?jydh 353858
http://www.jianshu.com/u/3cfeb3395a95 597

從結果看,3個網絡操作是並發執行的,而且結束順序不同,但只有一個線程。

使用gevent,可以獲得極高的並發性能,但gevent只能在Unix/Linux下運行,在Windows下不保證正常安裝和運行。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM