ip老被封怎么辦?


代理池設計
    獲取器:就是我們的爬蟲接口,抓取免費ip,這里我們為了后面的可擴展性,需要支持自由添加爬蟲進獲取器;
    數據庫:我們選擇Mongodb存放有效的代理,上面文章寫了關於Mongodb可擴展的封裝,我們這里直接搬來使用;
    調度器:主要是用於檢測爬蟲是否有效,並添加有效代理入庫,定制計划任務檢測庫中代理,控制爬蟲的啟動;
    Api:為了更方便的調用新的代理,我們使用flask做外部接口。   
    代理池得結構:

獲取器

我們打開百度輸入“免費ip”就可以看到很多提供免費ip的網站,這里我們選擇幻代理,66代理,快代理,西刺代理。

直接上代碼:

# coding=utf-8
# __author__ = 'yk'
import requests
from lxml import etree
from requests.exceptions import ConnectionError
def parse_url(url): 
    headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/54.0' } 
    try: 
        resp = requests.get(url, headers=headers) 
        if resp.status_code == 200: 
            return resp.text 
        return None 
    except ConnectionError: 
        print('Error.') 
        return None
def proxy_xici(): 
    url = 'http://www.xicidaili.com/' 
    resp = parse_url(url) 
    html = etree.HTML(resp) 
    ips = html.xpath('//*[@id="ip_list"]/tr/td[2]/text()') 
    ports = html.xpath('//*[@id="ip_list"]/tr/td[3]/text()') 
    for ip, port in zip(ips, ports): 
        proxy = ip + ':' + port 
        yield proxy
View Code

我們使用xpath解析出代理。

可擴展

我們需要抓取的網站有四個,未來肯定會更多,為了能夠更方便的擴展,我們需要寫一個元類。元類主要是控制代理獲取類的實現,在獲取類中加入兩個屬性。用於存放類里的每個網站爬蟲方法,以及所有爬蟲的數量。方便我們在調度器中調用:

# Spider/get_proxy.py
class ProxyMetaclass(type): 
    """ 元類,在ProxyGetter類中加入 __CrawlFunc__和__CrawlFuncCount__兩個屬性 
    分別表示爬蟲函數和爬蟲函數的數量 """ 
    def __new__(cls, name, bases, attrs): 
        count = 0 attrs['__CrawlFunc__'] = [] 
        for k in attrs.keys(): 
            if k.startswith('proxy_'): 
                attrs['__CrawlFunc__'].append(k) 
                count += 1 
                attrs['__CrawlFuncCount__'] = count 
                return type.__new__(cls, name, bases, attrs)
class ProxyGetter(object, metaclass=ProxyMetaclass): 
    def proxy_ip66(self): 
        pass 
    def proxy_xici(self): 
        pass 
    def proxy_kuai(self): 
        pass 
    def proxy_ihuan(self): 
        pass
View Code

class ProxyGetter(object, metaclass=ProxyMetaclass): 
    def get_raw_proxies(self, callback): 
        proxies = [] 
        for proxy in eval("self.{}()".format(callback)): 
            print('Getting', proxy, 'from', callback) 
            proxies.append(proxy) 
            return proxies

python內置的eval函數作用不知道的可以自行查找,這里做一個簡單解釋:

>>> m = 5
>>> n = 3
>>> eval('m') + eval('n')8

數據庫

數據庫我們直接使用上篇文章封裝的類,不過未免存入相同ip,我們需要判斷去重。

# Db/db.py...
def put(self, proxy): 
    """ 放置代理到數據庫 """ 
    num = self.proxy_num() + 1 
    if self.db[self.table].find_one({'proxy': proxy}): 
        self.delete(proxy) 
        self.db[self.table].insert({'proxy': proxy, 'num': num}) 
    else: 
        self.db[self.table].insert({'proxy': proxy, 'num': num})

# Schedule/tester.py
import asyncio
import aiohttp
from Db.db 
import MongodbClient
# 將配置信息放入配置文件config.py
from config import TEST_URL
class ProxyTester(object): 
    test_url = TEST_URL 
    def __init__(self): 
        self._raw_proxies = None 
    def set_raw_proxies(self, proxies): 
        # 供外部添加需要測試的代理 
        self._raw_proxies = proxies 
        self._conn = MongodbClient() 
        async 
    def test_single_proxy(self, proxy): 
    """ 測試一個代理,如果有效,將他放入usable-proxies """ 
    try: 
        async with aiohttp.ClientSession() as session: 
        try: 
            if isinstance(proxy, bytes): 
                proxy = proxy.decode('utf-8') 
                real_proxy = 'http://' + proxy 
                print('Testing', proxy) 
            async with session.get(self.test_url, proxy=real_proxy, timeout=10) as response: 
                if response.status == 200: 
                # 請求成功,放入數據庫 
                self._conn.put(proxy) 
                print('Valid proxy', proxy) 
        except Exception as e: 
            print(e) 
    except Exception as e: 
        print(e) 
    def test(self): 
        """ 異步測試所有代理 """ 
        print('Tester is working...') 
        try: loop = asyncio.get_event_loop() 
            tasks = [self.test_single_proxy(proxy) 
            for proxy in self._raw_proxies] 
            loop.run_until_complete(asyncio.wait(tasks)) 
        except ValueError: print('Async Error')
View Code

關於aiohttp的用法,大家可以參考這篇文章。也可以直接參考中文文檔

aiohttp大致的get請求方式如:

async with aiohttp.ClientSession() as session: 
    async with session.get(url, headers=headers, proxy=proxy, timeout=1) as r: 
        content, text = await r.read(), await r.text(encoding=None, errors='ignore')

test()方法是啟動測試的方法,使用asyncio庫。asyncio的編程模型就是一個消息循環。最后我們獲取一個EventLoop的引用:loop = asyncio.get_event_loop(),然后迭代出所有需要測試的代理,放入測試協程方法,將協程放入EventLoop中去執行。

這樣,一個異步測試的類就實現了。

# Schedule/adder.py
from Db.db 
import MongodbClient
from Spider.get_proxy import ProxyGetter
from .tester import ProxyTester
class PoolAdder(object): 
    """ 啟動爬蟲,添加代理到數據庫中 """ 
    def __init__(self, threshold): 
        self._threshold = threshold 
        self._conn = MongodbClient() 
        self._tester = ProxyTester() 
        self._crawler = ProxyGetter() 
    def is_over_threshold(self): 
        """ 判斷數據庫中代理數量是否達到設定閾值 """ 
        return True 
    if self._conn.get_nums >= self._threshold else False
    def add_to_pool(self): 
        """ 補充代理 """ 
        print('PoolAdder is working...') 
        proxy_count = 0 
        while not self.is_over_threshold(): 
            # 迭代所有的爬蟲,元類給ProxyGetter的兩個方法 
            # __CrawlFuncCount__是爬蟲數量,__CrawlFunc__是爬蟲方法 
            for callback_label in range(self._crawler.__CrawlFuncCount__): 
                callback = self._crawler.__CrawlFunc__[callback_label] 
                # 調用ProxyGetter()方法進行抓取代理 raw_proxies = self._crawler.get_raw_proxies(callback) 
                # 調用方法測試爬取到的代理 
                self._tester.set_raw_proxies(raw_proxies) 
                self._tester.test() proxy_count += len(raw_proxies) 
                if self.is_over_threshold(): 
                    print('Proxy is enough, waiting to be used...') 
                    break 
                if proxy_count == 0: 
                    print('The proxy source is exhausted.')
View Code

# Schedulr/schedule.py
import timefrom multiprocessing 
import Processfrom ProxyPool.db 
import MongodbClientfrom .tester 
import ProxyTesterfrom .adder 
import PoolAdderfrom config 
import VALID_CHECK_CYCLE, POOL_LEN_CHECK_CYCLE POOL_LOWER_THRESHOLD, POOL_UPPER_THRESHOLD
class Schedule(object): 
    @staticmethod 
    def valid_proxy(cycle=VALID_CHECK_CYCLE): 
    """ 從數據庫中拿到一半代理進行檢查 """ 
        conn = MongodbClient() 
        tester = ProxyTester() 
        while True: 
            print('Refreshing ip...') 
            # 調用數據庫,從左邊開始拿到一半代理 
            count = int(0.5 * conn.get_nums) 
            if count == 0: 
                print('Waiting for adding...') 
                time.sleep(cycle) 
                continue 
        raw_proxies = conn.get(count) 
        tester.set_raw_proxies(raw_proxies) 
        tester.test() 
        time.sleep(cycle) 
    @staticmethod 
    def check_pool(lower_threshold=POOL_LOWER_THRESHOLD, upper_threshold=POOL_UPPER_THRESHOLD, cycle=POOL_LEN_CHECK_CYCLE): 
            """ 如果代理數量少於最低閾值,添加代理 """ 
            conn = MongodbClient() 
            adder = PoolAdder(upper_threshold) 
            while True: 
                if conn.get_nums < lower_threshold: adder.add_to_pool() time.sleep(cycle) 
                def run(self): 
                    print('Ip Processing running...') 
                    valid_process = Process(target=Schedule.valid_proxy) 
                    check_process = Process(target=Schedule.check_pool) 
                    valid_process.start() 
                    check_process.start()
View Code

代碼中的幾個變量需要放置配置文件:

# config.py
# Pool 的低閾值和高閾值
POOL_LOWER_THRESHOLD = 10
POOL_UPPER_THRESHOLD = 40
# 兩個調度進程的周期
VALID_CHECK_CYCLE = 600
POOL_LEN_CHECK_CYCLE = 20

# run.py
from Api.api 
import appfrom Schedule.schedule 
import Scheduledef 
main(): 
    # 任務類的兩個周期進程就是整個調度器 
    s = Schedule() 
    s.run() 
    app.run()
if __name__ == '__main__': 
    main()

項目地址

github

可以直接下載到本地,切換到項目目錄。

安裝依賴:pip install -r requirements.txt

啟動:python run.py

示例

上面我們已經實現了我們的代理池,下面只是做一個基本的爬蟲調用。

首先我們啟動代理池:

>>> python run.py

如果沒有報錯則,並且顯示的信息正常,說明代理池已經啟動了。這是打開瀏覽器訪問http://127.0.0.1:5000/get就可以看到一條代理了。

最后爬蟲中調用:

import requests
def get_proxy(): 
    resp = requests.get('http://127.0.0.1:5000/get') 
    proxy = resp.text 
    ip = 'http://' + proxy 
    return ip
def get_resp(url): 
    ip = get_proxy() 
    proxy = {'http': 'http://{}'.format(ip)} 
    resp = requests.get(url, proxies=proxy) 
    if resp.status_code == 200: 
        print('success')

 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM