python之線程和進程(並發編程)


python的GIL

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行

線程

1同步鎖

2死鎖遞歸鎖

3:信號量和同步對象(了解)

4隊列------生產者消費者模型

5進程

 

線程的基本調用

 

<python的線程與threading模塊>

import threading  # 線程
import time


def Hi(num):
    print('hello %d' % num)
    time.sleep(3)

if __name__ == '__main__':
    # 第一個參數是要執行的函數名,第二個是函數的參數(必須是可迭代對象)
    t1 = threading.Thread(target=Hi, args=(10, ))  # 創建一個線程對象
    t1.start()  # 開啟線程
    t2 = threading.Thread(target=Hi, args=(9, ))  # 創建一個線程對象
    t2.start()  # 開啟線程
    print('ending....')
#   這就是並發的現象

#   並行:指的是兩個或者多個事件在同一時刻發生(同時調用多核)
#   並發:指的是兩個或者多個事件在同一時間間隔內發生(在一個核內快速的切換)

第二種調用方式

import threading
import time


class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num

    def run(self):#定義每個線程要運行的函數

        print("running on number:%s" %self.num)

        time.sleep(3)

if __name__ == '__main__':

    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()
    
    print("ending......")

join和setDaemon

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 18-5-22 下午8:45
# @Author  : LK
# @File    : lesson2.py
# @Software: PyCharm

import threading
import time

def music():
    print('begin listen music %s'%time.ctime())
    # t = input('請輸入內容>>>')
    # time.sleep(3)
    print('stop listen music %s'%time.ctime())
    # print(t)


def game():
    print('begin to game %s'%time.ctime())
    time.sleep(5)
    print('stop to game %s'%time.ctime())

if __name__ == '__main__':
    t1 = threading.Thread(target=music)
    t2 = threading.Thread(target=game)

    t1.start()
    t2.start()

    # t1.join()   #   join就是等待的意思,讓該線程執行完畢后,在執行主線程
    # t2.join()   # 注意如果注釋這一句,結果是什么

    print('ending....')

    print(t1.getName())  #  獲取線程名,
join
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 18-5-22 下午9:26
# @Author  : LK
# @File    : 守護線程.py
# @Software: PyCharm

#   守護線程 就是:和主線程一起退出,如果主線程結束了,那么不管守護的線程,有沒有執行完畢,都退出
import threading
import time


def music():
    print('begin listen music %s' % time.ctime())
    time.sleep(3)
    print('stop listen music %s' % time.ctime())


def game():
    print('begin to game %s' % time.ctime())
    time.sleep(5)
    print('stop to game %s' % time.ctime())


t1 = threading.Thread(target=music)
t2 = threading.Thread(target=game)
threads = []
threads.append(t1)
threads.append(t2)
if __name__ == '__main__':

    # t1.setDaemon(True)
    t2.setDaemon(True)
    for t in threads:
        t.start()
        # t.setDaemon(True) # 守護線程, 就是和主線程一塊結束
    print('ending....')
setDaemon

 

join():在子線程完成運行之前,這個子線程的父線程將一直被阻塞。

setDaemon(True):

         將線程聲明為守護線程,必須在start() 方法調用之前設置, 如果不設置為守護線程程序會被無限掛起。這個方法基本和join是相反的。

         當我們 在程序運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程 就分兵兩路,分別運行,那么當主線程完成

         想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成后再退出。但是有時候我們需要的是 只要主線程

         完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法啦

其他方法

# run():  線程被cpu調度后自動執行線程對象的run方法
# start():啟動線程活動。
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設置線程名。

threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

同步鎖lock

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 18-5-23 下午4:42
# @Author  : LK
# @File    : 同步鎖.py
# @Software: PyCharm
import threading, time

#   用多線程去調用一個每次減1的函數,里面讓他停留一會就會切換cpu,模擬線程安全問題,引出同步鎖(互斥鎖)


