2018-9-23 15:57:51
終於把多進程和多線程東西整理完了!
下次整理那個數據庫->前端->Django
慢慢來! 發現學了好多東西!
1. 僵屍進程,孤兒進程
1.1 僵屍進程:子進程結束,父類未結束
1.2 孤兒進程:父類進程over.,子進程未結束
''' 多線程的學習 ''' #使用線程的第一種方式,直接調用threading里面的Thread類 from threading import Thread import time def test(): print("--====昨晚喝多了,下次少喝點!!!") time.sleep(1) def main(): '''創建線程''' for i in range(5): #創建一個線程 t = Thread(target=test) t.start() if __name__ == '__main__': main() #使用線程的第二種方式,封裝成一個類, import threading import time class MyThread1(threading.Thread): """創建一個類繼承多線程類""" def run(self): for i in range(3): time.sleep(1) msg = "I'm" +slef.name+'@'+str(1) #name屬性中保存的是當前線程的名字 print(msg) if __name__ == '__main__': t = MyThread1() t.start()
2. 線程和進程區別
進程之間不能共享全局變量
線程之間共享全局變量,線程函數中的局部變量不是共享的!!
from threading import Thread import time g_num = 100 def work1(): global g_num for i in range(3): g_num += 1 print("-----in work1,g_num is %d-----"%g_num) def work2(): global g_num print("-------in work2 ,g_num is %d-----"%g_num) print("---------線程創建之前g_num is %d---"%g_num) t1 = Thread(target=work1) t1.start() #延時一會,保證t1線程中的事情做完 time.sleep(1) t2 = Thread(target=work2) t2.start()
3. 互斥鎖
關於互斥鎖,解決線程占用的問題!!!
上鎖的代碼越少越好!該加的時候才加!
#--*utf- 8*-- from threading import Thread ,Lock import time g_num = 0 def work1(): global g_num #上鎖, 這個線程和work2線程都在搶着 對這個鎖進行上鎖,如果由1方成功的上鎖 #那么導致一方會堵塞(一直等待)到這個鎖被解開位置 mutex.acquire() for i in range(100000): g_num += 1 #解鎖 #用來對mutex指向的這個鎖, 進行解鎖,,只要開了鎖,那么接下來會讓所有 #因為這個鎖 被鎖上的鎖 而堵塞的線程,進行搶着上鎖 mutex.release() print("-----in work1,g_num is %d-----"%g_num) def work2(): global g_num #上鎖 mutex.acquire() for i in range(100000): g_num += 1 #解鎖 mutex.release() print("-------in work2 ,g_num is %d-----"%g_num) #創建一把互斥鎖,這個鎖默認是沒有上鎖的 mutex =Lock() def main(): t1 = Thread(target=work1) t1.start() #延時一會,保證t1線程中的事情做完 # time.sleep(1) #取消屏蔽之后,再次運行程序 t2 = Thread(target=work2) t2.start() if __name__ == '__main__': main()
4. 同步應用:
同步的應用!就是一個接着一個
#同步的應用! from threading import Thread,Lock from time import sleep class Task1(Thread): def run(self): while True : if lock1.acquire(): print("------Task1----------") sleep(0.5) #給Task2開鎖! lock2.release() class Task2(Thread): def run(self): while True: if lock2.acquire(): print("------Task2----------") sleep(0.5) #給Task3開鎖 lock3.release() class Task3(Thread): def run(self): while True: if lock3.acquire(): print("------Task3----------") sleep(0.5) lock1.release() #使用Lock創建出的鎖默認沒有"鎖上" lock1 = Lock() #創建另外一個鎖,並且鎖上 lock2 = Lock() lock2.acquire() #創建另外一個鎖,並且"鎖上" lock3 = Lock() lock3.acquire() t1 = Task1() t2 = Task2() t3 = Task3() t1.start() t2.start() t3.start()
5.降低代碼的耦合性
#解決耦合的問題 #用隊列解決這種問題,起到了緩沖的作用 import threading import time # #python2中 # from Queue import Queue #python3中 from queue import Queue class Producer(threading.Thread): def run(self): global queue count = 0 while True: if queue.qsize() < 1000: for i in range(100): count = count +1 msg = '生成產品'+str(count) queue.put(msg) print(msg) time.sleep(0.5) class Consumer(threading.Thread): def run(self): global queue while True: if queue.qsize() > 100: for i in range(3): msg = self.name + '消費了 '+queue.get() print(msg) time.sleep(1) if __name__ == '__main__': queue = Queue() for i in range(500): queue.put('初始產品'+str(i)) for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start()
反思: 多看別人的代碼,多問自己是否能寫出來這樣優雅代碼! 代碼一定要低耦合,高優雅!!!敲代碼要多思考!
6.python中的GIL,線程鎖
在python中,多進程效率遠大於多線程效率
python中存在GIL這個"線程鎖",
關鍵地方可以使用c語言解決 GIL問題 然后可以提高cpu占用效率
7.異步的實現!
同步調用就是你 喊 你朋友吃飯 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,你們一起去
異步調用就是你 喊 你朋友吃飯 ,你朋友說知道了 ,待會忙完去找你 ,你就去做別的了。
from multiprocessing import Pool import time import os def test(): print("---進程池中的進程---pid=%d,ppid=%d--"%(os.getpid(),os.getppid())) for i in range(3): print("----%d---"%i) time.sleep(1) return "hahah" def test2(args): print("---callback func--pid=%d"%os.getpid()) print("---callback func--args=%s"%args) pool = Pool(3) pool.apply_async(func=test,callback=test2) time.sleep(5) print("----主進程-pid=%d----"%os.getpid())
8.反射
# 通過反射 # 對象名 獲取對象屬性 和普通方法 # 類名 獲取靜態屬性 和類方法 和靜態方法 # 普通方法 self # 靜態方法 @staticmethod # 類方法 @classmethod # 屬性方法@ property # 繼承 # 封裝 # 反射,, 應用:從類里面獲取字符串 # 在python中萬物皆對象 class Teacher: dic = {"查看學生信息":"show_student","查看講師信息":"show_teacher"} def show_student(): print("show_student") def show_teacher(): print("show_teacher") if hasattr(Teacher,"dic"): #如果類中有dic 則直接調用 ret = getattr(Teacher,"dic") #Teacher.dic #類也是對象 # print(ret) alex = Teacher() for k in Teacher.dic: print(k) key = input("輸入需求:") func = getattr(alex,Teacher.dic[key]) func()
9.一個_rcv面試題
#類調用屬性,屬性沒有,用__getatrr__魔法方法! #目的打印出 think different itcast class Foo(object): def __init__(self): pass def __getattr__(self,item): print(item, end="") return self def __str__(self): return "" print(Foo().think.different.itcast) 10.python多線程和多進程詳細總結 # python多進程 #!@Author TrueNewBee import time import os from multiprocessing import Process def func(args): print(args) print(54321) time.sleep(1) print("子進程:",os.getpid()) print('子進程的父進程:', os.getppid()) print(12345) if __name__ =="__main__": #如果函數需要傳參,就可以如下,args=() <--這是一個元組一個參數得加, #函數名不加括號 p = Process(target=func,args=(54321,)) #注冊 #p是一個進程對象,還沒有啟動進程 #異步(不是同時進行的) p.start() #開啟了一個子進程 print('*'*10) print('父進程:',os.getpid()) #查看當前進程號 print('父進程的父進程:', os.getppid()) #其實就是pycharm進程數 #進程的生命周期 #主進程 從運行到運行結束 #子進程 從start()開始,函數運行完結束 #開啟了子進程的主進程: #自己的代碼如果長,等待自己的代碼執行結束才結束, #子進程的執行時間長,主進程會在主進程代碼執行完畢后等待子進程執行完畢后 主進程結束
10.多進程異步
import time from multiprocessing import Process #多進程打印* 異步 def func(filename,content): """定義一個函數執行打印""" print('*'*arg1) time.sleep(5) print('*'*arg2) if __name__ =="__main__": # 異步,並不是按順序同時打印 # for i in range(10): # # p = Process(target=func,args=(10*i,20*i)) # # p.start() p_list = [] #創建一個進程對象的列表 for i in range(10): p = Process(target=func,args=(10*i,20*i)) p_list.append(p) p.start() #讓"運行完了最后打印,是個進程都是異步,在此之前所有子進程全部進行完了,然后才是同步 # for p in p_list:p.join() 列表推導式還原,其實就是一個for循環 [p.join() for p in p_list] #列表推導式 print("運行完了") import time from multiprocessing import Process def func(arg1,arg2): """定義一個函數執行打印""" print('*'*arg1) time.sleep(5) print('*'*arg2) # 先打印了 "運行完了" 5秒 再打印 arg2 實現了異步效果 if __name__ =="__main__": p = Process(target=func ,args=(10,20)) p.start() print("hhhhhhh") #join()和start()之間的部分仍然是異步,在join()下面才是同步 # p.join() #是感知一個子進程的結束,將異步的程序變為同步 """實現了異步效果""" print('========:運行完了')
11.python進程鎖(Demo)
#鎖 # 火車票 import json import time from multiprocessing import Process from multiprocessing import Lock def show(i): with open('ticket') as f: dic = json.load(f) print('余票: %s'%dic['ticket']) def buy_ticket(i,lock): lock.acquire() #拿鑰匙進門 with open('ticket') as f: dic = json.load(f) time.sleep(0.1) if dic['ticket'] > 0 : dic['ticket'] -= 1 print('\033[32m%s買到票了\033[0m'%i) else: print('\033[31m%s沒買到票\033[0m'%i) time.sleep(0.1) with open('ticket','w') as f: json.dump(dic,f) lock.release() # 還鑰匙 if __name__ == '__main__': for i in range(10): p = Process(target=show,args=(i,)) p.start() lock = Lock() for i in range(10): p = Process(target=buy_ticket, args=(i,lock)) p.start()
12.進程 信號量
回顧:
多進程代碼
from multiprocessing import Process
方法
進程對象.start() 開啟一個子進程
進程對象.join() 感知一個子進程的結束
進程對象.terminate() 結束一個子進程
進程對象.is_alive() 查看某個子進程是否還在運行
屬性
進程對象.name 進程名
進程對象.pid 進程號
進程對象.daemob 值為True的時候,表現新的子進程是一個守護進程
守護進程 隨着主進程代碼的執行結束而結束
一定在start之前設置
rom multiprocessing import Lock l = Lock() l.acquire() #拿鑰匙 # 會造成數據不安全的操作 l.release() #換鑰匙 # 信號量 # 多進程中的組件 # 一套資源 同一時間 只能被n個人訪問 # 某一段代碼 同一時間 只能被n個進程執行 import time import random from multiprocessing import Process from multiprocessing import Semaphore def ktv(arg, sem1): sem1.acquire() # 獲取鑰匙 """模擬唱歌""" print("%s走進ktv"%arg) time.sleep(random.randint(1, 5)) print("%s走出ktv"%arg) sem.release() if __name__ == '__main__': sem = Semaphore(4) for i in range(20): p = Process(target=ktv, args=(i, sem)) p.start()
13.事件
事件:通過一個信號 來控制 多個進程 同時 執行或者阻塞
一個信號可以使所有的進程都進入阻塞狀態
也可以控制所有進程的解除阻塞
一個事件被創建之后,默認是阻塞狀態
set 和 clear
分別用來修改一個事件的狀態 True或者False
is_set 用來查看一個事件的狀態
wait 是根據事件的狀態來解決自己是否在wait處阻塞
False阻塞 True不阻塞
from multiprocessing import Event def event(): e = Event() # 創建一個事件 print(e.is_set()) # 查看一個事件的狀態,默認設置成阻塞 e.set() # 將這個事件的狀態改為True print(e.is_set()) e.wait() # 是依據e.is_set()的值來決定是否阻塞 print(123456) e.clear() # 將這個事件的狀態改為False print(e.is_set()) e.wait() # 等待事件的信號被編程True print('*'*10) if __name__ == '__main__': event() # 紅綠燈事件 # 用事件控制信號,控制進程 import time import random from multiprocessing import Process from multiprocessing import Event def cars(e2, i1): """創建一個車""" if not e2.is_set(): print("car%i1在等待" % i1) e2.wait() # 阻塞,直到得到一個 事件狀態變成True的信號 print("car%s通過" % i1) def light(e1): """燈是獨立的進程""" while True: if e1.is_set(): time.sleep(2) e1.clear() print("綠燈亮了") else: e1.set() print("紅燈亮了") time.sleep(2) if __name__ == '__main__': e = Event() traffic = Process(target=light, args=(e, )) traffic.start() for i in range(20): car = Process(target=cars, args=(e, i)) car.start() time.sleep(random.random())
14.隊列
import time from multiprocessing import Queue q = Queue(5) q.put(1) # 向隊列里面放值 q.put(2) q.put(3) q.put(4) q.put(5) # 如果隊列滿了再添加則出現堵塞 print(q.full()) # 隊列是否滿了 print(q.get()) # 取出來數 print(q.get()) print(q.get()) print(q.get()) print(q.get()) # 如果隊列全取出了再去將出現阻塞 print(q.empty()) # 判斷隊列是否為空 while True: """檢查隊列是否為空""" try: q.get_nowait() except : print("隊列已空") time.sleep(1) from multiprocessing import Process from multiprocessing import Queue # 隊列生產和消費數據 def produce(q1): """隊列存儲數據""" q1.put('hello') def consume(q2): """隊列消費數據""" print(q2.get()) if __name__ == '__main__': q = Queue() p = Process(target=produce,args=(q, )) p.start() c = Process(target=consume, args=(q, )) c.start()
15.生產者和消費者 joinableQueue模型:
# 隊列 # 生產者消費者模型 解決供需不平衡的問題 import time import random from multiprocessing import Process, JoinableQueue def consumer(name, q1): """負責消費生產的東西""" while True: food = q1.get() if food is None: # 判斷為空則停止這個循環 print('%s獲取到一個空' % name) break print('\033[31m%s消費了%s\033[0m' % (name, food)) time.sleep(random.randint(0, 2)) q1.task_done() # 提交回執 count - 1 def producer(name, food, q1): """負責生產包子""" for i in range(10): time.sleep(random.randint(0, 2)) f = '%s生產了%s%s' % (name, food, i) print(f) q1.put(f) q1.join() # 阻塞,直到一個隊列中的數據 全部被執行完畢 if __name__ == '__main__': q = JoinableQueue(20) p1 = Process(target=producer, args=('Egon', '包子', q)) p2 = Process(target=producer, args=('WuSir', '泔水', q)) c1 = Process(target=consumer, args=('jinBoss', q)) c2 = Process(target=consumer, args=('alex', q)) p1.start() p2.start() c1.daemon = True # 成為守護進程,主進程中的代碼執行完畢之后,子進程自動結束 c2.daemon = True c1.start() c2.start() p1.join() # 感知一個進程結束 p2.join() # 在消費者這一端: # 每次獲取一個數據 # 處理一個數據 # 發送一個記號:標志一個數據被處理成功 # 在生產者這一端 # 每一次生產一個數據 # 且每一次生產的數據放在隊列中 # 在隊列中刻上一個記號 # 當生產者全部生產完畢后 # join信號: 已經停止生產數據了 # 且要等待之前被刻上的記號都被消費完 # 當數據都被處理完時,join阻塞結束 # consumer 中把所有的任務消耗完 # produce 端的join感知到,停止阻塞 # 所有的producer進程結束 # 主進程中的p.join結束 # 守護進程(消費者進程)結束
16.生產者和消費者 Queue模型:
# 隊列 # 生產者消費者模型 解決供需不平衡的問題 import time import random from multiprocessing import Queue, Process def consumer(name, q1): """負責消費生產的東西""" while True: food = q1.get() if food is None: # 判斷為空則停止這個循環 print('%s獲取到一個空' % name) break print('\033[31m%s消費了%s\033[0m' % (name, food)) time.sleep(random.randint(0, 2)) def producer(name, food, q1): """負責生產包子""" for i in range(10): time.sleep(random.randint(0, 2)) f = '%s生產了%s%s' % (name, food, i) print(f) q1.put(f) if __name__ == '__main__': q = Queue(20) p1 = Process(target=producer, args=('Egon', '包子', q)) p2 = Process(target=producer, args=('WuSir', '泔水', q)) c1 = Process(target=consumer, args=('jinBoss', q)) c2 = Process(target=consumer, args=('alex', q)) p1.start() p2.start() c1.start() c2.start() p1.join() # 感知一個進程結束 p2.join() q.put(None) q.put(None)
17.復習總結內容:
復習:
信號量 Semaphore
from multiprocessing import Semaphore
用鎖的原理實現的,內置了一個計數器
在同一時間,只能有指定數量的進程執行某一段被控制的代碼
事件
wait 阻塞收到時間狀態控制的同步組件
狀態 True False is_set
true->false clear()
false->true set()
wait 狀態為True不阻塞 狀態為False的時候阻塞
上面都是為了同步!!!(讓代碼按順序執行)
隊列
Queue
put 當隊列滿的時候阻塞等待隊列有空位置
get 當隊列空的時候阻塞等待隊列有數據
full empty 不完全准確
JoinableQueue
get task_done 這兩個聯用
put join 這兩個聯用
18. python進程間數據共享
# 管道 # 數據共享 Manager # 進程池和回調函數 ! # 管道 實現兩進程間的信息傳遞 from multiprocessing import Pipe, Process # 一種使用管道方式 def func(conn3, conn4): conn4.close() # 把多余通道關閉 while True: try: msg = conn3.recv() # 接收消息 print(msg) except EOFError: # 沒有數據可以取的時候拋出異常 conn3.close() break if __name__ == '__main__': conn1, conn2 = Pipe() # 接收兩個參數 兩個管道 Process(target=func, args=(conn1, conn2)).start() conn1.close() # 關閉多余通道 for i in range(20): conn2.send('吃了嗎') # 發送消息 conn2.close()
# pipe 數據不安全性 這是一個例子,下面有另外一個解決方案的代碼加上鎖! # IPC import time import random from multiprocessing import Process, Pipe def producer(con1, pro1, name, food): """通過管道把生產東西傳給消費者""" con1.close() for i in range(4): time.sleep(random.randint(1, 3)) f = '%s生產%s%s' % (name, food, i) print(f) pro1.send(f) pro1.close() def consumer(con2, pro2, name1): pro2.close() while True: try: food = con2.recv() print('%s吃了%s' % (name1, food)) time.sleep(random.randint(1, 3)) except EOFError: con2.close() break if __name__ == '__main__': con, pro = Pipe() p = Process(target=producer, args=(con, pro, 'a', '泔水')) c1 = Process(target=consumer, args=(con, pro, 'b')) c2 = Process(target=consumer, args=(con, pro, 'c')) c3 = Process(target=consumer, args=(con, pro, 'd')) p.start() c1.start() c2.start() c3.start() con.close() pro.close()
# 加上鎖解決pipe數據不安全問題 from multiprocessing import Process, Pipe, Lock def consumer(p, name, lock1): produce1, consume1 = p produce1.close() while True: lock1.acquire() food=consume1.recv() lock.release() if food: print('%s 收到包子:%s' %(name,baozi)) else: consume1.close() break def producer(p, n): produce2, consume2=p consume2.close() for i in range(n): produce2.send(i) produce2.send(None) produce2.send(None) produce2.close() if __name__ == '__main__': produce,consume=Pipe() lock = Lock() c1 = Process(target=consumer, args=((produce, consume), 'c1', lock)) c2 = Process(target=consumer, args=((produce, consume), 'c2', lock)) p1 = Process(target=producer, args=((produce, consume), 10)) c1.start() c2.start() p1.start() produce.close() consume.close() c1.join() c2.join() p1.join() print('主進程')
加鎖來控制操作管道的行為 來避免進程之間爭搶數據造成的數據不安全現象
隊列 進程之間數據安全的
管道 + 鎖
犧牲效率 保障了數據安全
from multiprocessing import Manager, Process, Lock def main(dic1, lock1): lock1.acquire() # 加上所也是為了數據不安全問題 dic1['count'] -= 1 lock1.release() if __name__ == '__main__': m = Manager() lock = Lock() dic = m.dict({'count': 100}) p_list = [] for i in range(50): p = Process(target=main, args=(dic, lock)) p.start() p_list.append(p) for i in p_list: p.join() print('主進程:', dic)
19. python進程池
主要方法 常用的就是 from multiprocessing import Pool map() 同步 apply() 同步 apply_async() 異步 手動 close() join() 學的逐漸的由淺入深 為什么會有進程池的概念 效率 每開啟進程,開啟屬於這個進程的內存空間 寄存器 堆棧 文件 進程過多 操作系統的調度 進程池 python中的 先創建一個屬於進程的池子 這個池子指定能放多少個進程 先將這些進程創建好 更高級的進程池 可以根據用戶需求改變進程數量 自帶join方法,里面是異步 map(func,range) 里面傳入的參數只能傳入可迭代的 range ,列表,字典等
import time from multiprocessing import Pool, Process def func(n): for a in range(10): print(n+1) if __name__ == '__main__': start = time.time() pool = Pool(5) # 5個進程 pool.map(func, range(100)) # 100個任務 異步 t1 = time.time() - start p_list = [] for i in range(100): p = Process(target=func, args=(i, )) p_list.append(p) p.start() for i in p_list:p.join() t2 = time.time()-start print(t1, t2) # apply() 方法為同步的 # apply_async() 方法為異步的 一般都是用這個 import time import os from multiprocessing import Pool def func(n): print('start func%s' % n, os.getpid()) time.sleep(1) print('end func%s' % n, os.getpid()) if __name__ == '__main__': p = Pool(5) for i in range(10): # p.apply(func, args=(i, )) # 該方法為同步的 p.apply_async(func, args=(i,)) # 該方法為異步的 # 使用apply_async 必須加的兩句話 p.close() # 結束進程接收任務 p.join() # 感知進程池中的任務執行結束 # p = Pool() # p.map(funcName, iterable) 默認異步的執行任務,且自帶close和join # p.apply 同步調用的 # p.apply_async 異步調用 和主進程完全異步 需要手動close 和join import time from multiprocessing import Pool def func(i1): time.sleep(0.5) return i1*i1 if __name__ == '__main__': p = Pool() # res_list = [] # 儲存res對象 到后面一塊被接收 # for i in range(10): # res = p.apply_async(func, args=(i, )) # apply_async的結果就是func的返回值 # res_list.append(res) # # res.get() # get() 等着func的計算結果,阻塞了(同步) # for res in res_list: # print(res.get()) # 每五個返回一次數據 讓get()變成了異步 # map() ret = p.map(func, range(100)) print(ret) # 整體返回所有數據
20. 回調函數
import os from multiprocessing import Pool def func1(n): print('in func1',os.getpid()) return n*n def func2(nn): print('in func2', os.getpid()) print(nn) if __name__ == '__main__': print('主進程:', os.getpid()) # 回調函數在主進程中執行的 p = Pool(5) for i in range(10): p.apply_async(func1, args=(10, ), callback=func2) # 回調func2返回值編程參數傳給func1 p.close() p.join()
21.進程池方式的 socket
server端
# 用進程池子方法讓服務端接就收多個客戶端發來消息 # apply_async() 異步方法 import socket from multiprocessing import Pool def func(conn1): conn1.send(b'hello') print(conn.recv(1024).decode('utf-8')) conn1.close() if __name__ == '__main__': p = Pool(5) sk = socket.socket() sk.bind(('127.0.0.1', 8080)) sk.listen() while True: # 需要異步 多進程就收客戶端發來消息 conn, add = sk.accept() p.apply_async(func, args=(conn, )) sk.close()
client 端
import socket sk = socket.socket() sk.connect(('127.0.0.1', 8080)) ret = sk.recv(1024).decode('utf-8') print(ret) msg = input('>>>').encode('utf-8') sk.send(msg) sk.close()
22. 上述復習和總結:
22.1: 進程池所有內容: 管道 數據的共享 Manager dic list 進程池 cpu個數+1 ret = map(func, iterable) 異步 自帶close和join 所有結果的[] apply() 同步的 : 只有當func執行完之后,才會繼續向下執行其他的代碼 apply(func, args=()) 返回值就是func的return apply_async 異步的:當func被注冊進入一個進程之后,程序就繼續向下執行 apply_async(func, args()) 返回值: apply_async返回的對象 為了用戶能從中獲取func的返回對象 obj.get() get會阻塞直到對應的func執行完畢拿到結果 使用apply_async給進程池分配任務,需要線close()后join來保持多進程和主進程代碼的同步性
22.2:回調函數 # 回調函數 from multiprocessing import Pool def func1(n): return n+1 def func2(m): print(m) if __name__ == '__main__': p = Pool(5) for i in range(10, 20): p.apply_async(func1, args=(i, ), callback=func2) p.close() p.join()
22.3線程:
import os import time from threading import Thread """多線程並發,都在同一個進程運行""" def func(n): time.sleep(1) # 全部線程並發睡1s 然后打印 print(n, os.getpid()) print('主線程:', os.getpid()) for i in range(10): t = Thread(target=func, args=(i, )) t.start() """使用面向對象方式開啟線程""" class MyTread(Thread): def __init__(self, arg): super().__init__() self.arg = arg def run(self): time.sleep(1) print(self.arg) t = MyTread(10) t.start()
threading模塊:
multiprocessing模塊的完全模仿threading模塊的接口,
二者在使用層面上有很大的相似地方
修改全局變量
在同一個進程多個線程之間的數據是共享的
def func1(a): global g g = 0 print(g, a, os.getpid()) g = 100 t_list = [] for i in range(10): t = Thread(target=func1, args=(i, )) t.start() t_list.append(t) for t in t_list: t.join() print(g)
進程 是最小的內存分配單位
線程 是操作系統調度的最小單位
線程被cpu執行了
進程內至少含有一個線程
進程中可以開啟多個線程
開啟一個線程所需要的時間要遠遠小於開啟一個進程
多個線程內部有自己的數據棧,數據不共享
全局變量在多個線程之間是共享的
在CPython解釋器下的python程序 在同一時刻 多線程只能有一個線程cpu被執行
高CPU(用多進程處理): 計算類 -----高CPU利用率 不占優勢
高IO(用多線程處理): 爬取網頁 200個網頁
qq聊天 send recv
處理日志文件 讀文件
處理web請求
讀取數據庫 寫數據庫
import time from threading import Thread from multiprocessing import Process """多線程與多進程時間對比""" def func(n): n+1 if __name__ == '__main__': start = time.time() t_list = [] for i in range(100): t = Thread(target=func, args=(i, )) t.start() t_list.append(t) for t in t_list: t.join() t1 = time.time() - start start1 = time.time() p_list = [] for i in range(100): p = Process(target=func, args=(i, )) p.start() p_list.append(t) for p in p_list: p.join() t2 = time.time() - start1 print(t1, t2)
22.4線程模塊中其他方法
import threading import time def func(n): time.sleep(0.5) # 查看線程名字和id print(n, threading.current_thread(), threading.get_ident()) for i in range(10): threading.Thread(target=func, args=(i, )).start() print(threading.current_thread()) print(threading.active_count()) # 查看所有線程數 11 加上主線程 print(threading.enumerate())
23. 多線程寫 socket sever
sever端
import socket from threading import Thread def chat(conn1): conn1.send(b'hello') msg = conn1.recv(1024).decode('utf-8') print(msg) inp = input(">>").encode('utf-8') conn1.send(inp) conn1.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1', 8080)) sk.listen() while True: conn, add = sk.accept() # 創建一個多線程實現多線程通訊 Thread(target=chat, args=(conn, )).start() sk.close()
client端 (多線程中可以用input 而多進程中不可以用input)
import socket sk = socket.socket() sk.connect(('127.0.0.1', 8080)) msg = sk.recv(1024) print(msg) inp = input('>>>>').encode('utf-8') # 多線程可以用input,多進程不可以用 sk.send(inp) sk.close()
25. 還是復習和總結:
25.1.復習:
正確的學習方法 input output correct 糾正 線程: 1.線程是進程中的執行單位 2.線程是cpu執行的最小單位 3.線程之間資源共享 4.線程的開啟和關閉以及切換的時間開銷遠遠小於進程 5.線程本身可以在同一時間使用多個cpu python 與 線程 CPython解釋器在解釋代碼過程中容易產生數據不安全的問題 GIL全局解釋器鎖 鎖的是線程 threading
25.2.守護線程:
import time from threading import Thread def func1(): while True: print('*'*10) time.sleep(1) def func2(): print('in func2') time.sleep(5) if __name__ == '__main__': t = Thread(target=func1, ) t.daemon = True t.start() t2 = Thread(target=func2, ) t2.start() t2.join() print('主線程') # 守護進程隨着主進程代碼的結束而結束(進程間資源不共享,所以想咋結束咋結束) # 守護線程會在主線程結束之后等待其他子線程的結束才結束(線程間資源共享,所以不能主線程不能立馬結束) # 主進程在執行完自己的代碼之后不會立即結束,而是等待子進程結束之后 揮手子進程的資源 # import time # from multiprocessing import Process # def func(): # time.sleep(5) # if __name__ == '__main__': # Process(target=func, ).start()
25.3.鎖
# import time # from threading import Thread, Lock # def func(lock1): # global n # lock1.acquire() # 加上一個鎖 # # n = 1 python內部就是下面執行的 # temp = n # time.sleep(0.2) # n = temp - 1 # 9 剛取回來還沒來得及賦值又被別人拿走了,所以得自己加個鎖不讓被人拿走 # lock1.release() # 換鑰匙 # # # n = 10 # t_list = [] # lock = Lock() # for i in range(10): # t = Thread(target=func, args=(lock, )) # t.start() # t_list.append(t) # for t in t_list: # t.join() # print(n) # 不加鎖是9 加鎖是 0 # 科學家吃面問題 經典死鎖問題 # noodle_lock = Lock() # fork_lock = Lock() # 互斥鎖 # # # def eat1(name): # noodle_lock.acquire() # print('%s拿到面條啦' % name) # fork_lock.acquire() # print('%s拿到叉子啦' % name) # print('%s吃面' % name) # fork_lock.release() # noodle_lock.release() # # # def eat2(name): # fork_lock.acquire() # print('%s拿到叉子啦' % name) # time.sleep(1) # noodle_lock.acquire() # print('%s拿到面條啦' % name) # print('吃面') # noodle_lock.release() # fork_lock.release() # # # if __name__ == '__main__': # Thread(target=eat1, args=('alex', )).start() # Thread(target=eat2, args=('Egon',)).start() # Thread(target=eat1, args=('bossJin',)).start() # Thread(target=eat2, args=('zeZha',)).start() import time from threading import RLock, Thread fork_lock = noodle_lock = RLock() # 一個鑰匙串上的兩把鑰匙 # 遞歸鎖 為了解決死鎖問題,可以acquire()多次, def eat1(name): noodle_lock.acquire() # 一把鑰匙 print('%s拿到面條啦' % name) fork_lock.acquire() print('%s拿到叉子啦' % name) print('%s吃面' % name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s拿到叉子啦' % name) time.sleep(1) noodle_lock.acquire() print('%s拿到面條啦' % name) print('%s吃面' % name) noodle_lock.release() fork_lock.release() if __name__ == '__main__': Thread(target=eat1, args=('alex', )).start() Thread(target=eat2, args=('Egon',)).start() Thread(target=eat1, args=('bossJin',)).start() Thread(target=eat2, args=('zeZha',)).start()
25.4.條件和定時器
import time from threading import Semaphore, Thread def func(sem1, a, b): # 同一時間就讓四個線程執行代碼 sem1.acquire() time.sleep(1) print(a+b) sem1.release() if __name__ == '__main__': sem = Semaphore(4) for i in range(10): t = Thread(target=func, args=(sem, i, i+5)) t.start()
25.5.事件
# 事件被創建的時候 # False狀態 # wait() 阻塞 # True狀態 # wait() 非阻塞 # clear 設置狀態為False # set 設置狀態為True # 數據庫- 文件夾 # 文件夾里有好多excel表格 # 1.能夠更方便的對數據進行增刪改查 # 2.安全訪問的機制 # 偽代碼 看現象: # 起兩個線程 # 第一個線程:連接數據庫 # 等待一個信號,告訴我我們之間的網絡是通的 # 連接數據庫 # 第二個線程:檢測與數據庫之間的網絡情況是否連通 # time.sleep(0,2) # 將事件狀態設置為True import time import random from threading import Thread, Event def connect_db(e1): count = 0 while count < 3: # 連接三次 e1.wait(0.5) # 狀態是False的時候,我只等待1s if e1.is_set() == True: print('連接成功') break else: count += 1 print('第%s次連接失敗' % count) else: # 連接三次都都沒連上,主動拋出異常 raise TimeoutError('數據庫連接超時') def check_web(e2): time.sleep(random.randint(0, 3)) e2.set() if __name__ == '__main__': e = Event() t1 = Thread(target=connect_db, args=(e, )) t2 = Thread(target=check_web, args=(e, )) t1.start() t2.start()
25.6.條件
# 條件 # 鎖 # acquire release # 一個條件被創建之初 默認有一個False狀態 # False狀態 會影響wait()一直處於等待狀態 # notify(int數據類型) 制造一串鑰匙 # 不大重要,只能停留在面試中 from threading import Condition, Thread def func(con1, i1): con1.acquire() con1.wait() # 等鑰匙 print('在第%s個循環里' % i1) con.release() con = Condition() for i in range(10): Thread(target=func, args=(con, i)).start() while True: num = int(input('>>>')) con.acquire() con.notify(num) # 造鑰匙 con.release()
25.7.定時器
# 用的不多,知道這個組件就好了 import time from threading import Timer def func(): print('時間同步') if __name__ == '__main__': while True: Timer(5, func).start() # 非阻塞的 time.sleep(5)
25.8.隊列
# queue import queue q = queue.Queue() # 隊列先進先出 # q.put() # q.get() # q.put_nowait() # q.get_nowait() queue.LifoQueue() # 棧 先進后出 q.put((50, 'a')) q.put((30, 'r')) q.put((1, 'z')) q.put((1, 'd')) print(q.get())
25.9.池
import time from concurrent.futures import ThreadPoolExecutor def func(n): time.sleep(2) print(n) return n*n tpool= ThreadPoolExecutor(max_workers=5) # 默認 不要超過cpu個數*5 t_list = [] for i in range(20): t = tpool.submit(func, i) # 異步提交任務 t_list.append(t) tpool.shutdown() # close+join print('主線程') for t in t_list: print('****', t.result())
26. IO多路復用和協程復習
26.1協程:
進程 啟動多個進程 進程之間是由操作系統負責調用
線程 啟動多個線程 真正被CPU執行的最小單位實際是線程
開啟一個線程 創建一個線程 寄存器 堆棧
關閉一個線程
協程
本質上是一個線程
能夠在多個任務之間切換來節省一些IO時間
實現並發的手段
def consumer(): """創建一個生成器""" while True: x = yield print('處理了數據', x) def producer(): c = consumer() next(c) for i in range(10): print('生產了數據:', i) c.send(i) producer()
真正的協程模塊就是使用greenlet完成的切換
from greenlet import greenlet def eat(): print('eating start') g2.switch() # 切換到play print('eating end') g2.switch() def play(): print('playing start ') g1.switch() print('playing end') if __name__ == '__main__': # 用於切換線程 g1 = greenlet(eat) g2 = greenlet(play) g1.switch()
放在開頭,是為了識別time (IO)
from gevent import monkey; monkey.patch_all() import time import gevent import threading def eat(): print(threading.current_thread()) # 查看線程名字 print('eating start') time.sleep(1) # gevent 檢測到停1s,則調到另外一個函數中 print('eating end') def play(): print(threading.current_thread()) print('playing start ') time.sleep(1) print('playing end') if __name__ == '__main__': g1 = gevent.spawn(eat) # 開啟協程 g2 = gevent.spawn(play) g1.join() g2.join()
進程和線程的任務切換由操作系統完成
協程任務之間的切換由程序(代碼)完成 只有遇到協程模塊能識別的IO操作的時候,程序才會進行任務切換實現並發效果
同步 和 異步 (網絡操作常用協程)
from gevent import monkey; monkey.patch_all() import time import gevent def task(): time.sleep(1) print(12345) def sync(): for i in range(10): task() def async(): g_list = [] for i in range(10): g = gevent.spawn(task) g_list.append(g) gevent.joinall(g_list) # for g in g_list :g.join() if __name__ == '__main__': sync() async()
協程 : 能夠在一個線程中實現並發效果的概念
能夠規避一些任務中的IO操作
在任務的執行過程中,檢測到IO就切換到其他任務
多線程 被弱化了
協程: 在一個線程上,提高cpu的利用率
協程相比於多線程的優勢 切換的效率更快了
爬蟲例子(正則基礎)
請求過程中的IO等待
from gevent import monkey;monkey.patch_all() import gevent from urllib.request import urlopen def get_url(url1): response = urlopen(url1) content = response.read().decode('utf-8') # 有各式的 return len(content) url = { 'http://www.baidu.com', 'http://www.taobao.com', 'http://www.hao123.com', } g_list = [] for i in url: g = gevent.spawn(get_url, i) g_list.append(g) gevent.joinall(g_list) for g in g_list: print(g.value) 26.2用協程寫 socket_demo socket # 用協程寫 socket # 用協程是最快最方便的 最省時間占用最小,代碼間的轉換 from gevent import monkey; monkey.patch_all() import socket import gevent def talk(conn1): conn1.send(b'hello') rec = conn.recv(1024).decode('utf-8') print(rec) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn, add = sk.accept() gevent.spawn(talk, conn) sk.close() client import socket sk = socket.socket() sk.connect(('127.0.0.1', 8080)) rec = sk.recv(1024).decode('utf-8') print(rec) msg = input('>>>>').encode('utf-8') sk.send(msg) sk.close()
26.3IO模型 筆記
同步 : 提交一個任務之后要等待這個任務執行完畢 異步 : 只管提交任務,不等待這個任務執行完畢就可以做其他事情 阻塞 : input urlopen() 在socket里面:recv() recvfrom() accept 非阻塞 : 除了阻塞的其他都是非阻塞 阻塞 線程 運行狀態 --> 阻塞狀態-->就緒 非阻塞 IO多路復用 select機制 Windows和linux 都是操作系統輪詢每一個被監聽的項,看是否讀操作 poll機制 linux 它可以監聽的對象比select機制可以監聽的多 隨着監聽項的增多,導致效率降低 epoll機制 linux
26.4.非阻塞模型
socket
# 非阻塞IO模型 # 單線程中非阻塞!(沒有用協程!) import socket sk = socket.socket() sk.bind(('127.0.0.1', 8080)) sk.setblocking(False) # 默認True阻塞, False非阻塞 sk.listen() conn_list = [] del_conn = [] # 存入失效連接的列表 while True: # 接收異常 BlockingIOError 完成非阻塞 try: conn, add = sk.accept() # 不阻塞,但沒人連我會報錯 print('建立連接了', add) # msg = conn.recv(1024) # 不阻塞,但沒有消息會報錯 # print(msg) conn_list.append(conn) except BlockingIOError: # 循環列表連接 看看是否有人發消息 for con in conn_list: try: msg = con.recv(1024) # 不阻塞,但沒有消息會報錯 if msg == b'': del_conn.append(con) # 把失效的連接存到del_conn中 continue print(msg) con.send(b'bye bye') except BlockingIOError: pass for con in del_conn: con.close() conn_list.remove(con) # 在conn_list中刪除失效連接 del_conn.clear() # 清空刪除列表
client
# 非阻塞IO 多線程並發socket IO import time import socket import threading def func(): sk = socket.socket() sk.connect(('127.0.0.1', 8080)) sk.send(b'hello') time.sleep(1) msg = sk.recv(1024) print(msg) sk.close() for i in range(20): threading .Thread(target=func).start()
26.5.IO多路復用
socket
# IO多路復用 多並發! import select import socket sk = socket.socket() sk.bind(('127.0.0.1', 8080)) sk.setblocking(False) sk.listen() read_list = [sk] # 儲存監聽對象 while True: # [sk, conn] sk,發送鏈接 conn監聽發送消息 r_list, w_list, x_list = select.select(read_list, [], []) for i in r_list: if i is sk: conn, add = i.accept() # 沒有sk, 有conn則會報錯 read_list.append(conn) else: ret = i.recv(1024) if ret == b'': i.close() read_list.remove(i) continue print(ret) i.send(b'goodbye')
client
import socket import threading import time def func(): sk = socket.socket() sk.connect(('127.0.0.1', 8080)) sk.send(b'hello') time.sleep(1) sk.recv(1024) sk.close() for i in range(20): threading .Thread(target=func).start()
26.6selector_dome
# 服務端 from socket import * import selectors sel = selectors.DefaultSelector() def accept(server_fileobj, mask): conn, addr = server_fileobj.accept() sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): try: data = conn.recv(1024) if not data: print('closing', conn) sel.unregister(conn) conn.close() return conn.send(data.upper()+b'_SB') except Exception: print('closing', conn) sel.unregister(conn) conn.close() sk = socket(AF_INET, SOCK_STREAM) sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sk.bind(('127.0.0.1', 8088)) sk.listen(5) sk.setblocking(False) # 設置socket的接口為非阻塞 # 相當於網select的讀列表里append了一個文件句柄server_fileobj,並且綁定了一個回調函數accept sel.register(sk, selectors.EVENT_READ, accept) # 說白了就是,如果有人請求連接sk,就調用accept方法 while True: events = sel.select() # 檢測所有的sk,conn,是否有完成wait data的 for sel_obj, mask in events: # [sk] callback = sel_obj.data # callback = accept callback(sel_obj.fileobjmask) # accept(server_fileobj,1)
#客戶端 # 基於selectors模塊實現聊天 from socket import * c=socket(AF_INET, SOCK_STREAM) c.connect(('127.0.0.1',8088)) while True: msg=input('>>: ') if not msg:continue c.send(msg.encode('utf-8')) data = c.recv(1024) print(data.decode('utf-8'))
2018-9-23 16:18:40 終於整理完了
下次開始整理數據庫內容 目前已經把博客python知識整理完了!好有成就感!
下次整理 7.26數據庫 (1)