用Python做大批量請求發送


原創. 禁轉.

大批量請求發送需要考慮的幾個因素:

1. 服務器承載能力(網絡帶寬/硬件配置);

2. 客戶端IO情況, 客戶端帶寬, 硬件配置;

 

方案:

1. 方案都是相對的;

2. 因為這里我的情況是客戶機只有一台,所以不能考慮使用分布式了, 服務器承載能力也非常有限(經過不斷調試得知);

3. 這里沒有使用Jmeter, 雖然jmeter也是可以做到的.  

 

注: 如無特殊說明以下代碼基於windows7 64位/Centos 6.5 64位, Python3.6+

Python里面支持發送大批量的方案有很多, 這里只介紹我所用過的幾種:

1. 使用grequests:

grequests可以一次性發送超大批量的請求, 但是底層聽說修改了socket通信, 可能不穩定或者不安全? 而且如果你需要校驗對比每個請求的發送信息與返回信息, 比較不方便, 因為它是批量發送,然后批量收回, 示例代碼 :

 

import grequests
import time
from collections import OrderedDict
import hashlib
import os
import xlrd
import json
import datetime

class voiceSearchInterface:
    @classmethod
    def formatUrlAndHeader(self, des, singer, songName):
        #生成url和header的邏輯
        return url, h
    
    @classmethod
    def exeRequests(self):
        errorLog = open("errorLog.txt","w+")
        startTime = datetime.datetime.now()
        rightCount = 0
        errCount = 0
        descr = ["播放", "搜索", "", "", "我要聽", "我想聽", "來一首", "來一個", "來一段", "來一曲", "來首", "來個", "來段", "來曲"]
        orgPath = os.path.join(os.path.dirname(os.getcwd()), "test","SongsAndSingers","singersAndSongs3.txt")
        f = open(orgPath,"rb")
        i = 0
        urlsAndHs = []
        for line in f.readlines():
            temp = line.decode().split("\t")
            orgSinger = temp[0]
            orgSong = temp[1].replace("\n","")
            for k in descr:
                urlAndH = self.formatUrlAndHeader(k, orgSinger,orgSong)
                urlsAndHs.append(urlAndH)
        f.close()
        rs = (grequests.get(u[0], headers = u[1], stream = False) for u in urlsAndHs)
        rsText = grequests.imap(rs, size=20)
        for r in rsText:
            executingLog = open("Log.txt","w+") 
            i+=1
            try:   
                searchResult = json.loads(r.text)
                searchItem = searchResult["data"]["searchitem"]
                tt =  searchItem.split("Split")
                searchSinger = tt[1]
                searchSong = tt[-1]
                resultSinger = searchResult["data"]["sounds"][0]["singer"]
                resultSong = searchResult["data"]["sounds"][0]["title"]
                if(searchSinger==resultSinger and searchSong==resultSong):
                    
                    rightCount += 1
                else:
                    
                    errCount += 1
                    print(searchSinger, "\t",resultSinger, "\t",searchSong,"\t", resultSong)
            except Exception:
                errCount += 1
                errorLog.write((r.text+"\n").encode('latin-1').decode('unicode_escape'))
            print(i)
            executingLog.write(str(int(i/14)))
        errorLog.close()
        executingLog.close()
        endTime = datetime.datetime.now()
        print("耗時: %d秒, 正確數: %d, 異常數: %d, 總數: %d, 通過率: %.2f%%" % ((endTime-startTime).seconds, rightCount, errCount,  i, (rightCount)/i*100))

voiceSearchInterface.exeRequests()

 注意: 使用grequests可能有坑, 因為它修改了底層socket通信, 可能會造成系統有問題,我目前雖然還沒遇到,但還是在這里友情提醒下.

 

2. 使用多進程+requests庫:

Python里面的多進程庫multiprocessing和requests庫都是神器, 下面直接上代碼:

#_*_coding=utf-8_*_
import multiprocessing
import time
from collections import OrderedDict
import hashlib
import linecache
import os
import requests
import json

def formatUrlAndHeader(des, singer, songName):
    #生成url和header的邏輯
    return url, h

#每個進程都去讀各自的文件,然后以寫文件的方式保存當前的執行記錄,為了預防斷電或者其他程序異常終止情況
def worker(fileName):  
    descr = ["播放", "搜索", "", "", "我要聽", "我想聽", "來一首", "來一個", "來一段", "來一曲", "來首", "來個", "來段", "來曲"]
    Logprefix = os.path.split(fileName)[1].replace(".txt", "")
    resultLogPath = os.path.join(os.getcwd(), "log", Logprefix+".log")
    logbreakPoint = os.path.join(os.getcwd(), "log", Logprefix+".txt")
    with open(logbreakPoint, "r") as b:
        startLine = int(b.read())
        b.close()
    with open(resultLogPath, "a+", encoding="utf-8") as logF:
        with open(fileName, "r", encoding="utf-8") as f:
            lines = f.readlines()
            f.close()
            LineNum = startLine
            for j in range(len(lines)-startLine+1):
                LineContent = linecache.getline(fileName, LineNum)
                for i in descr:
                    line = LineContent.split("\t")
                    singer = line[0]
                    song = line[1].replace("\n","")
                    uAndH = formatUrlAndHeader(i, singer, song)
                    try:
                        r = requests.get(url=uAndH[0], headers = uAndH[1])
                        with open(logbreakPoint, "w") as w:
                            w.write(str(LineNum))
                        print("searching:%s, line: %d\n" % (fileName, LineNum))
                        result = json.loads(r.text)
                        resultSinger = result["data"]["sounds"][0]["singer"]
                        resultSong = result["data"]["sounds"][0]["title"]
                        if not (resultSinger==singer and resultSong==song):
                            logF.write("Error: search des: %s, singer:%s, song:%s;return: %s\n" %(i,singer,song, r.text.encode('latin-1').decode('unicode_escape')))
                    except Exception as e:
                        logF.write("Error: search des: %s, singer:%s, song:%s;return: %s\n" %(i,singer,song,str(e).encode('latin-1').decode('unicode_escape')))
                LineNum += 1
        logF.close()

if __name__=='__main__':
    orgPath = os.path.join(os.getcwd(), "data")
    files = os.listdir(orgPath)
    for i in files:
        f =os.path.join(orgPath,i)
        if os.path.isfile(f):
            p = multiprocessing.Process(target=worker, args=(f,))
            p.start()

程序會根據數據源文件數量, 生成相應的進程數. 每個進程各自讀各自的數據源文件, 然后調用formatUrlAndHeader方法獲取url和heade, 挨個發送請求並保存當前執行記錄到指定文件. 這種方式的好處在於針對每個請求, 都能對比發送前的參數和收回的請求相應數據.

 

3. 使用異步asyncio, aiohttp

asyncio是python3.4+才進入的新東西, 是Python3.4+以上的標准庫, 是推薦采用的方式, 而aiohttp需要單獨安裝, 代碼如下:

#_*_coding=utf-8_*_
import aiohttp
import time
from collections import OrderedDict
import hashlib
import asyncio
import os
import linecache
import threading

def formatUrlAndHeader(des, singer, songName):
    #生成url和header的邏輯
    return url, h
    

async def fetch_async(uandh):
    u, h = uandh[0],uandh[1]
    with aiohttp.Timeout(301):
        async with aiohttp.request('GET', url=u, headers=h) as r:
            data = await r.text()
            return data
    
 
loop = asyncio.get_event_loop()
descr = ["播放", "搜索", "", "", "我要聽", "我想聽", "來一首", "來一個", "來一段", "來一曲", "來首", "來個", "來段", "來曲"]
orgPath = os.path.join(os.path.dirname(os.getcwd()), "test","SongsAndSingers","singersAndSongs3.txt")

def runRequests(startNum):
    start = time.time()
    urlsAndHs = []
    for i in range(20):
        line = linecache.getline(orgPath, startNum+i).split("\t")
        orgSinger = line[0]
        orgSong = line[1].replace("\n","")
        for k in descr:
            urlAndH = formatUrlAndHeader(k, orgSinger,orgSong)
            urlsAndHs.append(urlAndH)
    linecache.clearcache()
    tasks = [fetch_async(uandh) for uandh in urlsAndHs]
    done, pending = loop.run_until_complete(asyncio.wait(tasks))
    for i in done:
        print(i.result().encode('latin-1').decode('unicode_escape'))
    end = time.time()
    print(end-start)   

for i in range(1,50,20):
    t = threading.Thread(target=runRequests, args=(i,))
    t.start()
    t.join()

一個源數據文件, 多線程. 每個線程根據傳入的起始行號連續讀取文件的20行, 然后批量發送20個請求, 下一個線程必須等待上一個線程結束才開始. 這種方式也是批量發, 批量收回,不能單獨對比每個請求的請求前參數, 請求后相應.

 

以上3種方式, 任何一種都能滿足我的測試要求. 實際過程中發現:

1. PHP接口對於單個請求, 參數pagesize對相應速度影響甚大, 具體原因未知; 

2. 服務器對IO密集型的操作, 非常消耗CPU. 以上3種方式, 基本上都是每次只發20個請求左右, 而服務器的CPU(8核)已經滿載了!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM