首先需要一個模塊sshtunnel,如果沒有直接pip install sshtunnel
其實連個連接方式非常像:
pymysql連接方式:
import pymysql from sshtunnel import SSHTunnelForwarder ssh_host = "" # 堡壘機ip地址或主機名 ssh_port = 22 # 堡壘機連接mysql服務器的端口號,一般都是22,必須是數字 ssh_user = "" # 這是你在堡壘機上的用戶名 ssh_password = "" # 這是你在堡壘機上的用戶密碼 mysql_host = "localhost" # 這是你mysql服務器的主機名或ip地址 mysql_port = 3306 # 這是你mysql服務器上的端口,3306,mysql就是3306,必須是數字 mysql_user = "" # 這是你mysql數據庫上的用戶名 mysql_password = "" # 這是你mysql數據庫的密碼 mysql_db = "" # mysql服務器上的數據庫名 with SSHTunnelForwarder( (ssh_host, ssh_port), ssh_username=ssh_user, ssh_password=ssh_password, remote_bind_address=(mysql_host, mysql_port)) as server: conn = pymysql.connect(host=mysql_host, port=server.local_bind_port, user=mysql_user, passwd=mysql_password, db=mysql_db) cursor = conn.cursor() cursor.execute("select * from user") row_1 = cursor.fetchone() print(row_1, type(row_1)) # 獲取前n行數據 # row_2 = cursor.fetchmany(3) # 獲取所有數據 # row_3 = cursor.fetchall() conn.commit() cursor.close() conn.close()
如果給pymysql加一個事務處理,可以看這里https://www.cnblogs.com/woider/p/5926744.html
sqlalchemy連接方式:
from sshtunnel import SSHTunnelForwarder from sqlalchemy import Column, String, Integer, create_engine, event from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.exc import DisconnectionError ssh_host = "" # 堡壘機ip地址或主機名 ssh_port = 22 # 堡壘機連接mysql服務器的端口號,一般都是22,必須是數字 ssh_user = "" # 這是你在堡壘機上的用戶名 ssh_password = "" # 這是你在堡壘機上的用戶密碼 mysql_host = "" # 這是你mysql服務器的主機名或ip地址 mysql_port = 3306 # 這是你mysql服務器上的端口,3306,mysql就是3306,必須是數字 mysql_user = "" # 這是你mysql數據庫上的用戶名 mysql_password = "" # 這是你mysql數據庫的密碼 mysql_db = "" # mysql服務器上的數據庫名 Base = declarative_base() class Phones(Base): __tablename__ = 'phones' id = Column(Integer, primary_key=True) name = Column(String(32)) def checkout_listener(dbapi_con, con_record, con_proxy): try: try: dbapi_con.ping(False) except TypeError: dbapi_con.ping() except dbapi_con.OperationalError as exc: if exc.args[0] in (2006, 2013, 2014, 2045, 2055): raise DisconnectionError() else: raise with SSHTunnelForwarder( (ssh_host, ssh_port), ssh_username=ssh_user, ssh_password=ssh_password, remote_bind_address=(mysql_host, mysql_port) ) as server: # server.start() # ssh通道服務啟動,用了with語句會自己啟動 local_port = str(server.local_bind_port) engine = create_engine( 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(mysql_user, mysql_password, '127.0.0.1', local_port,mysql_db), pool_size=100, pool_recycle=3600) event.listen(engine, 'checkout', checkout_listener) # 防止報連接池相關的錯誤 Base.metadata.create_all(engine) # 檢測文件中所有繼承了Base類的類,在mysqld中建立所有的表,類就是表 Session = sessionmaker(bind=engine) session = Session()