Python:怎樣用線程將任務並行化?


如果待處理任務滿足:

  1. 可拆分,即任務可以被拆分為多個子任務,或任務是多個相同的任務的集合;
  2. 任務不是CPU密集型的,如任務涉及到較多IO操作(如文件讀取和網絡數據處理)

則使用多線程將任務並行運行,能夠提高運行效率。

假設待處理的任務為:有很多文件目錄,對於每個文件目錄,搜索匹配一個給定字符串的文件的所有行(相當於是實現grep的功能)。 則此處子任務為:給定一個目錄,搜索匹配一個給定字符串的文件的所有行。總的任務為處理所有目錄。

將子任務表示為一個函數T,如下所示:

def T(dir, pattern):
  print('searching pattern %s in dir %s' % (pattern, dir))
  ...

為每個子任務創建一個線程

要實現並行化,最簡單的方法是為每一個子任務創建一個thread,thread處理完后退出。

from threading import Thread
from time import sleep

def T(dir, pattern):
  "This is just a stub that simulate a dir operation"
  sleep(1)
  print('searching pattern %s in dir %s' % (pattern, dir))

threads = []
dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']
pattern = 'hello'

for dir in dirs:
  thread = Thread(target=T, args=(dir, pattern))   1
  thread.start()   2
  threads.append(thread)

for thread in threads:
  thread.join()   3

print('Main thread end here')
  • 1 :創建一個Thread對象,target參數指定這個thread待執行的函數,args參數指定target函數的輸入參數
  • 2 :啟動這個thread。 T(dir, pattern)將被調用
  • 3 :等待,直到這個thread結束。整個for循環表示主進程會等待所有子線程結束后再退出

程序的運行結果為:

searching pattern hello in dir a/b/csearching pattern hello in dir d/f
searching pattern hello in dir b/c
 searching pattern hello in dir a/b/d

Main thread end here

可以看出由於線程是並行運行的,部分輸出會交疊。但主進程的打印總在最后。

以上例子中對於每個dir都需要創建一個thread。如果dir的數目較多,則會創建太多的thread,影響運行效率。 較好的方式是限制總線程的數目。

限制線程數目

可以使用信號量(semaphore)來限制同時運行的最大線程數目。如下所示:

from threading import Thread, BoundedSemaphore
from time import sleep

def T(dir, pattern):
  "This is just a stub that simulate a dir operation"
  sleep(1)
  print('searching pattern %s in dir %s' % (pattern, dir))

threads = []
dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']
pattern = 'hello'

maxjobs = BoundedSemaphore(2)   1
def wrapper(dir, pattern):
  T(dir, pattern)
  maxjobs.release()   2

for dir in dirs:
  maxjobs.acquire()   3
  thread = Thread(target=wrapper, args=(dir, pattern))
  thread.start()
  threads.append(thread)

for thread in threads:
  thread.join() 

print('Main thread end here')
  • 1 :創建一個有2個資源的信號量。一個信號量代表總的可用的資源數目,這里表示同時運行的最大線程數目為2。
  • 2 :在線程結束時釋放資源。運行在子線程中。
  • 3 :在啟動一個線程前,先獲取一個資源。如果當前已經有2個線程在運行,則會阻塞,直到其中一個線程結束。 運行在主線程中。

當限制了最大運行線程數為2后,由於只有2個線程同時運行,程序的輸出更加有序,幾乎總是為:

searching pattern hello in dir a/b/c
searching pattern hello in dir a/b/d
searching pattern hello in dir b/c
searching pattern hello in dir d/f
Main thread end here

以上實現中為每個子任務創建一個線程進行處理,然后通過信號量限制同時運行的線程的數目。如果子任務很多,這種方法會創建太多的線程。更好的方法 是使用線程池。

使用線程池(Thread Pool)

即預先創建一定數目的線程,形成一個線程池。每個線程持續處理多個子任務(而不是處理一個就退出)。這樣做的好處是:創建的線程數目會比較固定。

那么,每個線程處理哪些子任務呢?一種方法為:預先將所有子任務均分給每個線程。如下所示:

from threading import Thread
from time import sleep

def T(dir, pattern):
  "This is just a stub that simulate a dir operation"
  sleep(1)
  print('searching pattern %s in dir %s' % (pattern, dir))

dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']
pattern = 'hello'

def wrapper(dirs, pattern):   1
  for dir in dirs:
    T(dir, pattern)

threadsPool = [   2
  Thread(target=wrapper, args=(dirs[0:2], pattern)),
  Thread(target=wrapper, args=(dirs[2:], pattern)),
]

for thread in threadsPool:   3
  thread.start()

for thread in threadsPool:
  thread.join()

print('Main thread end here')
  • 1 :這個函數能夠處理多個dir,將作為線程的target函數
  • 2 :創建一個有2個線程的線程池。並事先分配子任務給每個線程。線程1處理前兩個dir,線程2處理后兩個dir
  • 3 :啟動線程池中所有線程

程序的輸出結果為:

searching pattern hello in dir a/b/csearching pattern hello in dir b/c

searching pattern hello in dir d/f
 searching pattern hello in dir a/b/d
Main thread end here

這種方法存在以下問題:

  1. 子任務分配可能不均。導致每個線程運行時間差別可能較大,則整體運行時長可能被拖長
  2. 只能處理所有子任務都預先知道的情況,無法處理子任務實時出現的情況

如果有一種方法,能夠讓線程知道當前所有的待處理子任務,線程一旦空閑,便可以從中獲取一個任務進行處理,則以上問題都可以解決。任務隊列便是解決方案。

使用消息隊列

可以使用Queue實現一個任務隊列,用於在線程間傳遞子任務。主線程將所有待處理子任務放置在隊列中,子線程從隊列中獲取子任務去處理。 如下所有(注:以下代碼只運行於Python 2,因為Queue只存在於Python 2) :

from threading import Thread
from time import sleep
import Queue

def T(dir, pattern):
  "This is just a stub that simulate a dir operation"
  sleep(1)
  print('searching pattern %s in dir %s' % (pattern, dir))

dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']
pattern = 'hello'

taskQueue = Queue.Queue()   1

def wrapper():
  while True:
    try:
      dir = taskQueue.get(True, 0.1)   2
      T(dir, pattern)
    except Queue.Empty:
	continue

threadsPool = [Thread(target=wrapper) for i in range(2)]   3

for thread in threadsPool: 
  thread.start()    4

for dir in dirs:
  taskQueue.put(dir)   5

for thread in threadsPool:
  thread.join()
print('Main thread end here')
  • 1 :創建一個任務隊列
  • 2 :子線程從任務隊列中獲取一個任務。第一個參數為True,表示如果沒有任務,會等待。第二個參數表示最長等待0.1秒 如果在0.1秒后仍然沒有任務,則會拋出一個Queue.Empty的異常
  • 3 :創建有2個線程的線程池。注意target函數wrapper沒有任何參數
  • 4 :啟動所有線程
  • 5 :主線程將所有子任務放置在任務隊列中,以供子線程獲取處理。由於子線程已經被啟動,則子線程會立即獲取到任務並處理

程序的輸出為:

searching pattern hello in dir a/b/c
searching pattern hello in dir a/b/d
searching pattern hello in dir b/c
 searching pattern hello in dir d/f

從中可以看出主進程的打印結果並沒有出來,程序會一直運行,而不退出。這個問題的原因是:目前的實現中,子線程為一個無限循環, 因此其永遠不會終止。因此,必須有一種機制來結束子進程。

終止子進程

一種簡單方法為,可以在任務隊列中放置一個特殊元素,作為終止符。當子線程從任務隊列中獲取這個終止符后,便自行退出。如下所示,使用None作為終止符。

from threading import Thread
from time import sleep
import Queue

def T(dir, pattern):
  "This is just a stub that simulate a dir operation"
  sleep(1)
  print('searching pattern %s in dir %s' % (pattern, dir))

dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']
pattern = 'hello'

taskQueue = Queue.Queue()

def wrapper():
  while True:
    try:
      dir = taskQueue.get(True, 0.1)
      if dir is None:   1
	taskQueue.put(dir)   2
	break

      T(dir, pattern)
    except Queue.Empty:
	continue

threadsPool = [Thread(target=wrapper) for i in range(2)]

for thread in threadsPool:
  thread.start()

for dir in dirs:
  taskQueue.put(dir)

taskQueue.put(None)   3

for thread in threadsPool:
  thread.join()
print('Main thread end here')
  • 1 :如果任務為終止符(此處為None),則退出
  • 2 :將這個終止符重新放回任務隊列。因為只有一個終止符,如果不放回,則其它子線程獲取不到,也就無法終止
  • 3 :將終止符放在任務隊列。注意必須放置在末尾,否則終止符后的任務無法得到處理

修改過后,程序能夠正常運行,主進程能夠正常退出了。

searching pattern hello in dir a/b/csearching pattern hello in dir a/b/d

searching pattern hello in dir b/c
 searching pattern hello in dir d/f
Main thread end here

總結

要並行化處理子任務,最簡單的方法是為每個子任務創建一個線程去處理。這種方法的缺點是:如果子任務非常多,則需要創建的線程數目會非常多。 並且同時運行的線程數目也會較多。通過使用信號量來限制同時運行的線程數目,通過線程池來避免創建過多的線程。

與每個線程處理一個任務不同,線程池中每個線程會處理多個子任務。這帶來一個問題:每個子線程如何知道要處理哪些子任務。 一種方法是預先將所有子任務均分給每個線程,而更靈活的方法則是通過任務隊列,由子線程自行決定要處理哪些任務。

使用線程池時,線程主函數通常實現為一個無限循環,因此需要考慮如何終止線程。可以在任務隊列中放置一個終止符來告訴線程沒有更多任務, 因此其可以終止。

 

備注: 我的博客即將搬運同步至騰訊雲+社區,邀請大家一同入駐: https://cloud.tencent.com/developer/support-plan?invite_code=2qako5hpuv0g8


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM