原創. 禁轉.
大批量請求發送需要考慮的幾個因素:
1. 服務器承載能力(網絡帶寬/硬件配置);
2. 客戶端IO情況, 客戶端帶寬, 硬件配置;
方案:
1. 方案都是相對的;
2. 因為這里我的情況是客戶機只有一台,所以不能考慮使用分布式了, 服務器承載能力也非常有限(經過不斷調試得知);
3. 這里沒有使用Jmeter, 雖然jmeter也是可以做到的.
注: 如無特殊說明以下代碼基於windows7 64位/Centos 6.5 64位, Python3.6+
Python里面支持發送大批量的方案有很多, 這里只介紹我所用過的幾種:
1. 使用grequests:
grequests可以一次性發送超大批量的請求, 但是底層聽說修改了socket通信, 可能不穩定或者不安全? 而且如果你需要校驗對比每個請求的發送信息與返回信息, 比較不方便, 因為它是批量發送,然后批量收回, 示例代碼 :
import grequests import time from collections import OrderedDict import hashlib import os import xlrd import json import datetime class voiceSearchInterface: @classmethod def formatUrlAndHeader(self, des, singer, songName): #生成url和header的邏輯 return url, h @classmethod def exeRequests(self): errorLog = open("errorLog.txt","w+") startTime = datetime.datetime.now() rightCount = 0 errCount = 0 descr = ["播放", "搜索", "搜", "聽", "我要聽", "我想聽", "來一首", "來一個", "來一段", "來一曲", "來首", "來個", "來段", "來曲"] orgPath = os.path.join(os.path.dirname(os.getcwd()), "test","SongsAndSingers","singersAndSongs3.txt") f = open(orgPath,"rb") i = 0 urlsAndHs = [] for line in f.readlines(): temp = line.decode().split("\t") orgSinger = temp[0] orgSong = temp[1].replace("\n","") for k in descr: urlAndH = self.formatUrlAndHeader(k, orgSinger,orgSong) urlsAndHs.append(urlAndH) f.close() rs = (grequests.get(u[0], headers = u[1], stream = False) for u in urlsAndHs) rsText = grequests.imap(rs, size=20) for r in rsText: executingLog = open("Log.txt","w+") i+=1 try: searchResult = json.loads(r.text) searchItem = searchResult["data"]["searchitem"] tt = searchItem.split("Split") searchSinger = tt[1] searchSong = tt[-1] resultSinger = searchResult["data"]["sounds"][0]["singer"] resultSong = searchResult["data"]["sounds"][0]["title"] if(searchSinger==resultSinger and searchSong==resultSong): rightCount += 1 else: errCount += 1 print(searchSinger, "\t",resultSinger, "\t",searchSong,"\t", resultSong) except Exception: errCount += 1 errorLog.write((r.text+"\n").encode('latin-1').decode('unicode_escape')) print(i) executingLog.write(str(int(i/14))) errorLog.close() executingLog.close() endTime = datetime.datetime.now() print("耗時: %d秒, 正確數: %d, 異常數: %d, 總數: %d, 通過率: %.2f%%" % ((endTime-startTime).seconds, rightCount, errCount, i, (rightCount)/i*100)) voiceSearchInterface.exeRequests()
注意: 使用grequests可能有坑, 因為它修改了底層socket通信, 可能會造成系統有問題,我目前雖然還沒遇到,但還是在這里友情提醒下.
2. 使用多進程+requests庫:
Python里面的多進程庫multiprocessing和requests庫都是神器, 下面直接上代碼:
#_*_coding=utf-8_*_ import multiprocessing import time from collections import OrderedDict import hashlib import linecache import os import requests import json def formatUrlAndHeader(des, singer, songName): #生成url和header的邏輯 return url, h #每個進程都去讀各自的文件,然后以寫文件的方式保存當前的執行記錄,為了預防斷電或者其他程序異常終止情況 def worker(fileName): descr = ["播放", "搜索", "搜", "聽", "我要聽", "我想聽", "來一首", "來一個", "來一段", "來一曲", "來首", "來個", "來段", "來曲"] Logprefix = os.path.split(fileName)[1].replace(".txt", "") resultLogPath = os.path.join(os.getcwd(), "log", Logprefix+".log") logbreakPoint = os.path.join(os.getcwd(), "log", Logprefix+".txt") with open(logbreakPoint, "r") as b: startLine = int(b.read()) b.close() with open(resultLogPath, "a+", encoding="utf-8") as logF: with open(fileName, "r", encoding="utf-8") as f: lines = f.readlines() f.close() LineNum = startLine for j in range(len(lines)-startLine+1): LineContent = linecache.getline(fileName, LineNum) for i in descr: line = LineContent.split("\t") singer = line[0] song = line[1].replace("\n","") uAndH = formatUrlAndHeader(i, singer, song) try: r = requests.get(url=uAndH[0], headers = uAndH[1]) with open(logbreakPoint, "w") as w: w.write(str(LineNum)) print("searching:%s, line: %d\n" % (fileName, LineNum)) result = json.loads(r.text) resultSinger = result["data"]["sounds"][0]["singer"] resultSong = result["data"]["sounds"][0]["title"] if not (resultSinger==singer and resultSong==song): logF.write("Error: search des: %s, singer:%s, song:%s;return: %s\n" %(i,singer,song, r.text.encode('latin-1').decode('unicode_escape'))) except Exception as e: logF.write("Error: search des: %s, singer:%s, song:%s;return: %s\n" %(i,singer,song,str(e).encode('latin-1').decode('unicode_escape'))) LineNum += 1 logF.close() if __name__=='__main__': orgPath = os.path.join(os.getcwd(), "data") files = os.listdir(orgPath) for i in files: f =os.path.join(orgPath,i) if os.path.isfile(f): p = multiprocessing.Process(target=worker, args=(f,)) p.start()
程序會根據數據源文件數量, 生成相應的進程數. 每個進程各自讀各自的數據源文件, 然后調用formatUrlAndHeader方法獲取url和heade, 挨個發送請求並保存當前執行記錄到指定文件. 這種方式的好處在於針對每個請求, 都能對比發送前的參數和收回的請求相應數據.
3. 使用異步asyncio, aiohttp
asyncio是python3.4+才進入的新東西, 是Python3.4+以上的標准庫, 是推薦采用的方式, 而aiohttp需要單獨安裝, 代碼如下:
#_*_coding=utf-8_*_ import aiohttp import time from collections import OrderedDict import hashlib import asyncio import os import linecache import threading def formatUrlAndHeader(des, singer, songName): #生成url和header的邏輯 return url, h async def fetch_async(uandh): u, h = uandh[0],uandh[1] with aiohttp.Timeout(301): async with aiohttp.request('GET', url=u, headers=h) as r: data = await r.text() return data loop = asyncio.get_event_loop() descr = ["播放", "搜索", "搜", "聽", "我要聽", "我想聽", "來一首", "來一個", "來一段", "來一曲", "來首", "來個", "來段", "來曲"] orgPath = os.path.join(os.path.dirname(os.getcwd()), "test","SongsAndSingers","singersAndSongs3.txt") def runRequests(startNum): start = time.time() urlsAndHs = [] for i in range(20): line = linecache.getline(orgPath, startNum+i).split("\t") orgSinger = line[0] orgSong = line[1].replace("\n","") for k in descr: urlAndH = formatUrlAndHeader(k, orgSinger,orgSong) urlsAndHs.append(urlAndH) linecache.clearcache() tasks = [fetch_async(uandh) for uandh in urlsAndHs] done, pending = loop.run_until_complete(asyncio.wait(tasks)) for i in done: print(i.result().encode('latin-1').decode('unicode_escape')) end = time.time() print(end-start) for i in range(1,50,20): t = threading.Thread(target=runRequests, args=(i,)) t.start() t.join()
一個源數據文件, 多線程. 每個線程根據傳入的起始行號連續讀取文件的20行, 然后批量發送20個請求, 下一個線程必須等待上一個線程結束才開始. 這種方式也是批量發, 批量收回,不能單獨對比每個請求的請求前參數, 請求后相應.
以上3種方式, 任何一種都能滿足我的測試要求. 實際過程中發現:
1. PHP接口對於單個請求, 參數pagesize對相應速度影響甚大, 具體原因未知;
2. 服務器對IO密集型的操作, 非常消耗CPU. 以上3種方式, 基本上都是每次只發20個請求左右, 而服務器的CPU(8核)已經滿載了!
