Python學習筆記九(UDP套接字和並發編程)


一、UDP套接字

服務端

from socket import *
server = socket(AF_INET,SOCK_DGRAM)
server.bind(("127.0.0.1",8080))
while True:
    data,client_addr = server.recvfrom(1024)
    server.sendto(data.upper(),client_addr)

客戶端

from socket import *
client = socket(AF_INET,SOCK_DGRAM)
while True:
    msg = input(">>").strip()
    client.sendto(msg.encode("utf-8"),("127.0.0.1",8080))
    data,server_addr = client.recvfrom(1024)
    print(data.decode("utf-8"))

二、進程相關定義

進程是指程序的運行過程。每個進程都擁有自己的地址空間、內存、數據棧以及其他用於跟蹤執行的輔助數據。

多道技術:內存中同時存入多個程序,cpu從一個進程快速切換到另一個,使得每個進程各自運行幾十或幾百毫秒,雖然在一個時刻,一個cpu只執行了一個任務,但1秒內,cpu卻可以運行多個進程,給人帶來並行的錯覺,即偽並發,以此來區分多處理器操作系統的真正硬件並行(多cpu共享一個內存)

進程三種狀態間的轉換:

三、python多進程編程

為了利用多核CPU資源,python中使用multiprocessing多線程模塊,python多線程無法利用多核優勢。進程之間無任何共享數據,進程修改數據僅限進程內。

Process類常用方法:
p.start()   啟動進程,調用紫禁城的p.run()方法
p.run()    進程啟動時運行的方法,調用target制定的函數,自定義類中必須實現該方法。
p.terminate()  強調終止進程p,不會進行任何清理操作,如果p創建了子進程,該進程就成為了僵屍進程。如果p還保存了一個鎖,那么鎖也不會釋放,導致死鎖。
p.is_alive()  如果p仍然運行,返回True
p.join([timeout]) 主線程等待p進程終止。timeout為可選超時時間。
Process類常用屬性
p.daemon 默認為False,設置為True后,p代表后台運行的守護進程,當p的父進程終止時,p也隨之終止。設定為True后,p不能創建子進程,且必須在p.start()前設置。
p.name 進程名
p.pid    進程pid
p.exitcode  進程在運行時為None,如果為-N,表示被信號N結束
p.authkey  進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功

創建子進程的兩種方式

from multiprocessing import Process
import time
def task(name):
    print("%s is running"%name)
    time.sleep(3)
    print("%s is done"%name)
if __name__=="__main__":
    p = Process(target=task,args=("aaa",))
    p.start()
    print("main")
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self,name):
        super(MyProcess,self).__init__()
        self.name = name
    def run(self):
        print("%s is running"%self.name)
        time.sleep(3)
        print("%s is done"%self.name)
if __name__=="__main__":
    p=MyProcess("進程1")
    p.start()
    print("main")

使用join方法等待子進程結束

from multiprocessing import Process
import time
def task(name):
    print("%s is running"%name)
    time.sleep(3)
    print("%s is done"%name)
if __name__=="__main__":
    p = Process(target=task,args=("aaa",))
    p.start()
    p.join()
    print("main")

守護進程

from multiprocessing import Process
import time
def task(name):
    # p = Process(target=time.sleep,args=(1,))  #守護進程無法創建子進程,會報錯。
    # p.start()
    print("%s is running"%name)
    time.sleep(3)
    print("%s is done"%name)
if __name__ == "__main__":
    p = Process(target=task,args=("xxx",))
    p.daemon=True
    p.start()
    time.sleep(1)
    print("main")   #主進程在此結束,守護進程也會結束。
from multiprocessing import Process
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")
def bar():
    print(456)
    time.sleep(3)
    print("end456")
if __name__=="__main__":
    p1=Process(target=foo)
    p2=Process(target=bar)
    p1.daemon=True
    p1.start()
    p2.start()
    print("main")   #主進程執行完后,p1守護進程還輸入123就已經結束,不過在p2會執行完后,主進程才會結束

互斥鎖,針對進程間需要共同操作的資源,需要添加互斥鎖

#模擬搶票程序
from multiprocessing import Process,Lock
import json
import time
import random
import os
def search():
    time.sleep(random.randint(1,3))
    dic = json.load(open("db.txt",'r',encoding="utf-8"))
    print("%s查看剩余票數為%s"%(os.getpid(),dic["count"]))
def get():
    dic=json.load(open("db.txt",'r',encoding="utf-8"))
    if dic["count"]>0:
        dic["count"]-=1
        time.sleep(random.randint(1,3))
        json.dump(dic,open("db.txt",'w',encoding="utf-8"))
        print("%s購票成功"%os.getpid())
def task(mutex):
    search()
    mutex.acquire()
    get()     #對操作余票的函數加鎖
    mutex.release()
if __name__=="__main__":
    mutex = Lock()
    for i in range(10):
        p=Process(target=task,args=(mutex,)) #進程間數據不互通,需用參數傳入鎖
        p.start()

使用Queue實現生產者消費者模型,Queue自帶鎖。

from multiprocessing import Process,Queue
import time
import random

def producer(name,food,q):
    for i in range(10):
        res="%s%s"%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print("廚師[%s]生產了[%s]"%(name,res))
def consumer(name,q):
    while True: 
        res = q.get()
        time.sleep(random.randint(1,3))
        print("吃貨[%s]吃了[%s]"%(name,res))
if __name__=="__main__":
    q=Queue()
    p1 = Process(target=producer,args=("廚師1","包子",q))
    c1 = Process(target=consumer,args=("豬1",q))
    p1.start()
    c1.start()
    print("main")

上面的程序中生產者做完產品后,消費者並不知道已經生產完了,仍在在等着消費,主進程阻塞無法結束。

通過使用JoinableQueue隊列可解決以上問題。

from multiprocessing import Process,JoinableQueue
import time
import random

def producer(name,food,q):
    for i in range(10):
        res="%s%s"%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print("廚師[%s]生產了[%s]"%(name,res))
def consumer(name,q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print("吃貨[%s]吃了[%s]"%(name,res))
        q.task_done()
if __name__=="__main__":
    q=JoinableQueue()
    p1 = Process(target=producer,args=("廚師1","包子",q))
    c1 = Process(target=consumer,args=("豬1",q))
    c1.daemon=True     #c1中有死循環,需要設置為守護進程,主進程結束自動結束消費者。
    p1.start()
    c1.start()
    p1.join()
    q.join()
    print("main")

四.線程介紹

1.線程相關定義

線程與進程類似,不過是在同一進程下執行的,並且共享一片數據空間。進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是cpu上的執行單位。

使用多線程的原因:

a)多線程共享一個進程的地址空間

b)線程比進程更輕量級,線程比進程更容易創建可撤銷,在許多操作系統中,創建一個線程比創建一個進程快10-100倍,再有大量線程需要動態和快速修改時,較有用。

c)若多線程是cpu密集型,那么並不能獲得性能上的增強,但如果存在大量的計算和大量的I/O處理,擁有多個線程允許這些活動彼此重疊運行,從而加快程序執行的速度。

2.創建線程

python中使用threading模塊創建線程,和multiprocess模塊類似。

#第一種方式
from threading import Thread
import time
import random

def task(name):
    print("%s is running"%name)
    time.sleep(random.randint(1,3))
    print("%s is done"%name)
if __name__=="__main__":
    t1 = Thread(target=task,args=("xxx",))
    t1.start()
    print("main")  #先輸出線程中的內容
#第二種方式
from threading import Thread
import time
import random
class MyThread(Thread):
    def __init__(self,name):
        super(MyThread,self).__init__()
        self.name = name
    def run(self):
        print("%s is running"%self.name)
        time.sleep(random.randint(1,3))
        print("%s is done"%self.name)
if __name__=="__main__":
    t1 = MyThread("xxx")
    t1.start()
    print("main")  #先輸出線程中的內容

3.GIL

盡管python解釋器中可以運行多個線程,但在任意給定時刻只有一個線程會被解釋器執行。GIL本質為一個互斥鎖,將並發運行變成串行。

 對於任意面向I/O的Python功能,GIL會在I/O調用前被釋放,以允許其他線程在I/O執行的時候運行。而對於沒有太多I/O操作的代碼,更傾向於在該線程整個時間片內始終占用處理器和GIL。

I/O密集型Python程序比計算密集型代碼能夠更好地利用多線程環境。

 計算密集型程序,多進程效率高,主要用於金融分析。

from multiprocessing import Process
from threading import Thread
import os,time
def work():
    res=0
    for i in range(100000000):
        res*=i
if __name__=="__main__":
    l=[]
    print(os.cpu_count())   #8核
    start=time.time()
    for i in range(4):
        # p = Process(target=work)    #多進程為6.448570728302002秒
        p = Thread(target=work)   #多線程為21.265116930007935秒
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop = time.time()
    print("run time is %s"%(stop-start))

I/O密集型程序,多線程效率高,主要用於socket、爬蟲、web等。

from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
    time.sleep(2)
    print("==>")
if __name__=="__main__":
    l=[]
    print(os.cpu_count())
    start=time.time()
    for i in range(400):
        # p = Process(target=work)  #多進程9.083257913589478秒
        p = Thread(target=work)     #多線程2.039461612701416秒
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop = time.time()
    print("run time is %s"%(stop-start))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM