:以qq郵箱為例
使用的場景:
(1) 目前用在了重置用戶密碼
(2) 項目需要發送測試報告到用戶郵箱
(3) 之前見過一個用戶使用郵件的形式維護linux服務器(服務器端需要接收郵件進行解讀)
寫代碼前要准備的:
(1) 需要准備一個QQ號,因為要以這個QQ號發送和接受郵件。
(2) 需要這個QQ號郵箱開啟SMTP/POP3服務
# 點開會驗證,驗證完會給個授權碼,之后就是通過這個授權碼代替用戶密碼
提醒:直接修改if __main__里的內容即可
發送郵件:
import os import random import smtplib import time from email.header import Header from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart class MyEmail(object): def __init__(self, sender, receiver, username, password, smtp_server='smtp.qq.com'): self.sender = sender self.receiver = receiver self.username = username self.password = password # 郵箱開啟POP3和SMTP服務的授權碼 self.smtp_server = smtp_server # 連接服務器 self.connect_server() # 表示一封郵件,需要郵件主題 def create_email(self, mail_title): # 創建一個帶附件的實例 message = MIMEMultipart() message['From'] = self.sender message['To'] = self.receiver message['Subject'] = Header(mail_title, 'utf-8') self.message = message # 附件內容,如文本文件,圖片文件等 def email_appendix(self, file_path): att1 = MIMEText(open(file_path, 'rb').read(), 'base64', 'utf-8') # 指定頭部信息 att1["Content-Type"] = 'application/octet-stream' # 內容為二進制流 att1["Content-Disposition"] = 'attachment; filename="%s"' % (os.path.basename(file_path)) self.message.attach(att1) def email_text(self, content, content_type='plain'): # 郵件正文內容 # plain正常文本內容,html可以發送html格式內容 self.message.attach(MIMEText(content, content_type, 'utf-8')) def connect_server(self): # 連接郵件服務器 smtpObj = smtplib.SMTP_SSL() # 注意:如果遇到發送失敗的情況(提示遠程主機拒接連接),這里要使用SMTP_SSL方法 smtpObj.connect(self.smtp_server) try: # 連接qq郵箱服務器 smtpObj.login(self.username, self.password) # 給qq郵箱發送用戶名和授權碼,進行驗證,如果賬號沒有授權會返回smtplib.SMTPAuthenticationError except smtplib.SMTPAuthenticationError: print('請檢查用戶名和授權碼是否添加正確!') return else: self.smtpObj = smtpObj def send_mail(self): self.smtpObj.sendmail(self.sender, self.receiver, self.message.as_string()) # 發送一封郵件 print("郵件發送成功!!!") def __del__(self): self.smtpObj.close() # self.smtpObj.quit() t = ['烏鴉坐飛機', '邪惡在山頂', '雙龍探珠', '螳螂拳', '蛇足拳', '水蓮飄', ' 無相招', '佛朗明哥招', '飛天拳', ' 貓腳落地', ' 熊掌出擊', ' 貓甩水', ' 貓轉身'] if __name__ == '__main__': obj = MyEmail(sender='xxx@qq.com', receiver='xxx@qq.com', username='xx', password='xxx') for i in range(10): # 創建郵件,及設置標題 obj.create_email('來自zezhou的轟炸消息') # 添加郵件內容 obj.email_text(random.choice(t)) # 添加附件,如圖片或者文件啥的,需要文件的路徑 # obj.email_appendix('suolong.jpg') # 發送郵件 obj.send_mail() time.sleep(0.5) else: obj.create_email('boom~') obj.email_text('boom boom boom') obj.send_mail()
效果:
發送郵件需要注意的:
# 如果想要發送html格式的內容
# 發送的html格式的內容,只會留body標簽中的內容(且不包含script標簽)
接受郵件:
import poplib import base64 import time from email.parser import Parser # 用來解析郵件主題 from email.header import decode_header # 用來解析郵件來源 from email.utils import parseaddr class AcceptEmail(object): def __init__(self, user_email, password, pop3_server='pop.qq.com'): self.user_email = user_email self.password = password self.pop3_server = pop3_server self.connect_email_server() def connect_email_server(self): self.server = poplib.POP3(self.pop3_server) # 打印POP3服務器的歡迎文字,驗證是否正確連接到了郵件服務器 # print('連接成功 -- ', self.server.getwelcome().decode('utf8')) # +OK QQMail POP3 Server v1.0 Service Ready(QQMail v2.0) # 開始進行身份驗證 self.server.user(self.user_email) self.server.pass_(self.password) def __del__(self): # 關閉與服務器的連接,釋放資源 self.server.close() def get_email_count(self): # 返回郵件總數目和占用服務器的空間大小(字節數), 通過stat()方法即可 email_num, email_size = self.server.stat() # print("消息的數量: {0}, 消息的總大小: {1}".format(email_num, email_size)) return email_num def receive_email_info(self, now_count=None): # 返回郵件總數目和占用服務器的空間大小(字節數), 通過stat()方法即可 email_num, email_size = self.server.stat() # print("消息的數量: {0}, 消息的總大小: {1}".format(email_num, email_size)) self.email_count = email_num self.email_sumsize = email_size # 使用list()返回所有郵件的編號,默認為字節類型的串 rsp, msg_list, rsp_siz = self.server.list() # print(msg_list, '郵件數量',len(msg_list)) # print("服務器的響應: {0},\n消息列表: {1},\n返回消息的大小: {2}".format(rsp, msg_list, rsp_siz)) # print('郵件總數: {}'.format(len(msg_list))) self.response_status = rsp self.response_size = rsp_siz # 下面獲取最新的一封郵件,某個郵件下標(1開始算) # total_mail_numbers = len(msg_list) # 動態取消息 total_mail_numbers = now_count rsp, msglines, msgsiz = self.server.retr(total_mail_numbers) # rsp, msglines, msgsiz = self.server.retr(1) # print("服務器的響應: {0},\n原始郵件內容: {1},\n該封郵件所占字節大小: {2}".format(rsp, msglines, msgsiz)) # 從郵件原內容中解析 msg_content = b'\r\n'.join(msglines).decode('gbk') msg = Parser().parsestr(text=msg_content) self.msg = msg # print('解碼后的郵件信息:\n{}'.format(msg)) def recv(self, now_count=None): self.receive_email_info(now_count) self.parser() def get_email_title(self): subject = self.msg['Subject'] value, charset = decode_header(subject)[0] if charset: value = value.decode(charset) # print('郵件主題: {0}'.format(value)) self.email_title = value def get_sender_info(self): hdr, addr = parseaddr(self.msg['From']) # name 發送人郵箱名稱, addr 發送人郵箱地址 name, charset = decode_header(hdr)[0] if charset: name = name.decode(charset) self.sender_qq_name = name self.sender_qq_email = addr # print('發送人郵箱名稱: {0},發送人郵箱地址: {1}'.format(name, addr)) def get_email_content(self): content = self.msg.get_payload() # 文本信息 content_charset = content[0].get_content_charset() # 獲取編碼格式 text = content[0].as_string().split('base64')[-1] text_content = base64.b64decode(text).decode(content_charset) # base64解碼 self.email_content = text_content # print('郵件內容: {0}'.format(text_content)) # 添加了HTML代碼的信息 content_charset = content[1].get_content_charset() text = content[1].as_string().split('base64')[-1] # html_content = base64.b64decode(text).decode(content_charset) # print('文本信息: {0}\n添加了HTML代碼的信息: {1}'.format(text_content, html_content)) def parser(self): self.get_email_title() self.get_sender_info() self.get_email_content() def get_new_mail(dic, second=5): t = AcceptEmail(**dic) now_count = t.get_email_count() print('開啟的時候的郵件數量為:%s' % now_count) # 每次需要重新連接郵箱服務器,才能獲取到最新的消息 # 默認每隔5秒看一次是否有新內容 while True: obj = AcceptEmail(**dic) count = obj.get_email_count() if count > now_count: new_mail_count = count - now_count print('有新的郵件數量:%s' % new_mail_count) for i in range(1, new_mail_count + 1): obj = AcceptEmail(**dic) now_count += 1 obj.recv(now_count) yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email, "email_content": obj.email_content} # print('-' * 30) # print("郵件主題:%s\n發件人:%s\n發件人郵箱:%s\n郵件內容:%s" % ( # obj.email_title, obj.sender_qq_name, obj.sender_qq_email, obj.email_content)) # print('-' * 30) # else: # print('沒有任何新消息.') time.sleep(second) if __name__ == '__main__': dic = { 'user_email': 'xxx@qq.com', 'password': 'xxx', } print('正在監聽郵件服務器端是否有新消息---') try: iterator = get_new_mail(dic) except TypeError: print('監聽的內容有誤,有圖片數據等,無法解析而報錯,不是純文本內容') else: for dic in iterator: # 如果需要過濾某個用戶的郵件內容,加個if判斷是否是該郵箱即可 # if dic.get("sender_email") == 'xxx': print('-' * 30) print("郵件主題:%s\n發件人:%s\n發件人郵箱:%s\n郵件內容:%s" % ( dic["title"], dic["sender"], dic["sender_email"], dic["email_content"])) print('-' * 30)
過程中遇到的問題:
1.認證問題
解決方法:
在自己的QQ郵箱開啟POP3和SMTP服務,得到授權碼,填寫正確的信息,重新進行認證。