sqlalchemy和pymysql通過ssh連接遠程mysql服務器


 首先需要一個模塊sshtunnel,如果沒有直接pip install sshtunnel

其實連個連接方式非常像:

pymysql連接方式:

import pymysql
from sshtunnel import SSHTunnelForwarder

ssh_host = ""  # 堡壘機ip地址或主機名
ssh_port = 22  # 堡壘機連接mysql服務器的端口號,一般都是22,必須是數字
ssh_user = ""  # 這是你在堡壘機上的用戶名
ssh_password = ""  # 這是你在堡壘機上的用戶密碼
mysql_host = "localhost"  # 這是你mysql服務器的主機名或ip地址
mysql_port = 3306  # 這是你mysql服務器上的端口,3306,mysql就是3306,必須是數字
mysql_user = ""  # 這是你mysql數據庫上的用戶名
mysql_password = ""  # 這是你mysql數據庫的密碼
mysql_db = ""  # mysql服務器上的數據庫名


with SSHTunnelForwarder(
        (ssh_host, ssh_port),
        ssh_username=ssh_user,
        ssh_password=ssh_password,
        remote_bind_address=(mysql_host, mysql_port)) as server:
    conn = pymysql.connect(host=mysql_host,
                           port=server.local_bind_port,
                           user=mysql_user,
                           passwd=mysql_password,
                           db=mysql_db)

    cursor = conn.cursor()
    cursor.execute("select * from user")
    row_1 = cursor.fetchone()
    print(row_1, type(row_1))
    # 獲取前n行數據
    # row_2 = cursor.fetchmany(3)
    # 獲取所有數據
    # row_3 = cursor.fetchall()
    conn.commit()
    cursor.close()
    conn.close()

如果給pymysql加一個事務處理,可以看這里https://www.cnblogs.com/woider/p/5926744.html

sqlalchemy連接方式:

from sshtunnel import SSHTunnelForwarder
from sqlalchemy import Column, String, Integer, create_engine, event
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import DisconnectionError

ssh_host = ""  # 堡壘機ip地址或主機名
ssh_port = 22  # 堡壘機連接mysql服務器的端口號,一般都是22,必須是數字
ssh_user = ""  # 這是你在堡壘機上的用戶名
ssh_password = ""  # 這是你在堡壘機上的用戶密碼
mysql_host = ""  # 這是你mysql服務器的主機名或ip地址
mysql_port = 3306  # 這是你mysql服務器上的端口,3306,mysql就是3306,必須是數字
mysql_user = ""  # 這是你mysql數據庫上的用戶名
mysql_password = ""  # 這是你mysql數據庫的密碼
mysql_db = "" # mysql服務器上的數據庫名

Base = declarative_base()

class Phones(Base):
    __tablename__ = 'phones'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))


def checkout_listener(dbapi_con, con_record, con_proxy):
    try:
        try:
            dbapi_con.ping(False)
        except TypeError:
            dbapi_con.ping()
    except dbapi_con.OperationalError as exc:
        if exc.args[0] in (2006, 2013, 2014, 2045, 2055):
            raise DisconnectionError()
        else:
            raise


with SSHTunnelForwarder(
        (ssh_host, ssh_port),
        ssh_username=ssh_user,
        ssh_password=ssh_password,
        remote_bind_address=(mysql_host, mysql_port)
) as server:
    # server.start()  # ssh通道服務啟動,用了with語句會自己啟動
    local_port = str(server.local_bind_port)
    engine = create_engine(
        'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(mysql_user, mysql_password, '127.0.0.1', local_port,mysql_db),
        pool_size=100,
        pool_recycle=3600)

    event.listen(engine, 'checkout', checkout_listener)  # 防止報連接池相關的錯誤
    Base.metadata.create_all(engine)  # 檢測文件中所有繼承了Base類的類,在mysqld中建立所有的表,類就是表
    Session = sessionmaker(bind=engine)
    session = Session()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM