通過sshtunner連接數據庫:
代碼如下:
import pymysql
from sshtunnel import SSHTunnelForwarder
with SSHTunnelForwarder(
('123.88.12.1',22), # 指定ssh登錄的跳轉機的address,端口號
ssh_username='user', # 遠程服務器的用戶名,注意不是DB的用戶名和密碼
ssh_password='pwd123',# 遠程服務器的密碼
remote_bind_address=('123.88.199.198',3306) #注意端口號不要加引號
local_bind_address=('127.0.0.1',13306) #注意端口號不要加引號
) as server: # mysql服務器的address,端口號
conn = pymysql.connect(host='127.0.0.1', # 注意此處為上述代碼中的local_bind_address 中的地址和端口號
port=server.local_bind_port,
user='dbuser', # 數據庫用戶名
passwd='dbpwd123', # 數據庫密碼
charset='utf8',
db='dbname',# 數據庫名稱
autocommit=True)# 如果修改數據庫自動提交
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute("SELECT * FROM table where name='xiaoming';")
result=cursor.fetchall()
print(result)
cursor.close()#關閉游標
conn.close()#關閉連接
如果密碼中包含特殊字符,比如反斜杠,注意一定要對反斜杠進行轉義
代碼理解:
首先通過 sshtunnel.SSHTunnelForwarder 進行端口映射,將遠程服務器的3306端口映射到本地的13306端口,再連接本地的端口,那么后續對本地13306端口的操作其實都可以視為對線上服務器3306端口的操作,理論上來說,PuTTY也是進行了相同的操作,以此達到內網穿透的目的
下邊這個是別人寫好的,直接可以使用的情況;如果不這樣寫的話,脫離with as的作用於,連接就關閉了,這樣導致連接不上DB
import pymysql
from sshtunnel import SSHTunnelForwarder
#通過SSH,連接遠程服務器的數據庫
class DataBaseHandle :
''' 定義一個 MySQL 操作類'''
def __init__ ( self , host='127.0.01' , username='xxx' , password='xxx'
, database='xxx' , port=10022 ) :
'''初始化數據庫信息並創建數據庫連接'''
self.w_server = SSHTunnelForwarder (
# 中間服務器地址
("xxx.64.47.xxx" , 22) ,#遠程服務器的地址和端口號
ssh_username="xxx" ,
ssh_pkey="~/.ssh/id_rsa" ,
# ssh_private_key_password="~/.ssh/id_rsa",
# 目標的地址與端口,因為目標地址就是中間地址所以寫127.0.0.1或者localhost
remote_bind_address=('192.163.11.11' , 3306) ,#DB的服務器的地址和端口號
# 本地的地址與端口
local_bind_address=('127.0.0.1' , 10022) #本地的映射的地址和端口號
)
# 啟動ssh實例,后續的MySQL網絡連接都將在這個環境下運行。
self.w_server.start ()
# 后面開始對MySQL的數據進行初始化
self.host = host
self.username = username
self.password = password
self.database = database
self.port = port
self.db = pymysql.connect ( host=self.host ,
user=self.username ,
password=self.password ,
database=self.database ,
port=self.port ,
charset='utf8' )
# 這里 注釋連接的方法,是為了 實例化對象時,就創建連接。不許要單獨處理連接了。
#
# def connDataBase(self):
# ''' 數據庫連接 '''
#
# self.db = pymysql.connect(self.host,self.username,self.password,self.port,self.database)
#
# # self.cursor = self.db.cursor()
#
# return self.db
def insertDB ( self , sql ) :
''' 插入數據庫操作 '''
self.cursor = self.db.cursor ()
try :
# 執行sql
self.cursor.execute ( sql )
# tt = self.cursor.execute(sql) # 返回 插入數據 條數 可以根據 返回值 判定處理結果
# print(tt)
self.db.commit ()
except :
# 發生錯誤時回滾
self.db.rollback ()
finally :
self.cursor.close ()
def deleteDB ( self , sql ) :
''' 操作數據庫數據刪除 '''
self.cursor = self.db.cursor ()
try :
# 執行sql
self.cursor.execute ( sql )
# tt = self.cursor.execute(sql) # 返回 刪除數據 條數 可以根據 返回值 判定處理結果
# print(tt)
self.db.commit ()
except :
# 發生錯誤時回滾
self.db.rollback ()
finally :
self.cursor.close ()
def updateDb ( self , sql ) :
''' 更新數據庫操作 '''
self.cursor = self.db.cursor ()
try :
# 執行sql
self.cursor.execute ( sql )
# tt = self.cursor.execute(sql) # 返回 更新數據 條數 可以根據 返回值 判定處理結果
# print(tt)
self.db.commit ()
except :
# 發生錯誤時回滾
self.db.rollback ()
finally :
self.cursor.close ()
def selectDb ( self , sql ) :
''' 數據庫查詢 '''
self.cursor = self.db.cursor ()
try :
self.cursor.execute ( sql ) # 返回 查詢數據 條數 可以根據 返回值 判定處理結果
data = self.cursor.fetchall () # 返回所有記錄列表
print ( data )
# 結果遍歷
for row in data :
sid = row[0]
name = row[1]
# 遍歷打印結果
print ( 'sid = %s, name = %s' % (sid , name) )
except :
print ( 'Error: unable to fecth data' )
finally :
self.cursor.close ()
def closeDb ( self ) :
''' 數據庫連接關閉 '''
self.db.close ()
self.w_server.close ()
if __name__ == '__main__' :
DbHandle = DataBaseHandle ()
DbHandle.selectDb ( 'SELECT VERSION()' )
DbHandle.closeDb ()
https://www.cnblogs.com/sidianok/p/12571798.html 相關參考
https://blog.csdn.net/zgbzbl/article/details/107081320 相關參考