Python中開啟線程和線程池的方法,


一.最佳線程數的獲取:

  1、通過用戶慢慢遞增來進行性能壓測,觀察QPS(即每秒的響應請求數,也即是最大吞吐能力。),響應時間

  2、根據公式計算:服務器端最佳線程數量=((線程等待時間+線程cpu時間)/線程cpu時間) * cpu數量

  3、單用戶壓測,查看CPU的消耗,然后直接乘以百分比,再進行壓測,一般這個值的附近應該就是最佳線程數量。

二、為什么要使用線程池?

  1. 多線程中,線程的數量並非越多越好

  2.節省每次開啟線程的開銷

三、如何實現線程池?

  1. threadpool模塊

  2.concurrent.futures

  3.重寫threadpool或者future的函數

  4.vthread 模塊

1、過去:

使用threadpool模塊,這是個python的第三方模塊,支持python2和python3,具體使用方式如下:

復制代碼
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import threadpool
import time

def sayhello (a):
    print("hello: "+a)
    time.sleep(2)

def main():
    global result
    seed=["a","b","c"]
    start=time.time()
    task_pool=threadpool.ThreadPool(5)
    requests=threadpool.makeRequests(sayhello,seed)
    for req in requests:
        task_pool.putRequest(req)
    task_pool.wait()
    end=time.time()
    time_m = end-start
    print("time: "+str(time_m))
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))

if __name__ == '__main__':
    main()
復制代碼

運行結果如下:

threadpool是一個比較老的模塊了,現在雖然還有一些人在用,但已經不再是主流了,關於python多線程,現在已經開始步入未來(future模塊)了

2、未來:

使用concurrent.futures模塊,這個模塊是python3中自帶的模塊,但是,python2.7以上版本也可以安裝使用,具體使用方式如下:

復制代碼
#! /usr/bin/env python
# -*- coding: utf-8 -*-

from concurrent.futures import ThreadPoolExecutor
import time

def sayhello(a):
    print("hello: "+a)
    time.sleep(2)

def main():
    seed=["a","b","c"]
    start1=time.time()
    for each in seed:
        sayhello(each)
    end1=time.time()
    print("time1: "+str(end1-start1))
    start2=time.time()
    with ThreadPoolExecutor(3) as executor:
        for each in seed:
            executor.submit(sayhello,each)
    end2=time.time()
    print("time2: "+str(end2-start2))
    start3=time.time()
    with ThreadPoolExecutor(3) as executor1:
        executor1.map(sayhello,seed)
    end3=time.time()
    print("time3: "+str(end3-start3))

if __name__ == '__main__':
    main()
復制代碼

運行結果如下:

注意到一點:

concurrent.futures.ThreadPoolExecutor,在提交任務的時候,有兩種方式,一種是submit()函數,另一種是map()函數,兩者的主要區別在於:

2.1、map可以保證輸出的順序, submit輸出的順序是亂的

2.2、如果你要提交的任務的函數是一樣的,就可以簡化成map。但是假如提交的任務函數是不一樣的,或者執行的過程之可能出現異常(使用map執行過程中發現問題會直接拋出錯誤)就要用到submit()

2.3、submit和map的參數是不同的,submit每次都需要提交一個目標函數和對應的參數,map只需要提交一次目標函數,目標函數的參數放在一個迭代器(列表,字典)里就可以。

3.現在?

這里要考慮一個問題,以上兩種線程池的實現都是封裝好的,任務只能在線程池初始化的時候添加一次,那么,假設我現在有這樣一個需求,需要在線程池運行時,再往里面添加新的任務(注意,是新任務,不是新線程),那么要怎么辦?

其實有兩種方式:

3.1、重寫threadpool或者future的函數:

這個方法需要閱讀源模塊的源碼,必須搞清楚源模塊線程池的實現機制才能正確的根據自己的需要重寫其中的方法。

3.2、自己構建一個線程池:

這個方法就需要對線程池的有一個清晰的了解了,附上我自己構建的一個線程池:

復制代碼
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import Queue
import hashlib
import logging
from utils.progress import PrintProgress
from utils.save import SaveToSqlite


class ThreadPool(object):
    def __init__(self, thread_num, args):

        self.args = args
        self.work_queue = Queue.Queue()
        self.save_queue = Queue.Queue()
        self.threads = []
        self.running = 0
        self.failure = 0
        self.success = 0
        self.tasks = {}
        self.thread_name = threading.current_thread().getName()
        self.__init_thread_pool(thread_num)

    # 線程池初始化
    def __init_thread_pool(self, thread_num):
        # 下載線程
        for i in range(thread_num):
            self.threads.append(WorkThread(self))
        # 打印進度信息線程
        self.threads.append(PrintProgress(self))
        # 保存線程
        self.threads.append(SaveToSqlite(self, self.args.dbfile))

    # 添加下載任務
    def add_task(self, func, url, deep):
        # 記錄任務,判斷是否已經下載過
        url_hash = hashlib.new('md5', url.encode("utf8")).hexdigest()
        if not url_hash in self.tasks:
            self.tasks[url_hash] = url
            self.work_queue.put((func, url, deep))
            logging.info("{0} add task {1}".format(self.thread_name, url.encode("utf8")))

    # 獲取下載任務
    def get_task(self):
        # 從隊列里取元素,如果block=True,則一直阻塞到有可用元素為止。
        task = self.work_queue.get(block=False)

        return task

    def task_done(self):
        # 表示隊列中的某個元素已經執行完畢。
        self.work_queue.task_done()

    # 開始任務
    def start_task(self):
        for item in self.threads:
            item.start()

        logging.debug("Work start")

    def increase_success(self):
        self.success += 1

    def increase_failure(self):
        self.failure += 1

    def increase_running(self):
        self.running += 1

    def decrease_running(self):
        self.running -= 1

    def get_running(self):
        return self.running

    # 打印執行信息
    def get_progress_info(self):
        progress_info = {}
        progress_info['work_queue_number'] = self.work_queue.qsize()
        progress_info['tasks_number'] = len(self.tasks)
        progress_info['save_queue_number'] = self.save_queue.qsize()
        progress_info['success'] = self.success
        progress_info['failure'] = self.failure

        return progress_info

    def add_save_task(self, url, html):
        self.save_queue.put((url, html))

    def get_save_task(self):
        save_task = self.save_queue.get(block=False)

        return save_task

    def wait_all_complete(self):
        for item in self.threads:
            if item.isAlive():
                # join函數的意義,只有當前執行join函數的線程結束,程序才能接着執行下去
                item.join()

# WorkThread 繼承自threading.Thread
class WorkThread(threading.Thread):
    # 這里的thread_pool就是上面的ThreadPool類
    def __init__(self, thread_pool):
        threading.Thread.__init__(self)
        self.thread_pool = thread_pool

    #定義線程功能方法,即,當thread_1,...,thread_n,調用start()之后,執行的操作。
    def run(self):
        print (threading.current_thread().getName())
        while True:
            try:
                # get_task()獲取從工作隊列里獲取當前正在下載的線程,格式為func,url,deep
                do, url, deep = self.thread_pool.get_task()
                self.thread_pool.increase_running()

                # 判斷deep,是否獲取新的鏈接
                flag_get_new_link = True
                if deep >= self.thread_pool.args.deep:
                    flag_get_new_link = False

                # 此處do為工作隊列傳過來的func,返回值為一個頁面內容和這個頁面上所有的新鏈接
                html, new_link = do(url, self.thread_pool.args, flag_get_new_link)

                if html == '':
                    self.thread_pool.increase_failure()
                else:
                    self.thread_pool.increase_success()
                    # html添加到待保存隊列
                    self.thread_pool.add_save_task(url, html)

                # 添加新任務,即,將新頁面上的不重復的鏈接加入工作隊列。
                if new_link:
                    for url in new_link:
                        self.thread_pool.add_task(do, url, deep + 1)

                self.thread_pool.decrease_running()
                # self.thread_pool.task_done()
            except Queue.Empty:
                if self.thread_pool.get_running() <= 0:
                    break
            except Exception, e:
                self.thread_pool.decrease_running()
                # print str(e)
                break


 
復制代碼
 
 
 

安裝vthread函數庫

系統命令行下執行:
pip install vthread

0x02 一句話實現簡單多線程

import vthread,requests @vthread.thread(5) #開5個線程執行同一個函數 def compete(url): r = requests.get(url) if r.status_code == 200 : print("[*]Success") else: print("[*]Fail. Retrying...") compete("http://www.baidu.com/") 

相同效果:

import vthread,requests @vthread.thread def compete(url): r = requests.get(url) if r.status_code == 200 : print("[*]Success") else: print("[*]Fail. Retrying...") for i in range(5): #線程數 compete("http://www.baidu.com/") 

0x03 線程池包裝

import vthread,requests @vthread.pool(10) #包裝10條線程池 def compete(url): r = requests.get(url) if r.status_code == 200 : print("[*]Success") else: print("[*]Fail. Retrying...") for i in range(20): #20線程 compete("http://www.baidu.com/")


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM