Py西游攻關之多線程(threading模塊)


線程與進程

什么是線程(thread)?

線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務

A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.

Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.

If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.

Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

什么是進程(process)?

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

進程與線程的區別?

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

Python GIL(Global Interpreter Lock) 

CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.

threading模塊

一 線程的2種調用方式

直接調用

實例1:

import threading
import time
 
def sayhi(num): #定義每個線程要運行的函數
 
    print("running on number:%s" %num)
 
    time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程實例
 
    t1.start() #啟動線程
    t2.start() #啟動另一個線程
 
    print(t1.getName()) #獲取線程名
    print(t2.getName())
View Code

繼承式調用:

import threading
import time
 
 
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num
 
    def run(self):#定義每個線程要運行的函數
 
        print("running on number:%s" %self.num)
 
        time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()
View Code

二 Join & Daemon

import threading
from time import ctime,sleep
import time

def music(func):
    for i in range(2):
        print ("Begin listening to %s. %s" %(func,ctime()))
        sleep(4)
        print("end listening %s"%ctime())

def move(func):
    for i in range(2):
        print ("Begin watching at the %s! %s" %(func,ctime()))
        sleep(5)
        print('end watching %s'%ctime())

threads = []
t1 = threading.Thread(target=music,args=('七里香',))
threads.append(t1)
t2 = threading.Thread(target=move,args=('阿甘正傳',))
threads.append(t2)

if __name__ == '__main__':

    for t in threads:
        # t.setDaemon(True)
        t.start()
        # t.join()
    # t1.join()
    t2.join()########考慮這三種join位置下的結果?
    print ("all over %s" %ctime())
View Code

setDaemon(True):

      將線程聲明為守護線程,必須在start() 方法調用之前設置, 如果不設置為守護線程程序會被無限掛起。這個方法基本和join是相反的。當我們 在程序運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程 就分兵兩路,分別運行,那么當主線程完成想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成后再退出。但是有時候我們需要的是 只要主線程完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法啦 

join():

       在子線程完成運行之前,這個子線程的父線程將一直被阻塞。

其它方法

thread 模塊提供的其他方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
# 除了使用方法外,線程模塊同樣提供了Thread類來處理線程,Thread類提供了以下方法:
# run(): 用以表示線程活動的方法。
# start():啟動線程活動。
# join([time]): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
# isAlive(): 返回線程是否活動的。
# getName(): 返回線程名。
# setName(): 設置線程名。
View Code

三 同步鎖(Lock)

import time
import threading

def addNum():
    global num #在每個線程中都獲取這個全局變量
    # num-=1

    temp=num
    print('--get num:',num )
    #time.sleep(0.1)
    num =temp-1 #對此公共變量進行-1操作


num = 100  #設定一個共享變量
thread_list = []
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有線程執行完畢
    t.join()

print('final num:', num )

 

 

 

注意:

1:  why num-=1沒問題呢?這是因為動作太快(完成這個動作在切換的時間內)

2: if sleep(1),現象會更明顯,100個線程每一個一定都沒有執行完就進行了切換,我們說過sleep就等效於IO阻塞,1s之內不會再切換回來,所以最后的結果一定是99.

 

多個線程都在同時操作同一個共享資源,所以造成了資源破壞,怎么辦呢?

有同學會想用join唄,但join會把整個線程給停住,造成了串行,失去了多線程的意義,而我們只需要把計算(涉及到操作公共數據)的時候串行執行。

我們可以通過同步鎖來解決這種問題

import time
import threading

def addNum():
    global num #在每個線程中都獲取這個全局變量
    # num-=1
    lock.acquire()
    temp=num
    print('--get num:',num )
    #time.sleep(0.1)
    num =temp-1 #對此公共變量進行-1操作
    lock.release()

num = 100  #設定一個共享變量
thread_list = []
lock=threading.Lock()

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有線程執行完畢
    t.join()

print('final num:', num )

問題解決,但

請問:同步鎖與GIL的關系?

Python的線程在GIL的控制之下,線程之間,對整個python解釋器,對python提供的C API的訪問都是互斥的,這可以看作是Python內核級的互斥機制。但是這種互斥是我們不能控制的,我們還需要另外一種可控的互斥機制———用戶級互斥。內核級通過互斥保護了內核的共享資源,同樣,用戶級互斥保護了用戶程序中的共享資源。

GIL 的作用是:對於一個解釋器,只能有一個thread在執行bytecode。所以每時每刻只有一條bytecode在被執行一個thread。GIL保證了bytecode 這層面上是thread safe的。
但是如果你有個操作比如 x += 1,這個操作需要多個bytecodes操作,在執行這個操作的多條bytecodes期間的時候可能中途就換thread了,這樣就出現了data races的情況了。
 
那我的同步鎖也是保證同一時刻只有一個線程被執行,是不是沒有GIL也可以?是的;那要GIL有什么鳥用?你沒治;

四 線程死鎖和遞歸鎖

      在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所有這兩個線程在無外力作用下將一直等待下去。下面是一個死鎖的例子:

import threading,time

class myThread(threading.Thread):
    def doA(self):
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        time.sleep(3)
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        lockB.release()
        lockA.release()

    def doB(self):
        lockB.acquire()
        print(self.name,"gotlockB",time.ctime())
        time.sleep(2)
        lockA.acquire()
        print(self.name,"gotlockA",time.ctime())
        lockA.release()
        lockB.release()
    def run(self):
        self.doA()
        self.doB()
if __name__=="__main__":

    lockA=threading.Lock()
    lockB=threading.Lock()
    threads=[]
    for i in range(5):
        threads.append(myThread())
    for t in threads:
        t.start()
    for t in threads:
        t.join()#等待線程結束,后面再講。
View Code

解決辦法:使用遞歸鎖,將

lockA=threading.Lock()
lockB=threading.Lock()
#--------------
lock=threading.RLock()

為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。

應用

import time

import threading

class Account:
    def __init__(self, _id, balance):
        self.id = _id
        self.balance = balance
        self.lock = threading.RLock()

    def withdraw(self, amount):

        with self.lock:
            self.balance -= amount

    def deposit(self, amount):
        with self.lock:
            self.balance += amount


    def drawcash(self, amount):#lock.acquire中嵌套lock.acquire的場景

        with self.lock:
            interest=0.05
            count=amount+amount*interest

            self.withdraw(count)


def transfer(_from, to, amount):

    #鎖不可以加在這里 因為其他的其它線程執行的其它方法在不加鎖的情況下數據同樣是不安全的
     _from.withdraw(amount)

     to.deposit(amount)



alex = Account('alex',1000)
yuan = Account('yuan',1000)

t1=threading.Thread(target = transfer, args = (alex,yuan, 100))
t1.start()

t2=threading.Thread(target = transfer, args = (yuan,alex, 200))
t2.start()

t1.join()
t2.join()

print('>>>',alex.balance)
print('>>>',yuan.balance)
View Code

五 條件變量同步(Condition)

      有一類線程需要滿足條件之后才能夠繼續執行,Python提供了threading.Condition 對象用於條件變量線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。

      lock_con=threading.Condition([Lock/Rlock]): 鎖是可選選項,不傳人鎖,對象自動創建一個RLock()。

wait():條件不滿足時調用,線程會釋放鎖並進入等待阻塞;
notify():條件創造后調用,通知等待池激活一個線程;
notifyAll():條件創造后調用,通知等待池激活所有線程。

實例

import threading,time
from random import randint
class Producer(threading.Thread):
    def run(self):
        global L
        while True:
            val=randint(0,100)
            print('生產者',self.name,":Append"+str(val),L)
            if lock_con.acquire():
                L.append(val)
                lock_con.notify()
                lock_con.release()
            time.sleep(3)
class Consumer(threading.Thread):
    def run(self):
        global L
        while True:
                lock_con.acquire()
                if len(L)==0:
                    lock_con.wait()
                print('消費者',self.name,":Delete"+str(L[0]),L)
                del L[0]
                lock_con.release()
                time.sleep(0.25)

if __name__=="__main__":

    L=[]
    lock_con=threading.Condition()
    threads=[]
    for i in range(5):
        threads.append(Producer())
    threads.append(Consumer())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
View Code

六 同步條件(Event)

      條件同步和條件變量同步差不多意思,只是少了鎖功能,因為條件同步設計於不訪問共享資源的條件環境。event=threading.Event():條件環境對象,初始值 為False;

event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()
==False將阻塞線程;
event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
event.clear():恢復event的狀態值為False。

實例1:

import threading,time
class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚大家都要加班到22:00。")
        event.isSet() or event.set()
        time.sleep(5)
        print("BOSS:<22:00>可以下班了。")
        event.isSet() or event.set()
class Worker(threading.Thread):
    def run(self):
        event.wait()
        print("Worker:哎……命苦啊!")
        time.sleep(0.25)
        event.clear()
        event.wait()
        print("Worker:OhYeah!")
if __name__=="__main__":
    event=threading.Event()
    threads=[]
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()
View Code

實例2:

import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #綠燈狀態
    count = 0
    while True:
        if count < 10:
            print('\033[42;1m--green light on---\033[0m')
        elif count <13:
            print('\033[43;1m--yellow light on---\033[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('\033[41;1m--red light on---\033[0m')
        else:
            count = 0
            event.set() #打開綠燈
        time.sleep(1)
        count +=1
def car(n):
    while 1:
        time.sleep(random.randrange(10))
        if  event.isSet(): #綠燈
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range(3):
        t = threading.Thread(target=car,args=(i,))
        t.start()
View Code

七 信號量(Semaphore)

      信號量用來控制線程並發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。

      計數器不能小於0,當計數器為 0時,acquire()將阻塞線程至同步鎖定狀態,直到其他線程調用release()。(類似於停車位的概念)

      BoundedSemaphore與Semaphore的唯一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,如果超過了將拋出一個異常。

實例:

import threading,time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(5)
            semaphore.release()
if __name__=="__main__":
    semaphore=threading.Semaphore(5)
    thrs=[]
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()
View Code

 八 多線程利器(queue)

     queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

queue列隊類的方法

 
         

創建一個“隊列”對象
import Queue
q = Queue.Queue(maxsize = 10)
Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限。

 
         

將一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,默認為
1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。

 
         

將一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,默認為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。

 
         

Python Queue模塊有三種隊列及構造函數:
1、Python Queue模塊的FIFO隊列先進先出。  class queue.Queue(maxsize)
2、LIFO類似於堆,即先進后出。             class queue.LifoQueue(maxsize)
3、還有一種是優先級隊列級別越低越先出來。   class queue.PriorityQueue(maxsize)

 
         

此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小
q.empty() 如果隊列為空,返回True,反之False
q.full() 如果隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 相當q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之后,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味着等到隊列為空,再執行別的操作

實例

實例1:

import threading,queue
from time import sleep
from random import randint
class Production(threading.Thread):
    def run(self):
        while True:
            r=randint(0,100)
            q.put(r)
            print("生產出來%s號包子"%r)
            sleep(1)
class Proces(threading.Thread):
    def run(self):
        while True:
            re=q.get()
            print("吃掉%s號包子"%re)
if __name__=="__main__":
    q=queue.Queue(10)
    threads=[Production(),Production(),Production(),Proces()]
    for t in threads:
        t.start()
View Code

實例2:

import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
def Consumer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()
View Code

實例3:

#實現一個線程不斷生成一個隨機數到一個隊列中(考慮使用Queue這個模塊)
# 實現一個線程從上面的隊列里面不斷的取出奇數
# 實現另外一個線程從上面的隊列里面不斷取出偶數

import random,threading,time
from queue import Queue
#Producer thread
class Producer(threading.Thread):
  def __init__(self, t_name, queue):
    threading.Thread.__init__(self,name=t_name)
    self.data=queue
  def run(self):
    for i in range(10):  #隨機產生10個數字 ,可以修改為任意大小
      randomnum=random.randint(1,99)
      print ("%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum))
      self.data.put(randomnum) #將數據依次存入隊列
      time.sleep(1)
    print ("%s: %s finished!" %(time.ctime(), self.getName()))

#Consumer thread
class Consumer_even(threading.Thread):
  def __init__(self,t_name,queue):
    threading.Thread.__init__(self,name=t_name)
    self.data=queue
  def run(self):
    while 1:
      try:
        val_even = self.data.get(1,5) #get(self, block=True, timeout=None) ,1就是阻塞等待,5是超時5秒
        if val_even%2==0:
          print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even))
          time.sleep(2)
        else:
          self.data.put(val_even)
          time.sleep(2)
      except:   #等待輸入,超過5秒 就報異常
        print ("%s: %s finished!" %(time.ctime(),self.getName()))
        break
class Consumer_odd(threading.Thread):
  def __init__(self,t_name,queue):
    threading.Thread.__init__(self, name=t_name)
    self.data=queue
  def run(self):
    while 1:
      try:
        val_odd = self.data.get(1,5)
        if val_odd%2!=0:
          print ("%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd))
          time.sleep(2)
        else:
          self.data.put(val_odd)
          time.sleep(2)
      except:
        print ("%s: %s finished!" % (time.ctime(), self.getName()))
        break
#Main thread
def main():
  queue = Queue()
  producer = Producer('Pro.', queue)
  consumer_even = Consumer_even('Con_even.', queue)
  consumer_odd = Consumer_odd('Con_odd.',queue)
  producer.start()
  consumer_even.start()
  consumer_odd.start()
  producer.join()
  consumer_even.join()
  consumer_odd.join()
  print ('All threads terminate!')

if __name__ == '__main__':
  main()
View Code

注意:列表是線程不安全的

import threading,time

li=[1,2,3,4,5]

def pri():
    while li:
        a=li[-1]
        print(a)
        time.sleep(1)
        try:
            li.remove(a)
        except:
            print('----',a)

t1=threading.Thread(target=pri,args=())
t1.start()
t2=threading.Thread(target=pri,args=())
t2.start()
View Code

九 Python中的上下文管理器(contextlib模塊)

上下文管理器的任務是:代碼塊執行前准備,代碼塊執行后收拾

1、如何使用上下文管理器:

如何打開一個文件,並寫入"hello world"

filename="my.txt"
mode="w"
f=open(filename,mode)
f.write("hello world")
f.close()

當發生異常時(如磁盤寫滿),就沒有機會執行第5行。當然,我們可以采用try-finally語句塊進行包裝:

writer=open(filename,mode)
try:
    writer.write("hello world")
finally:
    writer.close()

當我們進行復雜的操作時,try-finally語句就會變得丑陋,采用with語句重寫:

with open(filename,mode) as writer:
    writer.write("hello world")

as指代了從open()函數返回的內容,並把它賦給了新值。with完成了try-finally的任務。

2、自定義上下文管理器  

with語句的作用類似於try-finally,提供一種上下文機制。要應用with語句的類,其內部必須提供兩個內置函數__enter__和__exit__。前者在主體代碼執行前執行,后者在主體代碼執行后執行。as后面的變量,是在__enter__函數中返回的。

class echo():
    def output(self):
        print "hello world"
    def __enter__(self):
        print "enter"
        return self  #可以返回任何希望返回的東西
    def __exit__(self,exception_type,value,trackback):
        print "exit"
        if exception_type==ValueError:
            return True
        else:
            return Flase
 
>>>with echo as e:
    e.output()
    
輸出:
enter
hello world
exit

完備的__exit__函數如下:

def __exit__(self,exc_type,exc_value,exc_tb)

其中,exc_type:異常類型;exc_value:異常值;exc_tb:異常追蹤信息

當__exit__返回True時,異常不傳播

3、contextlib模塊  

contextlib模塊的作用是提供更易用的上下文管理器,它是通過Generator實現的。contextlib中的contextmanager作為裝飾器來提供一種針對函數級別的上下文管理機制,常用框架如下:

from contextlib import contextmanager
@contextmanager
def make_context():
    print 'enter'
    try:
        yield "ok"
    except RuntimeError,err:
        print 'error',err
    finally:
        print 'exit'
        
>>>with make_context() as value:
    print value
    
輸出為:
    enter
    ok
    exit

其中,yield寫入try-finally中是為了保證異常安全(能處理異常)as后的變量的值是由yield返回。yield前面的語句可看作代碼塊執行前操作,yield之后的操作可以看作在__exit__函數中的操作。

以線程鎖為例:

@contextlib.contextmanager
def loudLock():
    print 'Locking'
    lock.acquire()
    yield
    print 'Releasing'
    lock.release()
 
with loudLock():
    print 'Lock is locked: %s' % lock.locked()
    print 'Doing something that needs locking'
 
#Output:
#Locking
#Lock is locked: True
#Doing something that needs locking
#Releasing

4、contextlib.nested:減少嵌套

對於:

with open(filename,mode) as reader:
    with open(filename1,mode1) as writer:
        writer.write(reader.read())

可以通過contextlib.nested進行簡化:

with contextlib.nested(open(filename,mode),open(filename1,mode1)) as (reader,writer):
    writer.write(reader.read())

在python 2.7及以后,被一種新的語法取代:

with open(filename,mode) as reader,open(filename1,mode1) as writer:
    writer.write(reader.read())

5、contextlib.closing() 

file類直接支持上下文管理器API,但有些表示打開句柄的對象並不支持,如urllib.urlopen()返回的對象。還有些遺留類,使用close()方法而不支持上下文管理器API。為了確保關閉句柄,需要使用closing()為它創建一個上下文管理器(調用類的close方法)。

import contextlib
class myclass():
    def __init__(self):
        print '__init__'
    def close(self):
        print 'close()'
     
with contextlib.closing(myclass()):
    print 'ok'
    
輸出:
__init__
ok
close()
View Code

十 自定義線程池

簡單版本:

import queue
import threading
import time

class ThreadPool(object):

    def __init__(self, max_num=20):
        self.queue = queue.Queue(max_num)
        for i in range(max_num):
            self.queue.put(threading.Thread)

    def get_thread(self):
        return self.queue.get()

    def add_thread(self):
        self.queue.put(threading.Thread)


'''
pool = ThreadPool(10)

def func(arg, p):
    print(arg)
    time.sleep(1)
    p.add_thread()


for i in range(30):
    Pool = pool.get_thread()
    t = Pool(target=func, args=(i, pool))
    t.start()
'''
View Code

復雜版本:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數)
        :return: 如果線程池已經終止,則返回True否則None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)#主線程
        self.q.put(w)#主線程

    def generate_thread(self):
        """
        創建一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)

        event = self.q.get()#if q為空,則阻塞住,一直等到有任務進來並把它取出來
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()#key:該線程在這里繼續等待新的任務,任務來了,繼續執行
                                        #暫時將該線程對象放到free_list中。
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完所有的任務后,所有線程停止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        無論是否還有任務,終止線程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, free_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數
        """
        free_list.append(worker_thread)#新的任務來的時候判斷
                                 # if len(self.free_list) == 0 and len(self.generate_list) < self.max_num
                                 # 任務得創建新的線程來處理;如果len(self.free_list) != 0:由阻塞着的存在free_list中的線程處理(event = self.q.get())
        try:
            yield
        finally:
            free_list.remove(worker_thread)

# How to use


pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass


def action(i):
    time.sleep(1)
    print(i)

for i in range(30):
    ret = pool.run(action, (i,), callback)

time.sleep(2)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))

# pool.close()
# pool.terminate()
View Code

 延伸:

import contextlib
import socket
@contextlib.contextmanager
def context_socket(host,port):
    sk=socket.socket()
    sk.bind((host,port))
    sk.listen(5)
    try:
        yield sk
    finally:sk.close()

with context_socket('127.0.0.1',8888) as socket:
    print(socket)
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM