python 多線程處理框架


多線程處理框架

python2.7
python3.5

多線程通用任務處理型驅動框架
probe_type 探測類型rtsp或者http
task_queue 任務隊列
task_handler 任務處理函數
thread_count 線程數數目
result_queue 結果存放隊列
args,kwargs為可變參數列表,為擴展性考慮

2016-8-26
python3新增中斷操作

#!/usr/bin/env python2
# coding=utf-8
import threading
import argparse
import Queue


class MultiThreadHandler(object):
    """
    多線程通用任務處理型驅動框架
    task_queue 任務隊列
    task_handler 任務處理函數
    thread_count 線程數數目
    result_queue 結果存放隊列
    args,kwargs為可變參數列表,為擴展性考慮
    """

    def __init__(self, task_queue, task_handler, result_queue=None, thread_count=1, *args, **kwargs):
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.task_handler = task_handler
        self.thread_count = thread_count
        self.args = args
        self.kwagrs = kwargs

    def run(self, block_flag):
        thread_pool = []
        for i in range(self.thread_count):
            t = _TaskHandler(self.task_queue, self.task_handler, self.result_queue, *self.args, **self.kwagrs)
            thread_pool.append(t)
        for th in thread_pool:
            th.start()
        # 阻塞等待所有線程結束
        if block_flag:
            for th in thread_pool:
                threading.Thread.join(th)


class _TaskHandler(threading.Thread):
    """
    一個任務處理器線程,task_queue任務隊列,result_queue是結果隊列,task_handler任務處理函數,args,kwargs可變控制參數
    """

    def __init__(self, task_queue, task_handler, result_queue=None, *args, **kwargs):
        threading.Thread.__init__(self)
        self.task_queue = task_queue
        self.task_handler = task_handler
        self.result_queue = result_queue
        self.args = args
        self.kwargs = kwargs

    def run(self):
        while True:
            try:
                item = self.task_queue.get(False)
                self.task_handler(item, self.result_queue, *self.args, **self.kwargs)
                self.task_queue.task_done()
            except Queue.Empty, e:
                print "all task has done!"
                break
            except Exception, e:
                print "error:", e


def out(item, result_queue):        # 自行加載處理函數
    host = item
    result_queue.put(host)


if __name__ == '__main__':
    # parse the command args
    parse = argparse.ArgumentParser()
    parse.add_argument("-f", "--file", help="the target file")
    parse.add_argument("-th", "--thread", type=int, default=1, help="the thread number")
    parse.add_argument("-o", "--outfile", help="the outputfile")
    # 解析命令行
    results = parse.parse_args()
    filename = results.file
    th = results.thread
    outfile = results.outfile
    task_queue = Queue.Queue()
    out_queue = Queue.Queue()
    with open(filename) as f:
        for line in f:
            line = line.rstrip()
            if line:
                task_queue.put(line)

    MultiThreadHandler(task_queue, out, out_queue, th).run(True)

    with open(outfile, "w+") as f:
        while True:
            f.write(out_queue.get() + '\n')
            if out_queue.empty():
                break
#! python3
# coding=utf-8


import queue
import argparse
import threading
import time


class MultiThreadHandler(object):
    """
    多線程通用任務處理型驅動框架
    task_queue 任務隊列
    task_handler 任務處理函數
    thread_count 線程數數目
    result_queue 結果存放隊列
    args,kwargs為可變參數列表,為擴展性考慮
    """

    def __init__(self, task_queue, task_handler, result_queue=None, thread_count=1, *args, **kwargs):
        self.task_queue = task_queue
        self.task_handler = task_handler
        self.result_queue = result_queue
        self.thread_count = thread_count
        self.args = args
        self.kwagrs = kwargs
        self.thread_pool = []

    def run(self, block_flag):
        for i in range(self.thread_count):
            t = _TaskHandler(self.task_queue, self.task_handler, self.result_queue, *self.args, **self.kwagrs)
            self.thread_pool.append(t)
        for th in self.thread_pool:
            th.setDaemon(True)
            th.start()
        '''
        # 阻塞等待所有線程結束
        if block_flag:
            for th in thread_pool:
                threading.Thread.join(th)
        '''
        # 阻塞等待所有線程結束
        while self._check_stop():
            try:
                time.sleep(1)
            except KeyboardInterrupt:
                print('KeyboardInterruption')
                self.stop_all()
                break
        print('>>>all Done')

    def _check_stop(self):
        """檢查線程池中所有線程是否全部運行完"""
        finish_num = 0
        for th in self.thread_pool:
            if not th.isAlive():
                finish_num += 1

        return False if finish_num == len(self.thread_pool) else True

    def stop_all(self):
        """掉用線程體stop方法,停止所有線程"""
        for th in self.thread_pool:
            th.stop()


class _TaskHandler(threading.Thread):
    """
    一個任務處理器線程,task_queue任務隊列,task_handler任務處理函數,result_queue是結果隊列,args,kwargs可變控制參數
    可外部中斷
    """

    def __init__(self, task_queue, task_handler, result_queue=None, *args, **kwargs):
        threading.Thread.__init__(self)
        self.task_queue = task_queue
        self.task_handler = task_handler
        self.result_queue = result_queue
        self.args = args
        self.kwargs = kwargs
        self.is_stoped = True

    def run(self):
        while self.is_stoped:
            try:
                item = self.task_queue.get(False)  # block= False
                self.task_handler(item, self.result_queue, *self.args, **self.kwargs)
                self.task_queue.task_done()  # 退出queue
            except queue.Empty as e:
                print("all task has done!")
                break
            except Exception as e:
                print("error:", e)
            # time.sleep(1)

    def stop(self):
        self.is_stoped = False


def out(item, result_queue):  # 加載處理函數
    result_queue.put(item)


if __name__ == '__main__':
    # parse the command args
    start = time.time()
    parse = argparse.ArgumentParser()
    parse.add_argument("-f", "--file", help="the target file")
    parse.add_argument("-th", "--thread", type=int, default=1, help="the thread number")
    parse.add_argument("-o", "--outfile", help="the outputfile")
    # 解析命令行
    results = parse.parse_args()
    filename = results.file
    th = results.thread
    outfile = results.outfile
    task_queue = queue.Queue()
    out_queue = queue.Queue()
    with open(filename, "r+") as f:
        for line in f:
            line = line.rstrip()
            if line:
                task_queue.put(line)

    MultiThreadHandler(task_queue, out, out_queue, th).run(True)

    with open(outfile, "w+") as f:
        while True:
            f.write(out_queue.get() + '\n')
            if out_queue.empty():
                break
    end = time.time()
    print(end - start)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM