requests模塊高級操作
- 代理相關的操作
- 驗證碼的識別
- cookie相關操作
- 模擬登錄
一. 代理操作
- 什么是代理?
- 就是代理服務器
- 提供代理的網站:
- 快代理
- 西祠代理
- goubanjia
- 代理的匿名度
- 透明代理: 對方服務器可以知道你使用了代理,並且也知道你的真實ip
- 匿名代理: 對方服務器可以知道你使用了代理,但不知道你的真實ip
- 高匿代理: 對方服務器不知道你使用了代理, 更不知道那你的真實ip
- 代理的類型:
- http: 該類型的代理ip只可以發起http協議頭對應的請求
- https: 該類型的代理ip只可以發起https協議頭對應的請求
- requests的get方法和post方法常用的參數
- url
- headers
- data/params
- proxies
示例:用代理訪問
import random
import requests
https = [
{'https':'223.19.212.30:8380'},
{'https':'221.19.212.30:8380'}
]
http = [
{'http':'223.19.212.30:8380'},
{'http':'221.19.212.30:8380'}
]
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
}
url = 'https://www.baidu.com/s?wd=ip'
if url.split(':')[0] == 'https':
page_text = requests.get(url=url,headers=headers,proxies=random.choice(https)).text
else:
page_text = requests.get(url=url,headers=headers,proxies=random.choice(http)).text
with open('./ip.html','w',encoding='utf-8') as fp:
fp.write(page_text)
二. 驗證碼識別
常用的驗證碼的打碼平台有雲打碼, 打碼兔, 超級鷹,以雲打碼為例
雲打碼使用流程
-
注冊種類(兩種):
- 普通用戶
- 登錄普通用戶,查詢剩余題分,如果不夠需要充值
- 開發者用戶
- 創建一個軟件:我的軟件-》創建一個新軟件(軟件名稱,秘鑰不可以修改),使用軟件的id和秘鑰
- 下載示例代碼:開發文檔-》點此下載:雲打碼接口DLL-》PythonHTTP示例下載
- 普通用戶
下載demo之后,可以看到api的使用示例:
import http.client, mimetypes, urllib, json, time, requests
######################################################################
class YDMHttp:
apiurl = 'http://api.yundama.com/api.php'
username = ''
password = ''
appid = ''
appkey = ''
def __init__(self, username, password, appid, appkey):
self.username = username
self.password = password
self.appid = str(appid)
self.appkey = appkey
def request(self, fields, files=[]):
response = self.post_url(self.apiurl, fields, files)
response = json.loads(response)
return response
def balance(self):
data = {'method': 'balance', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey}
response = self.request(data)
if (response):
if (response['ret'] and response['ret'] < 0):
return response['ret']
else:
return response['balance']
else:
return -9001
def login(self):
data = {'method': 'login', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey}
response = self.request(data)
if (response):
if (response['ret'] and response['ret'] < 0):
return response['ret']
else:
return response['uid']
else:
return -9001
def upload(self, filename, codetype, timeout):
data = {'method': 'upload', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey, 'codetype': str(codetype), 'timeout': str(timeout)}
file = {'file': filename}
response = self.request(data, file)
if (response):
if (response['ret'] and response['ret'] < 0):
return response['ret']
else:
return response['cid']
else:
return -9001
def result(self, cid):
data = {'method': 'result', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey, 'cid': str(cid)}
response = self.request(data)
return response and response['text'] or ''
def decode(self, filename, codetype, timeout):
cid = self.upload(filename, codetype, timeout)
if (cid > 0):
for i in range(0, timeout):
result = self.result(cid)
if (result != ''):
return cid, result
else:
time.sleep(1)
return -3003, ''
else:
return cid, ''
def report(self, cid):
data = {'method': 'report', 'username': self.username, 'password': self.password, 'appid': self.appid, 'appkey': self.appkey, 'cid': str(cid), 'flag': '0'}
response = self.request(data)
if (response):
return response['ret']
else:
return -9001
def post_url(self, url, fields, files=[]):
for key in files:
files[key] = open(files[key], 'rb');
res = requests.post(url, files=files, data=fields)
return res.text
將使用操作封裝為函數
#將示例代碼中的可執行程序封裝成函數
def transformCodeImg(imgPath,imgType):
# 普通用戶名
username = 'bobo328410948'
# 密碼
password = 'bobo328410948'
# 軟件ID,開發者分成必要參數。登錄開發者后台【我的軟件】獲得!
appid = 6003
# 軟件密鑰,開發者分成必要參數。登錄開發者后台【我的軟件】獲得!
appkey = '1f4b564483ae5c907a1d34f8e2f2776c'
# 圖片文件
filename = imgPath
# 驗證碼類型,# 例:1004表示4位字母數字,不同類型收費不同。請准確填寫,否則影響識別率。在此查詢所有類型 http://www.yundama.com/price.html
codetype = imgType
# 超時時間,秒
timeout = 30
result = None
# 檢查
if (username == 'username'):
print('請設置好相關參數再測試')
else:
# 初始化
yundama = YDMHttp(username, password, appid, appkey)
# 登陸雲打碼
uid = yundama.login();
print('uid: %s' % uid)
# 查詢余額
balance = yundama.balance();
print('balance: %s' % balance)
# 開始識別,圖片路徑,驗證碼類型ID,超時時間(秒),識別結果
cid, result = yundama.decode(filename, codetype, timeout);
return result
三. Cookie相關操作
cookie可以是服務端記錄客戶端的相關狀態, 有些網站get請求數據時,需要攜帶cookie
#需求:爬取雪球網中的新聞標題和對應的內容簡介
url = 'https://xueqiu.com/v4/statuses/public_timeline_by_category.json?since_id=-1&max_id=-1&count=10&category=-1'
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'
}
json_obj = requests.get(url=url,headers=headers).json()
print(json_obj)
四.基於multiprocessing.dummy線程池的數據爬取
- 需求:爬取梨視頻的視頻信息,並計算其爬取數據的耗時
普通爬取
import time
import requests
import random
from lxml import etree
import re
from fake_useragent import UserAgent
#安裝fake-useragent庫:pip install fake-useragent
url = 'http://www.pearvideo.com/category_1'
#隨機產生UA,如果報錯則可以添加如下參數:
#ua = UserAgent(verify_ssl=False,use_cache_server=False).random
#禁用服務器緩存:
#ua = UserAgent(use_cache_server=False)
#不緩存數據:
#ua = UserAgent(cache=False)
#忽略ssl驗證:
#ua = UserAgent(verify_ssl=False)
ua = UserAgent().random
headers = {
'User-Agent':ua
}
#獲取首頁頁面數據
page_text = requests.get(url=url,headers=headers).text
#對獲取的首頁頁面數據中的相關視頻詳情鏈接進行解析
tree = etree.HTML(page_text)
li_list = tree.xpath('//div[@id="listvideoList"]/ul/li')
detail_urls = []
for li in li_list:
detail_url = 'http://www.pearvideo.com/'+li.xpath('./div/a/@href')[0]
title = li.xpath('.//div[@class="vervideo-title"]/text()')[0]
detail_urls.append(detail_url)
for url in detail_urls:
page_text = requests.get(url=url,headers=headers).text
vedio_url = re.findall('srcUrl="(.*?)"',page_text,re.S)[0]
data = requests.get(url=vedio_url,headers=headers).content
fileName = str(random.randint(1,10000))+'.mp4' #隨機生成視頻文件名稱
with open(fileName,'wb') as fp:
fp.write(data)
print(fileName+' is over')
基於線程池的爬取
import requests
import random
from lxml import etree
import re
from fake_useragent import UserAgent
#安裝fake-useragent庫:pip install fake-useragent
#導入線程池模塊
from multiprocessing.dummy import Pool
#實例化線程池對象
pool = Pool()
url = 'http://www.pearvideo.com/category_1'
#隨機產生UA
ua = UserAgent().random
headers = {
'User-Agent':ua
}
#獲取首頁頁面數據
page_text = requests.get(url=url,headers=headers).text
#對獲取的首頁頁面數據中的相關視頻詳情鏈接進行解析
tree = etree.HTML(page_text)
li_list = tree.xpath('//div[@id="listvideoList"]/ul/li')
detail_urls = []#存儲二級頁面的url
for li in li_list:
detail_url = 'http://www.pearvideo.com/'+li.xpath('./div/a/@href')[0]
title = li.xpath('.//div[@class="vervideo-title"]/text()')[0]
detail_urls.append(detail_url)
vedio_urls = []#存儲視頻的url
for url in detail_urls:
page_text = requests.get(url=url,headers=headers).text
vedio_url = re.findall('srcUrl="(.*?)"',page_text,re.S)[0]
vedio_urls.append(vedio_url)
#使用線程池進行視頻數據下載
func_request = lambda link:requests.get(url=link,headers=headers).content
video_data_list = pool.map(func_request,vedio_urls)
#使用線程池進行視頻數據保存
func_saveData = lambda data:save(data)
pool.map(func_saveData,video_data_list)
def save(data):
fileName = str(random.randint(1,10000))+'.mp4'
with open(fileName,'wb') as fp:
fp.write(data)
print(fileName+'已存儲')
pool.close()
pool.join()
模擬古詩文登錄
- 爬取古詩文網,模擬登陸
import requests
from lxml import etree
s = requests.Session()
headers = {
"User-Agent":'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
}
# 獲取驗證碼圖片並且讓打碼平台
url = 'https://so.gushiwen.org/user/login.aspx'
page_text = s.get(url=url,headers=headers).text
tree = etree.HTML(page_text)
img_src = 'https://so.gushiwen.org' + tree.xpath('//*[@id="imgCode"]/@src')[0]
img_data = s.get(url=img_src,headers=headers).content
with open('./gushiwen.jpg','wb') as f:
f.write(img_data)
result = transformCodeImg('./gushiwen.jpg',1004)
__VIEWSTATE = tree.xpath('//*[@id="__VIEWSTATE"]/@value')[0]
__VIEWSTATEGENERATOR = tree.xpath('//*[@id="__VIEWSTATEGENERATOR"]/@value')[0]
# 模擬登陸
post_url = 'https://so.gushiwen.org/user/login.aspx?from=http%3a%2f%2fso.gushiwen.org%2fuser%2fcollect.aspx'
data = {
"__VIEWSTATE": __VIEWSTATE,
"__VIEWSTATEGENERATOR": __VIEWSTATEGENERATOR,
"from": "http://so.gushiwen.org/user/collect.aspx",
"email":"hey2380@163.com",
"pwd": "123456",
"code": result,
"denglu": "登錄",
}
response = s.post(url=post_url,headers=headers,data=data)
page_text = response.text
print(response.status_code)
# print(page_text)
print(result)
page_text = response.text
with open('./gushi.html','w',encoding='utf-8') as f:
f.write(page_text)