python-pymysql 操作數據庫-創建-寫入-多線程寫入-讀取-清空表


python-pymysql 操作數據庫

創建數據庫-表

#導入pymysql
import pymysql


# 創建連接
DBHOST = 'localhost'
DBUSER = 'root'
DBPASS = 'lxl123456'
# DBNAME = 'test'

# pymysql的接口獲取鏈接
def mysql_conn(host, user, password):
    # 傳參版本
    try:
        conn = pymysql.connect(host=host, user=user, password=password)
        print('數據庫連接成功!')
    except pymysql.Error as e:
        print("數據庫連接失敗:" + str(e))
    return conn

# 先創建cursor負責操作conn接口
conn = mysql_conn(DBHOST, DBUSER, DBPASS)
cursor=conn.cursor() #設置游標

# 創建數據庫的sql(如果數據庫存在就不創建,防止異常)
sql = "CREATE DATABASE IF NOT EXISTS test" 
# 執行創建數據庫的sql
cursor.execute(sql)
cursor.execute("use test")
# 創建表
sql_2 = '''CREATE TABLE `express2` (
  `運單號` bigint NOT NULL AUTO_INCREMENT,
  `物流單號` VARCHAR(80),
  `物流寶單號` VARCHAR(80),
  `行業` VARCHAR(80),
  `貨主` VARCHAR(80),
  `是否保價` VARCHAR(40),
  `是否催派` VARCHAR(40),
  `是否工單發起` VARCHAR(40),
  `是否預售下沉` VARCHAR(40),
  `停滯狀態` VARCHAR(80),
  `停滯時長` INT,
  `停滯要求時長` INT,
  `是否超停滯要求時長` VARCHAR(80),
  `物流停滯節點` VARCHAR(80),
  `發貨大區` VARCHAR(80),
  `發貨省` VARCHAR(80),
  `倉code` VARCHAR(80),
  `倉庫` VARCHAR(80),
  `配送公司` VARCHAR(150),
  `配送cp` VARCHAR(150),
  `配送類型` VARCHAR(80),
  `配送大區` VARCHAR(80),
  `收貨大區` VARCHAR(80),
  `收貨省` VARCHAR(80),
  `收貨市` VARCHAR(80),
  `收貨區縣` VARCHAR(80),
  `區域類型` VARCHAR(80),
  `一級分撥` VARCHAR(80),
  `二級分撥` VARCHAR(80),
  `末分撥` VARCHAR(80),
  `網點id` VARCHAR(80),
  `網點` VARCHAR(80),
  `支付時間` VARCHAR(80),
  `創建時間` VARCHAR(80),
  `接單時間` VARCHAR(80),
  `出庫時間` VARCHAR(80),
  `入交接區時間` VARCHAR(80),
  `攬收時間` VARCHAR(80),
  `一級分撥入時間` VARCHAR(80),
  `一級分撥出時間` VARCHAR(80),
  `二級分撥入時間` VARCHAR(80),
  `二級分撥出時間` VARCHAR(80),
  `末分撥入時間` VARCHAR(80),
  `末分撥出時間` VARCHAR(80),
  `網點入時間` VARCHAR(80),
  `領件時間` VARCHAR(80),
  `配送成功時間` VARCHAR(80),
  `配送應簽時間` VARCHAR(80),
  PRIMARY KEY (`運單號`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
'''
cursor.execute(sql_2)

#關閉連接和游標
conn.close()
cursor.close()

 

寫入數據庫 #導入類
import numpy as np
import pandas as pd
import pymysql

info = pd.read_excel(r'/Users/linxianli/Desktop/數據.xlsx')
info.shape
'''
(443378, 48)
'''

# 將 NAN 轉化為 None
info = info.astype(object).where(pd.notnull(info), None)

# 創建連接
DBHOST = 'localhost'
DBUSER = 'root'
DBPASS = 'lxl123456'
DBNAME = 'test'

# pymysql的接口獲取鏈接
def mysql_conn(host, user, password, db, port=3306, charset="utf8"):
    # 傳參版本
    try:
        conn = pymysql.connect(host=host, user=user, password=password, database=db, port=port, charset=charset)
        print('數據庫連接成功!')
    except pymysql.Error as e:
        print("數據庫連接失敗:" + str(e))
    return conn

def write_database():
    # 先創建cursor負責操作conn接口
    conn = mysql_conn(DBHOST, DBUSER, DBPASS, DBNAME)
    cursor=conn.cursor() #設置游標

    # # 開啟事務
    conn.begin()

    # 構造符合sql語句的列,因為sql語句是帶有逗號分隔的,(這個對應上面的sql語句的(column1, column2, column3))
    columns = ','.join(list(info.columns))

    # 構造每個列對應的數據,對應於上面的((value1, value2, value3))
    data_list = [tuple(i) for i in info.values] # 每個元組都是一條數據,根據df行數生成多少元組數據

    # 計算一行有多少value值需要用字符串占位
    s_count = len(data_list[0]) * "%s,"

    # 構造sql語句
    insert_sql = "insert into " + "express2" + " (" + columns + ") values (" + s_count[:-1] + ")"

    try:
        res = cursor.executemany(insert_sql, data_list)
        print('執行sql受影響的行數:',res)
        conn.commit()
        cursor.close()
        conn.close()
    except Exception as e:
        # 萬一失敗了,要進行回滾操作
        conn.rollback()
        cursor.close()
        conn.close()

