用python按行處理文件內容,並輸出到另外一個文件


用python按行處理文件內容,並輸出到另外一個文件:

import os
import pymysql
import json
import threading
import re


def get_full_read_file_name(name):
    read_file = read_dic + read_file_prefix + str(name) + read_file_suffix
    print("read file name:" + read_file)
    return read_file


def get_full_write_file_name(name):
    write_file = insert_dic + insert_file_prefix + str(name) + insert_file_suffix
    print("write file name:" + write_file)
    return write_file


def do_something(line, file_read, file_write):
    file_write.write(line)


def task(thread_name, file_name):
    print("當前線程:" + str(thread_name))
    # 如果有連接數據庫的需要,可以放開此處代碼
    # db = pymysql.connect("localhost", "root", "root", "test", 3307)
    # cursor = db.cursor()
    if not os.path.isdir(read_dic):
        return
    if not os.path.isfile(get_full_read_file_name(file_name)):
        return
    if not os.path.isdir(insert_dic):
        os.mkdir(insert_dic)
    # 每讀取一個文件,就創建一個對應的文件
    file = open(get_full_read_file_name(file_name), "r")
    print("當前讀取文件:" + get_full_read_file_name(file_name))
    file_insert = open(get_full_write_file_name(file_name), "w")
    print("當前寫入文件:" + get_full_write_file_name(file_name))
    # 初始化行計數器
    line_count = 0
    for line in file:
        # 針對每行作校驗,滿足條件的才進行處理
        if re.search("err", line):
            continue
        # 針對每一行做響應的處理
        do_something(line, file, file_insert)
        line_count += 1
        # 每一千行插入一個sleep
        if line_count == 1000:
            file_insert.write("select SLEEP(1);\n")
            line_count = 0
    file_insert.close()
    file.close()
    # 打開的數據庫記得關閉
    # db.close()


if __name__ == '__main__':
    # 讀取當前文件夾下面的所有文件
    threads = []
    # 讀取文件目錄
    read_dic = '/Users/dxm/Documents/'
    # 寫入文件目錄
    insert_dic = '/Users/dxm/Documents/'
    # 讀取文件前綴
    read_file_prefix = 'bid_did'
    # 讀取文件后綴
    read_file_suffix = '.sql'
    # 寫入文件前綴
    insert_file_prefix = 'bid_did_with_sleep'
    # 寫入文件后綴
    insert_file_suffix = '.sql'
    try:
        thread_count = 10
        while thread_count < 50:
            # 創建線程拼接SQL
            t = threading.Thread(target=task, args=("thread-" + str(thread_count), thread_count))
            threads.append(t)
            thread_count += 1

        for t in threads:
            t.setDaemon(True)
            t.start()

        for t in threads:
            t.join()

        print("任務執行成功")
    except (
            RuntimeError, ValueError, TypeError, BufferError, ConnectionError, ConnectionResetError,
            ConnectionAbortedError):
        print("線程被終止,異常")


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM