import time
from multiprocessing import Process, JoinableQueue, cpu_count
import csv
####處理一條數據的方法
def deal_line(line, writer, csv_file):
writer.writerow((line[3], line[1]))
csv_file.flush()#重點,在多進程中寫文件需要盡快刷新,否則可能會導致數據丟失
####消費者模型
def consumer(queue, writer, csv_file):
while True:
line = queue.get()
deal_line(line, writer, csv_file)
queue.task_done()
####生產者模型
def producer(queue):
with open('test.txt', 'r') as f:
for line in f:
queue.put(line)
####啟動N個生產者N個消費者模型
def main():
with open('t1.csv', 'w+') as csv_file:
writer = csv.writer(csv_file)
queue = JoinableQueue(8)#可限制隊列長度
pc = Process(target=producer, args=(queue,))
pc.start()
#多消費者
for _ in range(cpu_count()):
c1 = Process(target=consumer, args=(queue, writer, csv_file))
c1.daemon = True
c1.start()
pc.join()#等待生產者進程全部生成完畢
queue.join()# 等待所有數據全部處理完畢
# 當某些些進程是死循環時可強制終止
# pc.terminate()
if __name__ == '__main__':
now = lambda: time.time()
start = now()
main()
print("Time:", now() - start)