python並發編程(並發與並行,同步和異步,阻塞與非阻塞)


  最近在學python的網絡編程,學了socket通信,並利用socket實現了一個具有用戶驗證功能,可以上傳下載文件、可以實現命令行功能,創建和刪除文件夾,可以實現的斷點續傳等功能的FTP服務器。但在這當中,發現一些概念區分起來很難,比如並發和並行,同步和異步,阻塞和非阻塞,但是這些概念卻很重要。因此在此把它總結下來。

1. 並發 & 並行

  並發:在操作系統中,是指一個時間段中有幾個程序都處於已啟動運行到運行完畢之間,且這幾個程序都是在同一個處理機上運行,但任一個時刻點上只有一個程序在處理機上運行。簡言之,是指系統具有處理多個任務的能力。

  並行:當系統有一個以上CPU時,則線程的操作有可能非並發。當一個CPU執行一個線程時,另一個CPU可以執行另一個線程,兩個線程互不搶占CPU資源,可以同時進行,這種方式我們稱之為並行(Parallel)。簡言之,是指系統具有同時處理多個任務的能力。

  下面我們來兩個例子:

import threading #線程
import time


def music():
    print('begin to listen music {}'.format(time.ctime()))
    time.sleep(3)
    print('stop to listen music {}'.format(time.ctime()))


def game():
    print('begin to play game {}'.format(time.ctime()))
    time.sleep(5)
    print('stop to play game {}'.format(time.ctime()))


if __name__ == '__main__':
    music()
    game()
    print('ending.....')
View Code

music的時間為3秒,game的時間為5秒,如果按照我們正常的執行,直接執行函數,那么將按順序順序執行,整個過程8秒。

import threading #線程
import time


def music():
    print('begin to listen music {}'.format(time.ctime()))
    time.sleep(3)
    print('stop to listen music {}'.format(time.ctime()))


def game():
    print('begin to play game {}'.format(time.ctime()))
    time.sleep(5)
    print('stop to play game {}'.format(time.ctime()))


if __name__ == '__main__':
    t1 = threading.Thread(target=music) #創建一個線程對象t1 子線程
    t2 = threading.Thread(target=game) #創建一個線程對象t2 子線程

    t1.start()
    t2.start()

    # t1.join() #等待子線程執行完 t1不執行完,誰也不准往下走
    t2.join()

    print('ending.......') #主線程
    print(time.ctime())
View Code

  在這個例子中,我們開了兩個線程,將music和game兩個函數分別通過線程執行,運行結果顯示兩個線程同時開始,由於聽音樂時間3秒,玩游戲時間5秒,所以整個過程完成時間為5秒。我們發現,通過開啟多個線程,原本8秒的時間縮短為5秒,原本順序執行現在是不是看起來好像是並行執行的?看起來好像是這樣,聽音樂的同時在玩游戲,整個過程的時間隨最長的任務時間變化。但真的是這樣嗎?那么下面我來提出一個GIL鎖的概念。

GIL(全局解釋器鎖):無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行。
import time
from threading import Thread


def add():
    sum = 0
    i = 1
    while i<=1000000:
        sum += i
        i += 1
    print('sum:',sum)


def mul():
    sum2 = 1
    i = 1
    while i<=100000:
        sum2 = sum2 * i
        i += 1
    print('sum2:',sum2)


start = time.time()

add()
mul() #串行比多線程還快

print('cost time %s'%(time.time()-start))
View Code

 

import time
from threading import Thread


def add():
    sum = 0
    i = 1
    while i<=1000000:
        sum += i
        i += 1
    print('sum:',sum)


def mul():
    sum2 = 1
    i = 1
    while i<=100000:
        sum2 = sum2 * i
        i += 1
    print('sum2:',sum2)


start = time.time()
t1 = Thread(target=add)
t2 = Thread(target=mul)

l = []
l.append(t1)
l.append(t2)

for t in l:
   t.start()

for t in l:
    t.join()

print('cost time %s'%(time.time()-start))
View Code

 

  哎吆,這是怎么回事,串行執行比多線程還快?不符合常理呀。是不是顛覆了你的人生觀,這個就和GIL鎖有關,同一時刻,系統只允許一個線程執行,那么,就是說,本質上我們之前理解的多線程的並行是不存在的,那么之前的例子為什么時間確實縮短了呢?這里有涉及到一個任務的類型。

--任務: 1.IO密集型(會有cpu空閑的時間)  注:sleep等同於IO操作, socket通信也是IO  
2.計算密集型
  而之前那個例子恰好是IO密集型的例子,后面這個由於涉及到了加法和乘法,屬於計算密集型操作,那么,就產生了一個結論,多線程對於IO密集型任務有作用,
