接口自動化測試之數據清洗/隔離/備份/恢復
在得到QQ:1301559180 得代碼貢獻之后,想到了通過ssh連接上服務器,然后進行數據庫備份,數據庫恢復, 主要使用了
paramiko庫
最終效果
-
測試開始前會進行數據備份,並在下面2個路徑存儲對應的數據庫備份(目前備份的數據庫是寫死的為,測試連接的mysql數據庫).sql文件
數據庫服務器: /root/backup_sql/
本地(運行測試腳本的機器)當前目錄下的
backup_sqls文件命名方式為: 數據庫名+時間


如何使用
-
通過在
config.yaml中配置對應得數據庫信息,服務器必要得賬號密碼等信息(支持私鑰文件登錄,但個人未嘗試),大致文件格式如下# 數據庫校驗- mysql database: host: localhost port: 3306 user: root # 不用''會被解析成int類型數據 password: '123456' db_name: test charset: utf8mb4 # 數據庫所在的服務器配置 ssh_server: port: 22 username: root password: '123456' # 私有密鑰文件路徑 private_key_file: # 如果使用的docker容器部署mysql服務,需要傳入mysql的容器id/name mysql_container: mysql8 # 數據庫備份文件導出的本地路徑, 需要保證存在該文件夾 sql_data_file: backup_sqls/ -
然后在
test/conftest.py做如下操作#!/usr/bin/env/python3 # -*- coding:utf-8 -*- """ @project: apiAutoTest @author: zy7y @file: conftest.py @ide: PyCharm @time: 2020/12/8 @desc: """ import pytest from tools.data_clearing import DataClearing from tools.db import DB from tools.read_file import ReadFile @pytest.fixture(scope="session") def data_clearing(): """數據清洗""" DataClearing.server_init() # 1. 備份數據庫 DataClearing.backup_mysql() yield # 2. 恢復數據庫 DataClearing.recovery_mysql() DataClearing.close_client() # 若不需要數據清洗功能,請把get_db()入參拿掉 @pytest.fixture(scope="session") def get_db(data_clearing): """關於其作用域請移步查看官方文檔""" try: db = DB() yield db finally: db.close() @pytest.fixture(params=ReadFile.read_testcase()) def cases(request): """用例數據,測試方法參數入參該方法名 cases即可,實現同樣的參數化 目前來看相較於@pytest.mark.parametrize 更簡潔。 """ return request.param
實現代碼
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2021/1/19 11:44
@Author : zy7y
@ProjectName : apiAutoTest
@File : data_clearing.py
@Software : PyCharm
@Github : https://github.com/zy7y
@Blog : https://www.cnblogs.com/zy7y
"""
import os
from datetime import datetime
import paramiko
from tools.read_file import ReadFile
from tools import logger
class ServerTools:
def __init__(self, host: str, port: int = 22, username: str = "root", password: str = None,
private_key_file: str = None):
# 進行SSH連接
self.trans = paramiko.Transport((host, port))
self.host = host
if password is None:
self.trans.connect(username=username, pkey=paramiko.RSAKey.from_private_key_file(private_key_file))
else:
self.trans.connect(username=username, password=password)
# 將sshclient的對象的transport指定為以上的trans
self.ssh = paramiko.SSHClient()
logger.success("SSH客戶端創建成功.")
self.ssh._transport = self.trans
# 創建SFTP客戶端
self.ftp_client = paramiko.SFTPClient.from_transport(self.trans)
logger.success("SFTP客戶端創建成功.")
def execute_cmd(self, cmd: str):
"""
:param cmd: 服務器下對應的命令, 可以是list,或者str
"""
stdin, stdout, stderr = self.ssh.exec_command(cmd)
error = stderr.read().decode()
logger.info(f"輸入命令: {cmd} -> 輸出結果: {stdout.read().decode()}")
logger.error(f"異常信息: {error}")
return error
def files_action(self, post: bool, local_path: str = os.getcwd(), remote_path: str = "/root"):
"""
:param post: 動作 為 True 就是上傳, False就是下載
:param local_path: 本地的文件路徑, 默認當前腳本所在的工作目錄
:param remote_path: 服務器上的文件路徑,默認在/root目錄下
"""
if post: # 上傳文件
self.ftp_client.put(localpath=local_path, remotepath=f"{remote_path}{os.path.split(local_path)[1]}")
logger.info(f"文件上傳成功: {local_path} -> {self.host}:{remote_path}{os.path.split(local_path)[1]}")
else: # 下載文件
file_path = local_path + os.path.split(remote_path)[1]
self.ftp_client.get(remotepath=remote_path, localpath=file_path)
logger.info(f"文件下載成功: {self.host}:{remote_path} -> {file_path}")
def ssh_close(self):
"""關閉連接"""
self.trans.close()
logger.info("已關閉SSH連接...")
class DataClearing:
settings = ReadFile.read_config('$.database')
server_settings = settings.get('ssh_server')
server = None
# 導出的sql文件名稱及后綴
file_name = f"{settings.get('db_name')}_{datetime.now().strftime('%Y-%m-%dT%H_%M_%S')}.sql"
@classmethod
def server_init(cls, settings=settings, server_settings=server_settings):
cls.server = ServerTools(host=settings.get('host'), port=server_settings.get('port'),
username=server_settings.get('username'),
password=server_settings.get('password'),
private_key_file=server_settings.get('private_key_file'))
# 新建backup_sql文件夾在服務器上,存放導出的sql文件
cls.server.execute_cmd("mkdir backup_sql")
@classmethod
def backup_mysql(cls):
"""
備份數據庫, 會分別備份在數據庫所在服務器的/root/backup_sql/目錄下, 與當前項目文件目錄下的 backup_sqls
每次備份生成一個數據庫名_當前年_月_日T_時_分_秒, 支持linux 服務器上安裝的mysql服務(本人未調試),以及linux中docker部署的mysql備份
"""
if cls.server_settings.get('mysql_container') is None:
cmd = f"mysqldump -h127.0.0.1 -u{cls.settings.get('username')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > {cls.file_name}"
else:
# 將mysql服務的容器中的指定數據庫導出, 參考文章 https://www.cnblogs.com/wangsongbai/p/12666368.html
cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysqldump -h127.0.0.1 -u{cls.settings.get('user')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > /root/backup_sql/{cls.file_name}"
cls.server.execute_cmd(cmd)
cls.server.files_action(0, f"{cls.server_settings.get('sql_data_file')}", f"/root/backup_sql/{cls.file_name}")
@classmethod
def recovery_mysql(cls, sql_file: str = file_name, database: str = settings.get('db_name')):
"""
恢復數據庫, 從服務器位置(/root/backup_sql/) 或者本地(../backup_sqls)上傳, 傳入的需要是.sql文件
:param sql_file: .sql數據庫備份文件, 默認就是導出的sql文件名稱, 默認文件名稱是導出的sql文件
:param database: 恢復的數據庫名稱,默認是備份數據庫(config.yaml中的db_name)
"""
result = cls.server.execute_cmd(f"ls -l /root/backup_sql/{sql_file}")
if "No such file or directory" in result:
# 本地上傳
cls.server.files_action(1, f"../backup_sqls/{sql_file}", "/root/backup_sql/")
cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysql -u{cls.settings.get('user')} -p{cls.settings.get('password')} {database} < /root/backup_sql/{sql_file}"
cls.server.execute_cmd(cmd)
@classmethod
def close_client(cls):
cls.server.ssh_close()
源碼地址
gitee: https://gitee.com/zy7y/apiAutoTest
github: https://github.com/zy7y/apiAutoTest
參考資料
https://www.cnblogs.com/wangsongbai/p/12666368.html
https://www.liujiangblog.com/blog/15/
https://blog.csdn.net/leorx01/article/details/71141643
http://docs.paramiko.org/en/stable/api/client.html
https://www.bilibili.com/video/BV1cQ4y1P7dg?p=4
# 詳細的參考資料可以看這里
https://www.cnblogs.com/zy7y/p/14295902.html
最后
感謝遇見,歡迎討論
