Python之路【第八篇】python實現線程池


線程池概念

什么是線程池?
諸如web服務器、數據庫服務器、文件服務器和郵件服務器等許多服務器應用都面向處理來自某些遠程來源的大量短小的任務。
構建服務器應用程序的一個過於簡單的模型是:每當一個請求到達就創建一個新的服務對象,然后在新的服務對象中為請求服務。
但當有大量請求並發訪問時,服務器不斷的創建和銷毀對象的開銷很大。
所以提高服務器效率的一個手段就是盡可能減少創建和銷毀對象的次數,特別是一些很耗資源的對象創建和銷毀,這樣就引入了“池”的概念,
“池”的概念使得人們可以定制一定量的資源,然后對這些資源進行復用,而不是頻繁的創建和銷毀。

線程池是預先創建線程的一種技術
這些線程都是處於睡眠狀態,即均為啟動,不消耗CPU,而只是占用較小的內存空間。
當請求到來之后,緩沖池給這次請求分配一個空閑線程,把請求傳入此線程中運行,進行處理。
當預先創建的線程都處於運行狀態,即預制線程不夠,線程池可以自由創建一定數量的新線程,用於處理更多的請求。
當系統比較閑的時候,也可以通過移除一部分一直處於停用狀態的線程。

線程池的注意事項
雖然線程池是構建多線程應用程序的強大機制,但使用它並不是沒有風險的。在使用線程池時需注意線程池大小與性能的關系,注意並發風險、死鎖、資源不足和線程泄漏等問題。
1、線程池大小。多線程應用並非線程越多越好,需要根據系統運行的軟硬件環境以及應用本身的特點決定線程池的大小。

一般來說,如果代碼結構合理的話,線程數目與CPU 數量相適合即可。
如果線程運行時可能出現阻塞現象,可相應增加池的大小;如有必要可采用自適應算法來動態調整線程池的大小,以提高CPU 的有效利用率和系統的整體性能。
2、並發錯誤。多線程應用要特別注意並發錯誤,要從邏輯上保證程序的正確性,注意避免死鎖現象的發生。
3、線程泄漏。這是線程池應用中一個嚴重的問題,當任務執行完畢而線程沒能返回池中就會發生線程泄漏現象。

線程池要點

線程池要點:

線程池要點:
1、通過判斷等待的任務數量和線程池中的最大值,取最小值來判斷開啟多少線程來工作
比如:
任務數是3,進程池最大20  ,那么咱們只需要開啟3個線程就行了。
任務數是500,進程池是20,那么咱們只開20個線程就可以了。
取最小值

2、實現線程池正在運行,有一個查看的功能,查看一下現在線程里面活躍的線程是多少等待的是多少?

線程總共是多少,等待中多少,正在運行中多少
作用:
方便查看當前線程池狀態
能獲取到這個之后就可以當線程一直處於空閑狀態

查看狀態用:上下文管理來做,非常nice的一點

3、關閉線程

簡單線程池實現

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'
import Queue
import threading
import time

'''
這個簡單的例子的想法是通過:
1、利用Queue特性,在Queue里創建多個線程對象
2、那我執行代碼的時候,去queue里去拿線程!
如果線程池里有可用的,直接拿。
如果線程池里沒有可用,那就等。
3、線程執行完畢,歸還給線程池
'''

class ThreadPool(object): #創建線程池類
    def __init__(self,max_thread=20):#構造方法,設置最大的線程數為20
        self.queue = Queue.Queue(max_thread) #創建一個隊列
        for i in xrange(max_thread):#循環把線程對象加入到隊列中
            self.queue.put(threading.Thread)
            #把線程的類名放進去,執行完這個Queue

    def get_thread(self):#定義方法從隊列里獲取線程
        return self.queue.get()

    def add_thread(self):#定義方法在隊列里添加線程
        self.queue.put(threading.Thread)

pool = ThreadPool(10)

def func(arg,p):
    print arg
    time.sleep(2)
    p.add_thread() #當前線程執行完了,我在隊列里加一個線程!

for i in xrange(300):
    thread = pool.get_thread() #線程池10個線程,每一次循環拿走一個!默認queue.get(),如果隊列里沒有數據就會等待。
    t = thread(target=func,args=(i,pool))
    t.start()


'''
self.queue.put(threading.Thread) 添加的是類不是對象,在內存中如果相同的類只占一份內存空間
並且如果這里存儲的是對象的話每次都的新增都得在內存中開辟一段內存空間

還有如果是對象的話:下面的這個語句就不能這么調用了!
for i in xrange(300):
    thread = pool.get_thread()
    t = thread(target=func,args=(i,pool))
    t.start()
    通過查看源碼可以知道,在thread的構造函數中:self.__args = args  self.__target = target  都是私有字段那么調用就應該這么寫

for i in xrange(300):
    ret = pool.get_thread()
    ret._Thread__target = func
    ret._Thread__args = (i,pool)
    ret.start()
'''
simple_pool.py

復雜線程池需要知道的知識點

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'

import Queue
obj = object() #object也是一個類,我創建了一個對象obj

q = Queue.Queue()
for i in range(10):
    print id(obj)#看蘿卜號
    q.put(obj)
'''
這個隊列里有10個蘿卜(蘿卜=obj),但是這10個蘿卜只是個投影。
我們在for循環的時候put到隊列里,obj有變化嗎?是否有新開辟空間?顯然沒有
'''
knowledge_point_1.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'
import contextlib
import threading
import time
import random

doing = []
def number(l2):
    while True:
        print len(l2)
        time.sleep(1)

t = threading.Thread(target=number,args=(doing,))  #開啟一個線程,每一秒打印列表,當前工作中的線程數量
t.start()


#添加管理上下文的裝飾器
@contextlib.contextmanager
def show(li,iterm):
    li.append(iterm)
    yield
    '''
    yield凍結這次操作,就出去了,with就會捕捉到,然后就會執行with下的代碼塊,當with下的代碼塊
    執行完畢后就會回來繼續執行yield下面沒有執行的代碼塊!
    然后就執行完畢了
    如果with代碼塊中的非常耗時,那么doing的長度是不是一直是1,說明他沒執行完呢?我們就可以獲取到正在執行的數量,當他with執行完畢后
    執行yield的后續的代碼塊。把他移除后就為0了!
    '''
    li.remove(iterm)


def task(arg):
    with show(doing,1):#通過with管理上下文進行切換
        print len(doing)
        time.sleep(10) #等待10秒這里可以使用random模塊來操作~

for i in range(20): #開啟20個線程執行
    temp = threading.Thread(target=task,args=(i,))
    temp.start()

'''
作用:我們要記錄正在工作的的列表
比如正在工作的線程我把加入到doing這個列表中,如果工作完成的把它從doing列表中移除。
通過這個機制,就可以獲取現在正在執行的線程都有多少
'''
knowledge_point_2.py

線程池實現

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'
from Queue import Queue
import contextlib
import threading

WorkerStop = object()


class ThreadPool:
    workers = 0
    threadFactory = threading.Thread
    currentThread = staticmethod(threading.currentThread)

    def __init__(self, maxthreads=20, name=None):
        self.q = Queue(0) #這里創建一個隊列,如果是0的話表示不限制,現在這個隊列里放的是任務
        self.max = maxthreads #定義最大線程數
        self.name = name
        self.waiters = []#這兩個是用來計數的
        self.working = []#這兩個是用來技術的

    def start(self):
        #self.max 最大線程數
        #q.qisze(),任務個數
        needSize = self.q.qsize()
        while self.workers < min(self.max, needSize):#min(10,20)取最小值
            #wokers默認為0  【workers = 0】
            '''
            舉例來說:
            while self.workers < min(self.max, needSize):
            這個循環,比如最大線程為20,咱們的任務個數為10,取最小值為10
            每次循環開1個線程,並且workers自增1,那么循環10次后,開了10個線程了workers = 10 ,那么workers就不小於10了
            就不開線程了,我線程開到最大了,你們這10個線程去消耗這10個任務去吧
            並且這里不阻塞,創建完線程就去執行了!
            每一個線程都去執行_worker方法去了
            '''
            self.startAWorker()

    def startAWorker(self):
        self.workers += 1
        newThread = self.threadFactory(target=self._worker, name='shuaige') #創建一個線程並去執行_worker方法
        newThread.start()

    def callInThread(self, func, *args, **kw):
        self.callInThreadWithCallback(None, func, *args, **kw)

    def callInThreadWithCallback(self, onResult, func, *args, **kw):
        o = (func, args, kw, onResult)
        self.q.put(o)


    @contextlib.contextmanager
    def _workerState(self, stateList, workerThread):
        stateList.append(workerThread)
        try:
            yield
        finally:
            stateList.remove(workerThread)

    def _worker(self):
        ct = self.currentThread()
        o = self.q.get() #去隊列里取任務,如果有任務就O就會有值,每個任務是個元組,有方法,有參數
        while o is not WorkerStop:
            with self._workerState(self.working, ct):  #上下文切換
                function, args, kwargs, onResult = o
                del o
                try:
                    result = function(*args, **kwargs)
                    success = True
                except:
                    success = False
                    if onResult is None:
                        pass
                    else:
                        pass

                del function, args, kwargs

                if onResult is not None:
                    try:
                        onResult(success, result)
                    except:
                        #context.call(ctx, log.err)
                        pass

                del onResult, result

            with self._workerState(self.waiters, ct): #當線程工作完閑暇的時候,在去取任務執行
                o = self.q.get()

    def stop(self): #定義關閉線程方法
        while self.workers: #循環workers值
            self.q.put(WorkerStop) #在隊列中增加一個信號~
            self.workers -= 1 #workers值-1 直到所有線程關閉


def show(arg):
    import time
    time.sleep(1)
    print arg


pool = ThreadPool(10)

#創建500個任務,隊列里添加了500個任務
#每個任務都是一個元組(方法名,動態參數,動態參數,默認為NoNe)
for i in range(100):
    pool.callInThread(show, i)

pool.start()  #隊列添加完成之后,開啟線程讓線程一個一個去隊列里去拿

pool.stop() #當上面的任務都執行完之后,線程中都在等待着在隊列里去數據呢!
'''
我們要關閉所有的線程,執行stop方法,首先workers這個值是當前的線程數量,我們給線程發送一個信號“WorkerStop”
在線程的工作里:        while o is not WorkerStop:   如果線程獲取到這個值就不執行了,然后這個線程while循環就停止了,等待
python的垃圾回收機制,回收。

然后在self.workers -= 1 ,那么所有的線程收到這個信號之后就會停止!!!
over~
'''

 

更多請參考:http://www.cnblogs.com/wupeiqi/articles/4839959.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM