用python按行處理文件內容,並輸出到另外一個文件:
import os
import pymysql
import json
import threading
import re
def get_full_read_file_name(name):
read_file = read_dic + read_file_prefix + str(name) + read_file_suffix
print("read file name:" + read_file)
return read_file
def get_full_write_file_name(name):
write_file = insert_dic + insert_file_prefix + str(name) + insert_file_suffix
print("write file name:" + write_file)
return write_file
def do_something(line, file_read, file_write):
file_write.write(line)
def task(thread_name, file_name):
print("當前線程:" + str(thread_name))
# 如果有連接數據庫的需要,可以放開此處代碼
# db = pymysql.connect("localhost", "root", "root", "test", 3307)
# cursor = db.cursor()
if not os.path.isdir(read_dic):
return
if not os.path.isfile(get_full_read_file_name(file_name)):
return
if not os.path.isdir(insert_dic):
os.mkdir(insert_dic)
# 每讀取一個文件,就創建一個對應的文件
file = open(get_full_read_file_name(file_name), "r")
print("當前讀取文件:" + get_full_read_file_name(file_name))
file_insert = open(get_full_write_file_name(file_name), "w")
print("當前寫入文件:" + get_full_write_file_name(file_name))
# 初始化行計數器
line_count = 0
for line in file:
# 針對每行作校驗,滿足條件的才進行處理
if re.search("err", line):
continue
# 針對每一行做響應的處理
do_something(line, file, file_insert)
line_count += 1
# 每一千行插入一個sleep
if line_count == 1000:
file_insert.write("select SLEEP(1);\n")
line_count = 0
file_insert.close()
file.close()
# 打開的數據庫記得關閉
# db.close()
if __name__ == '__main__':
# 讀取當前文件夾下面的所有文件
threads = []
# 讀取文件目錄
read_dic = '/Users/dxm/Documents/'
# 寫入文件目錄
insert_dic = '/Users/dxm/Documents/'
# 讀取文件前綴
read_file_prefix = 'bid_did'
# 讀取文件后綴
read_file_suffix = '.sql'
# 寫入文件前綴
insert_file_prefix = 'bid_did_with_sleep'
# 寫入文件后綴
insert_file_suffix = '.sql'
try:
thread_count = 10
while thread_count < 50:
# 創建線程拼接SQL
t = threading.Thread(target=task, args=("thread-" + str(thread_count), thread_count))
threads.append(t)
thread_count += 1
for t in threads:
t.setDaemon(True)
t.start()
for t in threads:
t.join()
print("任務執行成功")
except (
RuntimeError, ValueError, TypeError, BufferError, ConnectionError, ConnectionResetError,
ConnectionAbortedError):
print("線程被終止,異常")