python並行編程


一、編程思想

並行編程的思想:分而治之,有兩種模型

1.MapReduce:將任務划分為可並行的多個子任務,每個子任務完成后合並得到結果

例子:統計不同形狀的個數。

先通過map進行映射到多個子任務,分別統計個數,然后在用reduce進行歸納一下。

2.流水:將任務分為串行的多個子任務,每個子任務並行。ProductConsume

例子:

多個生產者進行並行,多個消費者進行並行。生產者生產出來東西放到隊列里;隊列里有東西時,消費者就可以進行消費,這樣雙方沒有太大的依賴關系。

 

為什么要並行編程呢?

多核,雲計算,使得實現並行編程的條件更容易滿足。

大數據(導致數據多),機器學習(復雜),高並發,使得並行編程很必要。

 

為什么很少用呢?

任務分割,共享數據的訪問,死鎖,互斥,信號量,利用管道,隊列通信。線程,進程的管理。

這些概念使得並行編程的實現看上去很難

 

怎么學並行編程?

庫:  Threading,實現多線程

    Multiprocess,實現多進程

    Parallepython,實現分布式計算,同時解決CPU和網絡資源受限問題。

    Celery+RabbitMQ/Redis,可實現分布式任務隊列 Django和它搭配可實現異步任務隊列

    Gevent,可實現高效異步IO,協成

 

2.進程和線程

CPU同一時刻只能調度一個進程,進程之間memory獨立,進程內線程共享memory。

 

我們主要解決的問題是:

進程間通信問題;

線程間同步問題

 

例子:計算10000000000自減到0,然后用多進程和多線程計算,看看他們用時多久

 

# -*- coding: utf-8 -*-
# CopyRight by heibanke

import time
from threading import Thread
from multiprocessing import Process

def countdown(n):
    while n > 0:
        n -= 1
        
        
COUNT = 100000000 # 1億

def thread_process_job(n, Thread_Process, job):
    """
    n: 多線程或多進程數
    Thread_Process: Thread/Process類 
    job: countdown任務
    """
    local_time=time.time()

    #實例化多線程或多進程
    threads_or_processes = [Thread_Process(target=job,args=(COUNT//n,)) for i in xrange(n)]#學習這種寫法,很高大上,把不同的類放到列表里邊
    #threads_or_processes中保存了三個Thread_process個對象
    
    for t in threads_or_processes:
        t.start() #開始線程或進程,必須調用
    for t in threads_or_processes:
        t.join() #等待直到該線程或進程結束
        #join的作用是阻塞進程,直到所有的線程執行完畢之后,才可以執行后邊的語句
    
    print n,Thread_Process.__name__," run job need ",time.time()-local_time
    

if __name__=="__main__":
    print "Multi Threads"
    for i in [1,2,4]:
        thread_process_job(i,Thread, countdown)
        
    print "Multi Process"
    for i in [1,2,4]:
        thread_process_job(i,Process, countdown)        

輸出結果:

從結果中看出來,多線程時,隨着線程的增多,時間反而更多;多進程隨着進程的增多,時間變少。原因是python的GIL機制

GIL

當有多個線程的時候,並不是真的是並行運行的,實際上有一個鎖,誰申請到了誰運行

在python的原始解釋器CPython中存在着GIL(Global Interpreter Lock,全局解釋器鎖),因此在解釋執行python代碼時,會產生互斥鎖來限制線程對共享資源的訪問,直到解釋器遇到I/O操作或者操作次數達到一定數目時才會釋放GIL。

所以,雖然CPython的線程庫直接封裝了系統的原生線程,但CPython整體作為一個進程,同一時間只會有一個獲得GIL的線程在跑,其他線程則處於等待狀態。這就造成了即使在多核CPU中,多線程也只是做着分時切換而已。

所以它更適合處理I/O密集型的任務,不適合處理CPU密集型的任務。

不過muiltprocessing的出現,已經可以讓多進程的python代碼編寫簡化到了類似多線程的程度了。(鏈接:https://www.zhihu.com/question/23474039/answer/35418893)



這是兩個線程在運行,並不是並行,而是串行,紅色的線表示在申請cpu

 

四個線程在運行

 

進程可以快,而線程反而慢的原因是,我的電腦有多個核,進程可以進行並行的,而線程在python里邊還是串行的,申請cpu也需要花費時間的

 

 

三、I/O密集型任務

I/O密集型任務是諸如頻繁的磁盤讀取,或者通過網絡進行獲取數據,如爬蟲

比如,第一個線程運行,然后遇到I/O請求,這個I/O請求不會滿上滿足你,所以就切換到線程2上進行,過了會兒,線程2也有I/O請求,所以切換到線程3,然后線程3也有I/O請求,此時線程1的I/O請求完成,然后切換到線程1運行……

 

舉例:對韓寒博客進行爬取

步驟:

1.獲取urls;

2.將urls分給不同的進程或線程(相當於分配子任務);

3.多進程/線程抓取

 

分析:韓寒網站是:http://blog.sina.com.cn/s/articlelist_1191258123_0_1.html

如圖示,第幾頁,對應的相應的網址的箭頭部位就變成幾,所以我們用下面語句獲取每一頁的內容:

    for i in xrange(7):
        #這里的extend要注意,是在list后邊接上
        #list.extend(seq):在列表末尾一次性追加另一個序列中的多個值(用新列表擴展原來的列表)
        #str(i+1),因為我們從網站中可以看到,每一頁的變化,第一頁網站的此處就是1,第二頁就是2,以此類推,一共七頁
        urls.extend(parseTarget('http://blog.sina.com.cn/s/articlelist_1191258123_0_'+str(i+1)+'.html'))

 

 總的程序如下所示:

#!/usr/bin/env python
# coding: utf-8
#copyRight by heibanke
 
import urllib
import os
import re
from threading import Thread
from multiprocessing import Process
import time

def downloadURL(urls,dirpath):
    """
    urls: 需要下載的url列表
    dirpath: 下載的本地路徑
    """
    for url in urls:
        if len(url)>0:
            #print "current process id is ",os.getpid()
            content = urllib.urlopen(url).read()
            if not os.path.exists(dirpath):
                os.makedirs(dirpath)
            #dirpath實際上是文件夾,例如是1Process,+后邊的文件夾中的文件名字
            #把從網址中讀到的所有信息存到該文件中
            #url[-26:]是取url的倒數26個,'w'是寫模式
            open(dirpath+r'/'+url[-26:],'w').write(content)
            
def parseTarget(url):
    """
    根據目標url獲取文章列表的urls
    """
    urls=[]
    content=urllib.urlopen(url).read()
    #{.*} -- 盡可能多的吸取匹配字符串 (貪婪模式)
    #{.*?} -- 只要一匹配到,就不再往后吸取字符 (懶惰模式) 
    pattern = r'<a title=(.*?) href="(.*?)">'
    hrefs = re.findall(pattern,content)
         
    for href in hrefs:
        urls.append(href[1])#只把href讀取出來
 
    return urls   
    
def thread_process_job(n, Thread_or_Process, url_list, job):
    """
    n: 多線程或多進程數
    Thread_Process: Thread/Process類 
    job: countdown任務
    """
    local_time=time.time()
    threads_or_processes = [Thread_or_Process(target=job,args=(url_list[i],str(n)+Thread_or_Process.__name__)) for i in xrange(n)]
    for t in threads_or_processes:
        t.start()
    for t in threads_or_processes:
        t.join()
    
    print n,Thread_or_Process.__name__," run job need ",time.time()-local_time
    
if __name__=="__main__":

    t=time.time()

    urls=[]#urls是列表,和numpy中的array要區分開
    for i in xrange(7):
        #這里的extend要注意,是在list后邊接上
        #list.extend(seq):在列表末尾一次性追加另一個序列中的多個值(用新列表擴展原來的列表)
        #str(i+1),因為我們從網站中可以看到,每一頁的變化,第一頁網站的此處就是1,第二頁就是2,以此類推,一共七頁
        urls.extend(parseTarget('http://blog.sina.com.cn/s/articlelist_1191258123_0_'+str(i+1)+'.html'))
       
    url_len = len(urls)
    
    print "total urls number is ",url_len
    
    for n in [8,4,2,1]:
        #將urls分割到url_list
        url_list=[]

        #從Python2.2開始,增加了一個操作符 // ,以執行地板除://除法不管操作數為何種數值類型,
        #總是會舍去小數部分,返回數字序列中比真正的商小的最接近的數字。
        url_split_len = url_len//n
        for i in xrange(n):
            if i==n-1:
                #和上邊的extend區別開來
                #list.append(obj):在列表末尾添加新的對象,所以它被作為單獨整體加入的
                #extend是飛散成一個一個的被加入的,這里注意區別
                url_list.append(urls[i*url_split_len:url_len])
            else:
                url_list.append(urls[i*url_split_len:(i+1)*url_split_len])
        #分割任務后創建線程
        #url_list_len=len(url_list)
        #print "total urls_list number is ",url_list_len
        thread_process_job(n,Thread, url_list, downloadURL)
        thread_process_job(n,Process, url_list, downloadURL)

    print "All done in ",time.time()-t

輸出結果顯示:

>>> 
total urls number is  315
8 Thread  run job need  33.6749999523
8 Process  run job need  33.5950000286
4 Thread  run job need  40.2200000286
4 Process  run job need  90.7750000954
2 Thread  run job need  86.0289998055
2 Process  run job need  87.0989999771
1 Thread  run job need  131.422999859
1 Process  run job need  123.995000124
All done in  629.394000053
>>> 

由於網速等原因,時間上會有起伏。

 

上述代碼有一個地方有問題,就是在生成目錄的時候,有可能會發生一個進程/線程在創建着目錄,另一個進程/線程發現沒有目錄,然后也創建目錄的情況。

 

四、LOCK鎖

我們可以用lock鎖來保護公共資源。

還是上邊用的生產者和消費者模型

 

沒有lock的時候:

#!/usr/bin/env python
# coding: utf-8
#copyRight by heibanke

import time
import random
import threading
 
#當還剩下0個產品時,則不進行消費,待生產者生產
#當生產了100個產品時,則不進行生產,待消費者消費
 

#生產者
class Producer(threading.Thread):
    def __init__(self, product,filename):
        self.product = product
        self.file = filename
        threading.Thread.__init__(self)
 
    def run(self):
        while len(self.product)<100:
            tmp = random.randint(0,10)
            self.product.append(tmp)
            print "add %d, product = %s" %(tmp,str(self.product))
            fp=open(self.file,'a')
            fp.write("add %d, product = %s\n" %(tmp,str(self.product)))
            fp.close()
            time.sleep(0.1)
            #time.sleep(random.randrange(5))
 

#消費者
class Consumer(threading.Thread):
    def __init__(self, product, filename):
        self.product = product
        self.file = filename
        threading.Thread.__init__(self)
 
    def run(self):
        while True:
                if len(self.product)>0:
                    tmp = self.product[0]
                    del self.product[0]
                    print 'consum %d, product = %s'%(tmp,str(self.product))
                    fp=open(self.file,'a')
                    fp.write('consum %d, product = %s\n'%(tmp,str(self.product)))
                    fp.close()
                time.sleep(0.1)
                #time.sleep(random.randrange(4))

 
if __name__ == '__main__':
    product = [] #產品初始化時為0

    for i in range(5):#五個生產者
        p = Producer(product,'log.txt')
        p.start()
 
    for i in range(3):#三個消費者
        s = Consumer(product,'log.txt')
        s.start()

會出錯。

 

有鎖的時候:

 1 #!/usr/bin/env python
 2 # coding: utf-8
 3 #copyRight by heibanke
 4 
 5 import time
 6 import random
 7 import threading
 8  
 9 #當還剩下0個產品時,則不進行消費,待生產者生產
10 #當生產了100個產品時,則不進行生產,待消費者消費
11  
12 lock = threading.Condition()
13 
14 #生產者
15 class Producer(threading.Thread):
16     def __init__(self, lock, product,filename):
17         self._lock = lock
18         self.product = product
19         self.file = filename
20         threading.Thread.__init__(self)
21  
22     def run(self):
23         while True:
24             if self._lock.acquire():
25                 if len(self.product) >= 100:
26                     self._lock.wait()
27                 else:
28                     tmp = random.randint(0,10)
29                     self.product.append(tmp)
30                     print "add %d, product = %s" %(tmp,str(self.product))
31                     fp=open(self.file,'a')
32                     fp.write("add %d, product = %s\n" %(tmp,str(self.product)))
33                     fp.close()
34                 self._lock.notify()
35                 self._lock.release()
36                 time.sleep(0.1)
37                 #time.sleep(random.randrange(5))
38  
39 
40 #消費者
41 class Consumer(threading.Thread):
42     def __init__(self, lock, product, filename):
43         self._lock = lock
44         self.product = product
45         self.file=filename
46         threading.Thread.__init__(self)
47  
48     def run(self):
49         while True:
50             if self._lock.acquire():
51                 if len(self.product)== 0:
52                     self._lock.wait()
53                 else:
54                     tmp = self.product[0]
55                     del self.product[0]
56                     print 'consum %d, product =%s'%(tmp,str(self.product))
57                     fp=open(self.file,'a')
58                     fp.write('consum %d, product = %s\n'%(tmp,str(self.product)))
59                     fp.close()
60                 self._lock.notify()
61                 self._lock.release()
62                 time.sleep(0.1)
63                 #time.sleep(random.randrange(4))
64 
65  
66 if __name__ == '__main__':
67     product = [] #產品初始化時為0
68     for i in range(5):
69         p = Producer(lock,product,'log_lock.txt')
70         p.start()
71  
72     for i in range(3):
73         s = Consumer(lock,product,'log_lock.txt')
74         s.start()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM