pymysql庫連接
pandas讀
# pymysql 是在 python3.x 版本中用於連接 MySql 服務器的一個庫 # 1.創建一個連接對象 # host mysql 服務器地址 # port 數字類型 端口 # user 用戶名 # passwd 密碼 # db 數據庫名稱 # charset 連接編碼,需要顯示指明編碼方式 # 2.獲取游標 # 游標是什么? # 簡述:游標(cursor)是一個游動的標識,可以理解成一條sql取出對應n條結果資源的接口/句柄,就是游標,沿着游標可以一次取出一行 # 3.執行增刪改查的操作 # 4.處理數據 # 5.關閉cursor # 6.關閉connection import pymysql import pandas as pd # 創建一個連接對象,打開數據庫連接 conn=pymysql.connect(host='localhost',port=3306,user='root',passwd='密碼',db='數據庫名',charset='utf8') # 獲取游標 cursor=conn.cursor() # 創建一個字符串語句,字符串支持換行 sql = """ select * from football.player """ cursor.execute(sql) data1=cursor.fetchall() df1=pd.DataFrame(list(data1),columns=['id','name','club_id','country_id','position_id','history_club_id','active']) print(df1) cursor.close() conn.close()
2.其他方法
方法 | 描述 |
---|---|
begin() | 開啟事務 |
commit() | 提交事務 |
cursor(cursor=None) | 創建一個游標用來執行語句 |
ping(reconnect=True) | 檢查連接是否存活,會重新發起連接 |
rollback() | 回滾事務 |
close() | 關閉連接 |
select_db(db) | 選擇數據庫 |
show_warnings() | 查看warning信息 |
3.獲取cursor對象
1.游標對象,用於執行查詢和獲取結果
2.核心方法
方法名 | 說明 |
---|---|
execute() | 用於執行一個數據庫的查詢命令 |
fetchone() | 獲取結果集中的下一行 |
fetchmany(size) | 獲取結果集中的下(size)行 |
fetchall() | 獲取結果集中剩下的所有行 |
rowcount | 最近一次execute返回數據/影響的行數 |
close() | 關閉游標 |
4.舉個栗子
# 執行查詢功能 with connection.cursor() as cursor: sql = 'select * from home_user' cursor.execute(sql) results = cursor.fetchall() connection.commit() for results in results: uid = results[0] name = results[1] password = results[2] print('==========用戶信息===============') print('用戶id: {id} \n用戶名: {name}\n密碼: {pwd}'.format(id=uid, name=name, pwd=password))
5.詳細
方法 | 描述 |
---|---|
close() | 關閉游標。 |
execute(query, args=None) | 執行單條語句,傳入需要執行的語句,是string類型;同時可以給查詢傳入參數,參數可以是tuple、list或dict。執行完成后,會返回執行語句的影響行數,如果有的話。 |
executemany(query, args) | 執行多條INSERT語句,傳入需要執行的語句;同時可以給查詢傳入參數,參數是一個mappings序列。執行完成后,會返回執行語句的影響行數,如果有的話。 |
fetchone() | 獲取下一行數據。 |
fetchall() | 獲取所有數據。 |
fetchmany(size=None) | 獲取幾行數據。 |
read_next() | 獲取下一行數據。 |
callproc() | 用來調用存儲過程。 |
mogrify() | 參數化查詢,防止SQL注入。 |
scroll(num,mode) | 移動游標位置。 |
6.基本操作
查詢數據
# 1.分頁查詢操作 def find_by_page(page, size): with pymysql.connect(host=HOST, port=PORT, user=USER, passwd=PASSWORD, charset=CHARSET, db=DB_NAME) as cursor: sql = "SELECT * FROM t_addr LIMIT{},{}".format((page - 1) * size, size) cursor.execute(sql) user = cursor.fetchall()
事務操作
conn = pymysql.connect( host='10.10.0.109', port=3306, user='mha', password='123456', database='sbtest', charset='utf8' ) cursor = conn.cursor() # 插入sql; sql_insert = "insert into t_user (userid,username) values (10,'user10')" # 更新sql; sql_update = "update t_user set username = 'name91' where userid=9" # 刪除sql; sql_delete = "delete from t_user where userid < 3" # 把一個事務放到一個try塊里,如果出現異常就回滾; try: # 開啟事務; conn.begin() cursor.execute(sql_insert) print(cursor.rowcount) cursor.execute(sql_update) print(cursor.rowcount) cursor.execute(sql_delete) print(cursor.rowcount) # 提交事務; conn.commit() except Exception as e: # 若有異常就回滾; conn.rollback() cursor.close() conn.close()
批量插入
# 測試事務 批量添加 def test_batch_insert(): conn = pymysql.connect(host=HOST, port=PORT, user=USER, passwd=PASSWORD, charset=CHARSET, db=DB_NAME) cursor = conn.cursor() try: sql = 'INSERT INTO t_addr(PROVICE, CITY, COUNTY, DEATIL, USERID) VALUES (%s,%s,%s,%s,%s)' li = [] for i in range(50): li.append(('湖北省', '武漢市', '高新區' + str(i), '智慧園2135', 6)) # 開啟事物 conn.begin() cursor.executemany(sql, li) # 提交 conn.commit() except Exception as e: conn.rollback() print(e) # 報錯事務回滾 finally: # 關閉連接 cursor.close() conn.close()
通過跳板機SSH連接連接MySQL
import pymysql import pandas as pd from sshtunnel import SSHTunnelForwarder with SSHTunnelForwarder( # 配置SSH連接 ssh_address_or_host=('跳板機對應的地址',int('端口號')), # 指定SSH登陸的條件及的address ssh_password='跳轉機密碼', # 跳轉機的密碼 ssh_username='跳轉機用戶名', # 跳轉機的用戶名 local_bind_address=('127.0.0.1',int('2222')), # 映射到本機的地址和端口,此處必須是127.0.0.1 remote_bind_address=('mysql服務器地址',int('端口號'))) as server: # MySQL服務器的配置,有數據的服務器 conn=pymysql.connect(host='localhost',port=3306,user='root',passwd='密碼',db='數據庫名',charset='utf8') cursor=conn.cursor() # 獲取游標