def sub():
    global num
    #  這樣坐是0,因為num-=1 這個執行的很快,沒有到達時間片切換就執行完了,每次取的都是不同的值
    # num -= 1
    # 如果這里讓他停一會,就會發生,前面的一些拿到的值為100,然后開始停下來,讓其他線程取值
    # 取得值還可能是100, 再過幾個線程,取的時候就是前面的線程執行完了(到達時間片切換了)
    #  取得是就可能是其他值(前面幾個線程執行后的結果),最終結果就不在是0
    #   這就是線程安全問題,多個線程都在處理同一個數據,
    # 如果處理的太慢(sleep)就會發生多個線程處理的值都是原始的值(多個線程處理一個值)

    #   處理方法: 就是在執行這三條語句時,不讓cpu切換,知道處理完成時,再切換(同步鎖)
    '''
    定義一個鎖:lock = threading.Lock()
    lock.acquire()  #   鎖起來(獲取)
    這里是要執行的語句,
    lock.release()   #釋放
    這里的不執行完,不切換cpu
    '''
    lock.acquire()
    temp = num
    time.sleep(0.001)
    num = temp - 1
    lock.release()


num = 100
l = []
lock = threading.Lock()
for i in range(100):
    t = threading.Thread(target=sub)
    l.append(t)
    t.start()

for t in l:
    t.join()
print(num)
同步鎖(互斥鎖)

死鎖(遞歸鎖)

#   產生死鎖:
'''
第一個線程執行doLockA, 然后lockB, 執行完actionA, 都釋放了
開始執行actionB,對B上鎖,不允許切換cpu,與此同時第二個線程開始執行actionA,對A上鎖
不允許切換cpu,兩個線程同時需要對方的資源釋放但是都沒有釋放,所以死鎖
(就像你問我要個香蕉, 我問你要個蘋果, 你說我先給你,我說你先給我,於是就死鎖了)
'''
'''
解決方法:
用threading.RLock() 去替換鎖A和鎖B
rlock就是在內部實現一個計數器,在同一個線程中,加鎖就+1,釋放就減一,只要鎖數大於0,
其他線程就不能申請鎖,
就是執行過程就是:當A函數執行完之后,所有線程開始搶占資源,誰搶到誰開始執行,
比如線程2搶到, 線程2開始執行A函數,A沒有執行完之前其他線程不能夠繼續執行
'''
 
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 18-5-23 下午8:33
# @Author  : LK
# @File    : 遞歸鎖_死鎖.py
# @Software: PyCharm
import threading, time

#   產生死鎖:
'''
第一個線程執行doLockA, 然后lockB, 執行完actionA, 都釋放了
開始執行actionB,對B上鎖,不允許切換cpu,與此同時第二個線程開始執行actionA,對A上鎖
不允許切換cpu,兩個線程同時需要對方的資源釋放但是都沒有釋放,所以死鎖
(就像你問我要個香蕉, 我問你要個蘋果, 你說我先給你,我說你先給我,於是就死鎖了)
'''


class myThread(threading.Thread):
    def actionA(self):
        lockA.acquire()
        print(self.name, 'doLockA', time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name, 'doLockB', time.ctime())
        lockB.release()
        lockA.release()

    def actionB(self):
        lockB.acquire()
        print(self.name, 'doLockB', time.ctime())
        time.sleep(1)
        lockA.acquire()
        print(self.name, 'doLockA', time.ctime())
        lockA.release()
        lockB.release()

    def run(self):
        self.actionA()
        time.sleep(0.5)
        self.actionB()


if __name__ == '__main__':
    lockA = threading.Lock()
    lockB = threading.Lock()
    r_lock = threading.RLock()
    '''
    解決方法:
    用threading.RLock() 去替換鎖A和鎖B
    rlock就是在內部實現一個計數器,在同一個線程中,加鎖就+1,釋放就減一,只要鎖數大於0,
    其他線程就不能申請鎖,
    就是執行過程就是:當A函數執行完之后,所有線程開始搶占資源,誰搶到誰開始執行,
    比如線程2搶到, 線程2開始執行A函數,A沒有執行完之前其他線程不能夠繼續執行
    '''

    threads = []
    for i in range(5):
        '''創建5個線程對象'''
        threads.append(myThread())
    for t in threads:
        t.start()
    for i in threads:
        t.join()
    print('ending.....')
死鎖(遞歸鎖)

信號量與同步對象

信號量, 也是一種鎖的機制, 在那個遞歸鎖中的同時事件的安全問題中
里面是多個線程同時搶占資源, 但是這個是只允許指定的個數,去搶占
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 18-5-24 下午4:45
# @Author  : LK
# @File    : 信號量.py
# @Software: PyCharm

'''
信號量, 也是一種鎖的機制, 在那個遞歸鎖中的同時事件的安全問題中
里面是多個線程同時搶占資源, 但是這個是只允許指定的個數,去搶占
'''
import threading, time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(3)
            semaphore.release()

if __name__ == '__main__':

    #   信號量, 參數的意思是同時允許幾個線程去執行
    semaphore = threading.Semaphore(5)

    threads = []
    for i in range(10):
        threads.append(myThread())

    for t in threads:
        t.start()
    for t in threads:
        t.join()
信號量

同步對象

event

'''
event 同步對象標志位,就是在一個線程中設定后,在其他線程中也能捕獲到
需求:一個boss類,一個worker類,
當boss對象執行后,worker才執行,就需要設置一個同步的標志位,用來判斷
該執行那個線程
'''
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 18-5-23 下午9:38
# @Author  : LK
# @File    : 同步對象.py
# @Software: PyCharm

'''
event 同步對象標志位,就是在一個線程中設定后,在其他線程中也能捕獲到
需求:一個boss類,一個worker類,
當boss對象執行后,worker才執行,就需要設置一個同步的標志位,用來判斷
該執行那個線程
'''
'''
執行過程:
剛開始有5個worker線程和1個boss線程執行,
當worker執行時,wait沒有被設定,一直在阻塞,但是boss可以執行
於是設定event並開始等5秒, 同時5個worker中的標志位變了,可以繼續執行了
1秒后,worker清除event標志,繼續阻塞,又過了4秒boss又設置了,
同時worker又可以執行了
'''
import threading, time
class Boss(threading.Thread):
    def run(self):
        print(self.name,'boss今天大家要加班到22點')
        print(event.isSet())  # false,判斷是否標志
        event.set()
        time.sleep(5)
        print(self.name,'boss: <22:00>可以下班了')
        print(event.isSet())
        event.set()


class Worker(threading.Thread):
    def run(self):
        event.wait()  # 一旦event被設定 就等同於pass, 不設定就阻塞等待被設定
        print(self.name,'worker:哎呀生活苦呀,')
        time.sleep(1)
        event.clear()
        event.wait()
        print(self.name,'worker: oyear下班了')


#   如果不用event的話,就可能導致worker先說話,但是需求是讓boss先說話,
#   因為cpu是搶占的方式執行的,不會按照需求走,所以就要引入同步標志位event
if __name__ == '__main__':
    event = threading.Event()
    threads =[]

    #   創建5個worker線程,1個boss線程
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())

    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print('ending....')
同步對象

 

隊列----多線程常用方法

隊列的引入

'''
用多個線程同時對列表進行刪除最后一個值,
會報錯, 是因為可能會有多個線程同時取到最后一個值,都進行刪除,
就會報錯, 解決方法:可以用鎖,將其鎖住,(相當於串行的執行), 或者用隊列
'''
'''
l = [1,2,3,4,5]
def fun():
    while 1:
        if not l:
            break
        a = l[-1]
        print(a)
        time.sleep(1)
        l.remove(a)
if __name__ == '__main__':
    t1 = threading.Thread(target=fun, args=())
    t2 = threading.Thread(target=fun, args=())
    t1.start()
    t2.start()
'''
隊列的引入
Python Queue模塊有三種隊列及構造函數:
1、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize)
2、LIFO類似於堆,即先進后出。 class queue.LifoQueue(maxsize)
3、還有在一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize)
import queue

#   創建一個隊列,里面最多可以存放3個數據
q = queue.Queue(maxsize=3)  # FIFO,先進先出
# 將一個值入隊
q.put(10)
q.put(9)
q.put(8)
#   如果這里在加上一個put, 就是有4個數據入隊,但是空間不夠,就會一直阻塞,直到,隊列有空間時(從隊列中取出get)
#   block參數默認是True, 如果改成False,當隊列滿的時候如果繼續入隊,就不會堵塞而是報錯queue.Full
# q.put(9, block=False)
while 1:
    '''
    將一個值從隊列中取出
    q.get()
    調用隊列對象的get()
    方法從隊頭刪除並返回一個項目。可選參數為block,默認為True。如果隊列為空且block為True,
    get()
    就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
    '''
    # 取完之后將會阻塞, block=Fales,如果隊列為空時,繼續出隊,就會報錯,queue.Empty
    #   get類似與put
    data = q.get(block=False)
    print(data)
調用一
# 先進后出  LIFO 類似與棧
import queue
# 先進后出  LIFO 類似與棧
q = queue.LifoQueue(maxsize=4)
q.put('你好')
q.put(123)
q.put({"name":"lucky"})

while 1:
    data = q.get()
    print(data)
調用二
# 還有在一種是優先級隊列級別越低越先出來1級最低。class queue.PriorityQueue(maxsize)
import queue
q = queue.PriorityQueue(maxsize=3)
q.put([2,'你好'])
q.put([3,123,'aa'])
q.put([1,{"name":"lucky"}])

while 1:
    data = q.get()
    # print(data)
    print(data[1])  #   只顯示數據,
調用三
import queue
q = queue.Queue(maxsize=4)
q.put('a')
q.put('b')
q.put('c')
print(q.qsize())  # 打印當前隊列的大小,現在隊列中有幾個值
print(q.full())  # 判斷是否滿
print(q.empty())  # 判斷是否為空
# q.put_nowait(3)  #  相當於q.put(3, block=Flase)
# q.task_done() 在完成一項工作之后,q.task_done() 函數向任務已經完成的隊列發送一個信號
# q.join() 實際上意味着等到隊列為空,再執行別的操作
while 1:
    data = q.get()
    print(data)
    print(q.qsize())  # 打印當前隊列的大小,現在隊列中有幾個值
    print(q.full())   # 判斷是否滿
    print(q.empty())  # 判斷是否為空
隊列的其他方法

 

進程的基本使用

多進程模塊 multiprocessing

進程的使用和線程的使用和調用的方法基本一樣

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 18-5-25 下午5:31
# @Author  : LK
# @File    : 測試進程和線程.py
# @Software: PyCharm
from threading import Thread
from multiprocessing import Process
import time

def fun():
    print(time.ctime())
if __name__ == '__main__':
    threads = []
    for i in range(1000):
        # t = Thread(target=fun)  # 線程
        t = Process(target=fun)   # 進程
        threads.append(t)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
from  multiprocessing import Process
import time

def f(name):
    time.sleep(1)
    print('hello  %s'%name, time.ctime())

if __name__ == '__main__':
    p_list = []
    for t in range(3):
        p = Process(target=f, args=('luck',))
        p_list.append(p)

    #   三個進程時在同一時刻開啟的
    for p in p_list:
        p.start()
    for p in p_list:
        p.join()

    print('ending....')
進程的調用方法一
class myProcess(Process):
    def __init__(self):
        # 父類初始化, 不傳參數不用寫
        super(myProcess, self).__init__()
    def run(self):
        time.sleep(1)
        print(self.name,'hello ',time.ctime())

if __name__ == '__main__':
    p_list = []
    for i in range(3):
        p_list.append(myProcess())
    for p in p_list:
        p.start()
    for p in p_list:
        p.join()
進程的調用方法二

查看進程號

#   查查看進程的id
#   os.getppid()獲得當前進程的父進程,os.getpid()獲得當前進程
#   os.getppid 是pycharm的進程號
'''
from multiprocessing import Process
import os
import time


def info(title):
    print("title:", title)
    print('父進程 id:', os.getppid())
    print('當前進程 id:', os.getpid())
    # time.sleep(3)


if __name__ == '__main__':
    info('主進程')
    time.sleep(1)
    print("------------------")
    p = Process(target=info, args=('lucky子進程',))
    p.start()
    p.join()
    p_list = []
    for i in range(3):
        p = Process(target=info, args=('子進程%d' % i,))
        p_list.append(p)
    for p in p_list:
        p.start()
    for p in p_list:
        p.join()
查看進程號,進程之間的關系理解
Process 類
構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前還沒有實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程准備就緒,等待CPU調度

  run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():不管任務是否完成,立即停止工作進程

屬性:

  daemon:和線程的setDeamon功能一樣

  name:進程名字。


'''

from multiprocessing import Process
import time

class myProcess(Process):
    def __init__(self, args):
        super(myProcess,self).__init__()
        self.args = args
    def fun(self):
        time.sleep(1)
        print(self.name, self.is_alive(), self.args, self.pid)
        time.sleep(1)

    def run(self):
        self.fun()



if __name__ == '__main__':
    p_list = []
    for i in range(10):
        # p = Process(target=fun, args=('子進程%d' % i,))
        p_list.append(myProcess('子進程%d' % i))
    # p_list[-2].daemon = True
    # 因為這三個進程是同時執行的,所以這個屬性不明顯
    for p in p_list:
        p.start()
    for p in p_list:
        p.join()
進程模塊的其他方法

進程間的通信

隊列,這里的隊列和線程里的隊列調用方法不一樣

#   什么是進程通信,為什么要進程通信
#   因為進程與進程之間是相互獨立的,不像線程可以信息共享,要想讓進程間相互信息共享,就要傳參數(利用隊列)

from multiprocessing import Process, Queue # 進程隊列
import time
import queue   # 線程隊列

#   主進程get數據,子進程put數據,
#   阻塞的原因是,這個隊列的數據不共享,在子進程中隊列和主進程的隊列信息不共享,沒有任何關系
#   結局方法,將隊列作為參數傳過去
#   注意在win上面不傳參數不能運行,但是在linux下能運行
#  所以輕易不要用多進程,進行進程間通信時,會涉及到copy資源
# def fun(q):
# '''  隊列
def fun():
    q.put(1)
    q.put(2)
    q.put(3)
    print(id(q))
    # print(q.empty())
# def fun2():
def fun2(q):
    while 1:
        # print(q.empty())
        data = q.get()
        print(data)
        print(id(q))

if __name__ == '__main__':
    # q = queue.Queue()  # 線程隊列,
    q = Queue()          # 進程隊列
    # p = Process(target=fun, args=(q,))  # 傳參進行了copy占用資源
    p = Process(target=fun)
    p.start()
    p.join()
    p2 = Process(target=fun2)
    p2 = Process(target=fun2, args=(q,))
    p2.start()
    p2.join()

 

管道

from multiprocessing import Pipe
def fun(child_conn):
    child_conn.send([12, {"name":"yuan"}, 'hello'])
    print('主進程說:',child_conn.recv())
    print('子進程id',id(child_conn))

if __name__ == '__main__':
    prepare_conn, child_conn = Pipe()  # 雙向管道
    print('id:',id(prepare_conn), id(child_conn))
    p = Process(target=fun, args=(child_conn,))
    p.start()
    print('子進程說:',prepare_conn.recv())
    prepare_conn.send('你好')
    print('主進程id',id(prepare_conn))
    p.join()

Managers

Queue和pipe只是實現了數據交互,並沒有實現數據共享,即一個進程去更改另外一個進程的數據

 
Managers支持的數據類型
# A manager returned by Manager() will support types list, dict, Namespace,
# Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier,
#  Queue, Value and Array. For example:

from multiprocessing import Manager
# Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另一個進程的數據。
# 所以利用Managers 進行數據共享, 同樣也需要傳參

def fun(d, l, i):
    d[i]='字典'+ str(i)
    l.append(i)
    # pass
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()  #  d = {} 創建一個字典
        l = manager.list(range(5)) # l = range(5)
        p_list = []
        for i in range(5):
            p = Process(target=fun,args=(d,l,i,))
            p.start()
            p_list.append(p)
        for p in p_list:
            p.join()

        print(d)
        print(l)
Managers模塊

進程同步

#   進程鎖, lock
from multiprocessing import Process, Lock
import time

def fun(i):
    lock.acquire()
    print('%d'%i)
    lock.release()
if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=fun, args=(i,)).start()
進程同步,鎖

進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,
如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
apply 同步
apply_async 異步(常用)

在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多台主機,並行操作可以節約大量的時間。多進程是實現並發的手段之一,需要注意的問題是:

  1. 很明顯需要並發執行的任務通常要遠大於核數
  2. 一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程
  3. 進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多余核數目的進程也無法做到並行)

例如當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。

我們就可以通過維護一個進程池來控制進程數目,比如httpd的進程模式,規定最小進程數和最大進程數... 
ps:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

 

'''
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,
如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
apply   同步
apply_async   異步(常用)
'''
from multiprocessing import Process, Pool
import time, os

def foo(i):
    time.sleep(1)
    print('子進程:',os.getpid(),i)
    return 'Hello %s'%i
