Python接口自动化测试框架


Python接口自动化测试框架 

在自动化的测试体系中,包含了UI自动化测试和接口自动化测试,UI自动化实现的前提是软件版本进入稳定期,UI界面稳定、变动少,相比较之下接口自动化,接口受外界因素的影响较少,维护成本低,能够在最短时间发现问题。

 

一、浅谈接口测试

1、什么是接口测试:

API测试又称为接口测试,主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点,是对系统接口功能进行测试的一种手段,也是集成测试的一部分,通过直接控制被测应用的接口(API)来确定是否在功能、可靠性、性能和安全方面达到预期的软件测试活动。

2、如何测试接口:

  • 检查接口返回的数据是否与预期结果一致。
  • 检查接口的容错性,假如传递数据的类型错误时是否可以处理。
  • 接口参数的边界值。例如,传递的参数足够大或为负数时,接口是否可以正常处理。
  • 接口的性能,http请求接口大多与后端代码逻辑、执行的SQL语句性能、算法等相关。
  • 接口的安全性,外部调用的接口尤为重要。

3、接口测试的意义、目的:

接口测试的核心意义、目的在于:以保证系统的正确和稳定为核心,以持续集成为手段,提高测试效率,提升用户体验,降低产品研发成本。

 

二、定义框架目录分层

接口自动化框架的没有统一标准,可以根据实际需要自定义,以满足功能测试目标要求为目的。一个基本的接口自动化测试框架需要满足的功能:测试用例管理、各种配置信息管理、数据库操作、日志打印、报告输出、邮件发送

"""
API_Autotest/
|-- API_Case/                                     #测试用例
|   |  |-- PreviewRelease_01_RegisterLogin.py     #预发布环境注册登录接口case
|   |  |-- PreviewRelease_02_SubmitCredit.py      #预发布环境主流程接口case
|   |  |-- PreviewRelease_03_MobilePhonePWD.py    #预发布环境.......接口case
|   |  |-- Test_01_RegisterLogin.py               #测试环境注册登录接口case
|   |  |-- Test_02_SubmitCredit.py                #测试环境主流程接口case
|   |  |-- Test_03_LoanRepayment.py               #测试环境......接口case
|   |  |
|-- data/                                            ###配置信息
|   |-- conf_dict.py                                 #账号密码等配置信息
|   |-- custom_variable.py                           #向接口请求的请求的参数变量、keys、请求头
|   |-- export_url.py                                #接口url
|   |-- request_dict.py                              #向接口请求的请求参数
|   
|-- logic/                                           ###主要逻辑
|   |-- export_logic.py                              #接口实现主逻辑
|   |-- database.py                                  #数据库操作类
|   |-- log_print.py                                 #日志打印类
|   |-- public_class.py                              #公用函数类
|   |-- send_email.py                                #发送邮件类
|               
|-- log/                                            #日志
|   |-- Case--201804182020_log                      #根据运行日期保存操作日志
|   |-- Case--201804182022_log 
|
|-- report/                                         #测试报告
|   |-- report--201804181729.html                   #根据运行日期保存测试报告日志
|   |-- report--201804181731.html  
|   
|-- readme.md                                       #readme
|
|-- manage.py                                       #接口case运行管理类
"""

 

三、知识技能储备

萝卜青菜各有所爱,每个人心中的接口自动化测试框架也各不相同,想实现一个基础功能完备的接口测试框架需要的Python知识如下:

1、Python基础知识

列举了一些需要掌握的基础知识

2、主要依赖的库

下面介绍的库都是数据库操作、日志打印、报告输出、邮件发送功能实现所依赖的库:

a、数据库操作

数据库操作主要pymysql库,下面为代码示例:

import pymysql
import datetime,time
import os

from data.conf_dict import ConfData
from logic.log_print import LOG
from logic.public_class import Public

class ConnectDatabase():
    """
    连接数据库类
    """
    def __init__(self):
        self.Log = LOG()
        self.conf_data = ConfData()
        # 连接数据库110
        self.connection_110 = pymysql.connect(host=self.conf_data.get_conf_data("database_110", "host"),
                                              port=self.conf_data.get_conf_data("database_110", "port"),
                                              user=self.conf_data.get_conf_data("database_110", "user"),
                                              password=self.conf_data.get_conf_data("database_110", "password"),
                                              db=self.conf_data.get_conf_data("database_110", "db"),
                                              charset='utf8',
                                              # 以字典形式展示所查询数据
                                              cursorclass=pymysql.cursors.DictCursor)



        # 保存错误日志的文件名称
        self.log_name = Public.get_new_file(path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "log"))

    def select_sql(self, **kwargs):
        """
        根据传入参数执行数据库查询操作
        :param args:
        :param kwargs:   database:(database_110)选择数据库、table:表名、condition:where条件
        :return:
        """

        database_name = kwargs.get("database")
        field_name = kwargs.get("field", "*")
        table_name = kwargs.get("table")
        where_condition = kwargs.get("condition")
        if database_name == "database_110":
            try:
                with self.connection_110.cursor() as cursor:
                    sql = "SELECT %s FROM %s WHERE %s;"
                    data = (field_name, table_name, where_condition)
                    cursor.execute(sql % data)
                    self.connection_110.commit()
                    result = cursor.fetchone()
                    return result
            except Exception as e:
                self.Log.log_warning("database", self.log_name, "select_error:%s" % e)

    def select_sql_all(self, *args, **kwargs):
        """
        根据传入参数执行数据库查询操作
        :param args:
        :param kwargs:   database:(database_110)选择数据库、table:表名、condition:where条件
        :return:
        """

        database_name = kwargs.get("database")
        field_name = kwargs.get("field", "*")
        table_name = kwargs.get("table")
        where_condition = kwargs.get("condition")
        if database_name == "database_110":
            try:
                with self.connection_110.cursor() as cursor:
                    sql = "SELECT %s FROM %s WHERE %s;"
                    data = (field_name, table_name, where_condition)
                    print((sql % data))
                    cursor.execute(sql % data)
                    self.connection_110.commit()
                    result = cursor.fetchall()
                    return result
            except Exception as e:
                self.Log.log_warning("database", self.log_name, "select_error:%s" % e)


    def update_sql(self, *args, **kwargs):
        """
        根据传入参数执行数据库更新操作
        :param args:
        :param kwargs:   database:(database_110)选择数据库、table:表名、set:更新的字段和值、condition:where条件
        :return:
        """
        database_name = kwargs.get("database")
        table_name = kwargs.get("table")
        set_value = kwargs.get("set")
        where_condition = kwargs.get("condition")
        if database_name == "database_110":
            try:
                with self.connection_110.cursor() as cursor:
                    sql = "UPDATE %s SET %s WHERE %s;"
                    data = (table_name, set_value, where_condition)
                    # print(sql%data)
                    cursor.execute(sql % data)
                    self.connection_110.commit()
                    return cursor.rowcount
                    cursor.close()
            except Exception as e:
                self.Log.log_warning("database", self.log_name, "update_error:%s" % e)
                self.connection_110.rollback()


    def delete_sql(self, *args, **kwargs):
        """
        根据传入参数执行数据库删除操作
        :param args:
        :param kwargs:   database:(database_110)选择数据库、table:表名、condition:where条件
        :return:
        """
        database_name = kwargs.get("database")
        table_name = kwargs.get("table")
        where_condition = kwargs.get("condition")
        if database_name == "database_110":
            try:
                with self.connection_110.cursor() as cursor:
                    sql = "DELETE from %s where %s;"
                    data = (table_name,  where_condition)
                    cursor.execute(sql % data)
                    self.connection_110.commit()
                    return cursor.rowcount
            except Exception as e:
                self.Log.log_warning("database", self.log_name, "delete_error:%s" % e)
                self.connection_110.rollback()

    def insert_sql(self, *args, **kwargs):
        """
        根据传入参数执行数据库插入操作
        :param args:
        :param kwargs:   database:(database_110)选择数据库、sql:需要插入的sql语句
        :return:
        """
        database_name = kwargs.get("database")
        insert_sql = kwargs.get("sql")
        if database_name == "database_110":
            try:
                with self.connection_110.cursor() as cursor:
                    cursor.execute(insert_sql)
                    self.connection_110.commit()
                    return cursor.rowcount
            except Exception as e:
                self.Log.log_warning("database", self.log_name, "insert_error:%s" % e)
                self.connection_110.rollback()

    def mysql_function(self, *args, **kwargs):
        """
        根据传入参数执行数据库函数操作
        :param args:
        :param kwargs:   database:(database_110)选择数据库、function_name:函数名称,data_id:数据ID,phone:电话
        :return:
        """
        database_name = kwargs.get("database")
        function_name = kwargs.get("function_name")
        data_id = kwargs.get("data_id")
        phone = kwargs.get("phone")
        product_id = kwargs.get("product_id")
        if database_name == "database_110":
            try:
                with self.connection_110.cursor() as cursor:
                    cursor.callproc(function_name,args=(data_id,phone,product_id,))
                    self.connection_110.commit()
            except Exception as e:
                self.Log.log_warning("database", self.log_name, "mysql_function:%s" % e)
                self.connection_110.rollback()



if __name__ == "__main__":
    b = ConnectDatabase()
    data_id = Public.create_short_id()
示例代码--删减版

PS:pymsql库操作mysql数据库增、删、查、改、调用函数

 b、日志打印

日志打印依赖logging库,下面为代码示例:

import logging
import os
import sys
import datetime


log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "log")
sys.path.append(log_path)


class LOG(object):
    """
    日志打印类
    """
    def __init__(self):
        self.now_time = datetime.datetime.now().strftime("%Y%m%d%H%M")

    def log_info(self, *args):
        """
        根据传入参数打印普通日志
        :param arg[0]log的功能模块名,arg[1] 保存log的文件名,arg[2]要打印的日志内容
        :return 返回logger 对象
        """
        # 创建一个logger对象
        logger = logging.getLogger(args[0])
        logger.setLevel(logging.DEBUG)

        # 创建一个向屏幕输入的handler对象
        ch = logging.StreamHandler(sys.stdout)
        ch.setLevel(logging.DEBUG)

        # 创建一个像文件输入的handler对象
        log_file = os.path.join(log_path, "%s--%s_log" % (args[1], self.now_time))
        fh = logging.FileHandler(log_file, mode="a+", encoding="utf-8")
        fh.setLevel(logging.DEBUG)
        # 设置log输入格式
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        ch.setFormatter(formatter)
        fh.setFormatter(formatter)
        # logger,添加handler对象
        logger.addHandler(ch)
        logger.addHandler(fh)
        logger.info(args[2])
        #  在记录日志之后移除句柄, 不然会重复打印日志
        logger.removeHandler(fh)
        logger.removeHandler(ch)
        return logger

    @staticmethod
    def log_warning(*args):
        """
        根据传入参数错误日志
        :param arg[0]log的功能模块名,arg[1]文件名  arg[2] 需要打印的内容
        :return 返回logger 对象
        """
        # 创建一个logger对象
        logger = logging.getLogger(args[0])
        logger.setLevel(logging.DEBUG)

        # 创建一个向屏幕输入的handler对象
        ch = logging.StreamHandler(sys.stdout)
        ch.setLevel(logging.DEBUG)

        # 创建一个像文件输入的handler对象
        log_file = os.path.join(log_path, args[1])
        fh = logging.FileHandler(log_file, mode="a+", encoding="utf-8")
        fh.setLevel(logging.DEBUG)
        # 设置log输入格式
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        ch.setFormatter(formatter)
        fh.setFormatter(formatter)
        # logger,添加handler对象
        logger.addHandler(ch)
        logger.addHandler(fh)
        logger.warning(args[2])
        #  在记录日志之后移除句柄, 不然会重复打印日志
        logger.removeHandler(fh)
        logger.removeHandler(ch)
        print("aaa")
        return logger

    # @staticmethod
    def log_debug(self,message):
        """
        根据传入参数错误日志
        :param arg[0]log的功能模块名,arg[1]文件名  arg[2] 需要打印的内容
        :return 返回logger 对象
        """
        # 创建Logger
        logger = logging.getLogger()
        logger.setLevel(logging.DEBUG)

        # 创建Handler

        # 终端Handler
        consoleHandler = logging.StreamHandler(sys.stdout)
        consoleHandler.setLevel(logging.DEBUG)

        # 文件Handler
        fileHandler = logging.FileHandler('ing.log', mode='w', encoding='UTF-8')
        fileHandler.setLevel(logging.NOTSET)

        # Formatter
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        consoleHandler.setFormatter(formatter)
        fileHandler.setFormatter(formatter)

        # 添加到Logger中
        logger.addHandler(consoleHandler)
        logger.addHandler(fileHandler)

        return logger.debug(message)

