#首先,什么場合下用進程,什么場合下用線程:
. 計算密集型的用進程。
. IO密集型的用進程。
xSocket語法及相關
Socket Families(地址簇)
socket.AF_UNIX unix本機進程間通信
socket.AF_INET IPV4
socket.AF_INET6 IPV6
上面的這些內容代表地址簇,創建socket必須指定,默認為IPV4
Socket Types
socket.SOCK_STREAM #for tcp
socket.SOCK_DGRAM #for udp
socket.SOCK_RAW #原始套接字,普通的套接字無法處理ICMP、IGMP等網絡報文,而SOCK_RAW可以;其次,SOCK_RAW也可以處理特殊的IPv4報文;此外,利用原始套接字,可以通過IP_HDRINCL套接字選項由用戶構造IP頭。
socket.SOCK_RDM #是一種可靠的UDP形式,即保證交付數據報但不保證順序。SOCK_RAM用來提供對原始協議的低級訪問,在需要執行某些特殊操作時使用,如發送ICMP報文。SOCK_RAM通常僅限於高級用戶或管理員運行的程序使用。
socket.SOCK_SEQPACKET #廢棄了
Socket 方法
socket.socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)
Create a new socket using the given address family, socket type and protocol number. The address family should be AF_INET (the default), AF_INET6, AF_UNIX, AF_CAN or AF_RDS. The socket type should beSOCK_STREAM (the default), SOCK_DGRAM, SOCK_RAW or perhaps one of the other SOCK_ constants. The protocol number is usually zero and may be omitted or in the case where the address family is AF_CAN the protocol should be one of CAN_RAW or CAN_BCM. If fileno is specified, the other arguments are ignored, causing the socket with the specified file descriptor to return. Unlike socket.fromfd(), fileno will return the same socket and not a duplicate. This may help close a detached socket using socket.close().
socket.socketpair([family[, type[, proto]]])
Build a pair of connected socket objects using the given address family, socket type, and protocol number. Address family, socket type, and protocol number are as for the socket() function above. The default family is AF_UNIX if defined on the platform; otherwise, the default is AF_INET.
()
socket.create_connection(address[, timeout[, source_address]])
Connect to a TCP service listening on the Internet address (a 2-tuple (host, port)), and return the socket object. This is a higher-level function than socket.connect(): if host is a non-numeric hostname, it will try to resolve it for both AF_INET and AF_INET6, and then try to connect to all possible addresses in turn until a connection succeeds. This makes it easy to write clients that are compatible to both IPv4 and IPv6.
Passing the optional timeout parameter will set the timeout on the socket instance before attempting to connect. If no timeout is supplied, the global default timeout setting returned by getdefaulttimeout() is used.
If supplied, source_address must be a 2-tuple (host, port) for the socket to bind to as its source address before connecting. If host or port are ‘’ or 0 respectively the OS default behavior will be used.
socket.getaddrinfo(host, port, family=0, type=0, proto=0, flags=0) #獲取要連接的對端主機地址
sk.bind(address)
s.bind(address) 將套接字綁定到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。
sk.listen(backlog)
開始監聽傳入連接。backlog指定在拒絕連接之前,可以掛起的最大連接數量。
backlog等於5,表示內核已經接到了連接請求,但服務器還沒有調用accept進行處理的連接個數最大為5
這個值不能無限大,因為要在內核中維護連接隊列
sk.setblocking(bool)
是否阻塞(默認True),如果設置False,那么accept和recv時一旦無數據,則報錯。
sk.accept()
接受連接並返回(conn,address),其中conn是新的套接字對象,可以用來接收和發送數據。address是連接客戶端的地址。
接收TCP 客戶的連接(阻塞式)等待連接的到來
sk.connect(address)
連接到address處的套接字。一般,address的格式為元組(hostname,port),如果連接出錯,返回socket.error錯誤。
sk.connect_ex(address)
同上,只不過會有返回值,連接成功時返回 0 ,連接失敗時候返回編碼,例如:10061
sk.close()
關閉套接字
sk.recv(bufsize[,flag])
接受套接字的數據。數據以字符串形式返回,bufsize指定最多可以接收的數量。flag提供有關消息的其他信息,通常可以忽略。
sk.recvfrom(bufsize[.flag])
與recv()類似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。
sk.send(string[,flag])
將string中的數據發送到連接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。即:可能未將指定內容全部發送。
sk.sendall(string[,flag])
將string中的數據發送到連接的套接字,但在返回之前會嘗試發送所有數據。成功返回None,失敗則拋出異常。
內部通過遞歸調用send,將所有內容發送出去。
sk.sendto(string[,flag],address)
將數據發送到套接字,address是形式為(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。該函數主要用於UDP協議。
sk.settimeout(timeout)
設置套接字操作的超時期,timeout是一個浮點數,單位是秒。值為None表示沒有超時期。一般,超時期應該在剛創建套接字時設置,因為它們可能用於連接的操作(如 client 連接最多等待5s )
sk.getpeername()
返回連接套接字的遠程地址。返回值通常是元組(ipaddr,port)。
sk.getsockname()
返回套接字自己的地址。通常是一個元組(ipaddr,port)
sk.fileno()
套接字的文件描述符
socket.sendfile(file, offset=0, count=None)
發送文件 ,但目前多數情況下並無什么卵用。
SocketServer (每一個連接過來都創建一個線程)
The socketserver module simplifies the task of writing network servers.
There are four basic concrete server classes:
-
class
socketserver.TCPServer(server_address, RequestHandlerClass, bind_and_activate=True) -
This uses the Internet TCP protocol, which provides for continuous streams of data between the client and server. If bind_and_activate is true, the constructor automatically attempts to invoke
server_bind()andserver_activate(). The other parameters are passed to theBaseServerbase class.
-
class
socketserver.UDPServer(server_address, RequestHandlerClass, bind_and_activate=True) -
This uses datagrams, which are discrete packets of information that may arrive out of order or be lost while in transit. The parameters are the same as for
TCPServer.
-
class
socketserver.UnixStreamServer(server_address, RequestHandlerClass, bind_and_activate=True) 本機 -
class
socketserver.UnixDatagramServer(server_address, RequestHandlerClass,bind_and_activate=True) 本機 -
These more infrequently used classes are similar to the TCP and UDP classes, but use Unix domain sockets; they’re not available on non-Unix platforms. The parameters are the same as for
TCPServer.
These four classes process requests synchronously; each request must be completed before the next request can be started. This isn’t suitable if each request takes a long time to complete, because it requires a lot of computation, or because it returns a lot of data which the client is slow to process. The solution is to create a separate(獨立) process or thread(線程) to handle each request; the ForkingMixIn and ThreadingMixIn mix-in classes can be used to support asynchronous behaviour.
There are five classes in an inheritance diagram, four of which represent synchronous servers of four types:
這里有五個類,在下面的繼承圖里
+------------+
| BaseServer | +------------+ | v +-----------+ +------------------+ | TCPServer |------->| UnixStreamServer | +-----------+ +------------------+ | v +-----------+ +--------------------+ | UDPServer |------->| UnixDatagramServer | +-----------+ +--------------------+
Note that UnixDatagramServer derives from UDPServer, not from UnixStreamServer — the only difference between an IP and a Unix stream server is the address family, which is simply repeated in both Unix server classes.
-
class
socketserver.ForkingMixIn -
class
socketserver.ThreadingMixIn -
Forking and threading versions of each type of server can be created using these mix-in classes. For instance,
ThreadingUDPServeris created as follows:class ThreadingUDPServer(ThreadingMixIn, UDPServer): passThe mix-in class comes first, since it overrides a method defined in
UDPServer. Setting the various attributes also changes the behavior of the underlying server mechanism.常用的為下面的四個
-
class
socketserver.ForkingTCPServer -
class
socketserver.ForkingUDPServer -
class
socketserver.ThreadingTCPServer -
class
socketserver.ThreadingUDPServer - These classes are pre-defined using the mix-in classes.
Request Handler Objects
class socketserver.BaseRequestHandler
This is the superclass of all request handler objects. It defines the interface, given below. A concrete request handler subclass must define a new handle() method, and can override any of the other methods. A new instance of the subclass is created for each request.
setup()
Called before the handle() method to perform any initialization actions required. The default implementation does nothing.
handle() 所有客戶端的請求都是在handle處理
This function must do all the work required to service a request. The default implementation does nothing. Several instance attributes are available to it; the request is available as self.request; the client address as self.client_address; and the server instance as self.server, in case it needs access to per-server information.
The type of self.request is different for datagram or stream services. For stream services,self.request is a socket object; for datagram services, self.request is a pair of string and socket.
finish()
Called after the handle() method to perform any clean-up actions required. The default implementation does nothing. If setup() raises an exception, this function will not be called.
寫一個簡單的socketserver 和客戶端client的事例
import socketserver class MyTcpServer(socketserver.BaseRequestHandler): def handle(self): while True: print("NEW:",self.client_address) Server_recv=self.request.recv(1024) print("client:",Server_recv.decode()) self.request.send(Server_recv) if __name__ == "__main__": HOST,PORT = "localhost",5007 # 把剛才寫的類當作一個參數傳給ThreadingTCPServer這個類,下面的代碼就創建了一個多線程socket server Server=socketserver.ThreadingTCPServer((HOST,PORT),MyTcpServer) # 啟動這個server,這個server會一直運行,除非按ctrl-C停止 Server.serve_forever() #--------------看與之前寫的單線程的socket服務端有什么區別呢 import socket ip_port = ('127.0.0.1',9999) sk = socket.socket() sk.bind(ip_port) sk.listen(5) while True: print ('server waiting...') conn,addr = sk.accept()#conn表示實例,addr包含地址端口 client_data = conn.recv(1024) print (str(client_data,'utf8')) conn.sendall(bytes('不要回答,不要回答,不要回答','utf8')) conn.close() #少了等待用戶連接,獲取用戶的地址和端口以及創建實例的操作, socketserver用Server.serve_forever()實現了類似用戶連接的操作,不同的是可以多用戶連接,socketserver利用創建類在handle方法里實現用戶連接自動調用對客戶端的操作 # 之前的單線程的socket server 只能一個用戶連接,其余的連接都被阻塞了,等待上一次連接釋放 才可以進行下一次連接
client的與上一節的socketclient並無不同之處 import socket HOST,PORT=("localhost",5007) client = socket.socket() client.connect((HOST,PORT)) while True: User_input = input(">>>:") client.send(bytes(User_input,"utf8")) Server_recv=client.recv(1024) print("server:",Server_recv.decode())
上述類似於server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler)這種, socketserver.ThreadingTCPServer是系統本身定義好的一個方法,我們只需要傳入參數就行,參數包含了連接地址,端口以及我們自定義的方法,自定義方法內必須指定handle函數,一般我們在handle函數里實現用戶的交互
線程與進程
Python GIL(Global Interpreter Lock)
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行,擦。。。
首先需要明確的一點是GIL並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標准,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這里要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL
這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf
很多同學都聽說過,現代操作系統比如Mac OS X,UNIX,Linux,Windows等,都是支持“多任務”的操作系統。 什么叫“多任務”呢?簡單地說,就是操作系統可以同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽MP3,一邊在用Word趕作業,這就是多任務,至少同時有3個任務正在運行。還有很多任務悄悄地在后台同時運行着,只是桌面上沒有顯示而已。 現在,多核CPU已經非常普及了,但是,即使過去的單核CPU,也可以執行多任務。由於CPU執行代碼都是順序執行的,那么,單核CPU是怎么執行多任務的呢? 答案就是操作系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反復執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。 真正的並行執行多任務只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行。 對於操作系統來說,一個任務就是一個進程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器進程,打開一個記事本就啟動了一個記事本進程,打開兩個記事本就啟動了兩個記事本進程,打開一個Word就啟動了一個Word進程。 有些進程還不止同時干一件事,比如Word,它可以同時進行打字、拼寫檢查、打印等事情。在一個進程內部,要同時干多件事,就需要同時運行多個“子任務”,我們把進程內的這些“子任務”稱為線程(Thread)。 由於每個進程至少要干一件事,所以,一個進程至少有一個線程。當然,像Word這種復雜的進程可以有多個線程,多個線程可以同時執行,多線程的執行方式和多進程是一樣的,也是由操作系統在多個線程之間快速切換,讓每個線程都短暫地交替運行,看起來就像同時執行一樣。當然,真正地同時執行多線程需要多核CPU才可能實現。 我們前面編寫的所有的Python程序,都是執行單任務的進程,也就是只有一個線程。如果我們要同時執行多個任務怎么辦? 有兩種解決方案: 一種是啟動多個進程,每個進程雖然只有一個線程,但多個進程可以一塊執行多個任務。 還有一種方法是啟動一個進程,在一個進程內啟動多個線程,這樣,多個線程也可以一塊執行多個任務。 當然還有第三種方法,就是啟動多個進程,每個進程再啟動多個線程,這樣同時執行的任務就更多了,當然這種模型更復雜,實際很少采用。 總結一下就是,多任務的實現有3種方式: 多進程模式; 多線程模式; 多進程+多線程模式。 同時執行多個任務通常各個任務之間並不是沒有關聯的,而是需要相互通信和協調,有時,任務1必須暫停等待任務2完成后才能繼續執行,有時,任務3和任務4又不能同時執行,所以,多進程和多線程的程序的復雜度要遠遠高於我們前面寫的單進程單線程的程序。 因為復雜度高,調試困難,所以,不是迫不得已,我們也不想編寫多任務。但是,有很多時候,沒有多任務還真不行。想想在電腦上看電影,就必須由一個線程播放視頻,另一個線程播放音頻,否則,單線程實現的話就只能先把視頻播放完再播放音頻,或者先把音頻播放完再播放視頻,這顯然是不行的。 Python既支持多進程,又支持多線程,我們會討論如何編寫這兩種多任務程序。 小結 線程是最小的執行單元,而進程由至少一個線程組成。如何調度進程和線程,完全由操作系統決定,程序自己不能決定什么時候執行,執行多長時間。 多進程和多線程的程序涉及到同步、數據共享的問題,編寫起來更復雜。
如果還是不理解,復制url訪問看 圖文並茂!
http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
總結
1、線程共享創建它的進程的地址空間,進程有自己的地址空間
2、線程可以訪問進程所有的數據,線程可以相互訪問
3、線程之間的數據是獨立的
4、子進程復制線程的數據
5、子進程啟動后是獨立的 ,父進程只能殺掉子進程,而不能進行數據交換
6、修改線程中的數據,都是會影響其他的線程,而對於進程的更改,不會影響子進程
多線程實例:
創建多線程有兩種方式,一種是直接調用,一種是繼承式調用,即:通過繼承Thread類,重寫它的run方法;另一種是創建一個threading.Thread對象,在它的初始化函數(__init__)中將可調用對象作為參數傳入。下面分別舉例說明
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time def sayhi(num): #定義每個線程要運行的函數 print("running on number:%s" %num) time.sleep(3) if __name__ == '__main__': ''' t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程實例 t1.start() #啟動線程 t2.start() #啟動另一個線程 print(t1.getName()) #獲取線程名 print(t2.getName()) t1.join() #t2.wait() t2.join() #t2.wait()''' t_list = [] # 定義一個空列表,每啟動一個實例,將實例添加到列表 for i in range(10): t = threading.Thread(target=sayhi,args=[i,]) t.start() t_list.append(t) for i in t_list: 循環列表 ,等待列表中的每一個線程執行完畢 i.join() #t.join() 這種做法只是等待最后一個線程執行完畢了,不等待其他線程 print('---main---')
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading import time class MyThread(threading.Thread): def __init__(self,num): super(MyThread,self).__init__() 新式類繼承原有方法寫法 #threading.Thread.__init__(self)#經典類寫法 self.num = num def run(self):#定義每個線程要運行的函數 print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
Thread類還定義了以下常用方法與屬性:
Thread.getName() 獲取線程名稱
Thread.setName() 設置線程名稱
Thread.name
Thread.ident 獲取線程的標識符。線程標識符是一個非零整數,只有在調用了start()方法之后該屬性才有效,否則它只返回None
判斷線程是否是激活的(alive)。從調用start()方法啟動線程,到run()方法執行完畢或遇到未處理異常而中斷 這段時間內,線程是激活的
Thread.is_alive()
Thread.isAlive()
Thread.join([timeout]) 調用Thread.join將會使主調線程堵塞,直到被調用線程運行結束或超時。參數timeout是一個數值類型,表示超時時間,如果未提供該參數,那么主調線程將一直堵塞到被調線程結束(測試有疑問)
Join & Daemon
主線程A中,創建了子線程B,並且在主線程A中調用了B.setDaemon(),這個的意思是,把主線程A設置為守護線程,這時候,要是主線程A執行結束了,就不管子線程B是否完成,一並和主線程A退出.這就是setDaemon方法的含義,這基本和join是相反的。此外,還有個要特別注意的:必須在start() 方法調用之前設置,如果不設置為守護線程,程序會被無限掛起。
import time import threading def run(n): print('[%s]------running----\n' % n) time.sleep(2) print('--done--') def main(): for i in range(5): t = threading.Thread(target=run,args=[i,]) t.start() # t.join() print('starting thread', t.getName()) m = threading.Thread(target=main,args=[]) m.setDaemon(True) #將主線程設置為Daemon線程,它退出時,其它子線程會同時退出,不管是否執行完任務 m.start() # m.join() print("---main thread done----") ”“” 執行效果,首先m.start()啟動守護進程,不join main函數里又啟動了5個子線程,調用run函數 run函數里sleep了2秒鍾,因此程序執行后會直接打印---main thread done----或者在加上running..不會再有其他"""
線程鎖(互斥鎖Mutex)
一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味着每個線程可以訪問同一份數據,此時,如果2個線程同時要修改同一份數據,會出現什么狀況?
import time import threading num = 100 #設定一個共享變量 def addNum(): global num #在每個線程中都獲取這個全局變量 print('--get num:',num ) time.sleep(2) num -=1 #對此公共變量進行-1操作 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢 t.join() print('final num:', num ) """ 獲取到的get num每次都是100 是因為100個線程同事執行,獲取的都是100,線程內部都排隊去吃 每次取的值-1,但是在2.7中就會出錯,最后print final num 的值不為1 """
正常來講,這個num結果應該是0, 但在python 2.7上多運行幾次,會發現,最后打印出來的num結果不總是0,為什么每次運行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操作, 由於2個線程是並發同時運行的,所以2個線程很有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量后,結果就都是99。那怎么辦呢? 很簡單,每個線程在要修改公共數據時,為了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉后才能再訪問此數據。
*注:不要在3.x上運行,不知為什么,3.x上的結果總是正確的,可能是自動加了鎖
加鎖版本
import time import threading def addNum(): global num #在每個線程中都獲取這個全局變量 print('--get num:',num ) time.sleep(1) lock.acquire() #修改數據前加鎖 num -=1 #對此公共變量進行-1操作 lock.release() #修改后釋放 num = 100 #設定一個共享變量 thread_list = [] lock = threading.Lock() #生成全局鎖 for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢 t.join() print('final num:', num )
RLock(遞歸鎖)
說白了就是在一個大鎖中還要再包含子鎖
import threading,time def run1(): print("grab the first part data") lock.acquire() global num num +=1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2+=1 lock.release() return num2 def run3(): lock.acquire() res = run1() print('--------between run1 and run2-----') res2 = run2() lock.release() print(res,res2) if __name__ == '__main__': num,num2 = 0,0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: print(threading.active_count()) else: print('----all threads done---') print(num,num2)
Rlock與Lock的區別:
RLock允許在同一線程中被多次acquire。而Lock卻不允許這種情況。否則會出現死循環,程序不知道解哪一把鎖。注意:如果使用RLock,那么acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所占用的鎖
Semaphore(信號量)
互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去
import threading,time def run(n): semaphore.acquire() time.sleep(1) print("run the thread: %s\n" %n) semaphore.release() if __name__ == '__main__': num= 0 semaphore = threading.BoundedSemaphore(3) #最多允許3個線程同時運行 for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start() while threading.active_count() != 1: pass #print threading.active_count() else: print('----all threads done---') print(num)
Events
Python提供了Event對象用於線程間通信,它是由線程設置的信號標志,如果信號標志位真,則其他線程等待直到信號接觸。
Event對象實現了簡單的線程通信機制,它提供了設置信號,清除信號,等待等用於實現線程間的通信。
event = threading.Event() 創建一個event
1 設置信號
event.set()
使用Event的set()方法可以設置Event對象內部的信號標志為真。Event對象提供了isSet()方法來判斷其內部信號標志的狀態。當使用event對象的set()方法后,isSet()方法返回真
2 清除信號
event.clear()
使用Event對象的clear()方法可以清除Event對象內部的信號標志,即將其設為假,當使用Event的clear方法后,isSet()方法返回假
3 等待
event.wait()
Event對象wait的方法只有在內部信號為真的時候才會很快的執行並完成返回。當Event對象的內部信號標志位假時,則wait方法一直等待到其為真時才返回。也就是說必須set新號標志位真
下面用紅綠燈事件來講解event
#!/usr/bin/env python # -*- coding:utf-8 -*- import threading,time import random def light(): # 設定一個燈函數 if not event.isSet(): # 檢查event有沒有被設定 ,如果沒設定 就設定 event.set() #wait就不阻塞 #綠燈狀態 count = 0 while True: if count < 10: # 綠燈 print('\033[42;1m--green light on---\033[0m') elif count <13:# 黃燈3秒 print('\033[43;1m--yellow light on---\033[0m') elif count <20:#7秒紅燈 if event.isSet(): event.clear() #清楚setwait阻塞 print('\033[41;1m--red light on---\033[0m') else: count = 0 event.set() #打開綠燈 time.sleep(1) count +=1 def car(n): #no bug version while 1: time.sleep(1) #sleep(1)在這里是讓車出現的慢一點 沒其他作用 if event.isSet(): #綠燈 print("car [%s] is running.." % n) else:#紅燈 print("car [%s] is waiting for the red light.." %n) event.wait() if __name__ == '__main__': event = threading.Event() # 創建event事件 Light = threading.Thread(target=light) Light.start() for i in range(3): t = threading.Thread(target=car,args=(i,)) t.start() """ 首先創建一個事件,啟動燈事件的實例 燈函數 第一次判斷event有沒有被設定,如果沒設定就設定,第一次為綠燈 設定一個隨機數count 用來定義燈的時間,否則每次count+1大於20了就一直為紅燈了 循環每次count+1 設定紅綠燈的判斷值,大於20就重新設定count值,再次event.set()打開綠燈 車函數 循環,每過一秒出現一輛車,總共為三輛車,下面的for循環創建了三個車的實例 判斷event.set()有沒有被設定,即:count的值大於13小於20 設定了即為綠或者黃燈,否則為紅燈,event.wait在這里的作用是車等待,即不在循環了 直到event被設定 """
多進程multiprocessing
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
多進程模塊使用一個API類似線程模塊。多處理方案啊提供了本地和遠程的並發性,有效的避開了全局解釋器GIL鎖的線程,因此,multiprocessing允許程序員利用多個處理器在一台特定的機器上,它運行在windows和linux
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("\n\n") def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=info, args=('bob',)) p2 = Process(target=info, args=('bob',)) p.start() p2.start() p.join()
進程間通訊
不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:
Queues
使用方法跟threading里的queue差不多
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': que = Queue() p = Process(target=f, args=(que,)) p2 = Process(target=f, args=(que,)) p.start() p2.start() print('from parent:',que.get()) # prints "[42, None, 'hello']" print('from parent2:',que.get()) # prints "[42, None, 'hello']" p.join()
Pipes
The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
管道函數返回一個對在管道連接的連接對象,默認情況下是雙工(雙向)。例如:
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) #用傳過來的實例send到另一端的地址 也就是parent_conn conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,))#傳入管道一端的實例, p2 = Process(target=f, args=(child_conn,)) p2.start() p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" #打印對端傳出過來的數據 print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
Managers
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
Python中進程間共享數據,處理基本的queue,pipe和value+array外,還提供了更高層次的封裝。使用multiprocessing.Manager可以簡單地使用這些高級接口。
Manager()返回的manager對象控制了一個server進程,此進程包含的python對象可以被其他的進程通過proxies來訪問。從而達到多進程間數據通信且安全。
Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
from multiprocessing import Process, Manager def f(d, l,n): d[n] =n d['2'] = 2 d[0.25] = None l.append(n) print(l) print(d) if __name__ == '__main__': with Manager() as manager: d = manager.dict() #使用manager生成一個dict,對dict進行封裝 l = manager.list(range(5))#使用manager生成一個list,對list進行封裝 """ 如果不用manager對數據進行封裝,效果就是會生成10個列表,而不是對一個列表進行修改數據 p_list = [] for i in range(10): p = Process(target=f, args=(d, l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
進程同步
Without using the lock output from the different processes is liable to get all mixed up.
(不使用鎖的輸出不同的過程很容易得到全搞混了。)和線程鎖基本是一樣的
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
- apply 同步執行(串行)
- apply_async (異步執行 並行)
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Process,Pool,freeze_support import time def Foo(i): time.sleep(2) print('exec...') return i+100 def Bar(arg): print('-->exec done:',arg) if __name__ == '__main__': freeze_support() # 這塊代碼是為了防止在windows下出錯而定義,freeze_support模塊需導入 pool = Pool(3) #進程池設定三個進程 for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) #啟動進程,callback為回調機制,表示執行完foo函數后會把值傳送 給bar函數,apply_async表示異步 #pool.apply(func=Foo, args=(i,)) #apply表示同步(串行),一個個執行,,同步的時候不能callback print('end') pool.close()# 必須先關閉在join pool.join()#進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