def bar(args):
    # print('',os.getpgid())
    print(args)
    print('主進程:',os.getpid())

if __name__ == '__main__':
    pool = Pool(5)   #  進程池中開5個進程, 有100個任務需要執行, 每次執行5個
    print('main :',os.getpid())
    for i in range(100):
        #   回調函數:某個動作或者 函數執行成功后,再去執行的函數
        # pool.apply_async(func=foo, args=(i,))
        #   bar作為回調函數, 每次執行一個進程后都會執行回調函數, 回調函數是主進程中調用的
        #   func中函數的返回值,傳給回調函數做參數
        pool.apply_async(func=foo, args=(i,), callback=bar)
    #   close和join必須加,而且順序固定
    pool.close()
    pool.join()
進程池

回調函數

 回掉函數:

需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

我們可以把耗時間(阻塞)的任務放到進程池中,然后指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def pasrse_page(res):
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了
進程池應用

協程

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 18-5-26 上午10:19
# @Author  : LK
# @File    : 協程.py
# @Software: PyCharm
from greenlet import greenlet


#   優缺點
'''
def test1():
    print(12)
    g2.switch()  #  切換到g2 
    print(24)
    g2.switch()  #  切換到g2
def test2():
    print(56)
    g1.switch()  #  切換到g1
    print(65)
    g1.switch()

if __name__ ==  '__main__':
    g1 = greenlet(test1)
    g2 = greenlet(test2)
    g2.switch()
    
'''
'''
import gevent

import requests,time


start=time.time()

def f(url):
    print('GET: %s' % url)
    resp =requests.get(url)
    data = resp.text
    f = open('new.html', 'w', encoding='utf-8')
    f.write(data)
    f.close()
    print('%d bytes received from %s.' % (len(data), url))

#   這就相當於一個協程, 每次進行讀寫操作阻塞時,都會切換cpu
gevent.joinall([

        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://www.baidu.com/'),
        gevent.spawn(f, 'https://www.sina.com.cn/'),

])

# f('https://www.python.org/')
#
# f('https://www.yahoo.com/')
#
# f('https://baidu.com/')
#
# f('https://www.sina.com.cn/')

print("cost time:",time.time()-start)

'''
# import threading, gevent
# def fun():
#     a = input('>>>')
#     print(a)
# def fun2():
#     print('\nhello')
# # gevent.joinall([
# #
# #         gevent.spawn(fun()),
# #         gevent.spawn(fun2()),
# # ])
# if __name__ == '__main__':
#     # t1 = threading.Thread(target=fun).start()
#     # t2 = threading.Thread(target=fun2).start()
#     gevent.joinall([
# 
#             gevent.spawn(fun),
#             gevent.spawn(fun2),
#     ])
未完整

協程參考:

http://www.cnblogs.com/linhaifeng/articles/7429894.html

協程,協作式----- 非搶占式的程序

自己調動 

Yield(協程)

用戶態的切換呢

關鍵點在哪一步切換

協程主要解決的是io操作

協程:本質上就是一個線程

協程的優勢:

1:沒有切換的消耗

2:沒有鎖的概念

但是不能用多核,所以用多進程+協程,是一個很好的解決並發的方案

 

驅動事件

 

 

@font-face{ font-family:"Times New Roman"; } @font-face{ font-family:"宋體"; } p.MsoNormal{ mso-style-name:正文; mso-style-parent:""; margin:0pt; margin-bottom:.0001pt; mso-pagination:none; text-align:left; font-family:'Times New Roman'; font-size:12.0000pt; mso-font-kerning:1.0000pt; } span.msoIns{ mso-style-type:export-only; mso-style-name:""; text-decoration:underline; text-underline:single; color:blue; } span.msoDel{ mso-style-type:export-only; mso-style-name:""; text-decoration:line-through; color:red; } @page{mso-page-border-surround-header:no; mso-page-border-surround-footer:no;}@page Section0{ } div.Section0{page:Section0;}

Io多路復用

Select 觸發方式

1:水平處罰

2:邊緣觸發

3:IO多路復用優勢:同時可以監聽多個鏈接

 

Select  負責監聽

 

IO多路復用:

Select  那個平台都有

Poll

Epoll 效率最高

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM