python之路多進程和多線程總結(四)


2018-9-23 15:57:51

終於把多進程和多線程東西整理完了!

下次整理那個數據庫->前端->Django

慢慢來! 發現學了好多東西!

 

1. 僵屍進程,孤兒進程
1.1 僵屍進程:子進程結束,父類未結束
1.2 孤兒進程:父類進程over.,子進程未結束

'''
多線程的學習
'''
#使用線程的第一種方式,直接調用threading里面的Thread類
from threading import Thread
import time

def test():
    print("--====昨晚喝多了,下次少喝點!!!")
    time.sleep(1)

def main():
    '''創建線程'''
    for i in range(5):
        #創建一個線程
        t = Thread(target=test)
        t.start()

if __name__ == '__main__':
    main()


#使用線程的第二種方式,封裝成一個類,
import threading
import time

class MyThread1(threading.Thread):
    """創建一個類繼承多線程類"""
    def run(self):
        for i in range(3):
            time.sleep(1)
            msg = "I'm" +slef.name+'@'+str(1) #name屬性中保存的是當前線程的名字
            print(msg)

if __name__ == '__main__':
    t = MyThread1()
    t.start()

2. 線程和進程區別
進程之間不能共享全局變量
線程之間共享全局變量,線程函數中的局部變量不是共享的!!

from threading import Thread
import time 

g_num = 100
def  work1():
    global g_num
    for i in range(3):
        g_num += 1
    print("-----in work1,g_num is %d-----"%g_num)

def work2():
    global g_num
    print("-------in work2 ,g_num is %d-----"%g_num)


print("---------線程創建之前g_num is %d---"%g_num)
t1 = Thread(target=work1)
t1.start()
#延時一會,保證t1線程中的事情做完
time.sleep(1)
t2 = Thread(target=work2)
t2.start()

3. 互斥鎖
關於互斥鎖,解決線程占用的問題!!!
上鎖的代碼越少越好!該加的時候才加!

#--*utf- 8*--
from threading import Thread ,Lock
import time 

g_num = 0

def  work1():
    global g_num
    #上鎖, 這個線程和work2線程都在搶着 對這個鎖進行上鎖,如果由1方成功的上鎖
    #那么導致一方會堵塞(一直等待)到這個鎖被解開位置
    mutex.acquire()
    for i in range(100000):
        g_num += 1
    #解鎖
    #用來對mutex指向的這個鎖, 進行解鎖,,只要開了鎖,那么接下來會讓所有
    #因為這個鎖 被鎖上的鎖 而堵塞的線程,進行搶着上鎖
    mutex.release()
    print("-----in work1,g_num is %d-----"%g_num)

def work2():
    global g_num
    #上鎖
    mutex.acquire()
    for i in range(100000):
        g_num += 1
    #解鎖
    mutex.release()
    print("-------in work2 ,g_num is %d-----"%g_num)


#創建一把互斥鎖,這個鎖默認是沒有上鎖的
mutex =Lock()
def main():
    t1 = Thread(target=work1)
    t1.start()
    #延時一會,保證t1線程中的事情做完
    # time.sleep(1) #取消屏蔽之后,再次運行程序
    t2 = Thread(target=work2)
    t2.start()

if __name__ == '__main__':
    main()

4. 同步應用:
同步的應用!就是一個接着一個

#同步的應用!
from threading import Thread,Lock
from time import sleep

class Task1(Thread):
    def  run(self):
        while True :
            if lock1.acquire():
                print("------Task1----------")
                sleep(0.5)
                #給Task2開鎖!
                lock2.release()


class Task2(Thread):
    def run(self):
        while True:
            if lock2.acquire():
                print("------Task2----------")
                sleep(0.5)
                #給Task3開鎖
                lock3.release()


class Task3(Thread):
    def run(self):
        while True:
            if lock3.acquire():
                print("------Task3----------")
                sleep(0.5)
                lock1.release()

#使用Lock創建出的鎖默認沒有"鎖上"
lock1 = Lock()
#創建另外一個鎖,並且鎖上
lock2 = Lock()
lock2.acquire()
#創建另外一個鎖,並且"鎖上"
lock3 = Lock()
lock3.acquire()

t1 = Task1()
t2 = Task2()
t3 = Task3()

t1.start()
t2.start()
t3.start()

5.降低代碼的耦合性

#解決耦合的問題
#用隊列解決這種問題,起到了緩沖的作用
import threading
import time
# #python2中
# from Queue import Queue

#python3中
from queue import Queue

class Producer(threading.Thread):
    def run(self):
        global queue
        count = 0
        while True:
            if queue.qsize() < 1000:
                for i in range(100):
                    count = count +1
                    msg = '生成產品'+str(count)
                    queue.put(msg)
                    print(msg)
            time.sleep(0.5)


class Consumer(threading.Thread):
    def run(self):
        global queue
        while True:
            if queue.qsize() > 100:
                for i in range(3):
                    msg = self.name + '消費了 '+queue.get()
                    print(msg)
            time.sleep(1)


if __name__ == '__main__':
    queue = Queue()
    for i in range(500):
        queue.put('初始產品'+str(i))
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()

反思: 多看別人的代碼,多問自己是否能寫出來這樣優雅代碼!  代碼一定要低耦合,高優雅!!!敲代碼要多思考!

6.python中的GIL,線程鎖
在python中,多進程效率遠大於多線程效率
python中存在GIL這個"線程鎖",
關鍵地方可以使用c語言解決 GIL問題 然后可以提高cpu占用效率

 

7.異步的實現!
同步調用就是你 喊 你朋友吃飯 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,你們一起去
異步調用就是你 喊 你朋友吃飯 ,你朋友說知道了 ,待會忙完去找你 ,你就去做別的了。

from multiprocessing import Pool
import time
import os

def test():
    print("---進程池中的進程---pid=%d,ppid=%d--"%(os.getpid(),os.getppid()))
    for i in range(3):
        print("----%d---"%i)
        time.sleep(1)
    return "hahah"

def test2(args):
    print("---callback func--pid=%d"%os.getpid())
    print("---callback func--args=%s"%args)

pool = Pool(3)
pool.apply_async(func=test,callback=test2)
time.sleep(5)
print("----主進程-pid=%d----"%os.getpid())

8.反射

    # 通過反射
    # 對象名 獲取對象屬性 和普通方法
    # 類名  獲取靜態屬性 和類方法 和靜態方法
    # 普通方法 self
    # 靜態方法 @staticmethod
    # 類方法  @classmethod
    # 屬性方法@ property
    # 繼承
    # 封裝
    # 反射,, 應用:從類里面獲取字符串
    # 在python中萬物皆對象
class Teacher:
    dic = {"查看學生信息":"show_student","查看講師信息":"show_teacher"}
    def  show_student():
        print("show_student")
 
    def show_teacher():
        print("show_teacher")
    if hasattr(Teacher,"dic"):
        #如果類中有dic 則直接調用
        ret = getattr(Teacher,"dic")  #Teacher.dic  #類也是對象
# print(ret)
alex = Teacher()
for k in Teacher.dic:
    print(k)
key = input("輸入需求:")
func = getattr(alex,Teacher.dic[key])
func()

9.一個_rcv面試題

#類調用屬性,屬性沒有,用__getatrr__魔法方法!
#目的打印出 think different itcast

class Foo(object):
    def  __init__(self):
        pass

    def __getattr__(self,item):
        print(item, end="")
        return self
    def __str__(self):
        return ""

print(Foo().think.different.itcast)

10.python多線程和多進程詳細總結
# python多進程
#!@Author   TrueNewBee

import time
import os
from multiprocessing import Process


def func(args):
    print(args)
    print(54321)
    time.sleep(1)
    print("子進程:",os.getpid())
    print('子進程的父進程:', os.getppid())
    print(12345)


if __name__ =="__main__":
    #如果函數需要傳參,就可以如下,args=() <--這是一個元組一個參數得加,
    #函數名不加括號
    p = Process(target=func,args=(54321,)) #注冊
    #p是一個進程對象,還沒有啟動進程
    #異步(不是同時進行的)
    p.start()       #開啟了一個子進程
    print('*'*10)
    print('父進程:',os.getpid()) #查看當前進程號
    print('父進程的父進程:', os.getppid()) #其實就是pycharm進程數

#進程的生命周期
    #主進程  從運行到運行結束
    #子進程  從start()開始,函數運行完結束
    #開啟了子進程的主進程:
        #自己的代碼如果長,等待自己的代碼執行結束才結束,
        #子進程的執行時間長,主進程會在主進程代碼執行完畢后等待子進程執行完畢后 主進程結束

10.多進程異步

import  time
from multiprocessing import  Process

#多進程打印*   異步
def func(filename,content):
    """定義一個函數執行打印"""
    print('*'*arg1)
    time.sleep(5)
    print('*'*arg2)


if __name__ =="__main__":
    # 異步,並不是按順序同時打印
    # for i in  range(10):
    #     # p = Process(target=func,args=(10*i,20*i))
    #     # p.start()
    p_list = [] #創建一個進程對象的列表
    for i in range(10):
        p = Process(target=func,args=(10*i,20*i))
        p_list.append(p)
        p.start()
        #讓"運行完了最后打印,是個進程都是異步,在此之前所有子進程全部進行完了,然后才是同步
    # for p in p_list:p.join()  列表推導式還原,其實就是一個for循環
    [p.join() for p in p_list]  #列表推導式
    print("運行完了")

import  time
from multiprocessing import  Process


def func(arg1,arg2):
    """定義一個函數執行打印"""
    print('*'*arg1)
    time.sleep(5)
    print('*'*arg2)

# 先打印了 "運行完了"  5秒 再打印 arg2  實現了異步效果
if __name__ =="__main__":
    p = Process(target=func ,args=(10,20))
    p.start()
    print("hhhhhhh") #join()和start()之間的部分仍然是異步,在join()下面才是同步
    # p.join()  #是感知一個子進程的結束,將異步的程序變為同步
    """實現了異步效果"""
    print('========:運行完了')

11.python進程鎖(Demo)

#
# 火車票
import json
import time
from multiprocessing import Process
from multiprocessing import Lock

def show(i):
    with open('ticket') as f:
        dic = json.load(f)
    print('余票: %s'%dic['ticket'])

def buy_ticket(i,lock):
    lock.acquire() #拿鑰匙進門
    with open('ticket') as f:
        dic = json.load(f)
        time.sleep(0.1)
    if dic['ticket'] > 0 :
        dic['ticket'] -= 1
        print('\033[32m%s買到票了\033[0m'%i)
    else:
        print('\033[31m%s沒買到票\033[0m'%i)
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)
    lock.release()      # 還鑰匙

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=show,args=(i,))
        p.start()
    lock = Lock()
    for i in range(10):
        p = Process(target=buy_ticket, args=(i,lock))
        p.start()

12.進程 信號量

回顧:
多進程代碼
from multiprocessing import Process
方法
進程對象.start() 開啟一個子進程
進程對象.join() 感知一個子進程的結束
進程對象.terminate() 結束一個子進程
進程對象.is_alive() 查看某個子進程是否還在運行
屬性
進程對象.name 進程名
進程對象.pid 進程號
進程對象.daemob 值為True的時候,表現新的子進程是一個守護進程
守護進程 隨着主進程代碼的執行結束而結束
一定在start之前設置

rom  multiprocessing import Lock
l = Lock()
l.acquire()    #拿鑰匙
# 會造成數據不安全的操作
l.release()    #換鑰匙

# 信號量
# 多進程中的組件
# 一套資源  同一時間 只能被n個人訪問
# 某一段代碼 同一時間 只能被n個進程執行
import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore


def ktv(arg, sem1):
    sem1.acquire()   # 獲取鑰匙
    """模擬唱歌"""
    print("%s走進ktv"%arg)
    time.sleep(random.randint(1, 5))
    print("%s走出ktv"%arg)
    sem.release()


if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(20):
        p = Process(target=ktv, args=(i, sem))
        p.start()

13.事件

事件:通過一個信號 來控制 多個進程 同時 執行或者阻塞
一個信號可以使所有的進程都進入阻塞狀態
也可以控制所有進程的解除阻塞
一個事件被創建之后,默認是阻塞狀態

set 和 clear
分別用來修改一個事件的狀態 True或者False
is_set 用來查看一個事件的狀態
wait 是根據事件的狀態來解決自己是否在wait處阻塞
False阻塞 True不阻塞

from multiprocessing import Event


def event():
    e = Event()  # 創建一個事件
    print(e.is_set())  # 查看一個事件的狀態,默認設置成阻塞
    e.set()  # 將這個事件的狀態改為True
    print(e.is_set())
    e.wait()  # 是依據e.is_set()的值來決定是否阻塞
    print(123456)
    e.clear()  # 將這個事件的狀態改為False
    print(e.is_set())
    e.wait()    # 等待事件的信號被編程True
    print('*'*10)


if __name__ == '__main__':
    event()


# 紅綠燈事件
# 用事件控制信號,控制進程
import time
import random
from multiprocessing import Process
from multiprocessing import Event


def cars(e2, i1):
    """創建一個車"""
    if not e2.is_set():
        print("car%i1在等待" % i1)
        e2.wait()  # 阻塞,直到得到一個 事件狀態變成True的信號
    print("car%s通過" % i1)


def light(e1):
    """燈是獨立的進程"""
    while True:
        if e1.is_set():
            time.sleep(2)
            e1.clear()
            print("綠燈亮了")
        else:
            e1.set()
            print("紅燈亮了")
        time.sleep(2)


if __name__ == '__main__':
    e = Event()
    traffic = Process(target=light, args=(e, ))
    traffic.start()
    for i in range(20):
        car = Process(target=cars, args=(e, i))
        car.start()
        time.sleep(random.random())

14.隊列

import time
from multiprocessing import Queue


q = Queue(5)
q.put(1)    # 向隊列里面放值
q.put(2)
q.put(3)
q.put(4)
q.put(5)
# 如果隊列滿了再添加則出現堵塞
print(q.full())  # 隊列是否滿了
print(q.get())  # 取出來數
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# 如果隊列全取出了再去將出現阻塞
print(q.empty())    # 判斷隊列是否為空
while True:
    """檢查隊列是否為空"""
    try:
        q.get_nowait()
    except :
        print("隊列已空")
        time.sleep(1)

from multiprocessing import Process
from multiprocessing import Queue


# 隊列生產和消費數據
def produce(q1):
    """隊列存儲數據"""
    q1.put('hello')


def consume(q2):
    """隊列消費數據"""
    print(q2.get())


if __name__ == '__main__':
    q = Queue()
    p = Process(target=produce,args=(q, ))
    p.start()
    c = Process(target=consume, args=(q, ))
    c.start()

15.生產者和消費者 joinableQueue模型:

# 隊列
# 生產者消費者模型  解決供需不平衡的問題
import time
import random
from multiprocessing import Process, JoinableQueue


def consumer(name, q1):
    """負責消費生產的東西"""
    while True:
        food = q1.get()
        if food is None:     # 判斷為空則停止這個循環
            print('%s獲取到一個空' % name)
            break
        print('\033[31m%s消費了%s\033[0m' % (name, food))
        time.sleep(random.randint(0, 2))
        q1.task_done()  # 提交回執 count - 1


def producer(name, food, q1):
    """負責生產包子"""
    for i in range(10):
        time.sleep(random.randint(0, 2))
        f = '%s生產了%s%s' % (name, food, i)
        print(f)
        q1.put(f)
    q1.join()   # 阻塞,直到一個隊列中的數據 全部被執行完畢


if __name__ == '__main__':
    q = JoinableQueue(20)
    p1 = Process(target=producer, args=('Egon', '包子', q))
    p2 = Process(target=producer, args=('WuSir', '泔水', q))
    c1 = Process(target=consumer, args=('jinBoss', q))
    c2 = Process(target=consumer, args=('alex', q))
    p1.start()
    p2.start()
    c1.daemon = True  # 成為守護進程,主進程中的代碼執行完畢之后,子進程自動結束
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()   # 感知一個進程結束
    p2.join()


# 在消費者這一端:
#     每次獲取一個數據
#     處理一個數據
#     發送一個記號:標志一個數據被處理成功

# 在生產者這一端
#     每一次生產一個數據
#     且每一次生產的數據放在隊列中
#     在隊列中刻上一個記號
#     當生產者全部生產完畢后
#     join信號: 已經停止生產數據了
#     且要等待之前被刻上的記號都被消費完
#     當數據都被處理完時,join阻塞結束

# consumer 中把所有的任務消耗完
# produce 端的join感知到,停止阻塞
# 所有的producer進程結束
# 主進程中的p.join結束
# 守護進程(消費者進程)結束

16.生產者和消費者 Queue模型:

# 隊列
# 生產者消費者模型  解決供需不平衡的問題
import time
import random
from multiprocessing import Queue, Process


def consumer(name, q1):
    """負責消費生產的東西"""
    while True:
        food = q1.get()
        if food is None:    # 判斷為空則停止這個循環
            print('%s獲取到一個空' % name)
            break
        print('\033[31m%s消費了%s\033[0m' % (name, food))
        time.sleep(random.randint(0, 2))


def producer(name, food, q1):
    """負責生產包子"""
    for i in range(10):
        time.sleep(random.randint(0, 2))
        f = '%s生產了%s%s' % (name, food, i)
        print(f)
        q1.put(f)


if __name__ == '__main__':
    q = Queue(20)
    p1 = Process(target=producer, args=('Egon', '包子', q))
    p2 = Process(target=producer, args=('WuSir', '泔水', q))
    c1 = Process(target=consumer, args=('jinBoss', q))
    c2 = Process(target=consumer, args=('alex', q))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()   # 感知一個進程結束
    p2.join()
    q.put(None)
    q.put(None)

17.復習總結內容:

復習:
信號量 Semaphore
from multiprocessing import Semaphore
用鎖的原理實現的,內置了一個計數器
在同一時間,只能有指定數量的進程執行某一段被控制的代碼

事件
wait 阻塞收到時間狀態控制的同步組件
狀態 True False is_set
true->false clear()
false->true set()
wait 狀態為True不阻塞 狀態為False的時候阻塞

上面都是為了同步!!!(讓代碼按順序執行)

隊列
Queue
put 當隊列滿的時候阻塞等待隊列有空位置
get 當隊列空的時候阻塞等待隊列有數據
full empty 不完全准確

JoinableQueue
get task_done 這兩個聯用
put join 這兩個聯用

 

18. python進程間數據共享

# 管道
# 數據共享  Manager
# 進程池和回調函數 !
# 管道  實現兩進程間的信息傳遞  
from multiprocessing import Pipe, Process


# 一種使用管道方式
def func(conn3, conn4):
    conn4.close()   # 把多余通道關閉
    while True:
        try:
            msg = conn3.recv()  # 接收消息
            print(msg)
        except EOFError:    # 沒有數據可以取的時候拋出異常
            conn3.close()
            break


if __name__ == '__main__':
    conn1, conn2 = Pipe()  # 接收兩個參數 兩個管道
    Process(target=func, args=(conn1, conn2)).start()
    conn1.close()  # 關閉多余通道
    for i in range(20):
        conn2.send('吃了嗎')   # 發送消息
    conn2.close()
# pipe 數據不安全性  這是一個例子,下面有另外一個解決方案的代碼加上鎖!
# IPC
import time
import random
from multiprocessing import Process, Pipe


def producer(con1, pro1, name, food):
    """通過管道把生產東西傳給消費者"""
    con1.close()
    for i in range(4):
        time.sleep(random.randint(1, 3))
        f = '%s生產%s%s' % (name, food, i)
        print(f)
        pro1.send(f)
    pro1.close()


def consumer(con2, pro2, name1):
    pro2.close()
    while True:
        try:
            food = con2.recv()
            print('%s吃了%s' % (name1, food))
            time.sleep(random.randint(1, 3))
        except EOFError:
            con2.close()
            break


if __name__ == '__main__':
    con, pro = Pipe()
    p = Process(target=producer, args=(con, pro, 'a', '泔水'))
    c1 = Process(target=consumer, args=(con, pro, 'b'))
    c2 = Process(target=consumer, args=(con, pro, 'c'))
    c3 = Process(target=consumer, args=(con, pro, 'd'))
    p.start()
    c1.start()
    c2.start()
    c3.start()
    con.close()
    pro.close()
# 加上鎖解決pipe數據不安全問題
from multiprocessing import Process, Pipe, Lock


def consumer(p, name, lock1):
    produce1, consume1 = p
    produce1.close()
    while True:
        lock1.acquire()
        food=consume1.recv()
        lock.release()
        if food:
            print('%s 收到包子:%s' %(name,baozi))
        else:
            consume1.close()
            break


def producer(p, n):
    produce2, consume2=p
    consume2.close()
    for i in range(n):
        produce2.send(i)
    produce2.send(None)
    produce2.send(None)
    produce2.close()


if __name__ == '__main__':
    produce,consume=Pipe()
    lock = Lock()
    c1 = Process(target=consumer, args=((produce, consume), 'c1', lock))
    c2 = Process(target=consumer, args=((produce, consume), 'c2', lock))
    p1 = Process(target=producer, args=((produce, consume), 10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print('主進程')

加鎖來控制操作管道的行為 來避免進程之間爭搶數據造成的數據不安全現象
隊列 進程之間數據安全的
管道 + 鎖

犧牲效率 保障了數據安全

from multiprocessing import Manager, Process, Lock


def main(dic1, lock1):
    lock1.acquire()  # 加上所也是為了數據不安全問題
    dic1['count'] -= 1
    lock1.release()


if __name__ == '__main__':
    m = Manager()
    lock = Lock()
    dic = m.dict({'count': 100})
    p_list = []
    for i in range(50):
        p = Process(target=main, args=(dic, lock))
        p.start()
        p_list.append(p)
    for i in p_list:
        p.join()
    print('主進程:', dic)

19. python進程池

主要方法  常用的就是   from multiprocessing import Pool
map() 同步
apply() 同步
apply_async()  異步  手動 close()  join()   學的逐漸的由淺入深

為什么會有進程池的概念
  效率
  每開啟進程,開啟屬於這個進程的內存空間
寄存器 堆棧 文件
進程過多 操作系統的調度

進程池
  python中的 先創建一個屬於進程的池子
  這個池子指定能放多少個進程
  先將這些進程創建好

更高級的進程池
  可以根據用戶需求改變進程數量
    自帶join方法,里面是異步
    map(func,range) 里面傳入的參數只能傳入可迭代的  range ,列表,字典等
import time
from multiprocessing import Pool, Process


def func(n):
    for a in range(10):
        print(n+1)


if __name__ == '__main__':
    start = time.time()
    pool = Pool(5)                  # 5個進程
    pool.map(func, range(100))      # 100個任務  異步
    t1 = time.time() - start
    p_list = []
    for i in range(100):
        p = Process(target=func, args=(i, ))
        p_list.append(p)
        p.start()
    for i in p_list:p.join()
    t2 = time.time()-start
    print(t1, t2)


# apply() 方法為同步的
# apply_async() 方法為異步的 一般都是用這個
import time
import os
from multiprocessing import Pool


def func(n):
    print('start func%s' % n, os.getpid())
    time.sleep(1)
    print('end func%s' % n, os.getpid())


if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
        # p.apply(func, args=(i, ))     # 該方法為同步的
        p.apply_async(func, args=(i,))  # 該方法為異步的
    # 使用apply_async 必須加的兩句話
    p.close()   # 結束進程接收任務
    p.join()    # 感知進程池中的任務執行結束


# p = Pool()
# p.map(funcName, iterable)     默認異步的執行任務,且自帶close和join
# p.apply 同步調用的
# p.apply_async 異步調用 和主進程完全異步 需要手動close 和join
import time
from multiprocessing import Pool


def func(i1):
    time.sleep(0.5)
    return i1*i1


if __name__ == '__main__':
    p = Pool()
    # res_list = []   # 儲存res對象 到后面一塊被接收
    # for i in range(10):
    #     res = p.apply_async(func, args=(i, ))   # apply_async的結果就是func的返回值
    #     res_list.append(res)
    #     # res.get()    # get() 等着func的計算結果,阻塞了(同步)
    # for res in res_list:
    #     print(res.get())   # 每五個返回一次數據 讓get()變成了異步

    # map()
    ret = p.map(func, range(100))
    print(ret)  # 整體返回所有數據

20. 回調函數

import os
from multiprocessing import Pool


def func1(n):
    print('in func1',os.getpid())
    return n*n


def func2(nn):
    print('in func2', os.getpid())
    print(nn)


if __name__ == '__main__':
    print('主進程:', os.getpid())  # 回調函數在主進程中執行的
    p = Pool(5)
    for i in range(10):
        p.apply_async(func1, args=(10, ), callback=func2)  # 回調func2返回值編程參數傳給func1
    p.close()
    p.join()

21.進程池方式的 socket

server端

# 用進程池子方法讓服務端接就收多個客戶端發來消息
# apply_async() 異步方法
import socket
from multiprocessing import Pool


def func(conn1):
    conn1.send(b'hello')
    print(conn.recv(1024).decode('utf-8'))
    conn1.close()


if __name__ == '__main__':
    p = Pool(5)
    sk = socket.socket()
    sk.bind(('127.0.0.1', 8080))
    sk.listen()
    while True:
        # 需要異步 多進程就收客戶端發來消息
        conn, add = sk.accept()
        p.apply_async(func, args=(conn, ))
    sk.close()

client 端

import socket

sk = socket.socket()
sk.connect(('127.0.0.1', 8080))
ret = sk.recv(1024).decode('utf-8')
print(ret)
msg = input('>>>').encode('utf-8')
sk.send(msg)
sk.close()

22. 上述復習和總結:

22.1:
        進程池所有內容:
        管道
        數據的共享 Manager dic list
        進程池
          cpu個數+1
          ret = map(func, iterable)
              異步 自帶close和join
              所有結果的[]
          apply()
              同步的 : 只有當func執行完之后,才會繼續向下執行其他的代碼
              apply(func, args=())
              返回值就是func的return
          apply_async
              異步的:當func被注冊進入一個進程之后,程序就繼續向下執行
              apply_async(func, args())
              返回值: apply_async返回的對象
                  為了用戶能從中獲取func的返回對象 obj.get()
              get會阻塞直到對應的func執行完畢拿到結果
              使用apply_async給進程池分配任務,需要線close()后join來保持多進程和主進程代碼的同步性
  22.2:回調函數

                # 回調函數
                from multiprocessing import Pool


                def func1(n):
                    return n+1


                def func2(m):
                    print(m)


                if __name__ == '__main__':
                    p = Pool(5)
                    for i in range(10, 20):
                        p.apply_async(func1, args=(i, ), callback=func2)
                    p.close()
                    p.join()

22.3線程:

import os
import time
from threading import Thread
"""多線程並發,都在同一個進程運行"""

def func(n):
    time.sleep(1)   # 全部線程並發睡1s 然后打印
    print(n, os.getpid())


print('主線程:', os.getpid())
for i in range(10):
    t = Thread(target=func, args=(i, ))
    t.start()
"""使用面向對象方式開啟線程"""
class MyTread(Thread):
    def __init__(self, arg):
        super().__init__()
        self.arg = arg

    def run(self):
        time.sleep(1)
        print(self.arg)

t = MyTread(10)
t.start()    

threading模塊:
multiprocessing模塊的完全模仿threading模塊的接口,
二者在使用層面上有很大的相似地方
修改全局變量
在同一個進程多個線程之間的數據是共享的

def func1(a):
    global g
    g = 0
    print(g, a, os.getpid())

g = 100
t_list = []
for i in range(10):
    t = Thread(target=func1, args=(i, ))
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(g)

進程 是最小的內存分配單位
線程 是操作系統調度的最小單位
線程被cpu執行了
進程內至少含有一個線程
進程中可以開啟多個線程
開啟一個線程所需要的時間要遠遠小於開啟一個進程
多個線程內部有自己的數據棧,數據不共享
全局變量在多個線程之間是共享的
在CPython解釋器下的python程序 在同一時刻 多線程只能有一個線程cpu被執行
高CPU(用多進程處理): 計算類 -----高CPU利用率 不占優勢
高IO(用多線程處理): 爬取網頁 200個網頁
qq聊天 send recv
處理日志文件 讀文件
處理web請求
讀取數據庫 寫數據庫

import time
from threading import Thread
from multiprocessing import Process
"""多線程與多進程時間對比"""

def func(n):
    n+1


if __name__ == '__main__':
    start = time.time()
    t_list = []
    for i in range(100):
        t = Thread(target=func, args=(i, ))
        t.start()
        t_list.append(t)
    for t in t_list:
        t.join()
    t1 = time.time() - start

    start1 = time.time()
    p_list = []
    for i in range(100):
        p = Process(target=func, args=(i, ))
        p.start()
        p_list.append(t)
    for p in p_list:
        p.join()
    t2 = time.time() - start1
    print(t1, t2)

     22.4線程模塊中其他方法

import threading
import time


def func(n):
    time.sleep(0.5)
    # 查看線程名字和id
    print(n, threading.current_thread(), threading.get_ident())


for i in range(10):
    threading.Thread(target=func, args=(i, )).start()
print(threading.current_thread())
print(threading.active_count())  # 查看所有線程數  11 加上主線程
print(threading.enumerate())

23. 多線程寫 socket sever 

sever端

import socket
from threading import Thread


def chat(conn1):
    conn1.send(b'hello')
    msg = conn1.recv(1024).decode('utf-8')
    print(msg)
    inp = input(">>").encode('utf-8')
    conn1.send(inp)
    conn1.close()


if __name__ == '__main__':
    sk = socket.socket()
    sk.bind(('127.0.0.1', 8080))
    sk.listen()
    while True:
        conn, add = sk.accept()
        # 創建一個多線程實現多線程通訊
        Thread(target=chat, args=(conn, )).start()
    sk.close()

client端  (多線程中可以用input  而多進程中不可以用input)

import socket

sk = socket.socket()
sk.connect(('127.0.0.1', 8080))

msg = sk.recv(1024)
print(msg)
inp = input('>>>>').encode('utf-8')  # 多線程可以用input,多進程不可以用
sk.send(inp)
sk.close()

25. 還是復習和總結:

25.1.復習:

正確的學習方法
input
output
correct 糾正

線程:
1.線程是進程中的執行單位
2.線程是cpu執行的最小單位
3.線程之間資源共享
4.線程的開啟和關閉以及切換的時間開銷遠遠小於進程
5.線程本身可以在同一時間使用多個cpu
python 與 線程
CPython解釋器在解釋代碼過程中容易產生數據不安全的問題
GIL全局解釋器鎖 鎖的是線程
threading

25.2.守護線程:

import time
from threading import Thread


def func1():
    while True:
        print('*'*10)
        time.sleep(1)


def func2():
    print('in func2')
    time.sleep(5)


if __name__ == '__main__':
    t = Thread(target=func1, )
    t.daemon = True
    t.start()
    t2 = Thread(target=func2, )
    t2.start()
    t2.join()
    print('主線程')

# 守護進程隨着主進程代碼的結束而結束(進程間資源不共享,所以想咋結束咋結束)
# 守護線程會在主線程結束之后等待其他子線程的結束才結束(線程間資源共享,所以不能主線程不能立馬結束)

# 主進程在執行完自己的代碼之后不會立即結束,而是等待子進程結束之后 揮手子進程的資源

# import time
# from multiprocessing import Process


# def func():
#     time.sleep(5)


# if __name__ == '__main__':
#     Process(target=func, ).start()    

25.3.鎖

# import time
# from threading import Thread, Lock


# def func(lock1):
#     global n
#     lock1.acquire()  # 加上一個鎖
#     # n = 1  python內部就是下面執行的
#     temp = n
#     time.sleep(0.2)
#     n = temp - 1    # 9 剛取回來還沒來得及賦值又被別人拿走了,所以得自己加個鎖不讓被人拿走
#     lock1.release()  # 換鑰匙
#
#
# n = 10
# t_list = []
# lock = Lock()
# for i in range(10):
#     t = Thread(target=func, args=(lock, ))
#     t.start()
#     t_list.append(t)
# for t in t_list:
#     t.join()
# print(n)    # 不加鎖是9 加鎖是 0

# 科學家吃面問題 經典死鎖問題
# noodle_lock = Lock()
# fork_lock = Lock()
# 互斥鎖
#
#
# def eat1(name):
#     noodle_lock.acquire()
#     print('%s拿到面條啦' % name)
#     fork_lock.acquire()
#     print('%s拿到叉子啦' % name)
#     print('%s吃面' % name)
#     fork_lock.release()
#     noodle_lock.release()
#
#
# def eat2(name):
#     fork_lock.acquire()
#     print('%s拿到叉子啦' % name)
#     time.sleep(1)
#     noodle_lock.acquire()
#     print('%s拿到面條啦' % name)
#     print('吃面')
#     noodle_lock.release()
#     fork_lock.release()
#
#
# if __name__ == '__main__':
#     Thread(target=eat1, args=('alex', )).start()
#     Thread(target=eat2, args=('Egon',)).start()
#     Thread(target=eat1, args=('bossJin',)).start()
#     Thread(target=eat2, args=('zeZha',)).start()
import time
from threading import RLock, Thread


fork_lock = noodle_lock = RLock()   # 一個鑰匙串上的兩把鑰匙
# 遞歸鎖   為了解決死鎖問題,可以acquire()多次,


def eat1(name):
    noodle_lock.acquire()   # 一把鑰匙
    print('%s拿到面條啦' % name)
    fork_lock.acquire()
    print('%s拿到叉子啦' % name)
    print('%s吃面' % name)
    fork_lock.release()
    noodle_lock.release()


def eat2(name):
    fork_lock.acquire()
    print('%s拿到叉子啦' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s拿到面條啦' % name)
    print('%s吃面' % name)
    noodle_lock.release()
    fork_lock.release()


if __name__ == '__main__':
    Thread(target=eat1, args=('alex', )).start()
    Thread(target=eat2, args=('Egon',)).start()
    Thread(target=eat1, args=('bossJin',)).start()
    Thread(target=eat2, args=('zeZha',)).start()

25.4.條件和定時器

import time
from threading import Semaphore, Thread


def func(sem1, a, b):
    # 同一時間就讓四個線程執行代碼
    sem1.acquire()
    time.sleep(1)
    print(a+b)
    sem1.release()


if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(10):
        t = Thread(target=func, args=(sem, i, i+5))
        t.start()

25.5.事件

# 事件被創建的時候
# False狀態
#       wait() 阻塞
# True狀態
#       wait() 非阻塞
# clear 設置狀態為False
# set   設置狀態為True


# 數據庫- 文件夾
# 文件夾里有好多excel表格
#   1.能夠更方便的對數據進行增刪改查
#   2.安全訪問的機制


# 偽代碼 看現象:
# 起兩個線程
# 第一個線程:連接數據庫
#   等待一個信號,告訴我我們之間的網絡是通的
#   連接數據庫
# 第二個線程:檢測與數據庫之間的網絡情況是否連通
#       time.sleep(0,2)
#       將事件狀態設置為True

import time
import random
from threading import Thread, Event


def connect_db(e1):
    count = 0
    while count < 3:
        # 連接三次
        e1.wait(0.5)  # 狀態是False的時候,我只等待1s
        if e1.is_set() == True:
            print('連接成功')
            break
        else:
            count += 1
            print('第%s次連接失敗' % count)
    else:
        # 連接三次都都沒連上,主動拋出異常
        raise TimeoutError('數據庫連接超時')


def check_web(e2):
    time.sleep(random.randint(0, 3))
    e2.set()


if __name__ == '__main__':
    e = Event()
    t1 = Thread(target=connect_db, args=(e, ))
    t2 = Thread(target=check_web, args=(e, ))
    t1.start()
    t2.start()

25.6.條件

# 條件
#
# acquire release
# 一個條件被創建之初 默認有一個False狀態
# False狀態 會影響wait()一直處於等待狀態
# notify(int數據類型) 制造一串鑰匙
# 不大重要,只能停留在面試中
from threading import Condition, Thread


def func(con1, i1):
    con1.acquire()
    con1.wait()     # 等鑰匙
    print('在第%s個循環里' % i1)
    con.release()


con = Condition()
for i in range(10):
    Thread(target=func, args=(con, i)).start()
while True:
    num = int(input('>>>'))
    con.acquire()
    con.notify(num)     # 造鑰匙
    con.release()

25.7.定時器

# 用的不多,知道這個組件就好了
import time
from threading import Timer


def func():
    print('時間同步')


if __name__ == '__main__':
    while True:
        Timer(5, func).start()  # 非阻塞的
        time.sleep(5)

25.8.隊列

# queue
import queue


q = queue.Queue()   # 隊列先進先出
# q.put()
# q.get()
# q.put_nowait()
# q.get_nowait()
queue.LifoQueue()   # 棧 先進后出
q.put((50, 'a'))
q.put((30, 'r'))
q.put((1, 'z'))
q.put((1, 'd'))
print(q.get())

25.9.池

import time
from concurrent.futures import ThreadPoolExecutor


def func(n):
    time.sleep(2)
    print(n)
    return n*n


tpool= ThreadPoolExecutor(max_workers=5)   # 默認 不要超過cpu個數*5
t_list = []
for i in range(20):
    t = tpool.submit(func, i)   # 異步提交任務
    t_list.append(t)
tpool.shutdown()  # close+join
print('主線程')
for t in t_list:
    print('****', t.result())

 

26. IO多路復用和協程復習

26.1協程:

進程    啟動多個進程 進程之間是由操作系統負責調用
線程    啟動多個線程 真正被CPU執行的最小單位實際是線程
        開啟一個線程 創建一個線程 寄存器 堆棧
        關閉一個線程
協程
        本質上是一個線程
        能夠在多個任務之間切換來節省一些IO時間
實現並發的手段
def consumer():
    """創建一個生成器"""
    while True:
        x = yield
        print('處理了數據', x)


def producer():
    c = consumer()
    next(c)
    for i in range(10):
        print('生產了數據:', i)
        c.send(i)


producer()

真正的協程模塊就是使用greenlet完成的切換

from greenlet import greenlet
def eat():
    print('eating start')
    g2.switch()     # 切換到play
    print('eating end')
    g2.switch()


def play():
    print('playing start ')
    g1.switch()
    print('playing end')


if __name__ == '__main__':
    # 用於切換線程
    g1 = greenlet(eat)
    g2 = greenlet(play)
    g1.switch()

放在開頭,是為了識別time (IO)

from gevent import monkey; monkey.patch_all()
import time
import gevent
import threading


def eat():
    print(threading.current_thread())   # 查看線程名字
    print('eating start')
    time.sleep(1)   # gevent 檢測到停1s,則調到另外一個函數中
    print('eating end')


def play():
    print(threading.current_thread())
    print('playing start ')
    time.sleep(1)
    print('playing end')


if __name__ == '__main__':
    g1 = gevent.spawn(eat)  # 開啟協程
    g2 = gevent.spawn(play)
    g1.join()
    g2.join()

進程和線程的任務切換由操作系統完成
協程任務之間的切換由程序(代碼)完成 只有遇到協程模塊能識別的IO操作的時候,程序才會進行任務切換實現並發效果

同步 和 異步 (網絡操作常用協程)

from gevent import monkey; monkey.patch_all()
import time
import gevent


def task():
    time.sleep(1)
    print(12345)


def sync():
    for i in range(10):
        task()


def async():
    g_list = []
    for i in range(10):
        g = gevent.spawn(task)
        g_list.append(g)
    gevent.joinall(g_list)  # for g in g_list :g.join()


if __name__ == '__main__':
    sync()
    async()
協程 : 能夠在一個線程中實現並發效果的概念
       能夠規避一些任務中的IO操作
       在任務的執行過程中,檢測到IO就切換到其他任務

多線程  被弱化了
協程: 在一個線程上,提高cpu的利用率
協程相比於多線程的優勢  切換的效率更快了


爬蟲例子(正則基礎)
請求過程中的IO等待

 

from gevent import monkey;monkey.patch_all()
import gevent
from urllib.request import urlopen


def get_url(url1):
    response = urlopen(url1)
    content = response.read().decode('utf-8')   # 有各式的
    return len(content)


url = {
    'http://www.baidu.com',
    'http://www.taobao.com',
    'http://www.hao123.com',
}
g_list = []
for i in url:
    g = gevent.spawn(get_url, i)
    g_list.append(g)
gevent.joinall(g_list)
for g in g_list:
    print(g.value)

    26.2用協程寫 socket_demo
socket 

# 用協程寫 socket
# 用協程是最快最方便的  最省時間占用最小,代碼間的轉換
from gevent import monkey; monkey.patch_all()
import socket
import gevent


def talk(conn1):
    conn1.send(b'hello')
    rec = conn.recv(1024).decode('utf-8')
    print(rec)
    conn.close()


if __name__ == '__main__':
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    while True:
        conn, add = sk.accept()
        gevent.spawn(talk, conn)
    sk.close()

client
import socket


sk = socket.socket()
sk.connect(('127.0.0.1', 8080))
rec = sk.recv(1024).decode('utf-8')
print(rec)
msg = input('>>>>').encode('utf-8')
sk.send(msg)
sk.close()

26.3IO模型 筆記

同步 : 提交一個任務之后要等待這個任務執行完畢
異步 : 只管提交任務,不等待這個任務執行完畢就可以做其他事情
阻塞 : input  urlopen()  在socket里面:recv() recvfrom() accept
非阻塞 : 除了阻塞的其他都是非阻塞

阻塞  線程  運行狀態 --> 阻塞狀態-->就緒
非阻塞

IO多路復用
  select機制   Windows和linux            都是操作系統輪詢每一個被監聽的項,看是否讀操作
  poll機制      linux   它可以監聽的對象比select機制可以監聽的多
                          隨着監聽項的增多,導致效率降低
  epoll機制     linux

  26.4.非阻塞模型

socket

# 非阻塞IO模型
# 單線程中非阻塞!(沒有用協程!)
import socket

sk = socket.socket()
sk.bind(('127.0.0.1', 8080))
sk.setblocking(False)   # 默認True阻塞, False非阻塞
sk.listen()
conn_list = []
del_conn = []   # 存入失效連接的列表
while True:
    # 接收異常 BlockingIOError 完成非阻塞
    try:
        conn, add = sk.accept()     # 不阻塞,但沒人連我會報錯
        print('建立連接了', add)
        # msg = conn.recv(1024)       # 不阻塞,但沒有消息會報錯
        # print(msg)
        conn_list.append(conn)
    except BlockingIOError:
        # 循環列表連接 看看是否有人發消息
        for con in conn_list:
            try:
                msg = con.recv(1024)    # 不阻塞,但沒有消息會報錯
                if msg == b'':
                    del_conn.append(con)    # 把失效的連接存到del_conn中
                    continue
                print(msg)
                con.send(b'bye bye')
            except BlockingIOError:
                pass
        for con in del_conn:
            con.close()
            conn_list.remove(con)   # 在conn_list中刪除失效連接
        del_conn.clear()    # 清空刪除列表

client

# 非阻塞IO 多線程並發socket IO
import time
import socket
import threading


def func():
    sk = socket.socket()
    sk.connect(('127.0.0.1', 8080))
    sk.send(b'hello')
    time.sleep(1)
    msg = sk.recv(1024)
    print(msg)
    sk.close()


for i in range(20):
    threading .Thread(target=func).start()

26.5.IO多路復用 

socket

# IO多路復用 多並發!
import select
import socket

sk = socket.socket()
sk.bind(('127.0.0.1', 8080))
sk.setblocking(False)
sk.listen()

read_list = [sk]  # 儲存監聽對象
while True:  # [sk, conn]  sk,發送鏈接    conn監聽發送消息
    r_list, w_list, x_list = select.select(read_list, [], [])
    for i in r_list:
        if i is sk:
            conn, add = i.accept()  # 沒有sk, 有conn則會報錯
            read_list.append(conn)
        else:
            ret = i.recv(1024)
            if ret == b'':
                i.close()
                read_list.remove(i)
                continue
            print(ret)
            i.send(b'goodbye')

client

import socket
import threading
import time


def func():
    sk = socket.socket()
    sk.connect(('127.0.0.1', 8080))
    sk.send(b'hello')
    time.sleep(1)
    sk.recv(1024)
    sk.close()


for i in range(20):
    threading .Thread(target=func).start()

26.6selector_dome

# 服務端
from socket import *
import selectors
sel = selectors.DefaultSelector()


def accept(server_fileobj, mask):
    conn, addr = server_fileobj.accept()
    sel.register(conn, selectors.EVENT_READ, read)


def read(conn, mask):
    try:
        data = conn.recv(1024)
        if not data:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()


sk = socket(AF_INET, SOCK_STREAM)
sk.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sk.bind(('127.0.0.1', 8088))
sk.listen(5)
sk.setblocking(False)  # 設置socket的接口為非阻塞
# 相當於網select的讀列表里append了一個文件句柄server_fileobj,並且綁定了一個回調函數accept
sel.register(sk, selectors.EVENT_READ, accept)
# 說白了就是,如果有人請求連接sk,就調用accept方法

while True:
    events = sel.select()  # 檢測所有的sk,conn,是否有完成wait data的
    for sel_obj, mask in events:    # [sk]
        callback = sel_obj.data  # callback = accept
        callback(sel_obj.fileobjmask)  # accept(server_fileobj,1)
#客戶端
# 基於selectors模塊實現聊天
from socket import *
c=socket(AF_INET, SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data = c.recv(1024)
    print(data.decode('utf-8'))

2018-9-23 16:18:40 終於整理完了

下次開始整理數據庫內容 目前已經把博客python知識整理完了!好有成就感!

下次整理 7.26數據庫 (1)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM