理解Queue隊列中join()與task_done()的關系


在網上大多關於join()與task_done()的結束原話是這樣的:

Queue.task_done() 在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號
Queue.join() 實際上意味着等到隊列為空,再執行別的操作
 
但是可能很多人還是不太理解,這里以我自己的理解來闡述這兩者的關聯。

理解

如果線程里每從隊列里取一次,但沒有執行task_done(),則join無法判斷隊列到底有沒有結束,在最后執行個join()是等不到結果的,會一直掛起。
可以理解為,每task_done一次 就從隊列里刪掉一個元素,這樣在最后join的時候根據隊列長度是否為零來判斷隊列是否結束,從而執行主線程。
下面看個自己寫的例子:
下面這個例子,會在join()的地方無限掛起,因為join在等隊列清空,但是由於沒有task_done,它認為隊列還沒有清空,還在一直等。
 
  1.  
    #!/usr/bin/env python
  2.  
    # -*- coding:utf-8 -*-
  3.  
    '''threading test'''
  4.  
    import threading
  5.  
    import queue
  6.  
    from time import sleep
  7.  
    #之所以為什么要用線程,因為線程可以start后繼續執行后面的主線程,可以put數據,如果不是線程直接在get阻塞。
  8.  
    class Mythread(threading.Thread):
  9.  
    def __init__(self,que):
  10.  
    threading.Thread.__init__(self)
  11.  
    self.queue = que
  12.  
    def run(self):
  13.  
    while True:
  14.  
    sleep(1)
  15.  
    if self.queue.empty(): #判斷放到get前面,這樣可以,否則隊列最后一個取完后就空了,直接break,走不到print
  16.  
    break
  17.  
    item = self.queue.get()
  18.  
    print(item,'!')
  19.  
    #self.queue.task_done()
  20.  
    return
  21.  
    que = queue.Queue()
  22.  
    tasks = [Mythread(que) for x in range(1)]
  23.  
    for x in range(10):
  24.  
     
  25.  
    que.put(x) #快速生產
  26.  
    for x in tasks:
  27.  
    t = Mythread(que) #把同一個隊列傳入2個線程
  28.  
    t.start()
  29.  
     
  30.  
    que.join()
  31.  
     
  32.  
    print('---success---')
  33.  
     
如果把self.queue.task_done()  注釋去掉,就會順利執行完主程序。
這就是“Queue.task_done()函數向任務已經完成的隊列發送一個信號”這句話的意義,能夠讓join()函數能判斷出隊列還剩多少,是否清空了。
而事實上我們看下queue的源碼可以看出確實是執行一次未完成隊列減一:
  1.  
    def task_done(self):
  2.  
    '''Indicate that a formerly enqueued task is complete.
  3.  
     
  4.  
    Used by Queue consumer threads. For each get() used to fetch a task,
  5.  
    a subsequent call to task_done() tells the queue that the processing
  6.  
    on the task is complete.
  7.  
     
  8.  
    If a join() is currently blocking, it will resume when all items
  9.  
    have been processed (meaning that a task_done() call was received
  10.  
    for every item that had been put() into the queue).
  11.  
     
  12.  
    Raises a ValueError if called more times than there were items
  13.  
    placed in the queue.
  14.  
    '''
  15.  
    with self.all_tasks_done:
  16.  
    unfinished = self.unfinished_tasks - 1
  17.  
    if unfinished <= 0:
  18.  
    if unfinished < 0:
  19.  
    raise ValueError('task_done() called too many times')
  20.  
    self.all_tasks_done.notify_all()
  21.  
    self.unfinished_tasks = unfinished
  22.  
     
  23.  
     

快速生產-快速消費

上面的演示代碼是快速生產-慢速消費的場景,我們可以直接用task_done()與join()配合,來讓empty()判斷出隊列是否已經結束。 當然,queue我們可以正確判斷是否已經清空,但是線程里的get隊列是不知道,如果沒有東西告訴它,隊列空了,因此get還會繼續阻塞,那么我們就需要在get程序中加一個判斷,如果empty()成立,break退出循環,否則get()還是會一直阻塞。

慢速生產-快速消費

但是如果生產者速度與消費者速度相當,或者生產速度小於消費速度,則靠task_done()來實現隊列減一則不靠譜,隊列會時常處於供不應求的狀態,常為empty,所以用empty來判斷則不靠譜。 那么這種情況會導致 join可以判斷出隊列結束了,但是線程里不能依靠empty()來判斷線程是否可以結束。 我們可以在消費隊列的每個線程最后塞入一個特定的“標記”,在消費的時候判斷,如果get到了這么一個“標記”,則可以判定隊列結束了,因為生產隊列都結束了,也不會再新增了。 代碼如下:

  1.  
    #!/usr/bin/env python
  2.  
    # -*- coding:utf-8 -*-
  3.  
    '''threading test'''
  4.  
    import threading
  5.  
    import queue
  6.  
    from time import sleep
  7.  
    #之所以為什么要用線程,因為線程可以start后繼續執行后面的主線程,可以put數據,如果不是線程直接在get阻塞。
  8.  
    class Mythread(threading.Thread):
  9.  
    def __init__(self,que):
  10.  
    threading.Thread.__init__(self)
  11.  
    self.queue = que
  12.  
    def run(self):
  13.  
    while True:
  14.  
    item = self.queue.get()
  15.  
    self.queue.task_done() #這里要放到判斷前,否則取最后最后一個的時候已經為空,直接break,task_done執行不了,join()判斷隊列一直沒結束
  16.  
    if item == None:
  17.  
    break
  18.  
    print(item,'!')
  19.  
    return
  20.  
    que = queue.Queue()
  21.  
    tasks = [Mythread(que) for x in range(1)]
  22.  
    #快速生產
  23.  
    for x in tasks:
  24.  
    t = Mythread(que) #把同一個隊列傳入2個線程
  25.  
    t.start()
  26.  
    for x in range(10):
  27.  
    sleep(1)
  28.  
    que.put(x)
  29.  
    for x in tasks:
  30.  
    que.put(None)
  31.  
    que.join()
  32.  
    print('---success---')

注意點:

put隊列完成的時候千萬不能用task_done(),否則會報錯:

task_done() called too many times

因為該方法僅僅表示get成功后,執行的一個標記。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM