python連接數據庫實現自動發郵件
1.運行環境
redhat6 + python3.6 + crontab + Oracle客戶端
2.用到的模塊
3.操作步驟
(1)安裝python3.6參考步驟https://www.cnblogs.com/zhur/p/12044171.html
(2)安裝python模塊,pip安裝
4.安裝Oracle客戶端https://oracle.github.io/odpi/doc/installation.html#linux
5.配置環境變量 ~/.bash_profile
(1)LD_LIBRARY_PATH=[oracle客戶端安裝目錄]:$LD_LIBRARY_PATH
(2)export NLS_LANG=AMERICAN_AMERICA.ZHS16GBK(數據庫中的)
6.編寫配置文件和執行腳本
(1)配置文件:
##數據庫相關參數:主機 、數據庫用戶、數據庫密碼、數據庫sid、數據庫字符集nls_lan
ora_host = '192.168.168.168'
ora_user = '**********'
ora_pwd = '**********'
ora_sid = '****'
##nls_lan = 'AMERICAN_AMERICA.ZHS16GBK'
##數據庫共享庫目錄
##lib_path = '/opt/oracle/instantclient_11_2'
##執行腳本(py,sql)目錄
exe_path = '/root/python/prod/cstest/bin'
##數據庫導出sql語句文件,多個文件用逗號隔開;如果sql文件在執行腳本目錄下,可以使用相對路徑,否則使用絕對路徑;
##注意:標點符號都是英文的,sql文件中的時間變量必須是 &date 或 &yyyymmdd
sql_files = '/root/python/prod/cstest/bin/測試.sql'
## sql語句中的導數日期 yesterday :表示昨天 ,目前只支持這一種情況,以后可以酌情增加其它日期
exp_date = 'yesterday'
##導出excel目錄
excel_path = "/root/python/prod/cstest/data"
##導出excel文件sheet名,sheet名稱與上面參數sql_files中的每個sql文件中sql條數一一對應,如果沒有設置對應名稱,則sheet名稱默認
excel_sheet_names = [['測試']]
##導出exel文件是否壓縮 1:壓縮 0:不壓縮 刪除此參數
##is_zip = 1
##壓縮后文件名前綴 ,壓縮文件名=前綴+導數日期
zipfile_name = '測試'
##是否發送郵件 1: 發送 0:不發送
is_mail = 1
##郵箱發送服務器
mail_server = 'xxxxxxxxx.@com'
##郵箱發送服務器端口
mail_port = '**'
##發送郵件
mail_from = 'xxxxxxxx@qq.com'
##發送郵件密碼
mail_pass = '********'
##接收郵件
mail_to = 'xxxxxxxx@qq.com'
#抄送郵件
mail_cc = 'xxxxxxxxx@qq.com'
##郵件標題
mail_subject = '測試'
##郵件內容
mail_body = """
<p>您好:</p>
<p> 數據已提取,詳見附件,請查收; </p>
<p></p>
<p></p>
<p>-------------------------------------------------------------------------------------</p>
<p> 可視化運維:朱瑞
<p> 熱線電話:18834195657 </p>
<p> 運維郵箱:xxxxxxxxx@qq.com</p>
"""
(2)執行腳本
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import cx_Oracle as cx
from openpyxl import Workbook
import datetime
import os
import zipfile
from config_ora2excel import *
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email import encoders
from email.header import Header
from email.mime.base import MIMEBase
#os.environ['LD_LIBRARY_PATH'] = lib_path
#os.environ['NLS_LANG'] = nls_lan
#os.environ.setdefault("LD_LIBRARY_PATH", lib_path)
#os.environ.setdefault("NLS_LANG", nls_lan)
def get_yesterday():
today = datetime.date.today()
oneday = datetime.timedelta(days=1)
yesterday = today - oneday
return yesterday
def get_data_date():
# 獲取數據日期
##data_date = input('請輸入數據日期,格式yyyymmdd,默認為昨天:')
data_date = exp_date
if data_date.strip().lower() == 'yesterday':
yesterday = get_yesterday()
data_date = yesterday.strftime("%Y%m%d")
##print(data_date)
return data_date
def get_weeks():
#獲取周次
data_date = get_data_date()
y = int(data_date[:4])
m = int(data_date[4:6])
d = int(data_date[6:])
weeks = datetime.date(y,m,d).isocalendar()[1]
return weeks
def ora_to_excel(data_date):
try:
ret = 0
#print(os.environ['LD_LIBRARY_PATH'])
#print(os.environ['NLS_LANG'])
# 連接數據庫
# connect = cx.connect(user+'/'+pwd+'@'+host+'/'+sid)
connect = cx.connect('{}/{}@{}/{}'.format(ora_user, ora_pwd, ora_host, ora_sid))
print('連接數據庫成功!!')
# 獲取數據日期
data_date = data_date
print(data_date)
##切換到腳本執行目錄
os.chdir(exe_path)
# 獲取sql執行並導出到excel,可能是多條sql,需要分別執行
fi = 0
for f_sql in sql_files.split(','):
start = datetime.datetime.now()
##print(type(f_sql))
cursor = connect.cursor()
print(f_sql)
with open(f_sql, 'r', encoding='utf-8') as f:
sqls = f.read()
j = 0
wb = Workbook()
for sql in sqls.split(';'):
sql = sql.strip()
if not sql:
break
##替換變量 sql中的變量必須是 &date 或 &yyyymmdd
# sql = sql.lower()
sql = sql.replace('&date', data_date)
sql = sql.replace('&yyyymmdd', data_date)
##sql = sql.decode('utf-8')
print('-----------------------------------------開始執行sql:\n', sql)
cursor.execute(sql)
print('-----------------------------------------執行sql成功\n')
print('-----------------------------------------開始導出excel\n')
sht = wb.create_sheet(title='Mysheet'+str(j),index=j)
## 如果變量設置了sheet名稱,則修改
if len(excel_sheet_names) >= fi + 1:
if len(excel_sheet_names[fi]) >= j + 1 and excel_sheet_names[fi][j].strip():
sht.title = excel_sheet_names[fi][j]
print(sht.title)
# 處理表頭
i = 1
for col in cursor.description:
sht.cell(1, i).value = col[0]
i += 1
##print('11')
# 寫數據
##offset = 2
while True:
results = cursor.fetchmany(10000)
if not results:
break
resLen = len(results)
for res in results:
sht.append(res)
##在開頭插入一列
###print('1111')
sht.insert_cols(1)
###print('22')
###print(sht.max_row)
# 給第一列寫入行標
for k in range(2, sht.max_row+1):
sht.cell(k,1).value = k - 1
# 獲取導出文件目錄及文件名
exl_file = f_sql.split(os.sep)[-1].split('.')[0] + '_' + data_date + '.xlsx'
if excel_path.strip():
exl_file = excel_path + os.sep + exl_file
print(exl_file)
j += 1
wb.save(exl_file)
##wb.close()
cursor.close()
print('-----------------------------------------導出excel成功!\n')
print('導出excel 花費時間:', datetime.datetime.now() - start, '\n\n')
fi += 1
ret = 1
except Exception as e:
print('Error:', e)
##raise e
finally:
connect.close()
return ret
def get_zip_file(input_path, wd, results):
"""
在目錄下查找包含關鍵字的文件
input_path: 目錄
wd: 關鍵字
results:查找結果列表
"""
try:
files = os.listdir(input_path)
for filename in files:
fp = os.path.join(input_path, filename)
if os.path.isfile(fp) and wd in filename:
# 使用相對路徑
results.append('.' + os.sep + filename)
except Exception as e:
print(e)
raise e
def zip_files(input_path, output_zip_name, data_date):
"""
壓縮文件
input_path:目錄
output_zip_name:zip文件名
data_date :數據日期
"""
try:
ret = None
start = datetime.datetime.now()
filelists = []
wd = data_date + '.xlsx'
get_zip_file(input_path, wd, filelists)
if len(filelists) > 0:
print('-----------------------------------------開始壓縮文件')
f = zipfile.ZipFile(input_path + os.sep + output_zip_name, 'w', zipfile.ZIP_DEFLATED)
# 切換到打包目錄
os.chdir(input_path)
for file in filelists:
f.write(file)
print(input_path + os.sep + output_zip_name)
print('-----------------------------------------壓縮文件完成')
print('壓縮文件 花費時間:', datetime.datetime.now() - start, '\n\n')
f.close()
print('壓縮文件名:',input_path + os.sep + output_zip_name)
ret = output_zip_name
return ret
except Exception as e:
print('Error:', e)
raise e
finally:
return ret
def send_mail(zipfile):
# 構造一個MIMEMultipart對象代表郵件本身
msg = MIMEMultipart()
msg['From'] = mail_from
msg['To'] = mail_to
msg['Cc'] = mail_cc
weeks = str(get_weeks())
mail_sub = mail_subject.replace('{weeks}',weeks)
msg['Subject'] = Header(mail_sub,'utf-8').encode()
msg.attach(MIMEText(mail_body,'html','utf-8'))
# 切換到zip文件目錄
os.chdir(excel_path)
#二進制模式讀取zip文件
with open(zipfile,'rb') as f:
zipf = zipfile.split('.')[0]
# 設置附件的MIME和文件名,這里是zip類型:
mime = MIMEBase('zip', 'zip', filename=zipfile)
# 加上必要的頭信息:
#mime.add_header('Content-Disposition', 'attachment', file_name=Header(zipfile,'utf-8').encode())
#mime.add_header('Content-Disposition', 'attachment', filename=('gbk', '', zipfile))
#mime.add_header('Content-Disposition', 'attachment', filename=('utf-8', '', zipfile)) ##這種方式發送,郵件客戶端收到郵件,中文附件名還是可能有亂碼
mime.add_header('Content-Disposition', 'attachment', filename= Header(zipfile,'gbk').encode() ) ## ##中文附件名稱outlook,網頁,foxmail均顯示正常
#mime.add_header('Content-ID', '<0>')
#mime.add_header('X-Attachment-Id', '0')
# 把附件的內容讀進來:
mime.set_payload(f.read())
# 用Base64編碼
encoders.encode_base64(mime)
#添加到MIMEMultipart
msg.attach(mime)
try:
print('-----------------------------------------發送郵件開始')
s = smtplib.SMTP()
s.connect(mail_server,mail_port)
s.login(mail_from,mail_pass)
##sendmail的第2個參數郵件地址是個list,所以將字符串用split轉為list
s.sendmail(mail_from,mail_to.split(',')+mail_cc.split(','),msg.as_string())
s.quit()
print('-----------------------------------------發送郵件結束')
except smtplip.SMTPException as e:
print('發送失敗:',e)
return
def main():
try:
print('--Program begin',datetime.datetime.now(),'--------------------------------------')
print(os.environ['LD_LIBRARY_PATH'])
print(os.environ['NLS_LANG'])
data_date = get_data_date()
oret = ora_to_excel(data_date)
##oret = 1
weeks = str(get_weeks())
zfile_name = zipfile_name.replace('{weeks}',weeks)
output_zip_name = zfile_name + '_' + data_date + '.zip'
if oret == 1:
zfile = zip_files(excel_path, output_zip_name, data_date)
###zfile = output_zip_name
if zfile:
if is_mail == 1:
send_mail(zfile)
print('--Program End', datetime.datetime.now(), '--------------------------------------')
except Exception as e:
print(e)
## finally:
## end = input('請輸入回車鍵結束!')
if __name__ == '__main__':
main()
7.測試看是否發送成功 用行腳本就ok
8.配置定時任crontab -e
9.遇到的問題
(1).執行腳本時無法調用模塊,用pip 安裝python模塊報錯
解決方法用國內鏡像安裝
(2)定時任務crontab 遇到的問題 配置好定時任務無法執行,查看沒報錯,執行完卻沒有發送成功,
解決方法,前面加入環境變量,先識別環境變量在執行py,
或將~/bash_profile 和 執行腳本寫入shell腳本,設置定時任務執行shell腳本也可以
10.最后我的crontab 的配置
配置完成,成功發送,哈哈哈哈哈