start_time = time.time()
write_database()
stop_time = time.time()
print('run time is %s' % (stop_time - start_time))
'''
數據庫連接成功!
執行sql受影響的行數: 443378
run time is 96.30252575874329
'''

 

多線程寫入數據庫 import numpy as np
import pandas as pd
import time, requests
import pymysql
from concurrent.futures import ProcessPoolExecutor

info = pd.read_excel(r'/Users/linxianli/Desktop/數據.xlsx')

# 將 NAN 轉化為 None
info = info.astype(object).where(pd.notnull(info), None)

# 創建連接
DBHOST = 'localhost'
DBUSER = 'root'
DBPASS = 'lxl123456'
DBNAME = 'test'

# pymysql的接口獲取鏈接
def mysql_conn(host, user, password, db, port=3306, charset="utf8"):
    # 傳參版本
    try:
        conn = pymysql.connect(host=host, user=user, password=password, database=db, port=port, charset=charset)
        print('數據庫連接成功!')
    except pymysql.Error as e:
        print("數據庫連接失敗:" + str(e))
    return conn

def data_handler(info):
    conn = mysql_conn(DBHOST, DBUSER, DBPASS, DBNAME)
    cursor=conn.cursor() #設置游標

    # # 開啟事務
    conn.begin()

    # 構造符合sql語句的列,因為sql語句是帶有逗號分隔的,(這個對應上面的sql語句的(column1, column2, column3))
    columns = ','.join(list(info.columns))

    # 構造每個列對應的數據,對應於上面的((value1, value2, value3))
    data_list = [tuple(i) for i in info.values] # 每個元組都是一條數據,根據df行數生成多少元組數據

    # 計算一行有多少value值需要用字符串占位
    s_count = len(data_list[0]) * "%s,"

    # 構造sql語句
    insert_sql = "insert into " + "express2" + " (" + columns + ") values (" + s_count[:-1] + ")"

    try:
        res = cursor.executemany(insert_sql, data_list)
        print('執行sql受影響的行數:',res)
        conn.commit()
        cursor.close()
        conn.close()
    except Exception as e:
        # 萬一失敗了,要進行回滾操作
        conn.rollback()
        cursor.close()
        conn.close()

def run():
    urls = [info[0:100000],info[100000:200000],info[200000:300000],info[300000:450000]]
    with ProcessPoolExecutor() as excute:
        excute.map(data_handler,urls)  ##ProcessPoolExecutor 提供的map函數,可以直接接受可迭代的參數,並且結果可以直接for循環取出

start_time = time.time()
run()
stop_time = time.time()
print('run time is %s' % (stop_time - start_time))
'''
數據庫連接成功!
數據庫連接成功!
數據庫連接成功!
數據庫連接成功!
執行sql受影響的行數: 100000
執行sql受影響的行數: 100000
執行sql受影響的行數: 100000
執行sql受影響的行數: 143378
run time is 57.68310880661011
'''

 

查詢數據庫表數據-清空表數據 # 導入類
import numpy as np
import pandas as pd
import pymysql

# 創建連接
DBHOST = 'localhost'
DBUSER = 'root'
DBPASS = 'lxl123456'
DBNAME = 'test'

# pymysql的接口獲取鏈接
def mysql_conn(host, user, password, db, port=3306, charset="utf8"):
    # 傳參版本
    try:
        conn = pymysql.connect(host=host, user=user, password=password, database=db,port=port, charset=charset)
        print('數據庫連接成功!')
    except pymysql.Error as e:
        print("數據庫連接失敗:" + str(e))
    return conn

# 先創建cursor負責操作conn接口
conn = mysql_conn(DBHOST, DBUSER, DBPASS, DBNAME)
cursor=conn.cursor() #設置游標

def mysql(sql):
    cursor.execute(sql) #執行查詢語句
    jieguo=cursor.fetchall() #查看全部查詢結果
    cols=cursor.description #類似 desc table_name返回結果
    col=[] #創建一個空列表以存放列名
    for v in cols:
        col.append(v[0]) #循環提取列名,並添加到col空列表
    dfsql=pd.DataFrame(jieguo,columns=col) #將查詢結果轉換成DF結構,並給列重新賦值
#     if dfsql.empty:
#         return 'empty set' #判斷查詢結果為空時返回的值
#     else:   
    return dfsql #以DF結構返回查詢結構,DF.to_excel...導出查詢結果時可以帶列名,這樣就解決了mysql直接導出結果無列名的問題

df = mysql('''
    select * from express2
''')

conn.close()
cursor.close()

df.shape
'''
(443378, 48)
'''

# 清空表數據
# 先創建cursor負責操作conn接口
conn = mysql_conn(DBHOST, DBUSER, DBPASS, DBNAME)
cursor=conn.cursor() #設置游標

sql = '''TRUNCATE TABLE `express2`;'''
cursor.execute(sql)
print('清空表數據成功!')

conn.close()
cursor.close()
'''
數據庫連接成功!
清空表數據成功!
'''

 

 

代碼還有很多地方可以優化的地方可以慢慢調整

 

參考文檔:

https://blog.csdn.net/weixin_42796152/article/details/107931768

https://blog.csdn.net/blog_liuliang/article/details/78724910


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM