本人的程序是在mac上寫的,windows的話可能略有不同主要是PhantomJS的路徑上。首先要下載PhantomJS,然后創建一個到/usr/bin/phantomsjs的軟鏈。為什么用selenium和PhantomJS是因為,公司是做電商的,頁面很多都是ajax異步渲染出來的,使用urllib或者requests是無法渲染異步頁面的,而PhantomJS是一個沒有界面的瀏覽器,使用webkit瀏覽器內核(號稱Safari也是用這個內核)。可以完全模擬渲染和點擊拖動等動作。下面是我的主要代碼,涉及到一些用戶密碼我都去掉了,代碼就放在下面,其實也不難看懂,從登錄,商品頁面,下單,最后交易等。可以根據自己的需求添加,好像selenium是有自帶的頁面檢查機制的,我找了下沒找到就自己寫了個,主要就是那個 while wait_browser。
# -*- coding: utf-8 -*- from selenium import webdriver import time import re from monitor_business import business import requests import random from user_config import username_info from selenium.common.exceptions import NoSuchElementException class autocreateorder(): def __init__(self,_username,_loginurl): self.username=_username self.loginurl=_loginurl def login(self): try: self.browser = webdriver.PhantomJS(desired_capabilities={'phantomjs.page.settings.resourceTimeout': '15000'}) except Exception: print "PhantomJS init fail" self.browser.get(self.loginurl) try: #print "正在登錄" browserlogin = self.browser.find_element_by_class_name("j_login-form") browserusername = self.browser.find_element_by_id("username-input") browserusername.click() browserusername.clear() browserusername.send_keys(self.username) browserpassword = self.browser.find_element_by_id("password-input") browserpassword.click() browserpassword.clear() browserpassword.send_keys(u'xxxxx') browserlogin.submit() #print "登錄成功" except NoSuchElementException: print "PhantomJS login fail" return "fail" def productaddcart(self,_producturlurl,timeout,jscmd=None): if _producturlurl: self.browser.get(_producturlurl) print "open product url" time.sleep(timeout) self.browser.execute_script(jscmd) n=0 wait_browser=True while wait_browser: try: self.browser.find_element_by_class_name("joy-ui-dialog-alert-window") wait_browser=False #print "stop product url check" except NoSuchElementException: if n == 11: print "add cart timeout" self.browser.quit() return "showproductfail" else: #print "wait time to check product url" time.sleep(1) n=n+1 else: print "please config product url" def showcart(self): self.browser.get("http://shop.m.xxxxx.net/shopCart.html") #browser.get_screenshot_as_file("/Users/zhangsongbin/Desktop/screenshot1.png") print "show cart" n=0 wait_browser=True while wait_browser: try: self.browser.find_element_by_class_name("order-submit") wait_browser=False except NoSuchElementException: if n == 11: print "show cart timeout" self.browser.quit() return "showcartfail" else: time.sleep(1) n=n+1 def createorder(self): jscmd="$('.j_Submit').trigger('click');" self.browser.execute_script(jscmd) #browser.get_screenshot_as_file("/Users/zhangsongbin/Desktop/screenshot1.png") print "click cart make order" n=0 wait_browser=True while wait_browser: try: self.browser.find_element_by_class_name("j_Submit") wait_browser=False except NoSuchElementException: if n == 11: print "confirm order" self.browser.quit() return "createorderfail" else: time.sleep(1) n=n+1 #browser.get_screenshot_as_file("/Users/zhangsongbin/Desktop/確認訂單.png") time.sleep(5) jscmd="$('.j_Submit').trigger('click');" self.browser.execute_script(jscmd) def checkcommission(self): wait_browser=True n=0 while wait_browser: check_order_number=re.findall('<input type="hidden" value="(\d+)" name="orderNumber" class="j_OrderNumber">',self.browser.page_source) if check_order_number: wait_browser=False order_number=check_order_number[0] self.browser.quit() requests.get("http://shop.m.xxxxx.net/Fake_Pay?orderNumber="+str(order_number)) time.sleep(30) check_commission=business.monitor_commission(order_number)#訂單的狀態檢查。 if check_commission == "order_fail": print str(order_number)+":commission missing" else: print str(order_number)+":commission ok" elif n == 11: print "pay url timeout" self.browser.quit() return "payfail" else: time.sleep(1) n=n+1 def makeorder(): stratint=0 endint=len(username_info) inputusername=username_info[random.randrange(stratint,endint)]#我在其他文件中有個用戶名的集合,這里是隨機調用一個用戶出來登錄 order=autocreateorder(inputusername,"http://login.xxxxx.net/?service=http%3A%2F%2Fm.xxxxx.net&redirect_uri=%2Fm%2F") order.login() order.productaddcart("http://shop.m.xxxxx.net/shop/sku/18613.html",10,jscmd="$('.j_AddCart').trigger('click');") order.showcart() order.createorder() order.checkcommission() def makemutilorder(): stratint=0 endint=len(username_info) inputusername=username_info[random.randrange(stratint,endint)] order=autocreateorder(inputusername,"http://login.xxxxx.net/?service=http%3A%2F%2Fm.xxxxx.net&redirect_uri=%2Fm%2F") order.login() productlist=["http://shop.m.xxxxx.net/shop/sku/18613.html","http://shop.m.xxxxx.net/shop/sku/146991.html","http://shop.m.xxxxx.net/shop/sku/149104.html"] for url in productlist: productpage=order.productaddcart(url,15,jscmd="$('.j_AddCart').trigger('click');") if productpage == "showproductfail": print "showproductfail" order.showcart() order.createorder() order.checkcommission()
下面在放一個調用上面這段代碼的程序,可以用來做壓測,前提是找台好點的linux服務器,因為這個不是html的爬蟲,涉及到完全真實的渲染(當然了,如果沒有異步的玩意,誰願意用這個啊,壓測請求接口不是更好?,除了測試人員)
# -*- coding: utf-8 -*- import time import threading import multiprocessing from webkit_ghost_test import makeorder,makemutilorder def performancetest(_groups,_interval,_members): _groups=int(_groups) while _groups > 0: for i in range(_members):#一組下單多少次 make = multiprocessing.Process(target=makemutilorder) make.start() #for n in tsk: #n.join() #print "組間下單:倒敘"+str(_groups)+"次" _groups=_groups-1 time.sleep(_interval) if __name__ == '__main__': url="http://m.xxxxx.net" ordertimes=float(10) #規定時間內下單次數,后面要有小數 times=float(1) #以秒為單位規定測試時長,后面要有小數 groups=float(1) #組內單位是同時下單,組間單位是間隔下單 members=int(ordertimes/groups)#組內下單數量 interval=times/groups #組間間隔時間 print groups,interval,members time_wait = threading.Event() performancetest(groups,interval,members)