- (數據庫)CSV操作
# 從CSV中讀取數據
df = pd.read_csv('pandas.csv',encoding = "utf-8",delimiter=",",error_bad_lines=False)
# 讀取本地CSV文件
df = pd.read_csv("C:/Users/fuqia/Desktop/example.csv", sep=',')
- (數據庫)使用sqlite3
import pandas as pd
import altair as alt
import sqlite3
con = sqlite3.connect("pandas.db")
# 執行sql語句 ,創建表
# con.execute("CREATE TABLE t (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
# 執行sql語句 ,創建表
# con.execute("CREATE TABLE person2(\
# id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,\
# time TEXT NOT NULL,\
# name varchar(100) NOT NULL\
# )")
# # 執行sql語句,向數據表中插入數據記。
# con.execute("insert into person2(time, name) values('1', 'bill')")
# 提交之前的操作
# con.commit()
# pandas 從數據庫中讀取數據
df = pd.read_sql("select id,time, name from person2", con)
# pandas 數據存入數據庫
df = pd.DataFrame({'time': [11, 12, 13, 14], 'name': ['zhangsan', 'lisi', 'wangwu', 'zhuliu']})
df.to_sql('person2', con, index=False)
# 從CSV中讀取數據
df = pd.read_csv('pandas.csv',encoding = "utf-8",delimiter=",",error_bad_lines=False)
df
# 數據存入數據庫
df.to_sql('hf', con, index=False)
- (數據庫)mysql 讀取插入pandas數據
import pandas as pd
import pymysql
import sys
from sqlalchemy import create_engine
def read_mysql_and_insert():
# pymysql for df read_sql
try:
conn = pymysql.connect(host='127.0.0.1', user='root', password='pass', db='db1', charset='utf8')
except pymysql.err.OperationalError as e:
print('Error is ' + str(e))
sys.exit()
# sqlalchemy for df to_sql
try:
engine = create_engine('mysql+pymysql://root:pass@127.0.0.1:3306/db1')
except sqlalchemy.exc.OperationalError as e:
print('Error is ' + str(e))
sys.exit()
except sqlalchemy.exc.InternalError as e:
print('Error is ' + str(e))
sys.exit()
try:
# pymysql 讀取數據庫,返回df類型數據
sql = 'select * from person2'
df = pd.read_sql(sql, con=conn)
except pymysql.err.ProgrammingError as e:
print('Error is ' + str(e))
sys.exit()
print(df.head())
# sqlalchemy 存取df類型數據到數據庫
df.to_sql(name='add_table',con=engine,if_exists='append',index=False)
conn.close()
print('ok')
if __name__ == '__main__':
df = read_mysql_and_insert()
- (python模塊)pymysql,sqlalchemy
import pandas as pd
import numpy as np
import altair as alt
# pymysql ,執行sql語句
conn = pymysql.connect(host='127.0.0.1', user='root', password='xxx', db='db1', charset='utf8')
cursor = conn.cursor()
cursor.execute("CREATE TABLE t3 (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
# df 相關
#1 pymysql + sqlalchemy ,創建engine
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://root:xxx@127.0.0.1:3306/db1')
#2 sqlalchemy ,執行sql語句
engine.execute("CREATE TABLE t2 (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
#3 sqlalchemy ,讀取df
sql = ''' select * from employee; '''
df = pd.read_sql_query(sql, engine)
print(df)
- (python模塊)使用pymysql
import pymysql
con = pymysql.connect('localhost',
'root',
'eve136712',
db='model_save',
charset='utf8',
use_unicode=True)
cursor = con.cursor()
# 通過游標的execute方法執行sql語句。
cursor.execute("create table person(id varchar(32) primary key, name varchar(100) )")
- (python模塊)使用sqlalchemy
import pandas as pd
from sqlalchemy import create_engine
# engine 創建 方法1 (還是必須要用 pymysql + sqlalchemy ,創建engine )
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test')
# engine 創建 方法2 (還是必須要用pymysql)
db_info = {'user': 'root',
'password': '123456',
'host': 'localhost',
'port': 3306,
'database': 'test'
}
engine = create_engine('mysql+pymysql://%(user)s:%(password)s@%(host)s:%(port)d/%(database)s?charset=utf8' % db_info,
encoding='utf-8')
# sqlalchemy 讀取數據庫,返回df類型數據
sql = ''' select * from employee; '''
df = pd.read_sql_query(sql, engine)
print(df)
df = pd.DataFrame({'id': [1, 2, 3, 4], 'name': ['zhangsan', 'lisi', 'wangwu', 'zhuliu']})
# 將df數據儲存為MySQL中的數據表,儲存index列
df.to_sql('mydf', engine, index=True)
# 將新建的DataFrame儲存為MySQL中的數據表,不儲存index列(index=False)
# if_exists:
# 1.fail:如果表存在,啥也不做
# 2.replace:如果表存在,刪了表,再建立一個新表,把數據插入
# 3.append:如果表存在,把數據插入,如果表不存在創建一個表!!
# 方式1
pd.io.sql.to_sql(df, 'example', con=engine, index=False, if_exists='replace')
# 方式2
df.to_sql('example', con=engine, if_exists='replace')
- (python模塊)使用Django (應用)
def mysql_to_pd(case_name='XXXXXTraffic'):
report_details_by_casename = ReportDetail.objects.filter(case_name=case_name)
details_by_case_name_df = pd.DataFrame(
list(report_details_by_casename.values(
'report_create_time',
'platform_name',
'throughputd',
'throughputu',
'case_name', )))
df = details_by_case_name_df.rename(columns={'platform_name': 'HW'})
df['time'] = df['report_create_time'].dt.strftime('%Y-%m-%d')
df.set_index(pd.to_datetime(df["time"]), inplace=True)
df = df.sort_index()
return df
data=mysql_to_pd()
- (python模塊)使用df+mysql (應用)
import pandas as pd
import numpy as np
import altair as alt
import pymysql
from sqlalchemy import create_engine
engine = create_engine('mysql+pymysql://root:password@127.x.x.x:3306/db5')
sql = ''' select report_uuid from report_reportinfo; '''
df = pd.read_sql_query(sql, engine)
df
- (python)動態創建mysql表【自定義模塊】
# 此代碼實現將dict寫入mysql指定表中。如果指定表不存在,則根據dict中key的字段創建表,然后將dict寫入表中
import pymysql
from scrapy.conf import settings
class DataToMysql:
def __init__(self, host, user, passwd, db, port):
try:
self.conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db,
port=3306, charset='utf8') # 鏈接數據庫
self.cursor = self.conn.cursor()
except pymysql.Error as e:
print("數據庫連接信息報錯")
raise e
def write(self, table_name, info_dict):
"""
根據table_name與info自動生成建表語句和insert插入語句
:param table_name: 數據需要寫入的表名
:param info_dict: 需要寫入的內容,類型為字典
:return:
"""
sql_key = '' # 數據庫行字段
sql_value = '' # 數據庫值
for key in info_dict.keys(): # 生成insert插入語句
sql_value = (sql_value + '"' + pymysql.escape_string(info_dict[key]) + '"' + ',')
sql_key = sql_key + ' ' + key + ','
try:
self.cursor.execute(
"INSERT INTO %s (%s) VALUES (%s)" % (table_name, sql_key[:-1], sql_value[:-1]))
self.conn.commit() # 提交當前事務
except pymysql.Error as e:
if str(e).split(',')[0].split('(')[1] == "1146": # 當表不存在時,生成建表語句並建表
sql_key_str = '' # 用於數據庫創建語句
columnStyle = ' text' # 數據庫字段類型
for key in info_dict.keys():
sql_key_str = sql_key_str + ' ' + key + columnStyle + ','
self.cursor.execute("CREATE TABLE %s (%s)" % (table_name, sql_key_str[:-1]))
self.cursor.execute("INSERT INTO %s (%s) VALUES (%s)" %
(table_name, sql_key[:-1], sql_value[:-1]))
self.conn.commit() # 提交當前事務
else:
raise
if __name__ == '__main__':
mysql = DataToMysql('localhost','root','******','pythonDB')
di = {"A": "A", "B": "B"}
mysql.write('te', di)
- (python)動態創建mysql表中的字段【自定義模塊】【good】
def check_column_exist(mysql_conn, column_name):
mysql_sql = f'''SHOW COLUMNS FROM `report_parse_soap_result` LIKE '{column_name[:column_name.index(':')]}';'''
return bool(mysql_conn.cursor().execute(mysql_sql))
def add_column(mysql_conn, column_name):
try:
mysql_sql = f"ALTER TABLE `report_parse_soap_result` " \
f"ADD COLUMN `{column_name[:column_name.index(':')]}` INT NULL ;"
mysql_conn.cursor().execute(mysql_sql)
except Exception as e:
print('add_column:', e)
def save_one_soap_result(mysql_conn,report_uuid,soap_result):
(column_name, value), = soap_result.items()
if check_column_exist(mysql_conn, column_name):
try:
sql = f"UPDATE report_parse_soap_result SET `{column_name[:column_name.index(':')]}` = {value} " \
f'where fk_report_uuid_id= "{report_uuid}"'
mysql_conn.cursor().execute(sql)
mysql_conn.commit()
except Exception as e:
print('save_one_soap', e)
else:
add_column(mysql_conn, column_name)
def save_soap_result_list(mysql_conn, report_uuid, soap_result_list):
try:
sql = f'''INSERT INTO report_parse_soap_result (`fk_report_uuid_id`) VALUES ( '{report_uuid}');'''
mysql_conn.cursor().execute(sql)
mysql_conn.commit()
except Exception as e:
print(e)
for result in soap_result_list:
save_one_soap_result(mysql_conn, report_uuid, result)
# 使用
from test import save_soap_result_list
host = '10.101.35.249'
user = 'xxx'
password = 'xxx'
db = 'db'
conn = pymysql.connect(host=host, user=user, passwd=password, db=db, port=3306, charset='utf8')
ojb = RobotXML('output.xml')
report_uuid = ojb.report_reportinfo_dic['report_uuid']
soap_result_list = ojb.report_soap_parser_result
save_soap_result_list(conn, report_uuid, soap_result_list)