目錄
使用的主要庫及工具
requests
aiohttp
lxml
Beautiful Soup
pyquery
asyncio
fake_useragent
pymongo
MongoDB
python3.7
一.內容簡介
- 分析網頁代碼,爬取網頁信息(爬取10頁信息);
- 體驗下利用不同的解析庫來獲取代理(
IP:Port
和類型); - 對獲取的代理進行測試篩選;
- 將篩選成功的代理存入MongoDB。
二. 過程
(一) 分析http://www.xicidaili.com/nn/1網頁代碼
1.頁面分析
要爬取的網站頁面第一頁如下,IP地址、端口以及類型是我們的抓取目標。
進入第二頁,觀察url
的變化:
可以發現,url
從http://www.xicidaili.com/nn/1變成了http://www.xicidaili.com/nn/2,后面幾頁以此類推,從而可以得出鏈接http://www.xicidaili.com/nn/后面所跟的數字就代表了是第幾頁。
接下來在頁面中打開開發者工具,進入Network
,定位到接收的頁面內容,如下:
可以發現,我們將要爬取的代理信息都存在於一對<tr><\tr>
標簽中,進一步繼續分析,還可以發現這些標簽的class
要么是"odd"
,要么就是""
,而我們想要的信息中,ip
地址位於tr
標簽下的第二個td
標簽,端口位於第三個td
標簽,類型位於第六個td
標簽。
接下來我們就可以開始使用解析庫來嘗試爬取了,分別用lxml
、Beautiful Soup
以及pyquery
三個常用的解析庫來進行解析。
2. 抓取頁首
使用requests
庫(后來因為使用代理及測試代理的原因,改用成了異步協程請求庫aiohttp
,詳見:問題:ip地址被封禁),先試下直接獲取:
import requests
response = requests.get("http://www.xicidaili.com/nn/1")
print(response.text)
結果如下:
<html>
<head><title>503 Service Temporarily Unavailable</title></head>
<body bgcolor="white">
<center><h1>503 Service Temporarily Unavailable</h1></center>
<hr><center>nginx/1.1.19</center>
</body>
</html>
返回了狀態碼503,表示服務不可用,因此還要做些處理,嘗試加入請求頭:
import requests
header = {'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"}
response = requests.get("http://www.xicidaili.com/nn/1", headers = header)
print(response.text)
輸出結果如下:
<!DOCTYPE html>
<html>
<head>
<title>國內高匿免費HTTP代理IP__第1頁國內高匿</title>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="國內高匿免費HTTP代理" />
<meta name="Keywords" content="國內高匿,免費高匿代理,免費匿名代理,隱藏IP" />
<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=0">
<meta name="applicable-device"content="pc,mobile">
......
這樣就可以正常獲取頁面信息了。接下來選擇使用不同的解析庫進行頁面解析。
(二) 使用不同解析庫爬取信息
1. 使用lxml
庫進行解析
import requests
def get_page():
try:
header = {'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"}
response = requests.get("http://www.xicidaili.com/nn/1", headers = header)
get_detail(response.text)
except Exception as e:
print("發生錯誤: ", e)
# 使用lxml爬取
from lxml import etree
def get_detail(html):
html = etree.HTML(html)
# 爬取ip地址信息
print(html.xpath('//tr[@class="odd" or @class=""]/td[2]/text()'))
if __name__ == "__main__":
get_page()
首先嘗試獲取第一頁的所有ip地址信息,使用XPath規則'//tr[@class="odd"]/td[2]/text()'
進行提取,結果如下:
['121.40.66.129', '117.88.177.132', '117.88.176.203', '218.21.230.156', '121.31.101.41', '60.205.188.24', '221.206.100.133', '27.154.34.146', '58.254.220.116', '39.91.8.31', '221.218.102.146', '223.10.21.0', '58.56.149.198', '219.132.205.105', '221.237.37.97', '183.163.24.15', '171.80.196.14', '118.114.96.251', '114.239.91.166', '111.222.141.127', '121.237.148.133', '123.168.67.126', '118.181.226.166', '121.237.148.190', '124.200.36.118', '58.58.213.55', '49.235.253.240', '183.147.11.34', '121.40.162.239', '121.237.148.139', '121.237.148.118', '117.88.5.174', '117.88.5.234', '117.87.180.144', '119.254.94.93', '60.2.44.182', '175.155.239.23', '121.237.148.156', '118.78.196.186', '123.118.108.201', '117.88.4.71', '113.12.202.50', '117.88.177.34', '117.88.4.35', '222.128.9.235', '121.237.148.131', '121.237.149.243', '121.237.148.8', '182.61.179.157', '175.148.68.133']
結果沒有錯誤,同理可以得到端口及類型:
from lxml import etree
def get_detail(html):
html = etree.HTML(html)
# 爬取ip地址信息
print(html.xpath('//tr[@class="odd" or @class=""]/td[2]/text()')[:10])
# 爬取端口信息
print(html.xpath('//tr[@class="odd" or @class=""]/td[3]/text()')[:10])
# 爬取類型信息
print(html.xpath('//tr[@class="odd" or @class=""]/td[6]/text()')[:10])
# 統計一頁有多少條數據
print(len(html.xpath('//tr[@class="odd" or @class=""]/td[6]/text()')))
結果如下,輸出結果顯示一頁一共有100條數據:
['121.237.149.117', '121.237.148.87', '59.44.78.30', '124.93.201.59', '1.83.117.56', '117.88.176.132', '121.40.66.129', '222.95.144.201', '117.88.177.132', '121.237.149.132']
['3000', '3000', '42335', '59618', '8118', '3000', '808', '3000', '3000', '3000']
['HTTP', 'HTTP', 'HTTP', 'HTTPS', 'HTTP', 'HTTP', 'HTTP', 'HTTP', 'HTTP', 'HTTP']
100
2. 使用Beautiful Soup
進行解析
頁面的表格結構如下:
<table id="ip_list">
<tr>
<th class="country">國家</th>
<th>IP地址</th>
<th>端口</th>
<th>服務器地址</th>
<th class="country">是否匿名</th>
<th>類型</th>
<th class="country">速度</th>
<th class="country">連接時間</th>
<th width="8%">存活時間</th>
<th width="20%">驗證時間</th>
</tr>
<tr class="odd">
<td class="country"><img src="//fs.xicidaili.com/images/flag/cn.png" alt="Cn" /></td>
<td>222.128.9.235</td>
<td>59593</td>
<td>
<a href="/2018-09-26/beijing">北京</a>
</td>
<td class="country">高匿</td>
<td>HTTPS</td>
<td class="country">
<div title="0.032秒" class="bar">
<div class="bar_inner fast" style="width:87%">
</div>
</div>
</td>
<td class="country">
<div title="0.006秒" class="bar">
<div class="bar_inner fast" style="width:97%">
</div>
</div>
</td>
<td>533天</td>
<td>20-03-13 15:21</td>
</tr>
...
首先選擇table
下的所有tr
標簽:
from bs4 import BeautifulSoup
def get_detail(html):
soup = BeautifulSoup(html, 'lxml')
c1 = soup.select('#ip_list tr')
print(c1[1])
結果如下:
<tr class="odd">
<td class="country"><img alt="Cn" src="//fs.xicidaili.com/images/flag/cn.png"/></td>
<td>222.128.9.235</td>
<td>59593</td>
<td>
<a href="/2018-09-26/beijing">北京</a>
</td>
<td class="country">高匿</td>
<td>HTTPS</td>
<td class="country">
<div class="bar" title="0.032秒">
<div class="bar_inner fast" style="width:87%">
</div>
</div>
</td>
<td class="country">
<div class="bar" title="0.006秒">
<div class="bar_inner fast" style="width:97%">
</div>
</div>
</td>
<td>533天</td>
<td>20-03-13 15:21</td>
</tr>
接下來就是將每一個tr
標簽中的第二個(ip
)、第三個(端口)以及第六個(類型)td
標簽選取出來:
from bs4 import BeautifulSoup
def get_detail(html):
soup = BeautifulSoup(html, 'lxml')
c1 = soup.select('#ip_list tr')
ls = []
for index, tr in enumerate(c1):
if index != 0:
td = tr.select('td')
ls.append({'proxies': td[1].string + ":" + td[2].string,
'types': td[5].string})
print(ls)
print(len(ls))
結果如下:
[{'proxies': '222.128.9.235:59593', 'types': 'HTTPS'}, {'proxies': '115.219.105.60:8010', 'types': 'HTTP'}, {'proxies': '117.88.177.204:3000', 'types': 'HTTP'}, {'proxies': '222.95.144.235:3000', 'types': 'HTTP'}, {'proxies': '59.42.88.110:8118', 'types': 'HTTPS'}, {'proxies': '118.181.226.166:44640', 'types': 'HTTP'}, {'proxies': '121.237.149.124:3000', 'types': 'HTTP'}, {'proxies': '218.86.200.26:8118', 'types': 'HTTPS'}, {'proxies': '106.6.138.18:8118', 'types': 'HTTP'}......]
100
每頁100條數據,結果無誤。
3.使用pyquery
進行解析
pyquery
解析辦法和Beautiful Soup
類似,首先將表格的第一行刪除,然后選取表格中的tr
標簽:
from pyquery import PyQuery as pq
def get_detail(html):
doc = pq(html)
doc('tr:first-child').remove() # 刪除第一行
items = doc('#ip_list tr')
print(items)
從輸出結果可以看出items
中每一項的格式:
...
<tr class="">
<td class="country"><img src="//fs.xicidaili.com/images/flag/cn.png" alt="Cn"/></td>
<td>124.205.143.210</td>
<td>34874</td>
<td>
<a href="/2018-10-05/beijing">北京</a>
</td>
<td class="country">高匿</td>
<td>HTTPS</td>
<td class="country">
<div title="0.024秒" class="bar">
<div class="bar_inner fast" style="width:93%">
</div>
</div>
</td>
<td class="country">
<div title="0.004秒" class="bar">
<div class="bar_inner fast" style="width:99%">
</div>
</div>
</td>
<td>523天</td>
<td>20-03-12 02:20</td>
</tr>
...
接下來通過生成器將每一項取出來,選取第二個td
標簽(ip
地址),第三個td
標簽(端口號)以及第六個td
標簽(類型),用列表字典的格式進行存儲。
from pyquery import PyQuery as pq
def get_detail(html):
doc = pq(html)
doc('tr:first-child').remove() # 刪除第一行
items = doc('#ip_list tr')
ls = []
for i in items.items():
tmp1 = i('td:nth-child(2)') # 選取ip地址
tmp2 = i('td:nth-child(3)') # 選取端口
tmp3 = i('td:nth-child(6)') # 選取類型
ls.append({'proxies': tmp1.text() + ":" + tmp2.text(),
'types': tmp3.text()})
print(ls)
print(len(ls))
輸出結果如下:
[{'proxies': '222.128.9.235:59593', 'types': 'HTTPS'}, {'proxies': '115.219.105.60:8010', 'types': 'HTTP'}, {'proxies': '117.88.177.204:3000', 'types': 'HTTP'}, {'proxies': '222.95.144.235:3000', 'types': 'HTTP'}, {'proxies': '59.42.88.110:8118', 'types': 'HTTPS'}, {'proxies': '118.181.226.166:44640', 'types': 'HTTP'}, {'proxies': '121.237.149.124:3000', 'types': 'HTTP'}, {'proxies': '218.86.200.26:8118', 'types': 'HTTPS'}......
100
每頁結果100條數據,無誤。
(三)選擇百度網站來對抓取得到的代理進行測試
爬取得到的免費代理中有許多都是不好用或不穩定的,還不能直接就進行存儲,因此需要先選擇一個站點來測試我們抓取到的代理是否能夠請求成功,我選擇了http://www.baidu.com來作為測試,只有能成功請求的代理才會講其加入到數據庫中去,請求失敗次數超過3次就丟棄。
對於檢測代理這種一般需要十幾秒甚至更長時間的事情來說,使用requests一個個排隊檢測顯然不合理,因此需要選擇異步請求庫aiohttp
,關於異步協程的介紹,可以參考Python中異步協程的使用方法介紹,關於aiohttp
的介紹,可以參考aiohttp中文文檔。
主要的兩個關鍵字就是await
和async
,簡單地說就是在線程可能出現等待的地方A加一個await
修飾,然后線程到這個地方的時候就不會在這干等着了,而是跑去執行其他任務B,等到等待的對象A有反應后,又立刻回來繼續A下面的其他任務,這時任務B就暫時擱置了。但await
后面的對象必須是coroutine
對象,或者是一個可以返回 coroutine
對象的生成器,或者是一個包含 __await
方法的對象返回的一個迭代器(這就是為什么不能直接在requests
前面加await
的原因)。而我們函數加上async
修飾后,函數返回的對象就變成了coroutine
對象了,於是就可以“無腦”地來添加await
與async
組合,當然,如果添加await
的地方不是那種需要等待請求響應或等待數據上傳下載之類的會讓線程進入阻塞狀態的地方的話,是不會起到什么效果的,當然也不會有什么錯誤。
檢測代理的函數如下:
# 測試代理
async def test_proxy(self, dic):
## 根據類型構造不同的代理及url
if dic["types"] == "HTTP":
test_url = "http://www.baidu.com/"
prop = "http://" + dic["proxies"]
else:
test_url = "https://www.baidu.com/"
prop = "https://" + dic["proxies"]
ua = UserAgent()
header = {'User-Agent': ua.random}
# 異步協程請求
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get(test_url, headers = header, proxy = prop, timeout = 15, verify_ssl=False) as resp:
if resp.status == 200:
self.success_test_count += 1
print(prop, "\033[5;36;40m===========>測試成功,寫入數據庫!=========%d次\033[;;m"%self.success_test_count)
await self.insert_to_mongo(dic) ## 調用寫入mongodb數據庫的函數
return
except Exception as e:
print(prop, "==測試失敗,放棄==", e)
break
(四)選擇存儲的數據庫
考慮到以后還會對代理池進行進一步的維護,因此選擇使用MongoDB來進行存儲,插入數據時還可以很方便地避免重復,數據庫存儲函數如下:
# 寫入MongoDB數據庫
async def insert_to_mongo(self, dic):
db = self.client.Myproxies
collection = db.proxies
collection.update_one(dic,{'$set': dic}, upsert=True) # 設置upsert=True,避免重復插入
print("\033[5;32;40m插入記錄:" + json.dumps(dic), "\033[;;m")
(五)完整代碼
1.爬取階段使用代理的版本
最后,完整的代碼如下(這是在爬取代理的階段就使用代理進行請求的版本,因此我本機的ip
被封了,所以不得不這么做,過程會比較慢,后面繼續貼出爬取數據階段不用代理,測試時在使用代理的版本),在三種解析庫中選擇了最開始說到的lxml
進行解析:
import json
import time
import random
from fake_useragent import UserAgent
import asyncio
import aiohttp
# 避免出現RuntimeError錯誤
import nest_asyncio
nest_asyncio.apply()
from lxml import etree
import pymongo
class Get_prox:
def __init__(self):
# 初始化,連接MongoDB
self.client = pymongo.MongoClient('mongodb://localhost:27017/')
self.success_get_count = 0
self.success_test_count = 0
# 使用代理時,獲取頁面
async def get_page(self, session, url):
## 一個隨機生成請求頭的庫
ua = UserAgent()
header = {'User-Agent': ua.random}
# 從本地文件獲取代理池
proxies_pool = self.get_proxies()
while True:
try:
# 由於我一開始操作不慎ip被封禁了,因此在一開始抓取ip時我不得不使用了自己從
# 其他網站抓來的一批代理(如問題描述中所述),一共有5999條代理,每次隨機選取一條
p = 'http://' + random.choice(proxies_pool)
async with session.get(url, headers = header, proxy = p, timeout = 10) as response:
await asyncio.sleep(2)
if response.status == 200:
self.success_get_count += 1
print("\033[5;36;40m----------------------請求成功-------------------%d次\033[;;m"%self.success_get_count)
return await response.text()
else:
print("\033[5;31;m", response.status, "\033[;;m")
continue
except Exception as e:
print("請求失敗orz", e)
# 任務
async def get(self, url):
async with aiohttp.ClientSession() as session:
html = await self.get_page(session, url)
await self.get_detail(html)
# 測試代理
async def test_proxy(self, dic):
## 根據類型構造不同的代理及url
if dic["types"] == "HTTP":
test_url = "http://www.baidu.com/"
prop = "http://" + dic["proxies"]
else:
test_url = "https://www.baidu.com/"
prop = "https://" + dic["proxies"]
ua = UserAgent()
header = {'User-Agent': ua.random}
# 異步協程請求
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get(test_url, headers = header, proxy = prop, timeout = 15, verify_ssl=False) as resp:
if resp.status == 200:
self.success_test_count += 1
print(prop, "\033[5;36;40m===========>測試成功,寫入數據庫!=========%d次\033[;;m"%self.success_test_count)
await self.insert_to_mongo(dic) ## 調用寫入mongodb數據庫的函數
return
except Exception as e:
print(prop, "==測試失敗,放棄==", e)
break
# 獲取代理池
def get_proxies(self):
with open("proxies.txt", "r") as f:
ls = json.loads(f.read())
return ls
# 使用lxml爬取
async def get_detail(self, html):
html = etree.HTML(html)
dic = {}
ip = html.xpath('//tr[@class="odd" or @class=""]/td[2]/text()')
port = html.xpath('//tr[@class="odd" or @class=""]/td[3]/text()')
types = html.xpath('//tr[@class="odd" or @class=""]/td[6]/text()')
for i in range(len(ip)):
dic['proxies'] = ip[i] + ":" + port[i]
dic['types'] = types[i]
await self.test_proxy(dic)
# 寫入MongoDB數據庫
async def insert_to_mongo(self, dic):
db = self.client.Myproxies
collection = db.proxies
collection.update_one(dic,{'$set': dic}, upsert=True) # 設置upsert=True,避免重復插入
print("\033[5;32;40m插入記錄:" + json.dumps(dic), "\033[;;m")
# 主線程
if __name__ == "__main__":
urls = []
start = time.time()
# 抓取前10頁數據
for i in range(1, 11):
urls.append("http://www.xicidaili.com/nn/" + str(i))
c = Get_prox()
# 創建10個未來任務對象
tasks = [asyncio.ensure_future(c.get(url)) for url in urls]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
total = (end - start)/60.0
print("完成,總耗時:", total, "分鍾!")
執行過程會打印許多日志,部分日志如下:
不管是請求過程還是測試過程,代理ip
的請求成功率都很低,完全執行完需要等一段時間,完成后顯示耗時47分鍾。
翻一下日志,看到最后一條成功插入的數據顯示是第8次。。。。。
到數據庫中看一下,這是我重復運行了多次后數據庫中的數據,也僅插入了50條:
2.爬取階段不使用代理的版本
繼續貼出爬取數據階段不使用代理的版本,即使用requests
爬取,測試時再用aiohttp
,省去了第一階段篩選代理過程的等待時間。
import json
import time
import requests
from fake_useragent import UserAgent
import asyncio
import aiohttp
# 避免出現RuntimeError錯誤
import nest_asyncio
nest_asyncio.apply()
from lxml import etree
import pymongo
class Get_prox:
def __init__(self):
# 初始化,連接MongoDB
self.client = pymongo.MongoClient('mongodb://localhost:27017/')
self.success_get_count = 0
self.success_test_count = 0
# 不使用代理時,獲取頁面
def get_page(self, url):
## 一個隨機生成請求頭的庫
ua = UserAgent()
header = {'User-Agent': ua.random}
while True:
try:
response = requests.get(url, headers = header, timeout = 10)
time.sleep(1.5)
if response.status_code == 200:
self.success_get_count += 1
print("\033[5;36;40m----------------------請求成功-------------------%d次\033[;;m"%self.success_get_count)
return response.text
else:
print("\033[5;31;m", response.status_code, "\033[;;m")
continue
except Exception as e:
print("請求失敗orz", e)
# 任務
def get(self, urls):
htmls = []
# 先將抓取的頁面都存入列表中
for url in urls:
htmls.append(self.get_page(url))
# 測試代理使用異步
tasks = [asyncio.ensure_future(self.get_detail(html)) for html in htmls]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# 測試代理
async def test_proxy(self, dic):
## 根據類型構造不同的代理及url
if dic["types"] == "HTTP":
test_url = "http://www.baidu.com/"
prop = "http://" + dic["proxies"]
else:
test_url = "https://www.baidu.com/"
prop = "https://" + dic["proxies"]
ua = UserAgent()
header = {'User-Agent': ua.random}
# 異步協程請求
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get(test_url, headers = header, proxy = prop, timeout = 15, verify_ssl=False) as resp:
if resp.status == 200:
self.success_test_count += 1
print(prop, "\033[5;36;40m===========>測試成功,寫入數據庫!=========%d次\033[;;m"%self.success_test_count)
await self.insert_to_mongo(dic) ## 調用寫入mongodb數據庫的函數
return
except Exception as e:
print(prop, "==測試失敗,放棄==", e)
break
# 使用lxml爬取
async def get_detail(self, html):
html = etree.HTML(html)
dic = {}
ip = html.xpath('//tr[@class="odd" or @class=""]/td[2]/text()')
port = html.xpath('//tr[@class="odd" or @class=""]/td[3]/text()')
types = html.xpath('//tr[@class="odd" or @class=""]/td[6]/text()')
for i in range(len(ip)):
dic['proxies'] = ip[i] + ":" + port[i]
dic['types'] = types[i]
await self.test_proxy(dic)
# 寫入MongoDB數據庫
async def insert_to_mongo(self, dic):
db = self.client.Myproxies
collection = db.proxies
collection.update_one(dic,{'$set': dic}, upsert=True) # 設置upsert=True,避免重復插入
print("\033[5;32;40m插入記錄:" + json.dumps(dic) + "\033[;;m")
# 主線程
if __name__ == "__main__":
urls = []
start = time.time()
# 抓取前10頁數據
for i in range(1, 11):
urls.append("http://www.xicidaili.com/nn/" + str(i))
c = Get_prox()
c.get(urls)
end = time.time()
total = (end - start)/60.0
print("完成,總耗時:", total, "分鍾!")
經其他小伙伴測得結果截圖如下:
爬取階段的10次請求都非常順利。
最后總耗時19分鍾,可見前面爬取階段不去篩選免費代理確實能省下許多時間!
四.問題及解決
(一)ip
地址被封禁
由於一開始在使用lxml
解析庫探索解析規則時,為了方便沒有去設置休眠時間,后來由於疏忽,把抓取頁數調大后忘記了設置休眠時間,結果在抓取了幾次后,發現日志中的輸出內容變成如下信息:
{"proxies": "121.237.148.195:3000", "types": "HTTP"}
{"proxies": "121.234.31.44:8118", "types": "HTTPS"}
{"proxies": "117.88.4.63:3000", "types": "HTTP"}
{"proxies": "222.95.144.58:3000", "types": "HTTP"}
發生錯誤: 'NoneType' object has no attribute 'xpath'
發生錯誤: 'NoneType' object has no attribute 'xpath'
發生錯誤: 'NoneType' object has no attribute 'xpath'
發生錯誤: 'NoneType' object has no attribute 'xpath'
發生錯誤: 'NoneType' object has no attribute 'xpath'
發生錯誤: 'NoneType' object has no attribute 'xpath'
......
終止程序后將response獲取的狀態碼打印出現,結果如下:
503
503
503
503
503
...
同時通過瀏覽器我也無法進入到該網站了,由此可以得出因爬取次數過多,我的IP已經被網頁封禁了。
- 解決辦法
一開始我是選擇直接在其他免費的代理ip獲取網站選幾個ip過來,但發現免費的代理ip中有非常大的比例都是不能用的,使用網上現有項目構建代理ip池在環境和依賴的配置上比較耗時間,所以我直接去了66免費代理網,使用了該網站的免費ip提取功能,提取了6000個代理ip:
點擊提取后直接得到一個包含6000條代理信息的頁面,然后就可以寫個簡單的程序直接將此頁面上生成的6000條(實際抓取5999條)代理信息抓取到本地文件中:
response1 = requests.get("http://www.66ip.cn/mo.php?sxb=&tqsl=6000&port=&export=&ktip=&sxa=&submit=%CC%E1++%C8%A1&textarea=")
html = response1.text
print(response1.status_code == 200)
pattern = re.compile("br />(.*?)<", re.S)
items = re.findall(pattern, html)
for i in range(len(items)):
items[i] = items[i].strip()
print(len(items))
with open("proxies.txt", "w") as f:
f.write(json.dumps(items))
接着在爬蟲程序中讀取這個文件作為一個代理池:
# 獲取代理池
def get_proxies(self):
with open("proxies.txt", "r") as f:
ls = json.loads(f.read())
return ls
然后每次請求都隨機從代理池中選擇一個代理:
def get_page(ls):
url = []
ua = UserAgent()
with open("proxies.txt", "r") as f:
ls = json.loads(f.read())
for i in range(1, page+1):
url.append("http://www.xicidaili.com/nn/" + str(i))
count = 1
errcount = 1
for u in url:
while True:
try:
header = {'User-Agent': ua.random}
handler = {'http': 'http://' + random.choice(ls)}
response = requests.get(u, headers = header, proxies = handler, timeout = 10)
time.sleep(1)
get_detail(response.text)
if response.status_code == 200:
print("選取ip:", handler, "請求成功---------------------------第%d次"%count)
count += 1
else:
continue
break
except:
print("選取ip:", handler, ", 第%d請求發生錯誤"%errcount)
errcount += 1
但是這樣做有一個問題,就是線程在調度任務時每次只能負責一個,而代理中有許多ip都是不好用 的,導致每一次嘗試都要花費好幾秒的時間,而且大部分情況下請求都有出錯。
為了解決這個問題,就不能再選擇用單線程單步調度的方式來抓取頁面了,因此我選擇使用了異步請求庫aiohttp
。
參考了文章Python中異步協程的使用方法介紹以及aiohttp中文文檔后,我學着創建一個帶有10個任務(抓取10個頁面的任務)的協程對象來實現異步協程調度,使得每當線程遇到請求時,無需等待請求任務,而可以調度下一個任務,當10個請求都成功時,就能進入下一個調用的函數了,這樣總的時間消耗就可以縮小約10倍了,實現方法如下(未列出所有函數):
# 使用代理時,獲取頁面
async def get_page(self, session, url):
## 一個隨機生成請求頭的庫
ua = UserAgent()
header = {'User-Agent': ua.random}
# 從本地文件獲取代理池
proxies_pool = self.get_proxies()
while True:
try:
# 由於我一開始操作不慎ip被封禁了,因此在一開始抓取ip時我不得不使用了自己從
# 其他網站抓來的一批代理(如問題描述中所述),一共有5999條代理,每次隨機選取一條
p = 'http://' + random.choice(proxies_pool)
async with session.get(url, headers = header, proxy = p, timeout = 10) as response:
await asyncio.sleep(2)
if response.status == 200:
self.success_get_count += 1
print("\033[5;36;40m----------------------請求成功-------------------%d次\033[;;m"%self.success_get_count)
return await response.text()
else:
print("\033[5;31;m", response.status, "\033[;;m")
continue
except Exception as e:
print("請求失敗orz", e)
# 任務
async def get(self, url):
async with aiohttp.ClientSession() as session:
html = await self.get_page(session, url)
await self.get_detail(html)
# 主線程
if __name__ == "__main__":
urls = []
start = time.time()
# 抓取前10頁數據
for i in range(1, 11):
urls.append("http://www.xicidaili.com/nn/" + str(i))
c = Get_prox()
# 創建10個未來任務對象
tasks = [asyncio.ensure_future(c.get(url)) for url in urls]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
total = (end - start)/60.0
print("完成,總耗時:", total, "分鍾!")
抓取過程部分打印日志如下,可見,代理的請求成功機率是很低的,接下來就是耐心等待了:
(二)異步操作出現錯誤RuntimeError
錯誤
一開始使用異步協程運行程序時,控制台輸出了如下的錯誤日志:
RuntimeError: asyncio.run() cannot be called from a running event loop
上網查了解決方案,在程序開頭加入:
import nest_asyncio
nest_asyncio.apply()
之后就不會報錯了,具體原因不明。
五.可進一步改善的地方
- 如果不是因為
ip
被封禁,爬取代理階段直接用requests就好了,注意設置好休眠時間。然后其實爬取代理的時候可以同時對多個不同的代理網站進行爬取,這樣就可以把異步請求的機制也結合進來了。比如創建多個任務,每個任務分別針對不同的網站使用requests進行請求,然后將這些任務加入到異步協程的事件循環中去。 - 我的做法僅是將代理存入本地的數據庫,是靜態的,網絡上還有許多的代理池項目都是可以動態維護的,有
web
界面、api
接口等,但實現方法也更加復雜,后續可進一步深入學習。