# if __name__ == "__main__":
#     A=LOG()
#     A.log_debug('123')
示例代码--删减版

 PS:logging库详细介绍

c、报告输入

报告输出依赖库主要为requests、unittest、下面为代码示例:

import requests


from data.export_url import ExportERL
from data.request_dict import RequestData
from logic.log_print import LOG
from logic.database import ConnectDatabase
from data.custom_variable import GlobalVariable


class ExportLogic(object):

    """
    ExportLogic 接口主逻辑类
    """

    def __init__(self):
        self.Request = RequestData()
        self.Url = ExportERL()
        self.ConnectDatabase = ConnectDatabase()
        self.Log = LOG()
        self.Custom_variable = GlobalVariable()

    @staticmethod
    def request_post(*args, **kwargs):
        """
        传入请求参数,返回json请求结果
        :param args:
        :param kwargs: json:请求数据, url:请求路径, header:请求头
        :return:
        """
        request_data = kwargs.get("json")
        request_url = kwargs.get("url")
        request_header = kwargs.get("header")
        response_json = requests.post(url=request_url, data=request_data, headers=request_header).json()
        return response_json

    @staticmethod
    def request_post_json(*args, **kwargs):
        """
        传入请求参数,返回json请求结果
        :param args:
        :param kwargs: json:请求数据, url:请求路径, header:请求头
        :return:
        """
        request_data = kwargs.get("json")
        request_url = kwargs.get("url")
        request_header = kwargs.get("header")
        response_json = requests.post(url=request_url, json=request_data, headers=request_header)
        return response_json

    @staticmethod
    def request_post_file(*args, **kwargs):
        """
        上传接口独有
        :param args:
        :param kwargs: json:请求数据, url:请求路径, header:请求头, file:文件路径
        :return:
        """
        request_data = kwargs.get("json")
        request_url = kwargs.get("url")
        request_header = kwargs.get("header")
        request_file = kwargs.get("files")
        response_json = requests.post(url=request_url, data=request_data, files=request_file,
                                      headers=request_header).json()
        return response_json


if __name__ == "__main__":
    b = ExportLogic()
requests库演示代码--删减版
import unittest
import os
import HTMLTestReportCN
import datetime

from logic.export_logic import ExportLogic
from data.custom_variable import GlobalVariable
from data.export_url import ExportERL
from logic.public_class import Public
from data.request_dict import RequestData
from logic.log_print import LOG
from logic.database import ConnectDatabase
from data.conf_dict import ConfData
from logic.send_email import SendEmail
import random,requests

