Python之路【第七篇續】:進程、線程、協程


Socket Server模塊

SocketServer內部使用 IO多路復用 以及 “多線程” 和 “多進程” ,從而實現並發處理多個客戶端請求的Socket服務端。即:每個客戶端請求連接到服務器時,Socket服務端都會在服務器是創建一個“線程”或者“進 程” 專門負責處理當前客戶端的所有請求。

socket server 和 select & epoll 還是不太一樣他的本質是:客戶端第一次鏈接的時候,只要一進來,我服務端有個while循環為你創建一個
線程和進程,客戶端就和服務端直接創建通信,以后傳送數據什么的就不會通過server端了,直接他倆通過線程或者進程通信就可以了!

如果在多進程的時候,client1和client2他們同時傳輸10G的文件都是互相不影響!
如果在多線程的時候,python中的多線程,在同一時間只有一個線程在工作,他底層會自動進行上下文切換,client1傳一點,client2傳一點。

知識回顧:
python中的多線程,有一個GIL在同一時間只有一個線程在工作,他底層會自動進行上下文切換.
這樣會導致python的多線程效率會很低,也就是人們經常說的python多線程問題

如下圖:

第一次連接后,數據通訊就通過線程或進程進行數據交換(紅色箭頭)

ThreadingTCPServer

ThreadingTCPServer實現的Soket服務器內部會為每個client創建一個 “線程該線程用來和客戶端進行交互。

1、ThreadingTCPServer基礎

使用ThreadingTCPServer:

  • 創建一個繼承自 SocketServer.BaseRequestHandler 的類
  • 類中必須定義一個名稱為 handle 的方法
  • 啟動ThreadingTCPServer
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self): #定義handle方法
        # print self.request,self.client_address,self.server
        conn = self.request #如果連接請求過來,獲取client端對象
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.') #發送一個信息
        Flag = True #並把Flag設置為True
        while Flag:當Flag為True的時候執行
            data = conn.recv(1024) #接收client端數據
            if data == 'exit': #判斷如果data  == 'exit' 退出
                Flag = False #並把Flag設置為Flase
            elif data == '0': #如果為 == ‘0’ 
                conn.sendall('通過可能會被錄音.balabala一大推') #發送數據
            else:#上面的都沒匹配上,發送請重新輸入
                conn.sendall('請重新輸入.') 


if __name__ == '__main__':
    server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer) #實例化對象,設置啟動的IP/PORT並把自己定義的類寫上作為SocketServer.ThreadingTCPServer的構造函數
    server.serve_forever() #調用對象中的啟動方法
Socket Server
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()
Socket Client

2、ThreadingTCPServer源碼剖析

 

學會看源碼非常重要!不能僅僅光會用!大贊~ 知道他的過程和實現~ 怎么學會看源碼呢?多看然后畫類圖,如上圖!!!

在理解的時候可以把他們想象為,把所有需要用的方法,都抓到ThreadingTCPServer中

內部調用流程為:

  • 啟動服務端程序
  • 執行 TCPServer.__init__ 方法,創建服務端Socket對象並綁定 IP 和 端口
  • 執行 BaseServer.__init__ 方法,將自定義的繼承自SocketServer.BaseRequestHandler 的類 MyRequestHandle賦值給 self.RequestHandlerClass
  • 執行 BaseServer.server_forever 方法,While 循環一直監聽是否有客戶端請求到達 ...
  • 當客戶端連接到達服務器
  • 執行 ThreadingMixIn.process_request 方法,創建一個 “線程” 用來處理請求
  • 執行 ThreadingMixIn.process_request_thread 方法
  • 執行 BaseServer.finish_request 方法,執行 self.RequestHandlerClass()  即:執行 自定義 MyRequestHandler 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyRequestHandler的handle方法)

精簡源碼:

模擬Socekt Server的簡化版本:

import socket
import threading
import select


def process(request, client_address): #模擬定義的handle()方法,這個方法內的代碼是socket server與Client端交互代碼
    print request,client_address
    conn = request
    conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
    flag = True
    while flag:
        data = conn.recv(1024)
        if data == 'exit':
            flag = False
        elif data == '0':
            conn.sendall('通過可能會被錄音.balabala一大推')
        else:
            conn.sendall('請重新輸入.')

sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(('127.0.0.1',8002))
sk.listen(5)

while True:  #這里一個while循環循環監控sk文件句柄
    r, w, e = select.select([sk,],[],[],1)
    print 'looping'
    if sk in r: #當sk文件句柄發生變化的時候說明是新的客戶端連接過來了
        print 'get request'
        request, client_address = sk.accept()
        t = threading.Thread(target=process, args=(request, client_address)) #創建一個線程,並調用自己定義的process方法執行~然后樣客戶端與之交互
        t.daemon = False
        t.start()

sk.close()

如精簡代碼可以看出,SocketServer的ThreadingTCPServer之所以可以同時處理請求得益於 selectThreading 兩個東西,其實本質上就是在服務器端為每一個客戶端創建一個線程,當前線程用來處理對應客戶端的請求,所以,可以支持同時n個客戶端鏈接(長連接)。

ForkingTCPServer

ForkingTCPServer和ThreadingTCPServer的使用和執行流程基本一致,只不過在內部分別為請求者建立 “線程”  和 “進程”。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('通過可能會被錄音.balabala一大推')
            else:
                conn.sendall('請重新輸入.')


if __name__ == '__main__':
    server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()
socket server - multiprocess
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()
client

以上ForkingTCPServer只是將 ThreadingTCPServer 實例中的代碼:

server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyRequestHandler)
變更為:
server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyRequestHandler)

 Python線程

Threading用於提供線程相關的操作,線程是應用程序中工作的最小單元。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
  
def show(arg):
    time.sleep(2)
    print 'thread'+str(arg)
  
for i in range(10):
    t = threading.Thread(target=show, args=(i,))  #這里實例化對象的時候傳的兩個參數第一個參數是,線程需要執行的方法,第二個參數方法的參數
    t.start()
  
print 'main thread stop'

上述代碼創建了10個“前台”線程,然后控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。

再次回顧:這里為什么是分片執行?

python中的多線程,有一個GIL(Global Interpreter Lock 全局解釋器鎖 )在同一時間只有一個線程在工作,他底層會自動進行上下文切換.這個線程執行點,那個線程執行點!

更多方法:

  • start       線程准備就緒,等待CPU調度
  • setName     為線程設置名稱
  • getName     獲取線程名稱
  • setDaemon   設置為后台線程或前台線程(默認)
  •             如果是后台線程,主線程執行過程中,后台線程也在進行,主線程執行完畢后,后台線程不論成功與否,均停止
  •             如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,程序停止
  • join        逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義
  • run         線程被cpu調度后執行Thread類對象的run方法

線程鎖

由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,CPU接着執行其他線程。所以,可能出現如下問題:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

gl_num = 0

def show(arg):
    global gl_num
    time.sleep(1)
    gl_num +=1
    print gl_num

for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()

print 'main thread stop'
thread nolock

設置線程鎖

#!/usr/bin/env python
#coding:utf-8
   
import threading
import time
   
gl_num = 0
   
lock = threading.RLock() #實例化調用線程鎖
   
def Func():
    lock.acquire() #獲取線程鎖
    global gl_num
    gl_num +=1
    time.sleep(1)
    print gl_num
    lock.release() #釋放線程鎖,這里注意,在使用線程鎖的時候不能把鎖,寫在代碼中,否則會造成阻塞,看起來“像”單線程
       
for i in range(10):
    t = threading.Thread(target=Func)
    t.start()

event

他的作用就是:用主線程控制子線程合適執行,他可以讓子線程停下來,也可以讓線程繼續!
他實現的機制就是:標志位“Flag”

事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。

  • clear:將“Flag”設置為False
  • set:將“Flag”設置為True
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import threading
 
 
def do(event):
    print 'start'
    event.wait() #執行對象weit方法,然后他們停下來,等待“Flag”為True
    print 'execute'
 
 
event_obj = threading.Event() #創建事件的對象

for i in range(10):
    t = threading.Thread(target=do, args=(event_obj,)) #吧對象傳到每個線程里面了~
    t.start()
 
event_obj.clear()  #設置"Flag"為Flase

inp = raw_input('input:')
if inp == 'true':
    event_obj.set()
    
#thread enent 就是這個3個方法的使用

Python進程

from multiprocessing import Process
import threading
import time
  
def foo(i):
    print 'say hi',i
  
for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()

注意:由於進程之間的數據需要各自持有一份,所以創建進程需要的非常大的開銷。並且python不能再Windows下創建進程!

並且在使用多進程的時候,最好是創建多少個進程?:和CPU核數相等

默認的進程之間相互是獨立,如果想讓進程之間數據共享,就得有個特殊的數據結構,這個數據結構就可以理解為他有穿牆的功能
如果你能穿牆的話兩邊就都可以使用了
使用了3種方法


默認的進程無法進行數據共享:

#!/usr/bin/env python
#coding:utf-8
 
from multiprocessing import Process
from multiprocessing import Manager
 
import time
 
li = []
 
def foo(i):
    li.append(i)
    print 'say hi',li
  
for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()
     
print 'ending',li

使用特殊的數據類型,來進行穿牆:

默認的進程之間相互是獨立,如果想讓進程之間數據共享,就得有個特殊的數據結構,這個數據結構就可以理解為他有穿牆的功能
如果你能穿牆的話兩邊就都可以使用了
使用了3種方法


第一種方法:

#通過特殊的數據結構:數組(Array)

from multiprocessing import Process,Array

#創建一個只包含數字類型的數組(python中叫列表)
#並且數組是不可變的,在C,或其他語言中,數組是不可變的,之后再python中數組(列表)是可以變得
#當然其他語言中也提供可變的數組
#在C語言中數組和字符串是一樣的,如果定義一個列表,如果可以增加,那么我需要在你內存地址后面再開辟一塊空間,那我給你預留多少呢?
#在python中的list可能用鏈表來做的,我記錄了你前面和后面是誰。   列表不是連續的,數組是連續的

'''
上面不是列表是“數組"數組是不可變的,附加內容是為了更好的理解數組!
'''

temp = Array('i', [11,22,33,44]) #這里的i是C語言中的數據結構,通過他來定義你要共享的內容的類型!點進去看~
 
def Foo(i):
    temp[i] = 100+i
    for item in temp:
        print i,'----->',item
 
for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
    
第二種方法:
#方法二:manage.dict()共享數據
from multiprocessing import Process,Manager  #這個特殊的數據類型Manager
 
manage = Manager()
dic = manage.dict() #這里調用的時候,使用字典,這個字典和咱們python使用方法是一樣的!
 
def Foo(i):
    dic[i] = 100+i
    print dic.values()
 
for i in range(2):
    p = Process(target=Foo,args=(i,))
    p.start()
    p.join()

OK那么問題來了,既然進程之間可以進行共享數據,如果多個進程同時修改這個數據是不是就會造成臟數據?是不是就得需要鎖!

進程的鎖和線程的鎖使用方式是非常一樣的知識他們是用的類是在不同地方的

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process, Array, RLock

def Foo(lock,temp,i):
    """
    將第0個數加100
    """
    lock.acquire()
    temp[0] = 100+i
    for item in temp:
        print i,'----->',item
    lock.release()

lock = RLock()
temp = Array('i', [11, 22, 33, 44])

for i in range(20):
    p = Process(target=Foo,args=(lock,temp,i,))
    p.start()
進程鎖

 進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。

進程池中有兩個方法:

  • apply
  • apply_async
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from  multiprocessing import Process,Pool
import time
  
def Foo(i):
    time.sleep(2)
    return i+100
  
def Bar(arg):
    print arg
  
pool = Pool(5) #創建一個進程池
#print pool.apply(Foo,(1,))#去進程池里去申請一個進程去執行Foo方法
#print pool.apply_async(func =Foo, args=(1,)).get()
  
for i in range(10):
    pool.apply_async(func=Foo, args=(i,),callback=Bar)
  
print 'end'
pool.close()
pool.join()#進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。

'''
apply 主動的去執行
pool.apply_async(func=Foo, args=(i,),callback=Bar) 相當於異步,當申請一個線程之后,執行FOO方法就不管了,執行完之后就在執行callback ,當你執行完之后,在執行一個方法告訴我執行完了
callback 有個函數,這個函數就是操作的Foo函數的返回值!
'''

協程

首先要明確,線程和進程都是系統幫咱們開辟的,不管是thread還是process他內部都是調用的系統的API
而對於協程來說它和系統毫無關系!
他就和程序員有關系,對於線程和進程來說,調度是由CPU來決定調度的!
對於協程來說,程序員就是上帝,你想讓誰執行到哪里他就執行到哪里

協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。

適用場景:其實在其他語言中,協程的其實是意義不大的多線程即可已解決I/O的問題,但是在python因為他有GIL(Global Interpreter Lock 全局解釋器鎖 )在同一時間只有一個線程在工作,所以:如果一個線程里面I/O操作特別多,協程就比較適用

greenlet

收先要明確,線程和進程都是系統幫咱們開辟的,不管是thread還是process他內部都是調用的系統的API
而對於協程來說它和系統毫無關系!
他就和程序員有關系,對於線程和進程來說,是不是有CPU來決定調度的!
對於協程來說,程序員就是上帝,你想讓誰執行到哪里他就執行到哪里

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
 
from greenlet import greenlet
 
 
def test1():
    print 12
    gr2.switch()#切換到協程2執行
    print 34 #2切回來之后,在這里和yield類似
    gr2.switch() 
 
 
def test2():
    print 56
    gr1.switch()#上面執行了一句,在切換到協程1里去了
    print 78
 
gr1 = greenlet(test1) #創建了一個協程
gr2 = greenlet(test2)

gr1.switch() #執行test1 

'''
比I/O操作,如果10個I/O,我程序從上往下執行,如果同時發出去了10個I/O操作,那么返回的結果如果同時回來了2個
,是不是就節省了很多時間?

如果一個線程里面I/O操作特別多,使用協程是不是就非常適用了!

如果一個線程訪問URL通過協程來做,協程告訴它你去請求吧,然后繼續執行,但是如果不用協程就得等待第一個請求完畢之后返回之后才
繼續下一個請求。

協程:把一個線程分成了多個協程操作,每個協程做操作
多線程:是把每一個操作,分為多個線程做操作,但是python中,在同一時刻只能有一個線程操作,並且有上下文切換。但是如果上下文切換非常頻繁的話
是非常耗時的,但對於協程切換就非常輕便了~


'''

協程就是對線程的分片,上面的例子需要手動操作可能用處不是很大了解原理,看下面的例子:

上面的greenlet是需要認為的制定調度順序的,所以又出了一個gevent他是對greenlet功能進行封裝

 遇到I/O自動切換

from gevent import monkey; monkey.patch_all()
import gevent
import urllib2

def f(url):
    print('GET: %s' % url)
    resp = urllib2.urlopen(url) #當遇到I/O操作的時候就會調用協程操作,然后繼續往下走,然后這個協程就卡在這里等待數據的返回
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),  #這里的f是調用這個方法,第二個是調用方的參數
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
]) 

'''
gevent.spawn(f, 'https://www.python.org/'),  #這里的f是調用這個方法,第二個是調用方的參數

當函數f里的代碼遇到I/O操作的時候,函數就卡在哪里等待數據的返回,但是協程不會等待而是繼續操作!
'''

 

更多請參考:http://www.cnblogs.com/wupeiqi/articles/5040823.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM