一、UDP套接字
服務端
from socket import * server = socket(AF_INET,SOCK_DGRAM) server.bind(("127.0.0.1",8080)) while True: data,client_addr = server.recvfrom(1024) server.sendto(data.upper(),client_addr)
客戶端
from socket import * client = socket(AF_INET,SOCK_DGRAM) while True: msg = input(">>").strip() client.sendto(msg.encode("utf-8"),("127.0.0.1",8080)) data,server_addr = client.recvfrom(1024) print(data.decode("utf-8"))
二、進程相關定義
進程是指程序的運行過程。每個進程都擁有自己的地址空間、內存、數據棧以及其他用於跟蹤執行的輔助數據。
多道技術:內存中同時存入多個程序,cpu從一個進程快速切換到另一個,使得每個進程各自運行幾十或幾百毫秒,雖然在一個時刻,一個cpu只執行了一個任務,但1秒內,cpu卻可以運行多個進程,給人帶來並行的錯覺,即偽並發,以此來區分多處理器操作系統的真正硬件並行(多cpu共享一個內存)
進程三種狀態間的轉換:
三、python多進程編程
為了利用多核CPU資源,python中使用multiprocessing多線程模塊,python多線程無法利用多核優勢。進程之間無任何共享數據,進程修改數據僅限進程內。
Process類常用方法:
p.start() 啟動進程,調用紫禁城的p.run()方法
p.run() 進程啟動時運行的方法,調用target制定的函數,自定義類中必須實現該方法。
p.terminate() 強調終止進程p,不會進行任何清理操作,如果p創建了子進程,該進程就成為了僵屍進程。如果p還保存了一個鎖,那么鎖也不會釋放,導致死鎖。
p.is_alive() 如果p仍然運行,返回True
p.join([timeout]) 主線程等待p進程終止。timeout為可選超時時間。
Process類常用屬性 p.daemon 默認為False,設置為True后,p代表后台運行的守護進程,當p的父進程終止時,p也隨之終止。設定為True后,p不能創建子進程,且必須在p.start()前設置。 p.name 進程名 p.pid 進程pid p.exitcode 進程在運行時為None,如果為-N,表示被信號N結束 p.authkey 進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功
創建子進程的兩種方式
from multiprocessing import Process import time def task(name): print("%s is running"%name) time.sleep(3) print("%s is done"%name) if __name__=="__main__": p = Process(target=task,args=("aaa",)) p.start() print("main")
from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name): super(MyProcess,self).__init__() self.name = name def run(self): print("%s is running"%self.name) time.sleep(3) print("%s is done"%self.name) if __name__=="__main__": p=MyProcess("進程1") p.start() print("main")
使用join方法等待子進程結束
from multiprocessing import Process import time def task(name): print("%s is running"%name) time.sleep(3) print("%s is done"%name) if __name__=="__main__": p = Process(target=task,args=("aaa",)) p.start() p.join() print("main")
守護進程
from multiprocessing import Process import time def task(name): # p = Process(target=time.sleep,args=(1,)) #守護進程無法創建子進程,會報錯。 # p.start() print("%s is running"%name) time.sleep(3) print("%s is done"%name) if __name__ == "__main__": p = Process(target=task,args=("xxx",)) p.daemon=True p.start() time.sleep(1) print("main") #主進程在此結束,守護進程也會結束。
from multiprocessing import Process import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__=="__main__": p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main") #主進程執行完后,p1守護進程還輸入123就已經結束,不過在p2會執行完后,主進程才會結束
互斥鎖,針對進程間需要共同操作的資源,需要添加互斥鎖
#模擬搶票程序 from multiprocessing import Process,Lock import json import time import random import os def search(): time.sleep(random.randint(1,3)) dic = json.load(open("db.txt",'r',encoding="utf-8")) print("%s查看剩余票數為%s"%(os.getpid(),dic["count"])) def get(): dic=json.load(open("db.txt",'r',encoding="utf-8")) if dic["count"]>0: dic["count"]-=1 time.sleep(random.randint(1,3)) json.dump(dic,open("db.txt",'w',encoding="utf-8")) print("%s購票成功"%os.getpid()) def task(mutex): search() mutex.acquire() get() #對操作余票的函數加鎖 mutex.release() if __name__=="__main__": mutex = Lock() for i in range(10): p=Process(target=task,args=(mutex,)) #進程間數據不互通,需用參數傳入鎖 p.start()
使用Queue實現生產者消費者模型,Queue自帶鎖。
from multiprocessing import Process,Queue import time import random def producer(name,food,q): for i in range(10): res="%s%s"%(food,i) time.sleep(random.randint(1,3)) q.put(res) print("廚師[%s]生產了[%s]"%(name,res)) def consumer(name,q): while True: res = q.get() time.sleep(random.randint(1,3)) print("吃貨[%s]吃了[%s]"%(name,res)) if __name__=="__main__": q=Queue() p1 = Process(target=producer,args=("廚師1","包子",q)) c1 = Process(target=consumer,args=("豬1",q)) p1.start() c1.start() print("main")
上面的程序中生產者做完產品后,消費者並不知道已經生產完了,仍在在等着消費,主進程阻塞無法結束。
通過使用JoinableQueue隊列可解決以上問題。
from multiprocessing import Process,JoinableQueue import time import random def producer(name,food,q): for i in range(10): res="%s%s"%(food,i) time.sleep(random.randint(1,3)) q.put(res) print("廚師[%s]生產了[%s]"%(name,res)) def consumer(name,q): while True: res = q.get() time.sleep(random.randint(1,3)) print("吃貨[%s]吃了[%s]"%(name,res)) q.task_done() if __name__=="__main__": q=JoinableQueue() p1 = Process(target=producer,args=("廚師1","包子",q)) c1 = Process(target=consumer,args=("豬1",q)) c1.daemon=True #c1中有死循環,需要設置為守護進程,主進程結束自動結束消費者。 p1.start() c1.start() p1.join() q.join() print("main")
四.線程介紹
1.線程相關定義
線程與進程類似,不過是在同一進程下執行的,並且共享一片數據空間。進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是cpu上的執行單位。
使用多線程的原因:
a)多線程共享一個進程的地址空間
b)線程比進程更輕量級,線程比進程更容易創建可撤銷,在許多操作系統中,創建一個線程比創建一個進程快10-100倍,再有大量線程需要動態和快速修改時,較有用。
c)若多線程是cpu密集型,那么並不能獲得性能上的增強,但如果存在大量的計算和大量的I/O處理,擁有多個線程允許這些活動彼此重疊運行,從而加快程序執行的速度。
2.創建線程
python中使用threading模塊創建線程,和multiprocess模塊類似。
#第一種方式 from threading import Thread import time import random def task(name): print("%s is running"%name) time.sleep(random.randint(1,3)) print("%s is done"%name) if __name__=="__main__": t1 = Thread(target=task,args=("xxx",)) t1.start() print("main") #先輸出線程中的內容
#第二種方式 from threading import Thread import time import random class MyThread(Thread): def __init__(self,name): super(MyThread,self).__init__() self.name = name def run(self): print("%s is running"%self.name) time.sleep(random.randint(1,3)) print("%s is done"%self.name) if __name__=="__main__": t1 = MyThread("xxx") t1.start() print("main") #先輸出線程中的內容
3.GIL
盡管python解釋器中可以運行多個線程,但在任意給定時刻只有一個線程會被解釋器執行。GIL本質為一個互斥鎖,將並發運行變成串行。
對於任意面向I/O的Python功能,GIL會在I/O調用前被釋放,以允許其他線程在I/O執行的時候運行。而對於沒有太多I/O操作的代碼,更傾向於在該線程整個時間片內始終占用處理器和GIL。
I/O密集型Python程序比計算密集型代碼能夠更好地利用多線程環境。
計算密集型程序,多進程效率高,主要用於金融分析。
from multiprocessing import Process from threading import Thread import os,time def work(): res=0 for i in range(100000000): res*=i if __name__=="__main__": l=[] print(os.cpu_count()) #8核 start=time.time() for i in range(4): # p = Process(target=work) #多進程為6.448570728302002秒 p = Thread(target=work) #多線程為21.265116930007935秒 l.append(p) p.start() for p in l: p.join() stop = time.time() print("run time is %s"%(stop-start))
I/O密集型程序,多線程效率高,主要用於socket、爬蟲、web等。
from multiprocessing import Process from threading import Thread import threading import os,time def work(): time.sleep(2) print("==>") if __name__=="__main__": l=[] print(os.cpu_count()) start=time.time() for i in range(400): # p = Process(target=work) #多進程9.083257913589478秒 p = Thread(target=work) #多線程2.039461612701416秒 l.append(p) p.start() for p in l: p.join() stop = time.time() print("run time is %s"%(stop-start))