class LZExportCase(unittest.TestCase):
    """
    新浪有借有还接口测试用例
    """

    # 全距
    PHONE = Public().create_phone()
    #提交五项资料的Header
    HEADER = GlobalVariable().get_header('lz_Header')
    #修改手机号的Header
    HEADER1 = GlobalVariable().get_header('lz_Header')
    BASEID = ''
    PERIODNUM = ''
    IMGID =''
    Repayid = ''
    Periodnum_ = ''


    def setUp(self):
        self.Export_logic = ExportLogic()
        self.Export_url = ExportERL()
        self.Public = Public()
        self.Request_data = RequestData()
        self.Custom_variable = GlobalVariable()
        self.Log = LOG()
        self.ConnectDatabase = ConnectDatabase()
        self.Conf_data = ConfData()
        self.password = GlobalVariable().get_variable("password")
        self.phone = Public().create_phone()
        self.newphone = GlobalVariable().get_variable("YJ_newphone")
        # self.report_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "report")
        self.log_path = os.path.dirname(os.path.dirname(__file__))
        self.path = os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static"),
                                 "logo.jpg")

    def tearDownC(self):
        pass
        # os.remove(os.path.join(self.report_path, "report.html"))
        # os.remove(os.path.join(os.path.join(self.path, "log"), "Export_log"))


    def test_085_FileLoad(self):
        """
            Case--上传文件(saveType正常)
        """
        url = self.Export_url.get_export_url("LZ_test_url", "File")
        request_json = self.Request_data.get_request_data('File')
        file_ = open(self.path, 'rb')

        file = {
            "file": ('logo.jpg', file_, 'image/jpeg')
           }
        request_json["saveType"] = '1'
        request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json)
        self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),
                          "%s request data:%s" % (self._testMethodName, request_json))
        response_json = self.Export_logic.request_post_file(url=url, json=request_json, files=file, header=self.HEADER)
        LZExportCase.IMGID = response_json['data']['id']
        self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),
                          "%s response data:%s" % (self._testMethodName, response_json))
        self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['msg'], response_json['msg']) \
        and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['code'], response_json['code'])
        file_.close()

    def test_086_HeadImg(self):
        """
            Case--修改头像(headimgId正常)
        """
        url = self.Export_url.get_export_url("LZ_test_url", "HeadImg")
        request_json = self.Request_data.get_request_data('HeadImg')
        request_json["headimgId"] = LZExportCase.IMGID
        request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json)
        self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),
                          "%s request data:%s" % (self._testMethodName, request_json))
        response_json = self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER)
        self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),
                          "%s response data:%s" % (self._testMethodName, response_json))
        self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['msg'], response_json['msg']) \
        and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['code'], response_json['code'])

    def test_087_HeadImg(self):
        """
            Case--修改头像(headimgId错误)
        """
        url = self.Export_url.get_export_url("LZ_test_url", "HeadImg")
        request_json = self.Request_data.get_request_data('HeadImg')
        request_json["headimgId"] = '3213adcx213vcv'
        request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json)
        self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),
                          "%s request data:%s" % (self._testMethodName, request_json))
        response_json = self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER)
        self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),
                          "%s response data:%s" % (self._testMethodName, response_json))
        self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_ERROR')['msg'], response_json['msg']) \
        and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_ERROR')['code'], response_json['code'])

    def test_088_HeadImg(self):
        """
            Case--修改头像(headimgId为空)
        """
        url = self.Export_url.get_export_url("LZ_test_url", "HeadImg")
        request_json = self.Request_data.get_request_data('HeadImg')
        request_json["headimgId"] = ''
        request_json = self.Public.sign_md5(keys="lz_test_keys", json=request_json)
        self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),
                          "%s request data:%s" % (self._testMethodName, request_json))
        response_json = self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER)
        self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),
                          "%s response data:%s" % (self._testMethodName, response_json))
        self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_NULL')['msg'], response_json['msg']) \
        and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_NULL')['code'], response_json['code'])


if __name__ == '__main__':
    Send_mail = SendEmail()
    now_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
    module_name = os.path.basename(__file__).split(".")[0]
    module = __import__(module_name)
    path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "report")
    logo_path = os.path.join(os.path.join(path, "static"),"logo.jpg")
    fp = open(os.path.join(path, "report--%s.html" % now_time), "wb")
    runner = HTMLTestReportCN.HTMLTestRunner(stream=fp)
    all_suite = unittest.defaultTestLoader.loadTestsFromModule(module)
    runner.run(all_suite)
    fp.close()
    SendEmail.send_email(Send_mail)
unittest库演示代码--删减版

PS:requests库unittest库详细介绍

d、邮件发送

邮件发送依赖两个python内置库smtplib、email:

import os
import smtplib
import email.mime.multipart
import email.mime.text
from email import encoders
import datetime

from data.conf_dict import ConfData
from logic.public_class import Public



class SendEmail(object):
    def __init__(self):
        self.conf_data = ConfData()
        self.sender = self.conf_data.get_conf_data("email", "sender")
        self.receiver = self.conf_data.get_conf_data("email", "receiver")
        self.SMTP_server = self.conf_data.get_conf_data("email", "SMTP_server")
        self.username = self.conf_data.get_conf_data("email", "username")
        self.password = self.conf_data.get_conf_data("email", "password")
        self.content = self.conf_data.get_conf_data("email", "content")
        self.now_time = datetime.datetime.now().strftime("%Y%m%d%H%M")
        self.report_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "report")
        self.log_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "log")

    def create_msg(self,receiver):
        """
        此函数主要构建收发邮件联系人、邮件正文、邮件附件、邮件标题
        :return:
        """
        # 构建邮件正文
        msg = email.mime.multipart.MIMEMultipart()
        msg["from"] = self.sender
        msg["to"] = receiver
        msg['subject'] = "API自动化测试报告"
        txt = email.mime.text.MIMEText(self.content)
        msg.attach(txt)

        # 构建邮件附件之一:自动化测试报告
        report_name = Public.get_new_file(path=self.report_path)
        report_path = os.path.join(self.report_path, report_name)
        report = email.mime.text.MIMEText(open(report_path, "rb")
                                          .read(), 'html', 'utf-8')
        report["Content-Type"] = 'application/octet-stream'
        report.add_header('Content-Disposition', 'attachment', filename=('gbk', '', report_name))
        encoders.encode_base64(report)
        msg.attach(report)

        # 构建邮件附件之一:测试日志
        log_name = Public.get_new_file(path=self.log_path)
        log_path = os.path.join(self.log_path, log_name)
        info_log = email.mime.text.MIMEText(open(log_path, 'rb').read(), 'base64', 'utf-8')
        info_log["Content-Type"] = 'application/octet-stream'
        info_log.add_header('Content-Disposition', 'attachment', filename=('gbk', '', log_name))
        encoders.encode_base64(info_log)
        msg.attach(info_log)
        return msg

    def send_email(self,*args):
        """
        创建实例,发送邮件
        :return:
        """
        receiver = self.conf_data.get_conf_data("email", "%s_receiver" % args[0][0]) if args else self.receiver
        msg = self.create_msg(receiver)
        smtp = smtplib.SMTP_SSL(self.SMTP_server, 465)  # 在Linux端使用ssL方式连接邮箱服务器
        # smtp.connect(self.SMTP_server, 465)           # 在windows端使用connect方式连接邮箱服务器
        smtp.login(self.username, self.password)
        smtp.sendmail(self.sender, receiver.split(","), msg.as_string())
        smtp.quit()


if __name__ == "__main__":
    s = SendEmail()
    s.send_email()
示例代码

PS:smtplib、email库详细介绍


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM