python 實現數據庫中數據添加、查詢與更新


一、前言

  最近做web網站的測試,遇到很多需要批量造數據的功能;比如某個頁面展示數據條數需要達到10000條進行測試,此時手動構造數據肯定是不可能的,此時只能通過python腳本進行自動構造數據;本次構造數據主要涉及到在某個表里面批量添加數據、在關聯的幾個表中同步批量添加數據、批量查詢某個表中符合條件的數據、批量更新某個表中符合條件的數據等。  

 

二、數據添加

  即批量添加數據到某個表中。

 

insert_data.py

import pymysql
import random
import time
from get_userinfo import get_userinfo
from get_info import get_info
from get_tags import get_tags
from get_tuser_id import get_utag


class DatabaseAccess():
    def __init__(self):
        self.__db_host = "xxxxx"
        self.__db_port = 3307
        self.__db_user = "root"
        self.__db_password = "123456"
        self.__db_database = "xxxxxx"
    # 連接數據庫
    def isConnectionOpen(self):
        self.__db = pymysql.connect(
            host=self.__db_host,
            port=self.__db_port,
            user=self.__db_user,
            password=self.__db_password,
            database=self.__db_database,
            charset='utf8'
        )
    
    # 插入數據
    def linesinsert(self,n,user_id,tags_id,created_at):
 
        self.isConnectionOpen()
        # 創建游標
        global cursor
        conn = self.__db.cursor()
        try:
            sql1 = '''
            INSERT INTO `codeforge_new`.`cf_user_tag`(`id`, `user_id`, 
            `tag_id`, `created_at`, `updated_at`) VALUES ({}, {}, 
            {}, '{}', '{}');
            '''.format(n,user_id,tags_id,created_at,created_at)
            
            # 執行SQL   
            conn.execute(sql1,)
        except Exception as e:
            print(e)
        finally:
            # 關閉游標
            conn.close()
            self.__db.commit()
            
            self.__db.close()
    
    def get_data(self):
        
        # 生成對應數據 1000條
        for i in range(0,1001):
            created_at = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
            # print(create_at)
            # 用戶id
            tuserids = []
            tuserid_list = get_utag()
            for tuserid in tuserid_list:
                tuserids.append(tuserid[0])
            # print(tuserids)
            userid_list = get_userinfo()
            user_id = random.choice(userid_list)[0]
            if user_id not in tuserids:
                user_id=user_id
            
                # 標簽id
                tagsid_list = get_tags()
                tags_id = random.choice(tagsid_list)[0]
                self.linesinsert(i,user_id,tags_id,created_at)


if __name__ == "__main__":
    # 實例化對象
    db=DatabaseAccess()
    db.get_data()

 

二、數據批量查詢

 

select_data.py


import
pymysql import pandas as pd import numpy as np def get_tags(): # 連接數據庫,地址,端口,用戶名,密碼,數據庫名稱,數據格式 conn = pymysql.connect(host='xxx.xxx.xxx.xxx',port=3307,user='root',passwd='123456',db='xxxx',charset='utf8') cur = conn.cursor() # 表cf_users中獲取所有用戶id sql = 'select id from cf_tags where id between 204 and 298' # 將user_id列轉成列表輸出 df = pd.read_sql(sql,con=conn) # 先使用array()將DataFrame轉換一下 df1 = np.array(df) # 再將轉換后的數據用tolist()轉成列表 df2 = df1.tolist() # cur.execute(sql) # data = cur.fetchone() # print(df) # print(df1) # print(df2) return df2 conn.close()

 

三、批量更新數據

update_data.py

import
pymysql import random def update_comment(num,code_id): #連接數據庫 db = pymysql.connect(host='xxx.xxx.xxx.xxx',port=3307,user='root',passwd='123456',db='xxx',charset='utf8') #定義游標 cursor = db.cursor() try: update = "update cf_codes set score_num={} where id={}".format(num,code_id) # print(update) cursor.execute(update) print('數據更新成功') # 提交數據 db.commit() except: print('數據更新失敗') # db.rollback() cursor.close() db.close()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM