import time from multiprocessing import Process, JoinableQueue, cpu_count import csv ####處理一條數據的方法 def deal_line(line, writer, csv_file): writer.writerow((line[3], line[1])) csv_file.flush()#重點,在多進程中寫文件需要盡快刷新,否則可能會導致數據丟失 ####消費者模型 def consumer(queue, writer, csv_file): while True: line = queue.get() deal_line(line, writer, csv_file) queue.task_done() ####生產者模型 def producer(queue): with open('test.txt', 'r') as f: for line in f: queue.put(line) ####啟動N個生產者N個消費者模型 def main(): with open('t1.csv', 'w+') as csv_file: writer = csv.writer(csv_file) queue = JoinableQueue(8)#可限制隊列長度 pc = Process(target=producer, args=(queue,)) pc.start() #多消費者 for _ in range(cpu_count()): c1 = Process(target=consumer, args=(queue, writer, csv_file)) c1.daemon = True c1.start() pc.join()#等待生產者進程全部生成完畢 queue.join()# 等待所有數據全部處理完畢 # 當某些些進程是死循環時可強制終止 # pc.terminate() if __name__ == '__main__': now = lambda: time.time() start = now() main() print("Time:", now() - start)