python-pymysql 操作數據庫
創建數據庫-表 #導入pymysql import pymysql # 創建連接 DBHOST = 'localhost' DBUSER = 'root' DBPASS = 'lxl123456' # DBNAME = 'test' # pymysql的接口獲取鏈接 def mysql_conn(host, user, password): # 傳參版本 try: conn = pymysql.connect(host=host, user=user, password=password) print('數據庫連接成功!') except pymysql.Error as e: print("數據庫連接失敗:" + str(e)) return conn # 先創建cursor負責操作conn接口 conn = mysql_conn(DBHOST, DBUSER, DBPASS) cursor=conn.cursor() #設置游標 # 創建數據庫的sql(如果數據庫存在就不創建,防止異常) sql = "CREATE DATABASE IF NOT EXISTS test" # 執行創建數據庫的sql cursor.execute(sql) cursor.execute("use test") # 創建表 sql_2 = '''CREATE TABLE `express2` ( `運單號` bigint NOT NULL AUTO_INCREMENT, `物流單號` VARCHAR(80), `物流寶單號` VARCHAR(80), `行業` VARCHAR(80), `貨主` VARCHAR(80), `是否保價` VARCHAR(40), `是否催派` VARCHAR(40), `是否工單發起` VARCHAR(40), `是否預售下沉` VARCHAR(40), `停滯狀態` VARCHAR(80), `停滯時長` INT, `停滯要求時長` INT, `是否超停滯要求時長` VARCHAR(80), `物流停滯節點` VARCHAR(80), `發貨大區` VARCHAR(80), `發貨省` VARCHAR(80), `倉code` VARCHAR(80), `倉庫` VARCHAR(80), `配送公司` VARCHAR(150), `配送cp` VARCHAR(150), `配送類型` VARCHAR(80), `配送大區` VARCHAR(80), `收貨大區` VARCHAR(80), `收貨省` VARCHAR(80), `收貨市` VARCHAR(80), `收貨區縣` VARCHAR(80), `區域類型` VARCHAR(80), `一級分撥` VARCHAR(80), `二級分撥` VARCHAR(80), `末分撥` VARCHAR(80), `網點id` VARCHAR(80), `網點` VARCHAR(80), `支付時間` VARCHAR(80), `創建時間` VARCHAR(80), `接單時間` VARCHAR(80), `出庫時間` VARCHAR(80), `入交接區時間` VARCHAR(80), `攬收時間` VARCHAR(80), `一級分撥入時間` VARCHAR(80), `一級分撥出時間` VARCHAR(80), `二級分撥入時間` VARCHAR(80), `二級分撥出時間` VARCHAR(80), `末分撥入時間` VARCHAR(80), `末分撥出時間` VARCHAR(80), `網點入時間` VARCHAR(80), `領件時間` VARCHAR(80), `配送成功時間` VARCHAR(80), `配送應簽時間` VARCHAR(80), PRIMARY KEY (`運單號`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; ''' cursor.execute(sql_2) #關閉連接和游標 conn.close() cursor.close()
寫入數據庫 #導入類 import numpy as np import pandas as pd import pymysql info = pd.read_excel(r'/Users/linxianli/Desktop/數據.xlsx') info.shape ''' (443378, 48) ''' # 將 NAN 轉化為 None info = info.astype(object).where(pd.notnull(info), None) # 創建連接 DBHOST = 'localhost' DBUSER = 'root' DBPASS = 'lxl123456' DBNAME = 'test' # pymysql的接口獲取鏈接 def mysql_conn(host, user, password, db, port=3306, charset="utf8"): # 傳參版本 try: conn = pymysql.connect(host=host, user=user, password=password, database=db, port=port, charset=charset) print('數據庫連接成功!') except pymysql.Error as e: print("數據庫連接失敗:" + str(e)) return conn def write_database(): # 先創建cursor負責操作conn接口 conn = mysql_conn(DBHOST, DBUSER, DBPASS, DBNAME) cursor=conn.cursor() #設置游標 # # 開啟事務 conn.begin() # 構造符合sql語句的列,因為sql語句是帶有逗號分隔的,(這個對應上面的sql語句的(column1, column2, column3)) columns = ','.join(list(info.columns)) # 構造每個列對應的數據,對應於上面的((value1, value2, value3)) data_list = [tuple(i) for i in info.values] # 每個元組都是一條數據,根據df行數生成多少元組數據 # 計算一行有多少value值需要用字符串占位 s_count = len(data_list[0]) * "%s," # 構造sql語句 insert_sql = "insert into " + "express2" + " (" + columns + ") values (" + s_count[:-1] + ")" try: res = cursor.executemany(insert_sql, data_list) print('執行sql受影響的行數:',res) conn.commit() cursor.close() conn.close() except Exception as e: # 萬一失敗了,要進行回滾操作 conn.rollback() cursor.close() conn.close() start_time = time.time() write_database() stop_time = time.time() print('run time is %s' % (stop_time - start_time)) ''' 數據庫連接成功! 執行sql受影響的行數: 443378 run time is 96.30252575874329 '''
多線程寫入數據庫 import numpy as np import pandas as pd import time, requests import pymysql from concurrent.futures import ProcessPoolExecutor info = pd.read_excel(r'/Users/linxianli/Desktop/數據.xlsx') # 將 NAN 轉化為 None info = info.astype(object).where(pd.notnull(info), None) # 創建連接 DBHOST = 'localhost' DBUSER = 'root' DBPASS = 'lxl123456' DBNAME = 'test' # pymysql的接口獲取鏈接 def mysql_conn(host, user, password, db, port=3306, charset="utf8"): # 傳參版本 try: conn = pymysql.connect(host=host, user=user, password=password, database=db, port=port, charset=charset) print('數據庫連接成功!') except pymysql.Error as e: print("數據庫連接失敗:" + str(e)) return conn def data_handler(info): conn = mysql_conn(DBHOST, DBUSER, DBPASS, DBNAME) cursor=conn.cursor() #設置游標 # # 開啟事務 conn.begin() # 構造符合sql語句的列,因為sql語句是帶有逗號分隔的,(這個對應上面的sql語句的(column1, column2, column3)) columns = ','.join(list(info.columns)) # 構造每個列對應的數據,對應於上面的((value1, value2, value3)) data_list = [tuple(i) for i in info.values] # 每個元組都是一條數據,根據df行數生成多少元組數據 # 計算一行有多少value值需要用字符串占位 s_count = len(data_list[0]) * "%s," # 構造sql語句 insert_sql = "insert into " + "express2" + " (" + columns + ") values (" + s_count[:-1] + ")" try: res = cursor.executemany(insert_sql, data_list) print('執行sql受影響的行數:',res) conn.commit() cursor.close() conn.close() except Exception as e: # 萬一失敗了,要進行回滾操作 conn.rollback() cursor.close() conn.close() def run(): urls = [info[0:100000],info[100000:200000],info[200000:300000],info[300000:450000]] with ProcessPoolExecutor() as excute: excute.map(data_handler,urls) ##ProcessPoolExecutor 提供的map函數,可以直接接受可迭代的參數,並且結果可以直接for循環取出 start_time = time.time() run() stop_time = time.time() print('run time is %s' % (stop_time - start_time)) ''' 數據庫連接成功! 數據庫連接成功! 數據庫連接成功! 數據庫連接成功! 執行sql受影響的行數: 100000 執行sql受影響的行數: 100000 執行sql受影響的行數: 100000 執行sql受影響的行數: 143378 run time is 57.68310880661011 '''
查詢數據庫表數據-清空表數據 # 導入類 import numpy as np import pandas as pd import pymysql # 創建連接 DBHOST = 'localhost' DBUSER = 'root' DBPASS = 'lxl123456' DBNAME = 'test' # pymysql的接口獲取鏈接 def mysql_conn(host, user, password, db, port=3306, charset="utf8"): # 傳參版本 try: conn = pymysql.connect(host=host, user=user, password=password, database=db,port=port, charset=charset) print('數據庫連接成功!') except pymysql.Error as e: print("數據庫連接失敗:" + str(e)) return conn # 先創建cursor負責操作conn接口 conn = mysql_conn(DBHOST, DBUSER, DBPASS, DBNAME) cursor=conn.cursor() #設置游標 def mysql(sql): cursor.execute(sql) #執行查詢語句 jieguo=cursor.fetchall() #查看全部查詢結果 cols=cursor.description #類似 desc table_name返回結果 col=[] #創建一個空列表以存放列名 for v in cols: col.append(v[0]) #循環提取列名,並添加到col空列表 dfsql=pd.DataFrame(jieguo,columns=col) #將查詢結果轉換成DF結構,並給列重新賦值 # if dfsql.empty: # return 'empty set' #判斷查詢結果為空時返回的值 # else: return dfsql #以DF結構返回查詢結構,DF.to_excel...導出查詢結果時可以帶列名,這樣就解決了mysql直接導出結果無列名的問題 df = mysql(''' select * from express2 ''') conn.close() cursor.close() df.shape ''' (443378, 48) ''' # 清空表數據 # 先創建cursor負責操作conn接口 conn = mysql_conn(DBHOST, DBUSER, DBPASS, DBNAME) cursor=conn.cursor() #設置游標 sql = '''TRUNCATE TABLE `express2`;''' cursor.execute(sql) print('清空表數據成功!') conn.close() cursor.close()
'''
數據庫連接成功!
清空表數據成功!
'''
代碼還有很多地方可以優化的地方可以慢慢調整
參考文檔:
https://blog.csdn.net/weixin_42796152/article/details/107931768
https://blog.csdn.net/blog_liuliang/article/details/78724910