好長時間沒有更新博客了,哈哈。
今天公司給了這么一個需求,現在我們需要去淘寶獲取上一天的訂單號,然后再根據訂單號去另一個接口去獲取訂單詳情,然后再給我展示到web!
中間涉及到的技術點有:
- 模擬登陸
- 模擬下載
- 解析exal文件數據流
- 讀取exal文件,拿出訂單號
- 還有最后一點請求接口
下面就給大家挨個說一下,剛拿到需求其實還是很模糊的,因為一個都沒做過,等靜下心來去理解的時候,發現並沒有那么難,反而很簡單
模擬登陸
一、分析頁面請求頭
本次登陸地址是https://huoche.alitrip.com/hello.htm
1、先登陸了一遍查看了一下請求頭,發現就攜帶了三個東西,隱藏token,用戶名,密碼

一看一目了然,就一個后台頁面,可想而知相對來說還是很簡單,哈哈,下一步我只需要封裝一下cookie,然后帶上tocken,username,passwd去登陸咯
給大家說下,python的requests模塊可以忽略cookie,自己創建一個session對象,他自己去給咱們匹配cookie,不用去挨個試cookie,這樣就節省了好多代碼和時間
2、代碼如下
class TbTomas(object): def __init__(self): # 配置初始化 self.session_obj = requests.session() def download_file(self,thomas_username,thomas_password,): hello_url = 'https://huoche.alitrip.com/hello.htm' # 獲取原文 hello_response = self.session_obj.get(hello_url) # 正則匹配原文 h_u_s = re_search('<input type="hidden" id="h_u_s" name="h_u_s" value="(.*?)">', hello_response.text) h_u_s = base64.b64encode(h_u_s) headers = { 'Accept': 'text/html, application/xhtml+xml, image/jxr, */*', 'Referer': 'https://huoche.alitrip.com/hello.htm', 'Accept-Language': 'zh-CN', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10586', 'Content-Type': 'application/x-www-form-urlencoded', 'Accept-Encoding': 'gzip, deflate', 'Host': 'huoche.alitrip.com', 'Content-Length': '73', 'Connection': 'Keep-Alive', 'Cache-Control': 'no-cache' } post_data = { 'h_u_s': base64.b64encode(h_u_s), 'h_u_n': thomas_username, 'h_u_p': base64.b64encode(thomas_password) } index_url = 'https://huoche.alitrip.com/index.htm' index_response = self.session_obj.post(index_url, headers=headers, data=post_data)
最后一提交post請求,就可以判斷有沒有登錄成功了,是不是很簡單,哈哈!
數據下載
下載也是和登錄是一樣的道理,下載的時候肯定也是像網頁發一個post請求,然后就回去下載exal文件咯,python有這么一個模塊xlrd,可以去操作exal文件,非常方便
1、原文是讓我們輸入時間看,下載那一天的數據,領導給的任務是下載前一天的,所以上一天時間要寫幾行代碼來實現

代碼如下:
today = datetime.datetime.now() yesterday = today + datetime.timedelta(days=-1) trade_date = yesterday.strftime('%Y-%m-%d')
2、查看下載文件請求的url,以及提交的數據,一張圖一切都明白了