而計算密集型任務不推薦使用多線程。
 
 而其中我們還可以得到一個結論:由於GIL鎖,多線程不可能真正實現並行,所謂的並行也只是宏觀上並行微觀上並發,本質上是由於遇到io操作不斷的cpu切換
所造成並行的現象。由於cpu切換速度極快,所以看起來就像是在同時執行。
  --問題:沒有利用多核的優勢
    --這就造成了多線程不能同時執行,並且增加了切換的開銷,串行的效率可能更高。

2. 同步 & 異步

  對於一次IO訪問(以read舉例),數據會先被拷貝到操作系統內核的緩沖區中,然后才會從操作系統內核的緩沖區拷貝到應用程序的地址空間。所以說,當一個read操作發生時,它會經歷兩個階段:
     1. 等待數據准備 (Waiting for the data to be ready)
     2. 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)
同步:當進程執行IO(等待外部數據)的時候,-----等。同步(例如打電話的時候必須等)
異步:當進程執行IO(等待外部數據)的時候,-----不等,去執行其他任務,一直等到數據接收成功,再回來處理。異步(例如發短信)
當我們去爬取一個網頁的時候,要爬取多個網站,有些人可能會發起多個請求,然后通過函數順序調用。執行順序也是先調用先執行。效率非常低。
下面我們看一下異步的一個例子:
import socket
import select

"""
########http請求本質,IO阻塞########
sk = socket.socket()
#1.連接
sk.connect(('www.baidu.com',80,)) #阻塞
print('連接成功了')
#2.連接成功后發送消息
sk.send(b"GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n")

#3.等待服務端響應
data = sk.recv(8096)#阻塞
print(data) #\r\n\r\n區分響應頭和影響體

#關閉連接
sk.close()
"""
"""
########http請求本質,IO非阻塞########
sk = socket.socket()
sk.setblocking(False)
#1.連接
try:
    sk.connect(('www.baidu.com',80,)) #非阻塞,但會報錯
    print('連接成功了')
except BlockingIOError as e:
    print(e)

#2.連接成功后發送消息
sk.send(b"GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n")

#3.等待服務端響應
data = sk.recv(8096)#阻塞
print(data) #\r\n\r\n區分響應頭和影響體

#關閉連接
sk.close()
"""


class HttpRequest:
    def __init__(self,sk,host,callback):
        self.socket = sk
        self.host = host
        self.callback = callback

    def fileno(self):
        return self.socket.fileno()


class HttpResponse:
    def __init__(self,recv_data):
        self.recv_data = recv_data
        self.header_dict = {}
        self.body = None

        self.initialize()

    def initialize(self):
        headers, body = self.recv_data.split(b'\r\n\r\n', 1)
        self.body = body
        header_list = headers.split(b'\r\n')
        for h in header_list:
            h_str = str(h,encoding='utf-8')
            v = h_str.split(':',1)
            if len(v) == 2:
                self.header_dict[v[0]] = v[1]


class AsyncRequest:
    def __init__(self):
        self.conn = []
        self.connection = [] # 用於檢測是否已經連接成功

    def add_request(self,host,callback):
        try:
            sk = socket.socket()
            sk.setblocking(0)
            sk.connect((host,80))
        except BlockingIOError as e:
            pass
        request = HttpRequest(sk,host,callback)
        self.conn.append(request)
        self.connection.append(request)

    def run(self):

        while True:
            rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05)
            for w in wlist:
                print(w.host,'連接成功...')
                # 只要能循環到,表示socket和服務器端已經連接成功
                tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n"  %(w.host,)
                w.socket.send(bytes(tpl,encoding='utf-8'))
                self.connection.remove(w)
            for r in rlist:
                # r,是HttpRequest
                recv_data = bytes()
                while True:
                    try:
                        chunck = r.socket.recv(8096)
                        recv_data += chunck
                    except Exception as e:
                        break
                response = HttpResponse(recv_data)
                r.callback(response)
                r.socket.close()
                self.conn.remove(r)
            if len(self.conn) == 0:
                break


def f1(response):
    print('保存到文件',response.header_dict)


def f2(response):
    print('保存到數據庫', response.header_dict)


url_list = [
    {'host':'www.youku.com','callback': f1},
    {'host':'v.qq.com','callback': f2},
    {'host':'www.cnblogs.com','callback': f2},
]

req = AsyncRequest()
for item in url_list:
    req.add_request(item['host'],item['callback'])

req.run()
View Code

  我們可以看到,三個請求發送順序與返回順序,並不一樣,這樣就體現了異步請求。即我同時將請求發送出去,哪個先回來我先處理哪個。

  即我們可以理解為:我打電話的時候只允許和一個人通信,和這個人通信結束之后才允許和另一個人開始。這就是同步。

           我們發短信的時候發完可以不去等待,去處理其他事情,當他回復之后我們再去處理,這樣就大大解放了我們的時間。這就是異步。

  體現在網頁請求上面就是我請求一個網頁時候等待他回復,否則不接收其它請求,這就是同步。另一種就是我發送請求之后不去等待他是否回復,而去處理其它請求,當處理完其他請求之后,某個請求說,我的回復了,然后程序轉而去處理他的回復數據。這就是異步請求。所以,異步可以充分cpu的效率。

3. 阻塞 & 非阻塞

  調用blocking IO會一直block住對應的進程直到操作完成,而non-blocking IO在kernel還准備數據的情況下會立刻返回。
下面我們通過socket實現一個命令行功能來感受一下。
  
#服務端
from socket import *
import subprocess
import struct

ip_port = ('127.0.0.1', 8000)
buffer_size = 1024
backlog = 5

tcp_server = socket(AF_INET, SOCK_STREAM)
tcp_server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
tcp_server.bind(ip_port)
tcp_server.listen(backlog)

while True:
    conn, addr = tcp_server.accept()
    print('新的客戶端鏈接:', addr)
    while True:
        try:
            cmd = conn.recv(buffer_size)
            print('收到客戶端命令:', cmd.decode('utf-8'))

            #執行命令cmd,得到命令的結果cmd_res
            res = subprocess.Popen(cmd.decode('utf-8'),shell=True,
                                   stderr=subprocess.PIPE,
                                   stdout=subprocess.PIPE,
                                   stdin=subprocess.PIPE,
                                   )
            err = res.stderr.read()
            if err:
                cmd_res = err
            else:
                cmd_res = res.stdout.read()
            if not cmd_res:
                cmd_res = '執行成功'.encode('gbk')
            #解決粘包
            length = len(cmd_res)
            data_length = struct.pack('i',length)
            conn.send(data_length)
            conn.send(cmd_res)
        except Exception as e:
            print(e)
            break
    conn.close()


#客戶端
from socket import *

ip_port = ('127.0.0.1',8000)
buffer_size = 1024
backlog = 5

tcp_client = socket(AF_INET,SOCK_STREAM)
tcp_client.connect(ip_port)

while True:
    cmd = input('>>:').strip()
    if not cmd:
        continue
    if cmd == 'quit':
        break
    tcp_client.send(cmd.encode('utf-8'))

    #解決粘包
    length = tcp_client.recv(4)
    length = struct.unpack('i',length)[0]

    recv_size = 0
    recv_msg = b''
    while recv_size < length:
        recv_msg += tcp_client.recv(buffer_size)
        recv_size = len(recv_msg)

    print(recv_msg.decode('gbk'))
View Code

開啟了服務器和一個客戶端之后,我們在客戶端輸入一些命令,然后正確顯示,功能實現。這是在我再打開一個客戶端,輸入命令,發現服務器遲遲沒有響應。

這個就是當一個客戶端在請求的時候,當這個客戶端沒有結束的時候,服務器不會去處理其他客戶端的請求。這時候就阻塞了。

如何讓服務器同時處理多個客戶端請求呢?

#服務端
import socketserver


class Myserver(socketserver.BaseRequestHandler):
    """socketserver內置的通信方法"""
    def handle(self):
        print('conn is:',self.request)  #conn
        print('addr is:',self.client_address)  #addr

        while True:
            try:
                #發消息
                data = self.request.recv(1024)
                if not data:break
                print('收到的客戶端消息是:',data.decode('utf-8'),self.client_address)

                #發消息
                self.request.sendall(data.upper())
            except Exception as e:
                print(e)
                break


if __name__ == '__main__':
    s = socketserver.ThreadingTCPServer(('127.0.0.1',8000), Myserver)  #通信循環
    # s = socketserver.ForkingTCPServer(('127.0.0.1',8000), Myserver)  #通信循環
    print(s.server_address)
    print(s.RequestHandlerClass)
    print(Myserver)
    print(s.socket)
    s.serve_forever()

#客戶端

from socket import *

ip_port = ('127.0.0.1',8000)
buffer_size = 1024
backlog = 5

tcp_client = socket(AF_INET,SOCK_STREAM)
tcp_client.connect(ip_port)

while True:
    msg = input('>>:').strip()
    if not msg:continue
    if msg == 'quit':break

    tcp_client.send(msg.encode('utf-8'))

    data = tcp_client.recv(buffer_size)
    print(data.decode('utf-8'))

tcp_client.close()
View Code

這段代碼通過socketserver模塊實現了socket的並發。這個過程中,當一個客戶端在向服務器請求的時候,另一個客戶端也可以正常請求。服務器在處理一個客戶端請求的時候,另一個請求沒有被阻塞。

總結:只要有一丁點阻塞,就是阻塞IO。

   異步IO的特點就是全程無阻塞。

   有些人常把同步阻塞和異步非阻塞聯系起來,但實際上經過分析,阻塞與同步,非阻塞和異步的定義是不一樣的。同步和異步的區別是遇到IO請求是否等待。阻塞和非阻塞的區別是數據沒准備好的情況下是否立即返回。關於這兩點的區別,看了很多博客,最后總結如下。

阻塞和非阻塞指的是執行一個操作是等操作結束再返回,還是馬上返回。

比如餐館的服務員為用戶點菜,當有用戶點完菜后,服務員將菜單給后台廚師,此時有兩種方式:

  • 第一種:就在出菜窗口等待,直到廚師炒完菜后將菜送到窗口,然后服務員再將菜送到用戶手中;
  • 第二種:等一會再到窗口來問廚師,某個菜好了沒?如果沒有先處理其他事情,等會再去問一次;

第一種就是阻塞方式,第二種則是非阻塞的。

  同步和異步又是另外一個概念,它是事件本身的一個屬性。還拿前面點菜為例,服務員直接跟廚師打交道,菜出來沒出來,服務員直接指導,但只有當廚師將菜送到服務員手上,這個過程才算正常完成,這就是同步的事件。同樣是點菜,有些餐館有專門的傳菜人員,當廚師炒好菜后,傳菜員將菜送到傳菜窗口,並通知服務員,這就變成異步的了。其實異步還可以分為兩種:帶通知的和不帶通知的。前面說的那種屬於帶通知的。有些傳菜員干活可能主動性不是很夠,不會主動通知你,你就需要時不時的去關注一下狀態。這種就是不帶通知的異步。

對於同步的事件,你只能以阻塞的方式去做。而對於異步的事件,阻塞和非阻塞都是可以的。非阻塞又有兩種方式:主動查詢和被動接收消息。被動不意味着一定不好,在這里它恰恰是效率更高的,因為在主動查詢里絕大部分的查詢是在做無用功。對於帶通知的異步事件,兩者皆可。而對於不帶通知的,則只能用主動查詢。

  但是對於非阻塞和異步的概念有點混淆,非阻塞只是意味着方法調用不阻塞,就是說作為服務員的你不用一直在窗口等,非阻塞的邏輯是"等可以讀(寫)了告訴你",但是完成讀(寫)工作的還是調用者(線程)服務員的你等菜到窗口了還是要你親自去拿。而異步意味這你可以不用親自去做讀(寫)這件事,你的工作讓別人(別的線程)來做,你只需要發起調用,別人把工作做完以后,或許再通知你,它的邏輯是“我做完了 告訴/不告訴 你”,他和非阻塞的區別在於一個是"已經做完"另一個是"可以去做"。

這也是NIO和AIO最大的區別,就是NIO在有通知時可以進行相關操作,而AIO有通知時則代表操作已經完成

再舉一個例子:

  去書店借一本書,同步就是我要親自到書店,問老板有沒有這本書,阻塞就是老板查詢的時候(讀寫)我只能在那等着,老板找到書后把書交給我,這就是同步阻塞。

  我親自到書店借書,老板在找這本書的時候,我可以去干別的,然后每隔一段時間去問老板書找到了沒有,也可以等老板找到書以后通知我,這就是同步非阻塞。

  我想借本書,找個人幫我去借,借到書以后再通知我,這就是異步,我只發起調用,但是本身並不參與這個事件,而是讓別的線程去做這個事。

  同步與異步是對應的,它們是線程之間的關系,兩個線程之間要么是同步的,要么是異步的。

  阻塞與非阻塞是對同一個線程來說的,在某個時刻,線程要么處於阻塞,要么處於非阻塞。

  幫我借書的那個人有沒有借到書,我可以打電話問他(輪詢),也可以等他通知我,這是異步的通知;在借書的過程中借書的那個人可以輪詢的方式查看書是否已經找到(緩沖區有沒有數據),找到了你可以把它拿走,也可以等老板找到書后通知我,這是非阻塞的通知與輪詢。

  這里面其實涉及到的知識點還很多,這里只是憑我的記憶簡單總結了一下,以后會補充更多。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM