Python多線程的原理與實現


原文鏈接:https://blog.csdn.net/daiyu__zz/article/details/81912018

1 線程基本概念

1.1 線程是什么?

線程是指進程內的一個執行單元,也是進程內的可調度實體.

與進程的區別: 
(1) 地址空間:進程內的一個執行單元;進程至少有一個線程;它們共享進程的地址空間;而進程有自己獨立的地址空間; 
(2) 資源擁有:進程是資源分配和擁有的單位,同一個進程內的線程共享進程的資源 
(3) 線程是處理器調度的基本單位,但進程不是. 
(4) 二者均可並發執行.

簡而言之,一個程序至少有一個進程,一個進程至少有一個線程.

線程的划分尺度小於進程,使得多線程程序的並發性高。 
另外,進程在執行過程中擁有獨立的內存單元,而多個線程共享內存,從而極大地提高了程序的運行效率。

1.2 線程和進程關系?

​ 進程就是一個應用程序在處理機上的一次執行過程,它是一個動態的概念,而線程是進程中的一部分,進程包含多個線程在運行。

​ 多線程可以共享全局變量,多進程不能。多線程中,所有子線程的進程號相同;多進程中,不同的子進程進程號不同。

​ 進程是具有一定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位. 
​ 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧),但是它可與同屬一個進程的其他的線程共享進程所擁有的全部資源. 
​ 一個線程可以創建和撤銷另一個線程;同一個進程中的多個線程之間可以並發執行.

2 Python線程模塊

​ python主要是通過thread和threading這兩個模塊來實現多線程支持。python的thread模塊是比較底層的模塊,python的threading模塊是對thread做了一些封裝,可以更加方便的被使用。但是python(cpython)由於GIL的存在無法使用threading充分利用CPU資源,如果想充分發揮多核CPU的計算能力需要使用multiprocessing模塊(Windows下使用會有諸多問題)。

2.1 如何創建線程

​ python3.x中已經摒棄了Python2.x中采用函數式thread模塊中的start_new_thread()函數來產生新線程方式。

​ python3.x中通過threading模塊創建新的線程有兩種方法:一種是通過threading.Thread(Target=executable Method)-即傳遞給Thread對象一個可執行方法(或對象);第二種是繼承threading.Thread定義子類並重寫run()方法。第二種方法中,唯一必須重寫的方法是run()

(1)通過threading.Thread進行創建多線程

import threading
import time
def target():
    print("the current threading %s is runing"
       %(threading.current_thread().name))
    time.sleep(1)
    print("the current threading %s is ended"%(threading.current_thread().name))

print("the current threading %s is runing"%(threading.current_thread().name))
## 屬於線程t的部分
t = threading.Thread(target=target)
t.start()
## 屬於線程t的部分
t.join() # join是阻塞當前線程(此處的當前線程時主線程) 主線程直到Thread-1結束之后才結束
print("the current threading %s is ended"%(threading.current_thread().name))

(2)通過繼承threading.Thread定義子類創建多線程

​ 使用Threading模塊創建線程,直接從threading.Thread繼承,然后重寫init方法和run方法:

import threading
import time

class myThread(threading.Thread):  # 繼承父類threading.Thread
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter

   def run(self):  # 把要執行的代碼寫到run函數里面 線程在創建后會直接運行run函數
      print("Starting " + self.name)
      print_time(self.name, self.counter, 5)
      print("Exiting " + self.name)


def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print("%s process at: %s" % (threadName, time.ctime(time.time())))
      counter -= 1


# 創建新線程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 開啟線程
thread1.start()
thread2.start()

# 等待線程結束
thread1.join()
thread2.join()

print("Exiting Main Thread")

通過以上案例可以知道,thread1和thread2執行順序是亂序的。要使之有序,需要進行線程同步

3 線程間同步

​ 如果多個線程共同對某個數據修改,則可能出現不可預料的結果,為了保證數據的正確性,需要對多個線程進行同步。

​ 使用Thread對象的Lock和Rlock可以實現簡單的線程同步,這兩個對象都有acquire方法和release方法,對於那些需要每次只允許一個線程操作的數據,可以將其操作放到acquire和release方法之間。

​ 需要注意的是,Python有一個GIL(Global Interpreter Lock)機制,任何線程在運行之前必須獲取這個全局鎖才能執行,每當執行完100條字節碼,全局鎖才會釋放,切換到其他線程執行。

3.1 線程同步問題

多線程實現同步有四種方式:

鎖機制,信號量,條件判斷和同步隊列。

下面我主要關注兩種同步機制:鎖機制和同步隊列。

(1)鎖機制

threading的Lock類,用該類的acquire函數進行加鎖,用realease函數進行解鎖

import threading
import time
class myThread(threading.Thread):
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter
   def run(self):
      print("Starting " + self.name)
      # 獲得鎖,成功獲得鎖定后返回True
      # 可選的timeout參數不填時將一直阻塞直到獲得鎖定
      # 否則超時后將返回False
      threadLock.acquire()
      print_time(self.name, self.counter, 5)
      # 釋放鎖
      threadLock.realease()
def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print("%s: %s" % (threadName, time.ctime(time.time())))
      counter -= 1

threadLock = threading.Lock()
threads = []
# 創建新線程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開啟新線程
thread1.start()
thread2.start()
# 添加線程到線程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有線程完成
for t in threads:
   t.join()

print("Exiting Main Thread")

(2) 線程同步隊列queue

python2.x中提供的Queue, Python3.x中提供的是queue

見import queue.

Python的queue模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(后入先出)隊列LifoQueue,和優先級隊列PriorityQueue。這些隊列都實現了鎖原語,能夠在多線程中直接使用。可以使用隊列來實現線程間的同步。

queue模塊中的常用方法:

  • queue.qsize() 返回隊列的大小
  • queue.empty() 如果隊列為空,返回True,反之False
  • queue.full() 如果隊列滿了,返回True,反之False
  • queue.full 與 maxsize 大小對應
  • queue.get([block[, timeout]])獲取隊列,timeout等待時間
  • queue.get_nowait() 相當Queue.get(False)
  • queue.put(item) 寫入隊列,timeout等待時間
  • queue.put_nowait(item) 相當Queue.put(item, False)
  • queue.task_done() 在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號
  • queue.join() 實際上意味着等到隊列為空,再執行別的操作

案例1:

import queue
import threading
import time

exitFlag = 0

class myThread(threading.Thread):
   def __init__(self, threadID, name, q):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.q = q

   def run(self):
      print("Starting " + self.name)
      process_data(self.name, self.q)
      print("Exiting " + self.name)

def process_data(threadName, q):
   while not exitFlag:
      queueLock.acquire()
      if not workQueue.empty():
         data = q.get()
         queueLock.release()
         print("%s processing %s" % (threadName, data))
      else:
         queueLock.release()
      time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1

# 創建新線程
for tName in threadList:
   thread = myThread(threadID, tName, workQueue)
   thread.start()
   threads.append(thread)
   threadID += 1

# 填充隊列
queueLock.acquire()
for word in nameList:
   workQueue.put(word)
queueLock.release()

# 等待隊列清空
while not workQueue.empty():
   pass

# 通知線程是時候退出
exitFlag = 1

# 等待所有線程完成
for t in threads:
   t.join()
print("Exiting Main Thread")

案例2:

import time
import threading
import queue