從圖中可以看到,該文發送的url,請求方式,請求頭,和返回的數據
3、模擬請求下載,只需用提交一下日期就OK搞定,文件下載完畢,接下開要讀文件拿自己想要的東西啦
post_data = { 'orderExportDate': trade_date } sheet_content = "" for _ in xrange(3): try: # 得到exal文件流 download_response = self.session_obj.post(download_url, data=post_data) # 打開exal文件 xls_content = xlrd.open_workbook(file_contents=download_response.content) sheet_content = xls_content.sheets()[0] break except Exception as e: continue
4、這個就眾所周知,和讀取文件一樣,for循環一行一行讀取,然后把訂單號挨個添加給一個列表啥啦亂七八糟的
order_item = [] for line_num in range(sheet_content.nrows): line_item = sheet_content.row_values(line_num) if line_item[2]: order_item.append(line_item[2], ) # 訂單號 order_no # 獲取到所有訂單號 order_item = order_item[1:]
拿到訂單號要去獲取訂單詳情了,但是領導給我說這個已經有同事寫好代碼了,只需要調用那個接口就好,所以別人的代碼我就不往上面展示了,原理很簡單
requests模塊,請求url,get傳入訂單號,發送請求,就可以返回數據咯,web頁面展示,那個需求,每個公司都不一樣,存入數據庫,自己取自己想要的吧。
本文就到這里吧,學到一點東西的請點贊,哈哈
最后附帶源碼,用戶名和密碼就不告訴大家啦,啊哈哈
#!/usr/bin/python # coding:utf-8 import sys import os import django reload(sys) sys.setdefaultencoding('utf8') sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 把manage.py所在目錄添加到系統目錄 os.environ['DJANGO_SETTINGS_MODULE'] = 'business.settings' # 設置setting文件 django.setup() # 初始化Django環境 import requests import re import logging import base64 import xlrd import datetime import time import MySQLdb import threadpool from business import settings from train.depends.platform import Platform from train.models import TbTomasOrder,TbTomasEpay,TtTicketThomas,TbTomasLinkman from train import utils from train.status import OrderStatus from django.core.mail import EmailMultiAlternatives from train.busi import insert_order,insert_ticket,insert_epay,insert_linkman logger = logging.getLogger('django') class TbTomas(object): succ_number = 0 fail_number = 0 fail_order = [] def __init__(self,thread_num = 3): # 配置初始化 self.session_obj = requests.session() self.fail_order = [] self.succ_number = 0 self.fail_number = 0 self.thread_num = thread_num self.start_date = "" self.end_date = "" self.trade_date = utils.now() def login_thomas(self,thomas_username,thomas_password): hello_url = 'https://huoche.alitrip.com/hello.htm' hello_response = self.session_obj.get(hello_url) h_u_s = re_search('<input type="hidden" id="h_u_s" name="h_u_s" value="(.*?)">', hello_response.text) h_u_s = base64.b64encode(h_u_s) headers = { 'Accept': 'text/html, application/xhtml+xml, image/jxr, */*', 'Referer': 'https://huoche.alitrip.com/hello.htm', 'Accept-Language': 'zh-CN', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10586', 'Content-Type': 'application/x-www-form-urlencoded', 'Accept-Encoding': 'gzip, deflate', 'Host': 'huoche.alitrip.com', 'Content-Length': '73', 'Connection': 'Keep-Alive', 'Cache-Control': 'no-cache' } post_data = { 'h_u_s': base64.b64encode(h_u_s), 'h_u_n': thomas_username, 'h_u_p': base64.b64encode(thomas_password) } index_url = 'https://huoche.alitrip.com/index.htm' index_response = self.session_obj.post(index_url, headers=headers, data=post_data) logger.info(u"登陸成功,等待下載文件...") def download_file(self,thomas_username,thomas_password,args): for _ in xrange(3): try: self.login_thomas(thomas_username,thomas_password) break except Exception as e: logger.error(e) continue # 處理時間 all_time = self.date_time_handle(args) if not all_time: logger.error(u"日期格式錯誤!!") return for trade_date in all_time: try: self.trade_date = trade_date post_data = { 'orderExportDate': trade_date } download_url = 'https://huoche.alitrip.com/orderlistexp.do' sheet_content = "" for _ in xrange(3): try: # 得到exal文件流 download_response = self.session_obj.post(download_url, data=post_data) # 打開exal文件 xls_content = xlrd.open_workbook(file_contents=download_response.content) sheet_content = xls_content.sheets()[0] logger.info(u"下載文件成功,正在拿取訂單號") break except Exception as e: logger.error(u"下載文件超時,正在等待重新登錄后下載...") self.login_thomas(thomas_username, thomas_password) continue order_item = [] if not sheet_content: logger.error(u'下載文件失敗,正在重新登錄...') continue for line_num in range(sheet_content.nrows): line_item = sheet_content.row_values(line_num) if line_item[2] and line_item[2] not in order_item: order_item.append(line_item[2], ) # 訂單號 order_no # 獲取到所有訂單號 order_item = order_item[1:] # 根據訂單號去拿訂單詳情 logger.info(u"正在寫入數據庫") # 多線程去執行 pool = threadpool.ThreadPool(self.thread_num) reqs = threadpool.makeRequests(self.create_order_info, order_item) [pool.putRequest(req) for req in reqs] pool.wait() logger.info(u'寫入完成,完成時間為:%s'% self.trade_date) content = self.add_content(len(order_item), self.succ_number, self.fail_number, self.fail_order) self.send_mail(content=content) self.succ_number,self.fail_order = 0,0 self.fail_order = [] # self.create_order_info(order_item) except Exception as e: logger.error(e) def date_time_handle(self,args): all_time = [] if args: if len(args) == 1: self.start_date = datetime.datetime.strptime(args[0], "%Y-%m-%d").date() self.end_date = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d").date() elif len(args) == 2: self.start_date = datetime.datetime.strptime(args[0], "%Y-%m-%d").date() self.end_date = datetime.datetime.strptime(args[1], "%Y-%m-%d").date() elif len(args) == 3: self.start_date = datetime.datetime.strptime(args[0], "%Y-%m-%d").date() self.end_date = datetime.datetime.strptime(args[1], "%Y-%m-%d").date() self.thread_num = int(args[2]) else: logger.error(u"傳入參數錯誤,請重新執行") return i = 0 while True: tomoary = self.start_date + datetime.timedelta(days=i) trade_date = tomoary.strftime('%Y-%m-%d') all_time.append(trade_date) i += 1 if tomoary == self.end_date: break else: today = datetime.datetime.now() yesterday = today + datetime.timedelta(days=-1) trade_date = yesterday.strftime('%Y-%m-%d') all_time.append(trade_date) return all_time def create_order_info(self, order): platform_obj = Platform() order_info = platform_obj.get_order(order) if not order_info: self.fail_order.append(order) self.fail_number += 1 logger.error('獲取訂單號:[%s]失敗'%order) return try: # 插入order表 if TbTomasOrder.objects.filter(order_no=order).exists(): logger.error('訂單號:[%s]已經存在於TbTomasOrder'%order) self.fail_order.append(order) self.fail_number += 1 return else: insert_order(order_info,order,self.trade_date) self.succ_number += 1 # 插入ticket表 insert_ticket(order_info,order,self.trade_date) # 插入聯系人 if TbTomasLinkman.objects.filter(order_no=order).exists(): logger.error('訂單號:[%s]已經存在於TbTomasLinkman'%order) else: insert_linkman(order_info,order,self.trade_date) # 插入epay表 if TbTomasEpay.objects.filter(order_no=order).exists(): logger.error('訂單號:[%s]已經存在於TbTomasEpay'%order) else: insert_epay(order_info,order,self.trade_date) except Exception as e: logger.error(e) self.fail_number +=1 def add_content(self,total,succ_number,fail_number,fail_order): content = u''' <h3>托馬斯導入訂單報表</h3> <div class="col-xs-12"> <table border="1" cellpadding="3" cellspacing="1"> <tr> <td>日期</td> <td>總單數</td> <td>成功單數</td> <td>失敗單數</td> <td>失敗訂單號</td> </tr> <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> </tr> </table> </div> '''%(datetime.datetime.now().strftime("%Y-%m-%d %H:%M"),total,succ_number,fail_number,fail_order) return content def send_mail(self, content): time_target = self.trade_date subject = u'托馬斯數據抓取郵件 %s' % (time_target) logger.info(u'准備發送郵件....%s', subject) mail_address = settings.mail_address_thomas to_addr = [] if isinstance(mail_address, list): to_addr += mail_address elif isinstance(mail_address, str): to_addr.append(mail_address) logger.debug(to_addr) from_email = settings.DEFAULT_FROM_EMAIL msg = EmailMultiAlternatives(subject, 'result', from_email, to_addr) msg.attach_alternative(content, "text/html") flag = msg.send() if flag: logger.info(u'%s發送成功', subject) else: logger.error(u'%s發送失敗', subject) return def run(self, username,passwd,args): # 登陸托馬斯后台 for _ in xrange(3): try: self.download_file(username,passwd,args) break except Exception as e: logger.error(e) continue def re_search(regex, subject): subject = str(subject) obj = re.compile(regex) match = obj.search(subject) if match: result = match.group(1) else: result = '' return result def main(): username = base64.b64decode(settings.THOMAS_USERNAME) passwd = base64.b64decode(settings.THOMAS_PASSWORD) args = sys.argv[1:] if sys.argv[1:] else "" TbTomas().run(username,passwd,args) if __name__ == "__main__": main()
