【轉】
進程與線程
什么是進程(process)?
An executing instance of a program is called a process.
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
程序並不能單獨運行,只有將程序裝載到內存中,系統為它分配資源才能運行,而這種執行的程序就稱之為進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。
在多道編程中,我們允許多個程序同時加載到內存中,在操作系統的調度下,可以實現並發地執行。這是這樣的設計,大大提高了CPU的利用率。進程的出現讓每個用戶感覺到自己獨享CPU,因此,進程就是為了在CPU上實現多道編程而提出的
有了進程為什么還要線程?
進程有很多優點,它提供了多道編程,讓我們感覺我們每個人都擁有自己的CPU和其他資源,可以提高計算機的利用率。很多人就不理解了,既然進程這么優秀,為什么還要線程呢?其實,仔細觀察就會發現進程還是有很多缺陷的,主要體現在兩點上:
-
進程只能在一個時間干一件事,如果想同時干兩件事或多件事,進程就無能為力了。
-
進程在執行的過程中如果阻塞,例如等待輸入,整個進程就會掛起,即使進程中有些工作不依賴於輸入的數據,也將無法執行。
例如,我們在使用qq聊天, qq做為一個獨立進程如果同一時間只能干一件事,那他如何實現在同一時刻 即能監聽鍵盤輸入、又能監聽其它人給你發的消息、同時還能把別人發的消息顯示在屏幕上呢?你會說,操作系統不是有分時么?但我的親,分時是指在不同進程間的分時呀, 即操作系統處理一會你的qq任務,又切換到word文檔任務上了,每個cpu時間片分給你的qq程序時,你的qq還是只能同時干一件事呀。
再直白一點, 一個操作系統就像是一個工廠,工廠里面有很多個生產車間,不同的車間生產不同的產品,每個車間就相當於一個進程,且你的工廠又窮,供電不足,同一時間只能給一個車間供電,為了能讓所有車間都能同時生產,你的工廠的電工只能給不同的車間分時供電,但是輪到你的qq車間時,發現只有一個干活的工人,結果生產效率極低,為了解決這個問題,應該怎么辦呢?。。。。沒錯,你肯定想到了,就是多加幾個工人,讓幾個人工人並行工作,這每個工人,就是線程!
什么是線程(thread)?
線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務
進程與線程的區別?
- Threads share the address space of the process that created it; processes have their own address space.
- Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
- Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
- New threads are easily created; new processes require duplication of the parent process.
- Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
- Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
Python GIL(Global Interpreter Lock)
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) 上面的核心意思就是,無論你啟多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只允許一個線程運行,其他解釋器的python可以不考慮這個GIL,只有cpython |
首先需要明確的一點是GIL
並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標准,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念里CPython就是Python,也就想當然的把GIL
歸結為Python語言的缺陷。所以這里要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL
Python threading模塊
線程有2種調用方式,如下:
直接調用
import threading import time import os import ctypes def sayhi(num): #定義每個線程要運行的函數 print("running on number:%s" %num) time.sleep(3) if __name__ == '__main__': for i in range(10): t1 = threading.Thread(target=sayhi,args=(i,)) #生成一個線程實例 t1.start() #啟動線程 print(t1.getName()) #獲取線程名 print("thread parent id:",os.getpid()) # 無法獲取線程號,只能獲取進程號 print("thread child id:",ctypes.CDLL('libc.so.6').syscall(186))
('thread parent id:', 11301) ('thread child id:', 11301) Thread-7running on number:6 ('thread parent id:', 11301) ('thread child id:', 11301) Thread-8running on number:7 ('th read parent id:', 11301) ('thread child id:', 11301) running on number:8Thread-9 ('thread parent id:', 11301) ('thread child id:', 11301) running on number:9 Thread-10 ('thread parent id:', 11301) ('thread child id:', 11301)
繼承式調用

import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self):#定義每個線程要運行的函數 print("running on number:%s" %self.num) time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
Join & Daemon

#!/usr/bin/env python # set coding: utf-8 __author__ = "richardzgt" import threading import time class MyThread(threading.Thread): def __init__(self,n,sleep_time): super(MyThread,self).__init__() self.n = n self.sleep_time = sleep_time def run(self): print("run task",self.n) time.sleep(self.sleep_time) print("task done",self.n) t1 = MyThread("t1",1) t2 = MyThread("t2",3) t1.start() t2.start() t1.join() t2.join() # 等待所有子線程退出 print("main thread staring......")

#!/usr/bin/env python # set coding: utf-8 __author__ = "richardzgt" # daemon import threading import time def run(sleep_time): print("start task ",sleep_time) time.sleep(sleep_time) print("task done" ,sleep_time) t_job = [] for i in range(10): t = threading.Thread(target=run,args=(i,)) t.setDaemon(True) # 把當前線程設置為守護線程,當當前線程退出后,子線程也就退出,可以用於主線程對子線程的守護 t.start() t_job.append(t) time.sleep(5) ## 5s后主線程退出,子線程6-9自動退出 print("main thread done........")
Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an Event
.
線程鎖(互斥鎖Mutex)
一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味着每個線程可以訪問同一份數據,此時,如果2個線程同時要修改同一份數據,會出現什么狀況?

import time import threading def addNum(): global num #在每個線程中都獲取這個全局變量 print('--get num:',num ) time.sleep(1) num -=1 #對此公共變量進行-1操作 num = 100 #設定一個共享變量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢 t.join() print('final num:', num )
正常來講,這個num結果應該是0, 但在python 2.7上多運行幾次,會發現,最后打印出來的num結果不總是0,為什么每次運行的結果不一樣呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操作, 由於2個線程是並發同時運行的,所以2個線程很有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量后,結果就都是99。那怎么辦呢? 很簡單,每個線程在要修改公共數據時,為了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉后才能再訪問此數據。
*注:不要在3.x上運行,不知為什么,3.x上的結果總是正確的,可能是自動加了鎖
加鎖版本

import threading import time def run(n): global num lock.acquire() # 同一時間只有一個線程獲取鎖 print("start task and lock acquire",n) num += 1 time.sleep(1) print("task done and lock release" ,n) lock.release() lock = threading.Lock() num = 0 t_job = [] for i in range(3): t = threading.Thread(target=run,args=(i,)) t.start() t_job.append(t) for each in t_job: each.join() print("main thread done........")
GIL VS Lock
Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什么這里還需要lock? 注意啦,這里的lock是用戶級的lock,跟那個GIL沒關系 。
既然用戶程序已經自己有鎖了,那為什么C python還需要GIL呢?加入GIL主要的原因是為了降低程序的開發的復雜度,比如現在的你寫python不需要關心內存回收的問題,因為Python解釋器幫你自動定期進行內存回收,你可以理解為python解釋器里有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內存數據是可以被清空的,此時你自己的程序 里的線程和 py解釋器自己的線程是並發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這可以說是Python早期版本的遺留問題。
RLock(遞歸鎖)
說白了就是在一個大鎖中還要再包含子鎖

import threading,time def run1(): print("grab the first part data") lock.acquire() global num num +=1 lock.release() return num def run2(): print("grab the second part data") lock.acquire() global num2 num2+=1 lock.release() return num2 def run3(): lock.acquire() res = run1() print('--------between run1 and run2-----') res2 = run2() lock.release() print(res,res2) if __name__ == '__main__': num,num2 = 0,0 lock = threading.RLock() for i in range(10): t = threading.Thread(target=run3) t.start() while threading.active_count() != 1: print(threading.active_count()) else: print('----all threads done---') print(num,num2)
Semaphore(信號量)
互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去。

#!/usr/bin/env python # set coding: utf-8 __author__ = "richardzgt" import threading,time def run(n): semaphore.acquire() time.sleep(1) print("run the thread: ",n) semaphore.release() semaphore = threading.BoundedSemaphore(5) # 允許最多5個線程同時工作 for i in range(10): t = threading.Thread(target=run,args=(i,)) t.start() while threading.active_count() != 1: pass else: print("------ all threads done -------")
Timer 定時器
This class represents an action that should be run only after a certain amount of time has passed
Timers are started, as with threads, by calling their start()
method. The timer can be stopped (before its action has begun) by calling thecancel()
method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.
def hello(): print("hello, world") t = Timer(30.0, hello) t.start() # after 30 seconds, "hello, world" will be printed
Events
An event is a simple synchronization object;
the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set
event.wait()
# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.
通過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。

import threading,time import random def light(): if not event.isSet(): event.set() #wait就不阻塞 #綠燈狀態 count = 0 while True: if count < 10: print('\033[42;1m--green light on---\033[0m') elif count <13: print('\033[43;1m--yellow light on---\033[0m') elif count <20: if event.isSet(): event.clear() print('\033[41;1m--red light on---\033[0m') else: count = 0 event.set() #打開綠燈 time.sleep(1) count +=1 def car(n): while 1: time.sleep(random.randrange(10)) if event.isSet(): #綠燈 print("car [%s] is running.." % n) else: print("car [%s] is waiting for the red light.." %n) if __name__ == '__main__': event = threading.Event() Light = threading.Thread(target=light) Light.start() for i in range(3): t = threading.Thread(target=car,args=(i,)) t.start()
queue隊列
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
-
class
queue.
Queue
(maxsize=0) #先入先出
-
class
queue.
LifoQueue
(maxsize=0) #last in fisrt out -
class
queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
-
exception
queue.
Empty
-
Exception raised when non-blocking
get()
(orget_nowait()
) is called on aQueue
object which is empty.
-
exception
queue.
Full
-
Exception raised when non-blocking
put()
(orput_nowait()
) is called on aQueue
object which is full.
-
Queue.
qsize
()
-
Queue.
empty
() #return True if empty
-
Queue.
full
() # return True if full
-
Queue.
put_nowait
(item) -
Equivalent to
put(item, False)
.
-
Queue.
get
(block=True, timeout=None) -
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the
Empty
exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise theEmpty
exception (timeout is ignored in that case).
-
Queue.
get_nowait
() -
Equivalent to
get(False)
.
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
-
Queue.
task_done
()
-
Queue.
put
(item, block=True, timeout=None)
-
If a
join()
is currently blocking, it will resume when all items have been processed (meaning that atask_done()
call was received for every item that had beenput()
into the queue).Raises a
ValueError
if called more times than there were items placed in the queue.
-
Queue.
join
() block直到queue被消費完畢
生產者消費者模型
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

#!/usr/bin/env python # set coding: utf-8 __author__ = "richardzgt" import threading,time import queue q = queue.Queue(maxsize=10) def Producer(): count = 1 while True: q.put("g%s" % count) print("produer queue g%s" %count) count += 1 time.sleep(1) def Consumer(): while True: good = q.get() print("Consumer queue ",good) time.sleep(5) p = threading.Thread(target=Producer) c = threading.Thread(target=Consumer) p.start() c.start()

import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <20: time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 def Consumer(name): count = 0 while count <20: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) p1.start() c1.start()
使用Condition 條件變量象能讓一個線程停下來, 等待其它線程滿足了某個 “條件”,當符合才開始繼續下去

# -*- set coding:utf-8 -*- #!/usr/bin/python ''' Created on 2015年8月27日 @author: Administrator ''' # encoding: UTF-8 import threading import time # 商品 product = None # 條件變量 con = threading.Condition() # 生產者方法 def produce(): global product if con.acquire(): while True: if product is None: print 'produce...' product = 'anything' # 通知消費者,商品已經生產 con.notify() # 等待通知 con.wait() time.sleep(2) # 消費者方法 def consume(): global product if con.acquire(): while True: if product is not None: print 'consume...' product = None # 通知生產者,商品已經沒了 con.notify() # 等待通知 con.wait() time.sleep(2) t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start() t1.start()
多進程multiprocessing
multiprocessing
is a package that supports spawning processes using an API similar to the threading
module. The multiprocessing
package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
多進程是真正意義上多線程的應用。cpu密集型用多進程處理。
import time import multiprocessing def f(name): time.sleep(2) print('hello', name) if __name__ == '__main__': p = multiprocessing.Process(target=f, args=('bob',)) p.start() p.join()
確定多進程是不是真的是多進程
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("\n\n") def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': p_list = [] info('main process line') for i in range(10): p = Process(target=f, args=('bob%s'% i,)) p.start() p_list.append(p) for j in p_list: j.join() ======================》 ('module name:', '__main__') ('parent process:', 10340) ('process[31;1mfunction f[0m[31;1mfunction f[0m ('module name:', '__main__') ('parent process:', 10340) ('process id:', 10353) ('hello', 'bob9') [31;1mfunction f[0m ('module name:', '__main__') ('parent process:', 10340) ('process id:', 10352) ('hello', 'bob8')
多進程 multiprocess 和 requests結合,多進程並發訪問服務(常用)
import multiprocessing import requests,os,time,random p = multiprocessing.Pool(100) q = multiprocessing.Queue() url="http://10.0.0.219:4444" def get_data(url,i,q): r = requests.get(url) q.put("try count:[%s];result [%s]" % (i,r.status_code)) for i in range(100): p = multiprocessing.Process(target=get_data,args=(url,i,q)) p.start() while not q.empty(): print q.get() >>try count:[0];result [200] try count:[2];result [503] try count:[3];result [503] try count:[6];result [503] try count:[4];result [503] try count:[5];result [503] .........
進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
- apply
- apply_async
from multiprocessing import Process,Pool import time def Foo(i): time.sleep(2) return i+100 def Bar(arg): #回調函數 print('-->exec done:',arg) pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) #pool.apply(func=Foo, args=(i,)) print('end') pool.close() pool.join()#進程池中進程執行完畢后再關閉,如果注釋,那么程序直接關閉。
p.apply_async(func[, args[, kwds[, callback]]]) 函數的好處是有一個callback回調接口,非阻塞, print [ res.get() for res in result ] 方式會導致阻塞 注意args=('test',) 需要是tuple類型[類似t.thread], kwds=**dict
一般使用方法:
def worker(item): try: reg_func = re.split('[\[\]]',item) assert reg_func is not None, "regex error for %s" % item pname,func = reg_func[0].split('.') # 'webCheck.HttpCode' url = str(reg_func[1]) ws = WebStatus(url) ws.get() value = ws.request_value()[func] if ws.request_value()[func] else 0 except Exception as e: logger.error(str(e)) return {'item':item,'value':value} def callback_zbx_sender(kwargs): logger.debug(kwargs) packet = [ ZabbixMetric(ZBX_HOST, kwargs['item'], kwargs['value']) ] result = ZabbixSender(use_config=True,zabbix_port=10051).send(packet) logger.debug(result) if result.failed: logger.error(result) def run(): pool = Pool(processes=10) ret_zbx_items = get_zbx_items() for item in ret_zbx_items: pool.apply_async(func=worker,args=(item,),callback=callback_zbx_sender) pool.close() pool.join()
如果要使用使用在class 里面,需要先定義一個代理函數,但是這個方法有些情況也不一定會成功,特別是用到queue的時候
例1
# 代理函數 def run(cls_instance, i): return cls_instance.func(i) class Runner(object): def __init__(self): self.pool = Pool(processes=2) for i in range(2): self.pool.apply_async(run, (self, i),callback=self.callback) self.pool.close() self.pool.join() def func(self, i): print i time.sleep(i) return i+10 def callback(self,v): print "get return===",v def __getstate__(self): self_dict = self.__dict__.copy() del self_dict['pool'] return self_dict def __setstate__(self, state): self.__dict__.update(state) runner = Runner()
例2
import multiprocessing def multiSalt(cls_instance,token,i): logger.info("starting subprocess %s" % os.getpid()) return cls_instance.salt_local(token,**i) def multiRun_2(cls,t_list): result = [] token = cls.get_salt_token() p = multiprocessing.Pool(5) for i in t_list: result.append(p.apply_async(multiSalt,(cls,token,i))) p.close() p.join() # 后面那部分其實沒意義,只要送到callback函數就可以了 for ret_task in result: task_ret = ret_task.get() ret_task_id = task_ret.get('taskid') [ task.update(task_ret) for task in t_list if ret_task_id == task.get('taskid') ]
進程間通訊
不同進程間內存是不共享的,要想實現兩個進程間的數據交換,可以用以下方法:
Queues
使用方法跟threading里的queue差不多
from multiprocessing import Process, Queue
def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
Pipes
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe
def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time. |
Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example,
from multiprocessing import Process, Manager
def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.append(1) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
進程同步
Without using the lock output from the different processes is liable to get all mixed up.
from multiprocessing import Process, Lock
def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
協程
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程。
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位
協程的好處:
- 無需線程上下文切換的開銷
- 無需原子操作鎖定及同步的開銷
- "原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
- 方便切換控制流,簡化編程模型
- 高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。
缺點:
- 無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
- 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
使用yield實現協程操作例子
import time import queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n +=1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
- 必須在只有一個單線程里實現並發
- 修改共享數據不需加鎖
- 用戶程序里自己保存多個控制流的上下文棧
- 一個協程遇到IO操作自動切換到其它協程
基於上面這4點定義,我們剛才用yield實現的程並不能算是合格的線程,因為它有一點功能沒實現,就是共享數據加鎖
Greenlet
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可以使你在任意函數之間隨意切換,而不需把這個函數先聲明為generator
# -*- coding:utf-8 -*- from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
感覺確實用着比generator還簡單了呢,但好像還沒有解決一個問題,就是遇到IO操作,自動切換,對不對?
Gevent
Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
import gevent def func1(): print('\033[31;1m李闖在跟海濤搞...\033[0m') gevent.sleep(2) print('\033[31;1m李闖又回去跟繼續跟海濤搞...\033[0m') def func2(): print('\033[32;1m李闖切換到了跟海龍搞...\033[0m') gevent.sleep(1) print('\033[32;1m李闖搞完了海濤,回來繼續跟海龍搞...\033[0m') gevent.joinall([ gevent.spawn(func1), gevent.spawn(func2), #gevent.spawn(func3), ])
輸出:
李闖在跟海濤搞...
李闖切換到了跟海龍搞...
李闖搞完了海濤,回來繼續跟海龍搞...
李闖又回去跟繼續跟海濤搞...
同步與異步的性能區別
import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(1,10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn
。 初始化的greenlet列表存放在數組threads
中,此數組被傳給gevent.joinall
函數,后者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完后才會繼續向下走。
遇到IO阻塞時會自動切換任務
注意 monkey patch是必須要導入的,否則不能獲取urlopen的IO等待事件
from gevent import monkey; monkey.patch_all() import gevent from urllib.request import urlopen def f(url): print('GET: %s' % url) resp = urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
通過gevent實現單線程下的多socket並發
server side

import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001)
dome2

import gevent import gevent.socket as socket from gevent.pool import Pool class SocketPool(object): """docstring for SocketPool""" def __init__(self): self.pool = Pool(100) self.pool.start(self.listen) def listen(self): self.server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.server.bind('127.0.0.1',5431) self.server.listen(1024) while True: socket,addr = server.accept() data = socket.recv() print data def add_handler(self, socket): if self.pool.full(): raise Exception("At maximum pool size") else: self.pool.spawn(socket) def shutdown(self): self.pool.kill() SocketPool()
client side

import socket import threading def sock_conn(): client = socket.socket() client.connect(("localhost",8001)) count = 0 while True: #msg = input(">>:").strip() #if len(msg) == 0:continue client.send( ("hello %s" %count).encode("utf-8")) data = client.recv(1024) print("[%s]recv from server:" % threading.get_ident(),data.decode()) #結果 count +=1 client.close() for i in range(100): t = threading.Thread(target=sock_conn) t.start()
並發100個sock連接
攜程中的多進程
import gevent from gevent.subprocess import Popen, PIPE def cron(): while True: print("cron") gevent.sleep(0.2) g = gevent.spawn(cron) '到最后執行' sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True) out, err = sub.communicate() g.kill() print(out.rstrip())
攜程中的隊列
import gevent from gevent.queue import Queue import random tasks = Queue() def worker(n): while not tasks.empty(): task = tasks.get() print('Worker %s got task %s' % (n, task)) gevent.sleep(random.randint(1,3)) print('Quitting time!') def boss(): for i in xrange(1,25): tasks.put_nowait(i) gevent.spawn(boss).join() gevent.joinall([ gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'nancy'), ])
攜程中的管道
import gevent from multiprocessing import Process, Pipe from gevent.socket import wait_read, wait_write # To Process a, b = Pipe() # From Process c, d = Pipe() def relay(): for i in xrange(10): msg = b.recv() gevent.sleep(1) c.send(msg + " in " + str(i)) def put_msg(): for i in xrange(10): wait_write(a.fileno()) a.send('hi') def get_msg(): for i in xrange(10): wait_read(d.fileno()) print(d.recv()) if __name__ == '__main__': proc = Process(target=relay) proc.start() g1 = gevent.spawn(get_msg) g2 = gevent.spawn(put_msg) gevent.joinall([g1, g2], timeout=10)
攜程的異步返回
import gevent from gevent.event import AsyncResult a = AsyncResult() def setter(): """ After 3 seconds set the result of a. """ gevent.sleep(3) a.set('Hello!') def waiter(): """ After 3 seconds the get call will unblock after the setter puts a value into the AsyncResult. """ print(a.get()) gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])
使用攜程做一個WSGI服務器(自帶模塊)
from gevent.wsgi import WSGIServer def application(environ, start_response, log=logger): status = '200 OK' body = '<p>Hello World</p>' headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) return [body] WSGIServer(('', 8000), application).serve_forever()
在多進程中使用攜程

#encoding=utf-8 ''' 演示如何多進程的使用gevent, 1、gevent和multiprocessing組合使用會有很多問題, 所以多進程直接用subprocess.Popen,進程間不通過fork共享 任何數據,完全獨立運行,並通過socket通信 2、進程間同步不能用multiprocessing.Event, 因為wait()的時候會阻塞住線程,其它協程的代碼無法執行,也 不能使用gevent.event.Event(),因為它通過multiprocessing.Process 共享到子進程后,在父進程set(),子進程wait()是不會收到信號的 3、子進程內不能通過signal.signal(signal.SIGINT, signal.SIG_IGN) 忽略ctrl+c,所以啟動主進程時如果沒設置后台運行,在ctrl+c時,主進程 和子進程都會中止而不能優雅退出 4、主進程和子進程的通信和同步使用gevent.socket來實現,子進程收到 主進程斷開連接事件(接受到零字節數據)時,自己優雅退出,相當於主進程 發消息告訴子進程讓子進程退出 5、主進程啟動時直接在后台運行,使用"nohup gevent-multil-process.py &"來運行, 測試時可不用nohup命令,停止主進程時使用kill pid的方式,在主進程里 會攔截SIGTERM信號,通知並等待子進程退出 ''' import gevent import gevent.socket as socket from gevent.event import Event import os import sys import subprocess import signal import time url = ('localhost', 8888) class Worker(object): ''' 子進程運行的代碼,通過起一個協程來和主進程通信 包括接受任務分配請求,退出信號(零字節包),及反饋任務執行進度 然后主協程等待停止信號並中止進程(stop_event用於協程間同步)。 ''' def __init__(self, url): self.url = url self.stop_event = Event() gevent.spawn(self.communicate) try: self.stop_event.wait() except KeyboardInterrupt,e: pass print 'worker(%s):will stop' % os.getpid() def exec_task(self, task): print 'worker(%s):execute task:%s' % (os.getpid(), task.rstrip('\n')) # time.sleep(10) '測試阻塞等待' def communicate(self): print 'worker(%s):started' % os.getpid() client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(self.url) fp = client.makefile() while True: line = fp.readline() if not line: self.stop_event.set() break '單獨起一個協程去執行任務,防止通信協程阻塞' gevent.spawn(self.exec_task, line) class Master(): ''' 主進程運行代碼,啟動單獨協程監聽一個端口以供子進程連接和通信用, 通過subprocess.Popen啟動CPU個數個子進程,注冊SIGTERM信號以便在 KILL自己時通知子進程退出,主協程等待停止事件並退出主 ''' def __init__(self, url): self.url = url self.workers = [] self.stop_event = Event() gevent.spawn(self.communicate) gevent.sleep(0) #讓communicate協程有機會執行,否則子進程會先啟動 self.process = [subprocess.Popen(('python',sys.argv[0],'worker')) for i in xrange(3)] #啟動multiprocessing.cpucount-1個子進程 gevent.signal(signal.SIGTERM, self.stop) #攔截kill信號 gevent.signal(signal.SIGINT, self.ctrlC) #攔截ctrl+c信號 gevent.spawn(self.test) #測試分發任務 self.stop_event.wait() def communicate(self): print 'master(%s):started' % os.getpid() server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(url) server.listen(1024) while True: worker, addr = server.accept() print 'master(%s):new worker' % os.getpid() self.workers.append(worker) def stop(self): print 'master stop' for worker in self.workers: worker.close() for p in self.process: p.wait() self.stop_event.set() def ctrlC(self): print 'master stop from ctrl +c' for worker in self.workers: worker.close() for p in self.process: p.wait() gevent.sleep(0) self.stop_event.set() def test(self): import random while True: if not self.workers: gevent.sleep(1) continue task = str(random.randint(100,10000)) worker = random.choice(self.workers) worker.send(task) worker.send('\n') gevent.sleep(1) if len(sys.argv) == 1: Master(url) else: Worker(url)
作業需求:
題目: 做一個簡單的聊天室
題目:簡單主機批量管理工具
需求:
- 主機分組
- 主機信息配置文件用configparser解析
- 可批量執行命令、發送文件,結果實時返回,執行格式如下
- batch_run -h h1,h2,h3 -g web_clusters,db_servers -cmd "df -h"
- batch_scp -h h1,h2,h3 -g web_clusters,db_servers -action put -local test.py -remote /tmp/
- 主機用戶名密碼、端口可以不同
- 執行遠程命令使用paramiko模塊
- 批量命令需使用multiprocessing並發