class Worker(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.start()    #執行run()

    def run(self):
        #循環,保證接着跑下一個任務
        while True:
            # 隊列為空則退出線程
            if self.queue.empty():
                break
            # 獲取一個隊列數據
            foo = self.queue.get()
            # 延時1S模擬你要做的事情
            time.sleep(1)
            # 打印
            print(self.getName() + " process " + str(foo))
            # 任務完成
            self.queue.task_done()


# 隊列
queue = queue.Queue()
# 加入100個任務隊列
for i in range(100):
    queue.put(i)
# 開10個線程
for i in range(10):
    threadName = 'Thread' + str(i)
    Worker(threadName, queue)
# 所有線程執行完畢后關閉
queue.join()

4 線程池

傳統多線程問題?

​ 傳統多線程方案會使用“即時創建, 即時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那么服務器將處於不停的創建線程,銷毀線程的狀態。

​ 一個線程的運行時間可以分為3部分:線程的啟動時間、線程體的運行時間和線程的銷毀時間。在多線程處理的情景中,如果線程不能被重用,就意味着每次創建都需要經過啟動、銷毀和運行3個過程。這必然會增加系統相應的時間,降低了效率。

有沒有一種高效的解決方案呢? —— 線程池

線程池基本原理:

​ 我們把任務放進隊列中去,然后開N個線程,每個線程都去隊列中取一個任務,執行完了之后告訴系統說我執行完了,然后接着去隊列中取下一個任務,直至隊列中所有任務取空,退出線程。

使用線程池: 
​ 由於線程預先被創建並放入線程池中,同時處理完當前任務之后並不銷毀而是被安排處理下一個任務,因此能夠避免多次創建線程,從而節省線程創建和銷毀的開銷,能帶來更好的性能和系統穩定性。

multhreading-struct

線程池要設置為多少?

服務器CPU核數有限,能夠同時並發的線程數有限,並不是開得越多越好,以及線程切換是有開銷的,如果線程切換過於頻繁,反而會使性能降低

線程執行過程中,計算時間分為兩部分:

  • CPU計算,占用CPU
  • 不需要CPU計算,不占用CPU,等待IO返回,比如recv(), accept(), sleep()等操作,具體操作就是比如 
    訪問cache、RPC調用下游service、訪問DB,等需要網絡調用的操作

那么如果計算時間占50%, 等待時間50%,那么為了利用率達到最高,可以開2個線程: 
假如工作時間是2秒, CPU計算完1秒后,線程等待IO的時候需要1秒,此時CPU空閑了,這時就可以切換到另外一個線程,讓CPU工作1秒后,線程等待IO需要1秒,此時CPU又可以切回去,第一個線程這時剛好完成了1秒的IO等待,可以讓CPU繼續工作,就這樣循環的在兩個線程之前切換操作。

那么如果計算時間占20%, 等待時間80%,那么為了利用率達到最高,可以開5個線程: 
可以想象成完成任務需要5秒,CPU占用1秒,等待時間4秒,CPU在線程等待時,可以同時再激活4個線程,這樣就把CPU和IO等待時間,最大化的重疊起來

抽象一下,計算線程數設置的公式就是: 
N核服務器,通過執行業務的單線程分析出本地計算時間為x,等待時間為y,則工作線程數(線程池線程數)設置為 N*(x+y)/x,能讓CPU的利用率最大化。 
由於有GIL的影響,python只能使用到1個核,所以這里設置N=1

import queue
import threading
import time

# 聲明線程池管理類
class WorkManager(object):
   def __init__(self, work_num=1000, thread_num=2):
      self.work_queue = queue.Queue()  # 任務隊列
      self.threads = []  # 線程池
      self.__init_work_queue(work_num)  # 初始化任務隊列,添加任務
      self.__init_thread_pool(thread_num) # 初始化線程池,創建線程

   """
      初始化線程池
   """
   def __init_thread_pool(self, thread_num):
      for i in range(thread_num):
         # 創建工作線程(線程池中的對象)
         self.threads.append(Work(self.work_queue))


   """
      初始化工作隊列
   """
   def __init_work_queue(self, jobs_num):
      for i in range(jobs_num):
         self.add_job(do_job, i)

   """
      添加一項工作入隊
   """
   def add_job(self, func, *args):
      self.work_queue.put((func, list(args)))  # 任務入隊,Queue內部實現了同步機制

   """
      等待所有線程運行完畢
   """
   def wait_allcomplete(self):
      for item in self.threads:
         if item.isAlive(): item.join()


class Work(threading.Thread):
   def __init__(self, work_queue):
      threading.Thread.__init__(self)
      self.work_queue = work_queue
      self.start()

   def run(self):
      # 死循環,從而讓創建的線程在一定條件下關閉退出
      while True:
         try:
            do, args = self.work_queue.get(block=False)  # 任務異步出隊,Queue內部實現了同步機制
            do(args)
            self.work_queue.task_done()  # 通知系統任務完成
         except:
            break

# 具體要做的任務
def do_job(args):
   time.sleep(0.1)  # 模擬處理時間
   print(threading.current_thread())
   print(list(args))


if __name__ == '__main__':
   start = time.time()
   work_manager = WorkManager(100, 10)  # 或者work_manager =  WorkManager(10000, 20)
   work_manager.wait_allcomplete()
   end = time.time()
   print("cost all time: %s" % (end - start))

進程石油系統分配資源、線程是由CPU調度、協程由用戶控制

5 協程

​ 在python GIL之下,同一時刻只能有一個線程在運行,那么對於CPU計算密集的程序來說,線程之間的切換開銷就成了拖累,而以I/O為瓶頸的程序正是協程所擅長的:

Python中的協程經歷了很長的一段發展歷程。其大概經歷了如下三個階段:

  1. 最初的生成器變形yield/send
  2. 引入@asyncio.coroutine和yield from
  3. 在最近的Python3.5版本中引入async/await關鍵字

(1)從yield說起

先看一段普通的計算斐波那契續列的代碼

def fibs(n):
   res = [0] * n
   index = 0
   a = 0
   b = 1
   while index < n:
      res[index] = b
      a, b = b, a + b
      index += 1
   return res


for fib_res in fibs(20):
   print(fib_res)

​ 如果我們僅僅是需要拿到斐波那契序列的第n位,或者僅僅是希望依此產生斐波那契序列,那么上面這種傳統方式就會比較耗費內存。

這時,yield就派上用場了。

def fib(n):
   index = 0
   a = 0
   b = 1
   while index < n:
      yield b
      a, b = b, a + b
      index += 1

for fib_res in fib(20):
   print(fib_res)

​ 當一個函數中包含yield語句時,python會自動將其識別為一個生成器。這時fib(20)並不會真正調用函數體,而是以函數體生成了一個生成器對象實例。

​ yield在這里可以保留fib函數的計算現場,暫停fib的計算並將b返回。而將fib放入for…in循環中時,每次循環都會調用next(fib(20)),喚醒生成器,執行到下一個yield語句處,直到拋出StopIteration異常。此異常會被for循環捕獲,導致跳出循環。

(2) Send來了

​ 從上面的程序中可以看到,目前只有數據從fib(20)中通過yield流向外面的for循環;如果可以向fib(20)發送數據,那不是就可以在Python中實現協程了嘛。

​ 於是,Python中的生成器有了send函數,yield表達式也擁有了返回值。

​ 我們用這個特性,模擬一個慢速斐波那契數列的計算:

import time
import random

def stupid_fib(n):
   index = 0
   a = 0
   b = 1
   while index < n:
      sleep_cnt = yield b
      print('let me think {0} secs'.format(sleep_cnt))
      time.sleep(sleep_cnt)
      a, b = b, a + b
      index += 1


print('-' * 10 + 'test yield send' + '-' * 10)
N = 20
sfib = stupid_fib(N)
fib_res = next(sfib) #第一次必須要執行next()函數,讓程序控制到yield b 位置
while True:
   print(fib_res)
   try:
      fib_res = sfib.send(random.uniform(0, 0.5))
   except StopIteration:
      break

python 進行並發編程

​ 在Python 2的時代,高性能的網絡編程主要是使用Twisted、Tornado和Gevent這三個庫,但是它們的異步代碼相互之間既不兼容也不能移植。

​ asyncio是Python 3.4版本引入的標准庫,直接內置了對異步IO的支持。

​ asyncio的編程模型就是一個消息循環。我們從asyncio模塊中直接獲取一個EventLoop的引用,然后把需要執行的協程扔到EventLoop中執行,就實現了異步IO。

​ Python的在3.4中引入了協程的概念,可是這個還是以生成器對象為基礎。

​ Python 3.5添加了async和await這兩個關鍵字,分別用來替換asyncio.coroutineyield from

​ python3.5則確定了協程的語法。下面將簡單介紹asyncio的使用。實現協程的不僅僅是asyncio,tornado和gevent, vloop都實現了類似的功能。

(1)協程定義

asyncio實現Hello world代碼如下:

import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 異步調用asyncio.sleep(1)-->協程函數:
    r = yield from asyncio.sleep(1)  #此處為另外一個協程,不是休眠
    print("Hello again!")

# 獲取EventLoop(事件循環器):
loop = asyncio.get_event_loop()
# 執行coroutine
loop.run_until_complete(hello())
loop.close()

​ @asyncio.coroutine把一個generator標記為coroutine類型,然后,我們就把這個coroutine扔到EventLoop中執行。 hello()會首先打印出Hello world!,然后,yield from語法可以讓我們方便地調用另一個generator。由於asyncio.sleep()也是一個coroutine,所以線程不會等待asyncio.sleep(),而是直接中斷並執行下一個消息循環。當asyncio.sleep()返回時,線程就可以從yield from拿到返回值(此處是None),然后接着執行下一行語句。

​ 把asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程並未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實現並發執行。

我們用Task封裝兩個coroutine試試:

import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

觀察執行過程:

Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暫停約1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)  

由打印的當前線程名稱可以看出,兩個coroutine是由同一個線程並發執行的。

如果把asyncio.sleep()換成真正的IO操作,則多個coroutine就可以由一個線程並發執行。

asyncio案例實戰

我們用asyncio的異步網絡連接來獲取sina、sohu和163的網站首頁:

async_wget.py

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80) #等待打開host:80端口
    reader, writer = yield from connect #開始鏈接。如果連接成功,則返回Reader和寫writer的操作對象
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

結果信息如下:

wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時間)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...

可見3個連接由一個線程通過coroutine並發完成。

(3) 使用async/await

import asyncio
import re

async def browser(host, port=80):
    # 連接host
    reader, writer = await asyncio.open_connection(host, port)
    print(host, port, '連接成功!')

    # 發起 / 主頁請求(HTTP協議)
    # 發送請求頭必須是兩個空行
    index_get = 'GET {} HTTP/1.1\r\nHost:{}\r\n\r\n'.format('/', host)
    writer.write(index_get.encode())

    await writer.drain()  # 等待向連接寫完數據(請求發送完成)

    # 開始讀取響應的數據報頭
    while True:
        line = await reader.readline()  # 等待讀取響應數據
        if line == b'\r\n':
            break

        print(host, '<header>', line)

    # 讀取響應的數據body
    body = await reader.read()
    print(encoding)
    print(host, '<content>', body)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    tasks = [browser(host) for host in ['www.dushu.com', 'www.sina.com.cn', 'www.baidu.com']]

    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

    print('---over---')

小結

asyncio提供了完善的異步IO支持;

異步操作需要在coroutine中通過yield from完成;

多個coroutine可以封裝成一組Task然后並發執行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM