新手上路,記錄下第一次使用異步協程爬取代理數據為自己構建一個低級版的本地代理池的過程


目錄

使用的主要庫及工具

  1. requests
  2. aiohttp
  3. lxml
  4. Beautiful Soup
  5. pyquery
  6. asyncio
  7. fake_useragent
  8. pymongo
  9. MongoDB
  10. python3.7

一.內容簡介

  1. 分析網頁代碼,爬取網頁信息(爬取10頁信息);
  2. 體驗下利用不同的解析庫來獲取代理(IP:Port和類型);
  3. 對獲取的代理進行測試篩選;
  4. 將篩選成功的代理存入MongoDB。

二. 過程

(一) 分析http://www.xicidaili.com/nn/1網頁代碼

1.頁面分析

要爬取的網站頁面第一頁如下,IP地址、端口以及類型是我們的抓取目標。

進入第二頁,觀察url的變化:

可以發現,urlhttp://www.xicidaili.com/nn/1變成了http://www.xicidaili.com/nn/2,后面幾頁以此類推,從而可以得出鏈接http://www.xicidaili.com/nn/后面所跟的數字就代表了是第幾頁。

接下來在頁面中打開開發者工具,進入Network,定位到接收的頁面內容,如下:

可以發現,我們將要爬取的代理信息都存在於一對<tr><\tr>標簽中,進一步繼續分析,還可以發現這些標簽的class要么是"odd",要么就是"",而我們想要的信息中,ip地址位於tr標簽下的第二個td標簽,端口位於第三個td標簽,類型位於第六個td標簽。

接下來我們就可以開始使用解析庫來嘗試爬取了,分別用lxmlBeautiful Soup以及pyquery三個常用的解析庫來進行解析。

2. 抓取頁首

使用requests庫(后來因為使用代理及測試代理的原因,改用成了異步協程請求庫aiohttp,詳見:問題:ip地址被封禁),先試下直接獲取:

import requests
response = requests.get("http://www.xicidaili.com/nn/1")
print(response.text)

結果如下:

<html>
<head><title>503 Service Temporarily Unavailable</title></head>
<body bgcolor="white">
<center><h1>503 Service Temporarily Unavailable</h1></center>
<hr><center>nginx/1.1.19</center>
</body>
</html>

返回了狀態碼503,表示服務不可用,因此還要做些處理,嘗試加入請求頭:

import requests

header = {'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"}
response = requests.get("http://www.xicidaili.com/nn/1", headers = header)
print(response.text)

輸出結果如下:

<!DOCTYPE html>
<html>
<head>
  <title>國內高匿免費HTTP代理IP__第1頁國內高匿</title>
  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
  <meta name="Description" content="國內高匿免費HTTP代理" />
  <meta name="Keywords" content="國內高匿,免費高匿代理,免費匿名代理,隱藏IP" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=0">
    <meta name="applicable-device"content="pc,mobile">
    ......

這樣就可以正常獲取頁面信息了。接下來選擇使用不同的解析庫進行頁面解析。

(二) 使用不同解析庫爬取信息

1. 使用lxml庫進行解析

import requests

def get_page():
    try:
        header = {'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"}
        response = requests.get("http://www.xicidaili.com/nn/1", headers = header)
        get_detail(response.text)
    except Exception as e:
        print("發生錯誤: ", e)
        
# 使用lxml爬取
from lxml import etree

def get_detail(html):
    html = etree.HTML(html)
    # 爬取ip地址信息
    print(html.xpath('//tr[@class="odd" or @class=""]/td[2]/text()'))
    
if __name__ == "__main__":
    get_page()

首先嘗試獲取第一頁的所有ip地址信息,使用XPath規則'//tr[@class="odd"]/td[2]/text()'進行提取,結果如下:

['121.40.66.129', '117.88.177.132', '117.88.176.203', '218.21.230.156', '121.31.101.41', '60.205.188.24', '221.206.100.133', '27.154.34.146', '58.254.220.116', '39.91.8.31', '221.218.102.146', '223.10.21.0', '58.56.149.198', '219.132.205.105', '221.237.37.97', '183.163.24.15', '171.80.196.14', '118.114.96.251', '114.239.91.166', '111.222.141.127', '121.237.148.133', '123.168.67.126', '118.181.226.166', '121.237.148.190', '124.200.36.118', '58.58.213.55', '49.235.253.240', '183.147.11.34', '121.40.162.239', '121.237.148.139', '121.237.148.118', '117.88.5.174', '117.88.5.234', '117.87.180.144', '119.254.94.93', '60.2.44.182', '175.155.239.23', '121.237.148.156', '118.78.196.186', '123.118.108.201', '117.88.4.71', '113.12.202.50', '117.88.177.34', '117.88.4.35', '222.128.9.235', '121.237.148.131', '121.237.149.243', '121.237.148.8', '182.61.179.157', '175.148.68.133']

結果沒有錯誤,同理可以得到端口及類型:

from lxml import etree

def get_detail(html):
    html = etree.HTML(html)
    # 爬取ip地址信息
    print(html.xpath('//tr[@class="odd" or @class=""]/td[2]/text()')[:10])
    # 爬取端口信息
    print(html.xpath('//tr[@class="odd" or @class=""]/td[3]/text()')[:10])
    # 爬取類型信息
    print(html.xpath('//tr[@class="odd" or @class=""]/td[6]/text()')[:10])
    # 統計一頁有多少條數據
    print(len(html.xpath('//tr[@class="odd" or @class=""]/td[6]/text()')))

結果如下,輸出結果顯示一頁一共有100條數據:

['121.237.149.117', '121.237.148.87', '59.44.78.30', '124.93.201.59', '1.83.117.56', '117.88.176.132', '121.40.66.129', '222.95.144.201', '117.88.177.132', '121.237.149.132']
['3000', '3000', '42335', '59618', '8118', '3000', '808', '3000', '3000', '3000']
['HTTP', 'HTTP', 'HTTP', 'HTTPS', 'HTTP', 'HTTP', 'HTTP', 'HTTP', 'HTTP', 'HTTP']
100

2. 使用Beautiful Soup進行解析

頁面的表格結構如下:

<table id="ip_list">
    <tr>
      <th class="country">國家</th>
      <th>IP地址</th>
      <th>端口</th>
      <th>服務器地址</th>
      <th class="country">是否匿名</th>
      <th>類型</th>
      <th class="country">速度</th>
      <th class="country">連接時間</th>
      <th width="8%">存活時間</th>
      
      <th width="20%">驗證時間</th>
    </tr>
  
    <tr class="odd">
      <td class="country"><img src="//fs.xicidaili.com/images/flag/cn.png" alt="Cn" /></td>
      <td>222.128.9.235</td>
      <td>59593</td>
      <td>
        <a href="/2018-09-26/beijing">北京</a>
      </td>
      <td class="country">高匿</td>
      <td>HTTPS</td>
      <td class="country">
        <div title="0.032秒" class="bar">
          <div class="bar_inner fast" style="width:87%">
            
          </div>
        </div>
      </td>
      <td class="country">
        <div title="0.006秒" class="bar">
          <div class="bar_inner fast" style="width:97%">
            
          </div>
        </div>
      </td>
      
      <td>533天</td>
      <td>20-03-13 15:21</td>
    </tr>
...

首先選擇table下的所有tr標簽:

from bs4 import BeautifulSoup

def get_detail(html):
    soup = BeautifulSoup(html, 'lxml')
    c1 = soup.select('#ip_list tr')
    print(c1[1])

結果如下:

<tr class="odd">
<td class="country"><img alt="Cn" src="//fs.xicidaili.com/images/flag/cn.png"/></td>
<td>222.128.9.235</td>
<td>59593</td>
<td>
<a href="/2018-09-26/beijing">北京</a>
</td>
<td class="country">高匿</td>
<td>HTTPS</td>
<td class="country">
<div class="bar" title="0.032秒">
<div class="bar_inner fast" style="width:87%">
</div>
</div>
</td>
<td class="country">
<div class="bar" title="0.006秒">
<div class="bar_inner fast" style="width:97%">
</div>
</div>
</td>
<td>533天</td>
<td>20-03-13 15:21</td>
</tr>

接下來就是將每一個tr標簽中的第二個(ip)、第三個(端口)以及第六個(類型)td標簽選取出來:

from bs4 import BeautifulSoup

def get_detail(html):
    soup = BeautifulSoup(html, 'lxml')
    c1 = soup.select('#ip_list tr')
    ls = []
    for index, tr in enumerate(c1):
        if index != 0:
            td = tr.select('td')
            ls.append({'proxies': td[1].string + ":" + td[2].string, 
                        'types': td[5].string})
    print(ls)
    print(len(ls))

結果如下:

[{'proxies': '222.128.9.235:59593', 'types': 'HTTPS'}, {'proxies': '115.219.105.60:8010', 'types': 'HTTP'}, {'proxies': '117.88.177.204:3000', 'types': 'HTTP'}, {'proxies': '222.95.144.235:3000', 'types': 'HTTP'}, {'proxies': '59.42.88.110:8118', 'types': 'HTTPS'}, {'proxies': '118.181.226.166:44640', 'types': 'HTTP'}, {'proxies': '121.237.149.124:3000', 'types': 'HTTP'}, {'proxies': '218.86.200.26:8118', 'types': 'HTTPS'}, {'proxies': '106.6.138.18:8118', 'types': 'HTTP'}......]
100

每頁100條數據,結果無誤。

3.使用pyquery進行解析

pyquery解析辦法和Beautiful Soup類似,首先將表格的第一行刪除,然后選取表格中的tr標簽:

from pyquery import PyQuery as pq
    
def get_detail(html):
    doc = pq(html)
    doc('tr:first-child').remove()  # 刪除第一行
    items = doc('#ip_list tr')
    print(items)

從輸出結果可以看出items中每一項的格式:

    ...
    <tr class="">
      <td class="country"><img src="//fs.xicidaili.com/images/flag/cn.png" alt="Cn"/></td>
      <td>124.205.143.210</td>
      <td>34874</td>
      <td>
        <a href="/2018-10-05/beijing">北京</a>
      </td>
      <td class="country">高匿</td>
      <td>HTTPS</td>
      <td class="country">
        <div title="0.024秒" class="bar">
          <div class="bar_inner fast" style="width:93%">
            
          </div>
        </div>
      </td>
      <td class="country">
        <div title="0.004秒" class="bar">
          <div class="bar_inner fast" style="width:99%">
            
          </div>
        </div>
      </td>
      
      <td>523天</td>
      <td>20-03-12 02:20</td>
    </tr>
    ...

接下來通過生成器將每一項取出來,選取第二個td標簽(ip地址),第三個td標簽(端口號)以及第六個td標簽(類型),用列表字典的格式進行存儲。

from pyquery import PyQuery as pq
    
def get_detail(html):
    doc = pq(html)
    doc('tr:first-child').remove()  # 刪除第一行
    items = doc('#ip_list tr')    
    ls = []
    for i in items.items():
        tmp1 = i('td:nth-child(2)') # 選取ip地址
        tmp2 = i('td:nth-child(3)') # 選取端口
        tmp3 = i('td:nth-child(6)') # 選取類型
        ls.append({'proxies': tmp1.text() + ":" + tmp2.text(),
                    'types': tmp3.text()})
    print(ls)
    print(len(ls))

輸出結果如下:

[{'proxies': '222.128.9.235:59593', 'types': 'HTTPS'}, {'proxies': '115.219.105.60:8010', 'types': 'HTTP'}, {'proxies': '117.88.177.204:3000', 'types': 'HTTP'}, {'proxies': '222.95.144.235:3000', 'types': 'HTTP'}, {'proxies': '59.42.88.110:8118', 'types': 'HTTPS'}, {'proxies': '118.181.226.166:44640', 'types': 'HTTP'}, {'proxies': '121.237.149.124:3000', 'types': 'HTTP'}, {'proxies': '218.86.200.26:8118', 'types': 'HTTPS'}......
100

每頁結果100條數據,無誤。

(三)選擇百度網站來對抓取得到的代理進行測試

爬取得到的免費代理中有許多都是不好用或不穩定的,還不能直接就進行存儲,因此需要先選擇一個站點來測試我們抓取到的代理是否能夠請求成功,我選擇了http://www.baidu.com來作為測試,只有能成功請求的代理才會講其加入到數據庫中去,請求失敗次數超過3次就丟棄。

對於檢測代理這種一般需要十幾秒甚至更長時間的事情來說,使用requests一個個排隊檢測顯然不合理,因此需要選擇異步請求庫aiohttp,關於異步協程的介紹,可以參考Python中異步協程的使用方法介紹,關於aiohttp的介紹,可以參考aiohttp中文文檔

主要的兩個關鍵字就是awaitasync,簡單地說就是在線程可能出現等待的地方A加一個await修飾,然后線程到這個地方的時候就不會在這干等着了,而是跑去執行其他任務B,等到等待的對象A有反應后,又立刻回來繼續A下面的其他任務,這時任務B就暫時擱置了。但await后面的對象必須是coroutine對象,或者是一個可以返回 coroutine 對象的生成器,或者是一個包含 __await 方法的對象返回的一個迭代器(這就是為什么不能直接在requests前面加await的原因)。而我們函數加上async修飾后,函數返回的對象就變成了coroutine對象了,於是就可以“無腦”地來添加awaitasync組合,當然,如果添加await的地方不是那種需要等待請求響應或等待數據上傳下載之類的會讓線程進入阻塞狀態的地方的話,是不會起到什么效果的,當然也不會有什么錯誤。

檢測代理的函數如下:

    # 測試代理  
    async def test_proxy(self, dic):
        ## 根據類型構造不同的代理及url
        if dic["types"] == "HTTP":
            test_url = "http://www.baidu.com/"
            prop = "http://" + dic["proxies"]
        else:
            test_url = "https://www.baidu.com/"
            prop = "https://" + dic["proxies"]
        ua = UserAgent()
        header = {'User-Agent': ua.random}
        # 異步協程請求
        async with aiohttp.ClientSession() as session:
            while True:
                try:
                    async with session.get(test_url, headers = header, proxy = prop, timeout = 15, verify_ssl=False) as resp:
                        if resp.status == 200:
                            self.success_test_count += 1
                            print(prop, "\033[5;36;40m===========>測試成功,寫入數據庫!=========%d次\033[;;m"%self.success_test_count)
                            await self.insert_to_mongo(dic) ## 調用寫入mongodb數據庫的函數
                            return
                except Exception as e:
                    print(prop, "==測試失敗,放棄==", e)
                    break

(四)選擇存儲的數據庫

考慮到以后還會對代理池進行進一步的維護,因此選擇使用MongoDB來進行存儲,插入數據時還可以很方便地避免重復,數據庫存儲函數如下:

    # 寫入MongoDB數據庫   
    async def insert_to_mongo(self, dic):
        db = self.client.Myproxies
        collection = db.proxies
        collection.update_one(dic,{'$set': dic}, upsert=True)   # 設置upsert=True,避免重復插入
        print("\033[5;32;40m插入記錄:" + json.dumps(dic), "\033[;;m")

(五)完整代碼

1.爬取階段使用代理的版本

最后,完整的代碼如下(這是在爬取代理的階段就使用代理進行請求的版本,因此我本機的ip被封了,所以不得不這么做,過程會比較慢,后面繼續貼出爬取數據階段不用代理,測試時在使用代理的版本),在三種解析庫中選擇了最開始說到的lxml進行解析:

import json
import time
import random
from fake_useragent import UserAgent
import asyncio
import aiohttp
# 避免出現RuntimeError錯誤
import nest_asyncio
nest_asyncio.apply()
from lxml import etree
import pymongo

class Get_prox:
    def __init__(self):
        # 初始化,連接MongoDB
        self.client = pymongo.MongoClient('mongodb://localhost:27017/')
        self.success_get_count = 0
        self.success_test_count = 0
    
    # 使用代理時,獲取頁面
    async def get_page(self, session, url):
        ## 一個隨機生成請求頭的庫        
        ua = UserAgent()
        header = {'User-Agent': ua.random}
        # 從本地文件獲取代理池
        proxies_pool = self.get_proxies()
        while True:
            try:
                # 由於我一開始操作不慎ip被封禁了,因此在一開始抓取ip時我不得不使用了自己從
                # 其他網站抓來的一批代理(如問題描述中所述),一共有5999條代理,每次隨機選取一條
                p = 'http://' + random.choice(proxies_pool)
                async with session.get(url, headers = header, proxy = p, timeout = 10) as response:
                    await asyncio.sleep(2)
                    if response.status == 200:
                        self.success_get_count += 1
                        print("\033[5;36;40m----------------------請求成功-------------------%d次\033[;;m"%self.success_get_count)
                        return await response.text()
                    else:
                        print("\033[5;31;m", response.status, "\033[;;m")
                        continue
            except Exception as e:
                print("請求失敗orz", e)    
        
    # 任務
    async def get(self, url):
        async with aiohttp.ClientSession() as session:
            html = await self.get_page(session, url)
            await self.get_detail(html)
    
    # 測試代理  
    async def test_proxy(self, dic):
        ## 根據類型構造不同的代理及url
        if dic["types"] == "HTTP":
            test_url = "http://www.baidu.com/"
            prop = "http://" + dic["proxies"]
        else:
            test_url = "https://www.baidu.com/"
            prop = "https://" + dic["proxies"]
        ua = UserAgent()
        header = {'User-Agent': ua.random}
        # 異步協程請求
        async with aiohttp.ClientSession() as session:
            while True:
                try:
                    async with session.get(test_url, headers = header, proxy = prop, timeout = 15, verify_ssl=False) as resp:
                        if resp.status == 200:
                            self.success_test_count += 1
                            print(prop, "\033[5;36;40m===========>測試成功,寫入數據庫!=========%d次\033[;;m"%self.success_test_count)
                            await self.insert_to_mongo(dic) ## 調用寫入mongodb數據庫的函數
                            return
                except Exception as e:
                    print(prop, "==測試失敗,放棄==", e)
                    break
    
    # 獲取代理池
    def get_proxies(self):
        with open("proxies.txt", "r") as f:
            ls = json.loads(f.read())
        return ls
    
    # 使用lxml爬取 
    async def get_detail(self, html):
        html = etree.HTML(html)
        dic = {}
        ip = html.xpath('//tr[@class="odd" or @class=""]/td[2]/text()')
        port = html.xpath('//tr[@class="odd" or @class=""]/td[3]/text()')
        types = html.xpath('//tr[@class="odd" or @class=""]/td[6]/text()')
        for i in range(len(ip)):
            dic['proxies'] = ip[i] + ":" + port[i]
            dic['types'] = types[i]
            await self.test_proxy(dic)
        
    # 寫入MongoDB數據庫   
    async def insert_to_mongo(self, dic):
        db = self.client.Myproxies
        collection = db.proxies
        collection.update_one(dic,{'$set': dic}, upsert=True)   # 設置upsert=True,避免重復插入
        print("\033[5;32;40m插入記錄:" + json.dumps(dic), "\033[;;m")

    
# 主線程
if __name__ == "__main__":
    urls = []
    start = time.time()
    # 抓取前10頁數據
    for i in range(1, 11):
        urls.append("http://www.xicidaili.com/nn/" + str(i))
    c = Get_prox()
    # 創建10個未來任務對象
    tasks = [asyncio.ensure_future(c.get(url)) for url in urls]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    end = time.time()
    total = (end - start)/60.0
    print("完成,總耗時:", total, "分鍾!")

執行過程會打印許多日志,部分日志如下:

不管是請求過程還是測試過程,代理ip的請求成功率都很低,完全執行完需要等一段時間,完成后顯示耗時47分鍾。

翻一下日志,看到最后一條成功插入的數據顯示是第8次。。。。。

到數據庫中看一下,這是我重復運行了多次后數據庫中的數據,也僅插入了50條:

2.爬取階段不使用代理的版本

繼續貼出爬取數據階段不使用代理的版本,即使用requests爬取,測試時再用aiohttp,省去了第一階段篩選代理過程的等待時間。

import json
import time
import requests
from fake_useragent import UserAgent
import asyncio
import aiohttp
# 避免出現RuntimeError錯誤
import nest_asyncio
nest_asyncio.apply()
from lxml import etree
import pymongo

class Get_prox:
    def __init__(self):
        # 初始化,連接MongoDB
        self.client = pymongo.MongoClient('mongodb://localhost:27017/')
        self.success_get_count = 0
        self.success_test_count = 0            
                
    # 不使用代理時,獲取頁面
    def get_page(self, url):
        ## 一個隨機生成請求頭的庫        
        ua = UserAgent()
        header = {'User-Agent': ua.random}
        while True:
            try:
                response = requests.get(url, headers = header, timeout = 10)
                time.sleep(1.5)
                if response.status_code == 200:
                    self.success_get_count += 1
                    print("\033[5;36;40m----------------------請求成功-------------------%d次\033[;;m"%self.success_get_count)
                    return response.text
                else:
                    print("\033[5;31;m", response.status_code, "\033[;;m")
                    continue
            except Exception as e:
                print("請求失敗orz", e)
        
    # 任務
    def get(self, urls):
        htmls = []
        # 先將抓取的頁面都存入列表中
        for url in urls:
            htmls.append(self.get_page(url))
        # 測試代理使用異步
        tasks = [asyncio.ensure_future(self.get_detail(html)) for html in htmls]
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
    
    # 測試代理  
    async def test_proxy(self, dic):
        ## 根據類型構造不同的代理及url
        if dic["types"] == "HTTP":
            test_url = "http://www.baidu.com/"
            prop = "http://" + dic["proxies"]
        else:
            test_url = "https://www.baidu.com/"
            prop = "https://" + dic["proxies"]
        ua = UserAgent()
        header = {'User-Agent': ua.random}
        # 異步協程請求
        async with aiohttp.ClientSession() as session:
            while True:
                try:
                    async with session.get(test_url, headers = header, proxy = prop, timeout = 15, verify_ssl=False) as resp:
                        if resp.status == 200:
                            self.success_test_count += 1
                            print(prop, "\033[5;36;40m===========>測試成功,寫入數據庫!=========%d次\033[;;m"%self.success_test_count)
                            await self.insert_to_mongo(dic) ## 調用寫入mongodb數據庫的函數
                            return
                except Exception as e:
                    print(prop, "==測試失敗,放棄==", e)
                    break
    
    # 使用lxml爬取 
    async def get_detail(self, html):
        html = etree.HTML(html)
        dic = {}
        ip = html.xpath('//tr[@class="odd" or @class=""]/td[2]/text()')
        port = html.xpath('//tr[@class="odd" or @class=""]/td[3]/text()')
        types = html.xpath('//tr[@class="odd" or @class=""]/td[6]/text()')
        for i in range(len(ip)):
            dic['proxies'] = ip[i] + ":" + port[i]
            dic['types'] = types[i]
            await self.test_proxy(dic)
        
    # 寫入MongoDB數據庫   
    async def insert_to_mongo(self, dic):
        db = self.client.Myproxies
        collection = db.proxies
        collection.update_one(dic,{'$set': dic}, upsert=True)   # 設置upsert=True,避免重復插入
        print("\033[5;32;40m插入記錄:" + json.dumps(dic) + "\033[;;m")

    
# 主線程
if __name__ == "__main__":
    urls = []
    start = time.time()
    # 抓取前10頁數據
    for i in range(1, 11):
        urls.append("http://www.xicidaili.com/nn/" + str(i))
    c = Get_prox()
    c.get(urls)
    end = time.time()
    total = (end - start)/60.0
    print("完成,總耗時:", total, "分鍾!")

經其他小伙伴測得結果截圖如下:

爬取階段的10次請求都非常順利。

最后總耗時19分鍾,可見前面爬取階段不去篩選免費代理確實能省下許多時間!

四.問題及解決

(一)ip地址被封禁

由於一開始在使用lxml解析庫探索解析規則時,為了方便沒有去設置休眠時間,后來由於疏忽,把抓取頁數調大后忘記了設置休眠時間,結果在抓取了幾次后,發現日志中的輸出內容變成如下信息:

{"proxies": "121.237.148.195:3000", "types": "HTTP"}
{"proxies": "121.234.31.44:8118", "types": "HTTPS"}
{"proxies": "117.88.4.63:3000", "types": "HTTP"}
{"proxies": "222.95.144.58:3000", "types": "HTTP"}
發生錯誤:  'NoneType' object has no attribute 'xpath'
發生錯誤:  'NoneType' object has no attribute 'xpath'
發生錯誤:  'NoneType' object has no attribute 'xpath'
發生錯誤:  'NoneType' object has no attribute 'xpath'
發生錯誤:  'NoneType' object has no attribute 'xpath'
發生錯誤:  'NoneType' object has no attribute 'xpath'
......

終止程序后將response獲取的狀態碼打印出現,結果如下:

503
503
503
503
503
...


同時通過瀏覽器我也無法進入到該網站了,由此可以得出因爬取次數過多,我的IP已經被網頁封禁了。

  • 解決辦法

一開始我是選擇直接在其他免費的代理ip獲取網站選幾個ip過來,但發現免費的代理ip中有非常大的比例都是不能用的,使用網上現有項目構建代理ip池在環境和依賴的配置上比較耗時間,所以我直接去了66免費代理網,使用了該網站的免費ip提取功能,提取了6000個代理ip:


點擊提取后直接得到一個包含6000條代理信息的頁面,然后就可以寫個簡單的程序直接將此頁面上生成的6000條(實際抓取5999條)代理信息抓取到本地文件中:

response1 = requests.get("http://www.66ip.cn/mo.php?sxb=&tqsl=6000&port=&export=&ktip=&sxa=&submit=%CC%E1++%C8%A1&textarea=")
html = response1.text
print(response1.status_code == 200)
pattern = re.compile("br />(.*?)<", re.S)

items = re.findall(pattern, html)
for i in range(len(items)):
    items[i] = items[i].strip()
print(len(items))
with open("proxies.txt", "w") as f:
    f.write(json.dumps(items))

接着在爬蟲程序中讀取這個文件作為一個代理池:

# 獲取代理池
    def get_proxies(self):
        with open("proxies.txt", "r") as f:
            ls = json.loads(f.read())
        return ls

然后每次請求都隨機從代理池中選擇一個代理:

def get_page(ls):
    url = []
    ua = UserAgent()
    with open("proxies.txt", "r") as f:
        ls = json.loads(f.read())
    for i in range(1, page+1):
        url.append("http://www.xicidaili.com/nn/" + str(i))
    count = 1
    errcount = 1
    for u in url:
        while True:
            try:
                header = {'User-Agent': ua.random}
                handler = {'http': 'http://' + random.choice(ls)}
                response = requests.get(u, headers = header, proxies = handler, timeout = 10)
                time.sleep(1)
                get_detail(response.text)
                if response.status_code == 200:
                    print("選取ip:", handler, "請求成功---------------------------第%d次"%count)
                    count += 1
                else:
                    continue
                break
            except:
                print("選取ip:", handler, ", 第%d請求發生錯誤"%errcount)
                errcount += 1

但是這樣做有一個問題,就是線程在調度任務時每次只能負責一個,而代理中有許多ip都是不好用 的,導致每一次嘗試都要花費好幾秒的時間,而且大部分情況下請求都有出錯。

為了解決這個問題,就不能再選擇用單線程單步調度的方式來抓取頁面了,因此我選擇使用了異步請求庫aiohttp

參考了文章Python中異步協程的使用方法介紹以及aiohttp中文文檔后,我學着創建一個帶有10個任務(抓取10個頁面的任務)的協程對象來實現異步協程調度,使得每當線程遇到請求時,無需等待請求任務,而可以調度下一個任務,當10個請求都成功時,就能進入下一個調用的函數了,這樣總的時間消耗就可以縮小約10倍了,實現方法如下(未列出所有函數):

    # 使用代理時,獲取頁面
    async def get_page(self, session, url):
        ## 一個隨機生成請求頭的庫        
        ua = UserAgent()
        header = {'User-Agent': ua.random}
        # 從本地文件獲取代理池
        proxies_pool = self.get_proxies()
        while True:
            try:
                # 由於我一開始操作不慎ip被封禁了,因此在一開始抓取ip時我不得不使用了自己從
                # 其他網站抓來的一批代理(如問題描述中所述),一共有5999條代理,每次隨機選取一條
                p = 'http://' + random.choice(proxies_pool)
                async with session.get(url, headers = header, proxy = p, timeout = 10) as response:
                    await asyncio.sleep(2)
                    if response.status == 200:
                        self.success_get_count += 1
                        print("\033[5;36;40m----------------------請求成功-------------------%d次\033[;;m"%self.success_get_count)
                        return await response.text()
                    else:
                        print("\033[5;31;m", response.status, "\033[;;m")
                        continue
            except Exception as e:
                print("請求失敗orz", e)    
        
    # 任務
    async def get(self, url):
        async with aiohttp.ClientSession() as session:
            html = await self.get_page(session, url)
            await self.get_detail(html)
# 主線程
if __name__ == "__main__":
    urls = []
    start = time.time()
    # 抓取前10頁數據
    for i in range(1, 11):
        urls.append("http://www.xicidaili.com/nn/" + str(i))
    c = Get_prox()
    # 創建10個未來任務對象
    tasks = [asyncio.ensure_future(c.get(url)) for url in urls]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    end = time.time()
    total = (end - start)/60.0
    print("完成,總耗時:", total, "分鍾!")

抓取過程部分打印日志如下,可見,代理的請求成功機率是很低的,接下來就是耐心等待了:

(二)異步操作出現錯誤RuntimeError錯誤

一開始使用異步協程運行程序時,控制台輸出了如下的錯誤日志:

RuntimeError: asyncio.run() cannot be called from a running event loop

上網查了解決方案,在程序開頭加入:

import nest_asyncio
nest_asyncio.apply()

之后就不會報錯了,具體原因不明。

五.可進一步改善的地方

  • 如果不是因為ip被封禁,爬取代理階段直接用requests就好了,注意設置好休眠時間。然后其實爬取代理的時候可以同時對多個不同的代理網站進行爬取,這樣就可以把異步請求的機制也結合進來了。比如創建多個任務,每個任務分別針對不同的網站使用requests進行請求,然后將這些任務加入到異步協程的事件循環中去。
  • 我的做法僅是將代理存入本地的數據庫,是靜態的,網絡上還有許多的代理池項目都是可以動態維護的,有web界面、api接口等,但實現方法也更加復雜,后續可進一步深入學習。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM