python搭建代理IP池


自己構建代理池,從各種代理服務網站中獲取代理 IP,並檢測其可用性(使用一個穩定的網址來檢測,最好是自己將要爬取的網站),再保存到數據庫中,需要使用的時候再調用

代碼地址:鏈接:https://pan.baidu.com/s/19qFHwYHYR6SLXCMAxry9pQ        提取碼:gxeb

1.獲取IP

使用的庫:requests、pyquery

幾家免費的代理服務網站:

創建crawler.py文件

import json
import re
from  Proxy_pool.utils import get_page
from pyquery import PyQuery as pq

class ProxyMetaclass(type):
    def __new__(cls, name, bases, attrs):
        count = 0
        attrs['__CrawlFunc__'] = []
        for k, v in attrs.items():
            if 'crawl_' in k:
                attrs['__CrawlFunc__'].append(k)
                count += 1
        attrs['__CrawlFuncCount__'] = count
        return type.__new__(cls, name, bases, attrs)

class Crawler(object, metaclass=ProxyMetaclass):
    def get_proxies(self, callback):
        proxies = []
        for proxy in eval("self.{}()".format(callback)):
            print('成功獲取到代理', proxy)
            proxies.append(proxy)
        return proxies

    def crawl_daili66(self, page_count=4):
        """
        獲取代理66
        :param page_count: 頁碼
        :return: 代理
        """
        start_url = 'http://www.66ip.cn/{}.html'
        urls = [start_url.format(page) for page in range(1, page_count + 1)]
        for url in urls:
            print('Crawling', url)
            html = get_page(url)
            if html:
                doc = pq(html)
                trs = doc('.containerbox table tr:gt(0)').items()
                for tr in trs:
                    ip = tr.find('td:nth-child(1)').text()
                    port = tr.find('td:nth-child(2)').text()
                    yield ':'.join([ip, port])

    def crawl_xicidaili(self):
        for i in range(1, 3):
            start_url = 'http://www.xicidaili.com/nn/{}'.format(i)
            headers = {
                'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
                'Cookie':'_free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJWRjYzc5MmM1MTBiMDMzYTUzNTZjNzA4NjBhNWRjZjliBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMUp6S2tXT3g5a0FCT01ndzlmWWZqRVJNek1WanRuUDBCbTJUN21GMTBKd3M9BjsARg%3D%3D--2a69429cb2115c6a0cc9a86e0ebe2800c0d471b3',
                'Host':'www.xicidaili.com',
                'Referer':'http://www.xicidaili.com/nn/3',
                'Upgrade-Insecure-Requests':'1',
            }
            html = get_page(start_url, options=headers)
            if html:
                find_trs = re.compile('<tr class.*?>(.*?)</tr>', re.S)
                trs = find_trs.findall(html)
                for tr in trs:
                    find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>') 
                    re_ip_address = find_ip.findall(tr)
                    find_port = re.compile('<td>(\d+)</td>')
                    re_port = find_port.findall(tr)
                    for address,port in zip(re_ip_address, re_port):
                        address_port = address+':'+port
                        yield address_port.replace(' ','')

    def crawl_ip3366(self):
        for i in range(1, 4):
            start_url = 'http://www.ip3366.net/?stype=1&page={}'.format(i)
            html = get_page(start_url)
            if html:
                find_tr = re.compile('<tr>(.*?)</tr>', re.S)
                trs = find_tr.findall(html)
                for s in range(1, len(trs)):
                    find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>')
                    re_ip_address = find_ip.findall(trs[s])
                    find_port = re.compile('<td>(\d+)</td>')
                    re_port = find_port.findall(trs[s])
                    for address,port in zip(re_ip_address, re_port):
                        address_port = address+':'+port
                        yield address_port.replace(' ','')

    def crawl_kuaidaili(self):
        for i in range(1, 4):
            start_url = 'http://www.kuaidaili.com/free/inha/{}/'.format(i)
            html = get_page(start_url)
            if html:
                ip_address = re.compile('<td data-title="IP">(.*?)</td>') 
                re_ip_address = ip_address.findall(html)
                port = re.compile('<td data-title="PORT">(.*?)</td>')
                re_port = port.findall(html)
                for address,port in zip(re_ip_address, re_port):
                    address_port = address+':'+port
                    yield address_port.replace(' ','')

可以自己添加要獲取的ip代理

創建utils.py文件

import requests
from requests.exceptions import ConnectionError

base_headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36',
    'Accept-Encoding': 'gzip, deflate, sdch',
    'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7'
}


def get_page(url, options={}):
    """
    抓取代理
    :param url:
    :param options:
    :return:
    """
    headers = dict(base_headers, **options)
    print('正在抓取', url)
    try:
        response = requests.get(url, headers=headers)
        print('抓取成功', url, response.status_code)
        if response.status_code == 200:
            return response.text
    except ConnectionError:
        print('抓取失敗', url)
        return None

抓取網頁內容的方法,訪問鏈接成功后返回整個網頁 HTML 內容,便於后續對網頁具體內容的提取。封裝成一個方法,讓上面的 crawler 在抓取各個網站時調用

創建getter.py文件

from  Proxy_pool.crawler import Crawler
from  Proxy_pool.db import MySqlClient
from  Proxy_pool.setting import *
import sys

class Getter():
    def __init__(self):
        self.mysql = MySqlClient()
        self.crawler = Crawler()

    def is_over_threshold(self):
        """
        判斷是否達到了代理池限制
        """
        if self.mysql.count() >= POOL_UPPER_THRESHOLD:
            return True
        else:
            return False
    
    def run(self):
        print('獲取器開始執行')
        if not self.is_over_threshold():
            for callback_label in range(self.crawler.__CrawlFuncCount__):
                callback = self.crawler.__CrawlFunc__[callback_label]
                # 獲取代理
                all_ip = self.crawler.get_proxies(callback)
                sys.stdout.flush()
                for ip in all_ip:
                    self.mysql.add(ip)

結果如下:

 已經獲取到ip代理,蛋不知道是否可用,也沒有保存

2.保存獲取到的IP

現在本地或服務器部署mysql,具體部署方法請自行查找。

創建數據庫test
CREATE database test;
進入數據庫test
use test;
創建表PROXY
create table PROXY (IP VARCHAR(255),SCORE VARCHAE(255));

創建setting.py文件

先在一個文件中定義一些配置信息,如數據庫的設置、一些不變量如滿分的數值等

# 數據庫地址
HOST = '192.168.98.128'  # 填寫要連接數據庫的ip
# MySql端口
MYSQL_PORT = 3306
# MySQl用戶名、密碼
MYSQL_USERNAME = 'root'
MYSQL_PASSWORD = 'RZXrzx1218'
# 數據庫名
SQL_NAME = 'test'
# MAX_SCORE、MIN_SCORE、INITIAL_SCORE 分別代表最大分數、最小分數、初始分數
# 代理等級
MAX_SCORE = 30
MIN_SCORE = 0
INITIAL_SCORE = 10

VALID_STATUS_CODES = [200, 302]

# 代理池數量界限
POOL_UPPER_THRESHOLD = 1000

# 檢查周期
TESTER_CYCLE = 20
# 獲取周期
GETTER_CYCLE = 300

# 測試API,建議抓哪個網站測哪個
TEST_URL = 'http://www.baidu.com'

# API配置
API_HOST = '0.0.0.0'
API_PORT = 5555

# 開關
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True

# 最大批測試量
BATCH_TEST_SIZE = 30

創建db.py文件

定義一個類來操作數據庫的有序集合,內含一些方法來實現分數的設置、代理的獲取等

import pymysql
from Proxy_pool.error import PoolEmptyError
from Proxy_pool.setting import *
from random import choice
import re


class MySqlClient(object):
    # 初始化
    def __init__(self, host=HOST, port=MYSQL_PORT, username=MYSQL_USERNAME, password=MYSQL_PASSWORD, sqlname=SQL_NAME):
        self.db = pymysql.connect(host=host, user=username, password=password, port=port, db=sqlname)
        self.cursor = self.db.cursor()

    # 添加代理IP
    def add(self, ip, score=INITIAL_SCORE):
        sql_add = "INSERT INTO PROXY (IP,SCORE) VALUES ('%s', %s)" % (ip, score)
        if not re.match('\d+\.\d+\.\d+\.\d+\:\d+', ip):
            print('代理不符合規范', ip, '丟棄')
            return
        if not self.exists(ip):
            self.cursor.execute(sql_add)
            self.db.commit()
            return True

    # 減少代理分數
    def decrease(self, ip):
        sql_get = "SELECT * FROM PROXY WHERE IP='%s'" % (ip)
        self.cursor.execute(sql_get)
        score = self.cursor.fetchone()[1]
        if score and score > MIN_SCORE:
            print('代理', ip, '當前分數', score, '減 1')
            sql_change = "UPDATE PROXY SET SCORE = %s WHERE IP = '%s'" % (score-1, ip)
        else:
            print('代理', ip, '當前分數', score, '移除')
            sql_change = "DELETE FROM PROXY WHERE IP = %s" % (ip)
        self.cursor.execute(sql_change)
        self.db.commit()

    # 分數最大化
    def max(self, ip):
        print('代理', ip, '可用,設置為', MAX_SCORE)
        sql_max = "UPDATE PROXY SET SCORE = %s WHERE IP = '%s'" % (MAX_SCORE, ip)
        self.cursor.execute(sql_max)
        self.db.commit()
        
    # 隨機獲取有效代理
    def random(self):
        # 先從滿分中隨機選一個
        sql_max = "SELECT * FROM PROXY WHERE SCORE=%s" % (MAX_SCORE)
        if self.cursor.execute(sql_max):
            results = self.cursor.fetchall()
            return choice(results)[0]
        # 沒有滿分則隨機選一個
        else:
            sql_all = "SELECT * FROM PROXY WHERE SCORE BETWEEN %s AND %s" % (MIN_SCORE, MAX_SCORE)
            if self.cursor.execute(sql_all):
                results = self.cursor.fetchall()
                return choice(results)[0]
            else:
                raise PoolEmptyError

    # 判斷是否存在
    def exists(self, ip):
        sql_exists = "SELECT 1 FROM PROXY WHERE IP='%s' limit 1" % ip
        return self.cursor.execute(sql_exists)
        
    # 獲取數量
    def count(self):
        sql_count = "SELECT * FROM PROXY"
        return self.cursor.execute(sql_count)

    # 獲取全部
    def all(self):
        self.count()
        return self.cursor.fetchall()

    # 批量獲取
    def batch(self, start, stop):
        sql_batch = "SELECT * FROM PROXY LIMIT %s, %s" % (start, stop - start)
        self.cursor.execute(sql_batch)
        return self.cursor.fetchall()

結果:

 3.檢測IP

創建tester.py文件

import asyncio
import aiohttp
import time
import sys
from aiohttp import ClientError
from  Proxy_pool.db import MySqlClient
from  Proxy_pool.setting import *


class Tester(object):
    def __init__(self):
        self.mysql = MySqlClient()
    
    async def test_single_ip(self, ip):
        """
        測試單個代理
        :param ip:
        :return:
        """
        conn = aiohttp.TCPConnector(verify_ssl=False)
        async with aiohttp.ClientSession(connector=conn) as session:
            try:
                if isinstance(ip, bytes):
                    ip = ip.decode('utf-8')
                real_ip = 'http://' + ip
                print('正在測試', ip)
                async with session.get(TEST_URL, proxy=real_ip, timeout=15, allow_redirects=False) as response:
                    if response.status in VALID_STATUS_CODES:
                        self.mysql.max(ip)
                        print('代理可用', ip)
                    else:
                        self.mysql.decrease(ip)
                        print('請求響應碼不合法 ', response.status, 'IP', ip)
            except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError):
                self.mysql.decrease(ip)
                print('代理請求失敗', ip)
    
    def run(self):
        """
        測試主函數
        :return:
        """
        print('測試器開始運行')
        try:
            count = self.mysql.count()
            print('當前剩余', count, '個代理')
            for i in range(0, count, BATCH_TEST_SIZE):
                start = i
                stop = min(i + BATCH_TEST_SIZE, count)
                print('正在測試第', start + 1, '-', stop, '個代理')
                test_ip_group = self.mysql.batch(start, stop)
                loop = asyncio.get_event_loop()
                tasks = [self.test_single_ip(ip_tuple[0]) for ip_tuple in test_ip_group]
                loop.run_until_complete(asyncio.wait(tasks))
                sys.stdout.flush()
                time.sleep(5)
        except Exception as e:
            print('測試器發生錯誤', e.args)

結果:

 4.定義接口

創建api.py

from flask import Flask, g
from  Proxy_pool.db import MySqlClient

__all__ = ['app']

app = Flask(__name__)

def get_conn():
    if not hasattr(g, 'mysql'):
        g.mysql = MySqlClient()
    return g.mysql

@app.route('/')
def index():
    return '<h2>Welcome to Proxy Pool System</h2>'

@app.route('/random')
def get_proxy():
    """
    Get a proxy
    :return: 隨機代理
    """
    conn = get_conn()
    return conn.random()

@app.route('/count')
def get_counts():
    """
    Get the count of proxies
    :return: 代理池總量
    """
    conn = get_conn()
    return str(conn.count())

結果:

 

5.調度模塊

調用定義的獲取、存儲、檢測三個模塊,將這三個模塊通過多進程的形式運行起來

創建scheduler.py

import time
from multiprocessing import Process
from Proxy_pool.api import app
from Proxy_pool.getter import Getter
from Proxy_pool.tester import Tester
from Proxy_pool.db import MySqlClient
from Proxy_pool.setting import *


class Scheduler():
    def schedule_tester(self, cycle=TESTER_CYCLE):
        """
        定時測試代理
        """
        tester = Tester()
        while True:
            print('測試器開始運行')
            tester.run()
            time.sleep(cycle)
    
    def schedule_getter(self, cycle=GETTER_CYCLE):
        """
        定時獲取代理
        """
        getter = Getter()
        while True:
            print('開始抓取代理')
            getter.run()
            time.sleep(cycle)
    
    def schedule_api(self):
        """
        開啟API
        """
        app.run(API_HOST, API_PORT)
    
    def run(self):
        print('代理池開始運行')
        
        if TESTER_ENABLED:
            tester_process = Process(target=self.schedule_tester)
            tester_process.start()
        
        if GETTER_ENABLED:
            getter_process = Process(target=self.schedule_getter)
            getter_process.start()
        
        if API_ENABLED:
            api_process = Process(target=self.schedule_api)
            api_process.start()

分別判斷了三個模塊的開關,如果開啟的話,就新建一個 Process 進程,設置好啟動目標,然后調用 start() 方法運行,這樣三個進程就可以並行執行,互不干擾

6.啟動

創建run.py 方法

from Proxy_pool.scheduler import Scheduler
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
def main():
    try:
        s = Scheduler()
        s.run()
    except:
        main()
if __name__ == '__main__':
    main()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM