【Day34】Pyhotn之路——網絡編程


  • 守護進程

    (1)守護進程在主進程代碼運行結束后,就自動終止

    (2)守護進程內無法開啟子進程,否則報錯AssertionError: daemonic processes are not allowed to have children

    注意:進程之間相互獨立的,主進程運行完畢后,守護進程隨即結束

from multiprocessing import Process
import os,time, random
def func():
    print('%s ruing '%os.getpid())
    time.sleep(0.2)
    print('%s end'%os.getpid())
    # p = Process(target=func) #在子進程內創建以個子進程,報錯
    # p.start()

if __name__ == '__main__':
    P = Process(target=func)
    P.daemon = True
    P.start()
    # P.join()# 使主進程必須等到子進程運行完畢
    print('')

 

from multiprocessing import Process
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    p1=Process(target=foo)
    p2 = Process(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    print("main-------")#打印出“main”意味着主進程結束,守護進程也就結束了,然而p2並不是守護進行,主進程需等到p2運行完畢,才能結束,否則會產生孤兒進程
迷惑人的例子
  • 守護線程

守護線程需要等到其他非守護線程執行完,才結束。

from threading import Thread
import time, os, random

def func():
    print('%s is ruing'% os.getpid())
    time.sleep(2)
    print("%s end"% os.getpid())
    t = Thread(target=time.sleep,args=(3,))
    t.start()

if __name__ == '__main__':
    t = Thread(target=func)
    t.start()
    print('')
from multiprocessing import Process
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    t1=Thread(target=foo)
    t2 = Thread(target=bar)

    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")
迷惑人的例子
  • 互斥鎖:把並發變成了串行,犧牲效率提高安全性

進程同步鎖

def func():
    # mutex.acquire()
    print('%s is runing'%os.getpid())
    time.sleep(2)
    print('%s end'%os.getpid())
    time.sleep(random.randint(1,3))
    # mutex.release()

if __name__ == '__main__':
    p = Process(target=func)
    p1 = Process(target=func)
    p2= Process(target=func)
    p3 = Process(target=func)
    p.start()
    p1.start()
    p2.start()
    p3.start()
    # mutex = Lock()
    # p = Process(target=func,args=(mutex,))
    # p1 = Process(target=func,args=(mutex,))
    # p2 = Process(target=func,args=(mutex,))
    # p3 = Process(target=func,args=(mutex,))
    # p.start()
    # p1.start()
    # p2.start()
    # p3.start()

模擬搶票:

from multiprocessing import Process,Lock
import os,time
import json
from random import randint
# def sehch():
#     with open('text.txt',encoding='utf-8')as f:
#         dic = json.load(f)
#     print('%s 剩余票數%s'%(os.getpid(),dic['count']))
# def get():
#
#     with open('text.txt',encoding='utf-8')as f:
#         dic = json.load(f)
#     if dic['count'] > 0:
#         dic['count'] -=1
#         time.sleep(randint(1,3))
#         with open("text.txt",'w',encoding='utf-8')as f:
#             json.dump(dic,f)
#             print('購票成功%s'%os.getpid())
# def task(mutex):
#     sehch()
#     mutex.acquire()
#     get()
#     mutex.release()
# if __name__ == '__main__':
#     mutex = Lock()
#     for i in range(20):
#         p = Process(target=task,args=(mutex,))
#         p.start()
模擬搶票
  •  互斥鎖——線程
from threading import Thread
import time, os, random
n = 100
def func():
    # global n
    # mutex.acquire()
    # tem = n
    # n = tem -1
    # mutex.release()
    global n
    # with mutex:
    tem = n
    time.sleep(0.1)
    n = tem -1
if __name__ == '__main__':
    li = []
    # mutex = Lock()
    for i in range(100):
        t = Thread(target=func)
        li.append(t)
        t.start()  #在創建進程和線程時,使用的Thread, Process 時需要使用start進行啟動
    for t in li:        #使用with時,可以省略使用acquire,和release 兩個函數
        t.join()
    print(n)
結果為99
from threading import Thread
import time, os, random
n = 100
def func():
    # global n
    # mutex.acquire()
    # tem = n
    # n = tem -1
    # mutex.release()
    global n
    with mutex:
        tem = n
        n = tem -1
if __name__ == '__main__':
    li = []
    mutex = Lock()
    for i in range(100):
        t = Thread(target=func)
        li.append(t)
        t.start()  #在創建進程和線程時,使用的Thread, Process 時需要使用start進行啟動
    for t in li:        #使用with時,可以省略使用acquire,和release 兩個函數
        t.join()
    print(n)
jion 和Lock的區別:lock 好處想鎖那一塊就鎖那一塊,可以只鎖修改共享數據那一塊,讓局部變成串行。
互斥鎖:把並發變成串行,保證大家修改同一塊數據的安全,
互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去,如果指定信號量為3,那么來一個人獲得一把鎖,計數加1,當計數等於3時,后面的人均需要等待。一旦釋放,就有人可以獲得一把鎖 信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念
from multiprocessing import Process,Semaphore
import time, os, random
def  task(sm):
    with sm:
        print("%s 快遞"%os.getpid())
        time.sleep(1)
if __name__ == '__main__':
    sm = Semaphore(3)
    for i in range(10):
        p =Process(target=task, args=(sm,))
        p.start()
  • 進程Queue 與線程queue

Queue(3)可以限制放入的數量,但是也可以限制取出的數量,如果取出的數量大於放入的數量,則會處於停頓狀態,一直等待放入數據

(2)Queue 是先進先出

from multiprocessing import    Queue
# q = Queue(3)
# q.put({"a":1})
# q.put({"a":2})
# print(q.get())
  • 線程queue隊列
import queue
q = queue.Queue(3)
q.put('a')
print(q.get())
  • 優先級隊列:
import queue
q = queue.PriorityQueue(3)
q.put((1,21))               #put()參數為元組,元組第一個為優先級,數越小,優先級越高

print(q.get())

“”“
(1, 21)
”“”
  • 堆棧:后進先出
import queue
q = queue.LifoQueue(3)
q.put((1,21))

print(q.get())
"""(1, 21)"""
  •  生產者消費者模型
from multiprocessing import Process, Queue
import time, os, random

def gieve(q):
    for i in range(10):
        res = '包子%s'%i
        time.sleep(0.5)
        q.put(res)
        print('%s 生產了 包子%s'%(os.getpid(),res))
    q.put(None)
def get_e(q):
    while True:
        res = q.get()
        if res == None:
            break
        print('%s 吃了 %s'%(os.getpid(),res))
        time.sleep(random.randint(1,3))

if __name__ == '__main__':
    q = Queue()
    a = Process(target=gieve,args=(q,))
    s = Process(target=get_e,args=(q,))
    a.start()
    s.start()
    # a.join()
    # q.put(None)
    print('')
from multiprocessing import Process, Queue
import time, os, random

def gieve(q):
for i in range(10):
res = '包子%s'%i
time.sleep(0.5)
q.put(res)
print('%s 生產了 包子%s'%(os.getpid(),res))
q.put(None)
def get_e(q):
while True:
res = q.get()
if res == None:
break
print('%s 吃了 %s'%(os.getpid(),res))
time.sleep(random.randint(1,3))

if __name__ == '__main__':
q = Queue()
a = Process(target=gieve,args=(q,))
s = Process(target=get_e,args=(q,))
a.start()
s.start()
# a.join()
# q.put(None)
print('')
  • 信號量
信號量。信號量是用於維持有限資源訪問的信號。它們和鎖類似,除了它們可以允許某個限制下的多個訪問。
它就像電梯一樣只能夠容納幾個人。一旦達到了限制,想要使用資源的進程就必須等待。其它進程釋放了信號量之后,它才可以獲得。 例如,假設有許多進程需要讀取中心數據庫服務器的數據。如果過多的進程同時訪問它,它就會崩潰,所以限制連接數量就是個好主意。
如果數據庫只能同時支持N
=2的連接,我們就可以以初始值N=2來創建信號量。
>>> from threading import Semaphore
>>> db_semaphore = Semaphore(2) # set up the semaphore
>>> database = []
>>> def insert(data):
        db_semaphore.acquire() # try to acquire the semaphore
        database.append(data)  # if successful, proceed
        db_semaphore.release() # release the semaphore
>>> insert(7)
>>> insert(8)
>>> insert(9)

信號量的工作機制是,所有進程只在獲取了信號量之后才可以訪問數據庫。只有N=2個進程可以獲取信號量,其它的進程都需要等到其中一個進程釋放了信號量,之后在訪問數據庫之前嘗試獲取它。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM