多線程處理框架
python2.7
python3.5
多線程通用任務處理型驅動框架
probe_type 探測類型rtsp或者http
task_queue 任務隊列
task_handler 任務處理函數
thread_count 線程數數目
result_queue 結果存放隊列
args,kwargs為可變參數列表,為擴展性考慮
2016-8-26
python3新增中斷操作
#!/usr/bin/env python2
# coding=utf-8
import threading
import argparse
import Queue
class MultiThreadHandler(object):
"""
多線程通用任務處理型驅動框架
task_queue 任務隊列
task_handler 任務處理函數
thread_count 線程數數目
result_queue 結果存放隊列
args,kwargs為可變參數列表,為擴展性考慮
"""
def __init__(self, task_queue, task_handler, result_queue=None, thread_count=1, *args, **kwargs):
self.task_queue = task_queue
self.result_queue = result_queue
self.task_handler = task_handler
self.thread_count = thread_count
self.args = args
self.kwagrs = kwargs
def run(self, block_flag):
thread_pool = []
for i in range(self.thread_count):
t = _TaskHandler(self.task_queue, self.task_handler, self.result_queue, *self.args, **self.kwagrs)
thread_pool.append(t)
for th in thread_pool:
th.start()
# 阻塞等待所有線程結束
if block_flag:
for th in thread_pool:
threading.Thread.join(th)
class _TaskHandler(threading.Thread):
"""
一個任務處理器線程,task_queue任務隊列,result_queue是結果隊列,task_handler任務處理函數,args,kwargs可變控制參數
"""
def __init__(self, task_queue, task_handler, result_queue=None, *args, **kwargs):
threading.Thread.__init__(self)
self.task_queue = task_queue
self.task_handler = task_handler
self.result_queue = result_queue
self.args = args
self.kwargs = kwargs
def run(self):
while True:
try:
item = self.task_queue.get(False)
self.task_handler(item, self.result_queue, *self.args, **self.kwargs)
self.task_queue.task_done()
except Queue.Empty, e:
print "all task has done!"
break
except Exception, e:
print "error:", e
def out(item, result_queue): # 自行加載處理函數
host = item
result_queue.put(host)
if __name__ == '__main__':
# parse the command args
parse = argparse.ArgumentParser()
parse.add_argument("-f", "--file", help="the target file")
parse.add_argument("-th", "--thread", type=int, default=1, help="the thread number")
parse.add_argument("-o", "--outfile", help="the outputfile")
# 解析命令行
results = parse.parse_args()
filename = results.file
th = results.thread
outfile = results.outfile
task_queue = Queue.Queue()
out_queue = Queue.Queue()
with open(filename) as f:
for line in f:
line = line.rstrip()
if line:
task_queue.put(line)
MultiThreadHandler(task_queue, out, out_queue, th).run(True)
with open(outfile, "w+") as f:
while True:
f.write(out_queue.get() + '\n')
if out_queue.empty():
break
#! python3
# coding=utf-8
import queue
import argparse
import threading
import time
class MultiThreadHandler(object):
"""
多線程通用任務處理型驅動框架
task_queue 任務隊列
task_handler 任務處理函數
thread_count 線程數數目
result_queue 結果存放隊列
args,kwargs為可變參數列表,為擴展性考慮
"""
def __init__(self, task_queue, task_handler, result_queue=None, thread_count=1, *args, **kwargs):
self.task_queue = task_queue
self.task_handler = task_handler
self.result_queue = result_queue
self.thread_count = thread_count
self.args = args
self.kwagrs = kwargs
self.thread_pool = []
def run(self, block_flag):
for i in range(self.thread_count):
t = _TaskHandler(self.task_queue, self.task_handler, self.result_queue, *self.args, **self.kwagrs)
self.thread_pool.append(t)
for th in self.thread_pool:
th.setDaemon(True)
th.start()
'''
# 阻塞等待所有線程結束
if block_flag:
for th in thread_pool:
threading.Thread.join(th)
'''
# 阻塞等待所有線程結束
while self._check_stop():
try:
time.sleep(1)
except KeyboardInterrupt:
print('KeyboardInterruption')
self.stop_all()
break
print('>>>all Done')
def _check_stop(self):
"""檢查線程池中所有線程是否全部運行完"""
finish_num = 0
for th in self.thread_pool:
if not th.isAlive():
finish_num += 1
return False if finish_num == len(self.thread_pool) else True
def stop_all(self):
"""掉用線程體stop方法,停止所有線程"""
for th in self.thread_pool:
th.stop()
class _TaskHandler(threading.Thread):
"""
一個任務處理器線程,task_queue任務隊列,task_handler任務處理函數,result_queue是結果隊列,args,kwargs可變控制參數
可外部中斷
"""
def __init__(self, task_queue, task_handler, result_queue=None, *args, **kwargs):
threading.Thread.__init__(self)
self.task_queue = task_queue
self.task_handler = task_handler
self.result_queue = result_queue
self.args = args
self.kwargs = kwargs
self.is_stoped = True
def run(self):
while self.is_stoped:
try:
item = self.task_queue.get(False) # block= False
self.task_handler(item, self.result_queue, *self.args, **self.kwargs)
self.task_queue.task_done() # 退出queue
except queue.Empty as e:
print("all task has done!")
break
except Exception as e:
print("error:", e)
# time.sleep(1)
def stop(self):
self.is_stoped = False
def out(item, result_queue): # 加載處理函數
result_queue.put(item)
if __name__ == '__main__':
# parse the command args
start = time.time()
parse = argparse.ArgumentParser()
parse.add_argument("-f", "--file", help="the target file")
parse.add_argument("-th", "--thread", type=int, default=1, help="the thread number")
parse.add_argument("-o", "--outfile", help="the outputfile")
# 解析命令行
results = parse.parse_args()
filename = results.file
th = results.thread
outfile = results.outfile
task_queue = queue.Queue()
out_queue = queue.Queue()
with open(filename, "r+") as f:
for line in f:
line = line.rstrip()
if line:
task_queue.put(line)
MultiThreadHandler(task_queue, out, out_queue, th).run(True)
with open(outfile, "w+") as f:
while True:
f.write(out_queue.get() + '\n')
if out_queue.empty():
break
end = time.time()
print(end - start)