python遍歷文件夾下的csv文件,讀取文件內容存到數據庫


一、使用python導入的原因

1、csv文件為從文件數據庫導出的數據文件,一個一個的導入到數據庫效率就比較低下;

2、日期形式的字段會存在特殊的字符或者字段中包含了單引號就會報錯。

 

二、操作

  1、循環讀取選定文件夾下的文件

'''
    讀取文件夾下的csv文件
'''
def readAllFiles(filePath):
    fileList = os.listdir(filePath)
    for file in fileList:
        path = os.path.join(filePath, file)
        if os.path.isfile(path):
            file = open(path, 'r', encoding='utf-8')
            print(path)
            #流程記錄信息
            if path.find("workflow") != -1:
                analysisWorkflowCsv(file)
                pass
            #意見信息
            elif path.find("opinion") != -1:
                analysisOpinionCsv(file)
                pass
            #發文數據
            elif path.find("wd_24") != -1:
                analysisWd24Csv(file)
                pass
            #收文數據
            elif path.find("wd_25") != -1:
                analysisWd25Csv(file)
                pass
        else:
            readAllFiles(path)

 

2、解析文件內容,首行為標題欄需要跳過。入庫操作每滿1000條commit一次主要是python頻繁提交執行次數達到1000+就會報錯。1000條commit一次可以避免錯誤並緩解內存壓力。

'''
    解析文件
'''
def analysisWorkflowCsv(file):
    csvFile = csv.reader(file)
    # 讀取一行,下面的reader中已經沒有該行了
    head_row = next(csvFile)
    # print(head_row)
    __conn = getConnect_old()
    counter = 0
    for row in csvFile:
        workflow = {}
        workflow['UUID'] = row[0]
        workflow['subject'] = row[1]
        workflow['signdate'] = row[2]
        workflow['U_UnitName'] = row[3]
        workflow['U_UnitUser'] = row[4]
        workflow['U_UnitUserTitle'] = row[5]
        workflow['U_UnitEndTime'] = row[6]
        workflow['U_UnitAction'] = row[7]
        workflow['U_UnitToTitle'] = row[8]
        if insertWorkflows(__conn, workflow):
            counter += 1
        if counter % 1000 == 0:
            __conn.commitData()
    print("已經插入工作流數據: %d 條。"%counter)
    __conn.commitData()
    __conn.closeConn()

 

3、數據入庫

'''
    插入工作流程數據
'''
def insertWorkflows(__conn, workflow):
    __sql = '''
        INSERT INTO workflows (
            UUID, U_UnitName, U_UnitUser, U_UnitUserTitle, U_UnitEndTime, U_UnitAction, U_UnitToTitle, subject, signdate
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
    '''
    __params = (
        workflow['UUID'], workflow['U_UnitName'], workflow['U_UnitUser'], workflow['U_UnitUserTitle'],
        workflow['U_UnitEndTime'], workflow['U_UnitAction'], workflow['U_UnitToTitle'], workflow['subject'],
        workflow['signdate']
        )
    # print(__sql % __params)
    return __conn.mssql_exe_sql(__sql, __params)

 

4、python操作sqlserver代碼

import pymssql

os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'

'''
數據庫連接
'''
class ConnectionDatabase(object):
    # 連接mysql數據庫
    def __init__(self, ip, user_name, passwd, db, char='utf8'):
        self.ip = ip
        # self.port = port
        self.username = user_name
        self.passwd = passwd
        self.mysqldb = db
        self.char = char

        self.MsSQL_db = pymssql.connect(
            host=self.ip,
            user=self.username,
            password=self.passwd,
            database=self.mysqldb,
            charset=self.char)
  # 查詢數據(sqlserver)
    def mssql_findList(self, sql):
        cursor = self.MsSQL_db.cursor()
        MsSQL_sql = sql
        results = None
        if not cursor:
            raise (NameError,"數據庫連接失敗")
        try:
            # 執行SQL語句
            cursor.execute(MsSQL_sql)
            # 獲取所有記錄列表
            results = cursor.fetchall()
        except Exception as e:
            print(e)
            self.MsSQL_db.close()
        if results:
            return results
        else:
            return None

    # 數據增刪改查(sqlserver)
    def mssql_exe_sql(self, sql, params):
        cursor = self.MsSQL_db.cursor()
        MsSQL_sql = sql
        result = 0
        if not cursor:
            raise (NameError,"數據庫連接失敗")
        try:
            # 執行SQL語句
            cursor.execute(MsSQL_sql, params)
            result = cursor.rowcount
        except Exception as e:
            print(e)
            self.MsSQL_db.rollback()
            self.MsSQL_db.close()

        return result>0

    '''
        提交數據集
    '''
    def commitData(self):
        try:
            self.MsSQL_db.commit()
        except Exception as e:
            print(e)

    '''
        關閉數據庫連接
    '''
    def closeConn(self):
        if self.MsSQL_db:
            self.MsSQL_db.close()

 

5、執行代碼

if __name__ == "__main__":
    #文件所在的文件夾父路徑
    # testFilePath = "G:\數據解析\csv\workflowcsv"
    testFilePath = "G:\數據解析\csv\wd25csv"
    readAllFiles(testFilePath)

 

遇到的問題及解決方式:

  (1)以上代碼執行時如果有時間類型的字段需要對字符串進行轉換;

     re.sub('[^0-9 | \- | : ]', '', timestr) 

    利用正則表達式將時間字符串中的特殊字符去掉,再轉換為時間字符串,避免代碼執行時類型轉換錯誤。

  (2)數據庫插入數據的sql語句最好使用的是帶參數的執行方式,不要使用sql占位符拼接的方式,這樣可能出現單引號“'”導致sql執行失敗。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM