自己構建代理池,從各種代理服務網站中獲取代理 IP,並檢測其可用性(使用一個穩定的網址來檢測,最好是自己將要爬取的網站),再保存到數據庫中,需要使用的時候再調用
代碼地址:鏈接:https://pan.baidu.com/s/19qFHwYHYR6SLXCMAxry9pQ 提取碼:gxeb
1.獲取IP
使用的庫:requests、pyquery
幾家免費的代理服務網站:
創建crawler.py文件
import json import re from Proxy_pool.utils import get_page from pyquery import PyQuery as pq class ProxyMetaclass(type): def __new__(cls, name, bases, attrs): count = 0 attrs['__CrawlFunc__'] = [] for k, v in attrs.items(): if 'crawl_' in k: attrs['__CrawlFunc__'].append(k) count += 1 attrs['__CrawlFuncCount__'] = count return type.__new__(cls, name, bases, attrs) class Crawler(object, metaclass=ProxyMetaclass): def get_proxies(self, callback): proxies = [] for proxy in eval("self.{}()".format(callback)): print('成功獲取到代理', proxy) proxies.append(proxy) return proxies def crawl_daili66(self, page_count=4): """ 獲取代理66 :param page_count: 頁碼 :return: 代理 """ start_url = 'http://www.66ip.cn/{}.html' urls = [start_url.format(page) for page in range(1, page_count + 1)] for url in urls: print('Crawling', url) html = get_page(url) if html: doc = pq(html) trs = doc('.containerbox table tr:gt(0)').items() for tr in trs: ip = tr.find('td:nth-child(1)').text() port = tr.find('td:nth-child(2)').text() yield ':'.join([ip, port]) def crawl_xicidaili(self): for i in range(1, 3): start_url = 'http://www.xicidaili.com/nn/{}'.format(i) headers = { 'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Cookie':'_free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJWRjYzc5MmM1MTBiMDMzYTUzNTZjNzA4NjBhNWRjZjliBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMUp6S2tXT3g5a0FCT01ndzlmWWZqRVJNek1WanRuUDBCbTJUN21GMTBKd3M9BjsARg%3D%3D--2a69429cb2115c6a0cc9a86e0ebe2800c0d471b3', 'Host':'www.xicidaili.com', 'Referer':'http://www.xicidaili.com/nn/3', 'Upgrade-Insecure-Requests':'1', } html = get_page(start_url, options=headers) if html: find_trs = re.compile('<tr class.*?>(.*?)</tr>', re.S) trs = find_trs.findall(html) for tr in trs: find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>') re_ip_address = find_ip.findall(tr) find_port = re.compile('<td>(\d+)</td>') re_port = find_port.findall(tr) for address,port in zip(re_ip_address, re_port): address_port = address+':'+port yield address_port.replace(' ','') def crawl_ip3366(self): for i in range(1, 4): start_url = 'http://www.ip3366.net/?stype=1&page={}'.format(i) html = get_page(start_url) if html: find_tr = re.compile('<tr>(.*?)</tr>', re.S) trs = find_tr.findall(html) for s in range(1, len(trs)): find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>') re_ip_address = find_ip.findall(trs[s]) find_port = re.compile('<td>(\d+)</td>') re_port = find_port.findall(trs[s]) for address,port in zip(re_ip_address, re_port): address_port = address+':'+port yield address_port.replace(' ','') def crawl_kuaidaili(self): for i in range(1, 4): start_url = 'http://www.kuaidaili.com/free/inha/{}/'.format(i) html = get_page(start_url) if html: ip_address = re.compile('<td data-title="IP">(.*?)</td>') re_ip_address = ip_address.findall(html) port = re.compile('<td data-title="PORT">(.*?)</td>') re_port = port.findall(html) for address,port in zip(re_ip_address, re_port): address_port = address+':'+port yield address_port.replace(' ','')
可以自己添加要獲取的ip代理
創建utils.py文件
import requests from requests.exceptions import ConnectionError base_headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36', 'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7' } def get_page(url, options={}): """ 抓取代理 :param url: :param options: :return: """ headers = dict(base_headers, **options) print('正在抓取', url) try: response = requests.get(url, headers=headers) print('抓取成功', url, response.status_code) if response.status_code == 200: return response.text except ConnectionError: print('抓取失敗', url) return None
抓取網頁內容的方法,訪問鏈接成功后返回整個網頁 HTML 內容,便於后續對網頁具體內容的提取。封裝成一個方法,讓上面的 crawler 在抓取各個網站時調用
創建getter.py文件
from Proxy_pool.crawler import Crawler from Proxy_pool.db import MySqlClient from Proxy_pool.setting import * import sys class Getter(): def __init__(self): self.mysql = MySqlClient() self.crawler = Crawler() def is_over_threshold(self): """ 判斷是否達到了代理池限制 """ if self.mysql.count() >= POOL_UPPER_THRESHOLD: return True else: return False def run(self): print('獲取器開始執行') if not self.is_over_threshold(): for callback_label in range(self.crawler.__CrawlFuncCount__): callback = self.crawler.__CrawlFunc__[callback_label] # 獲取代理 all_ip = self.crawler.get_proxies(callback) sys.stdout.flush() for ip in all_ip: self.mysql.add(ip)
結果如下:
已經獲取到ip代理,蛋不知道是否可用,也沒有保存
2.保存獲取到的IP
現在本地或服務器部署mysql,具體部署方法請自行查找。
創建數據庫test
CREATE database test;
進入數據庫test
use test;
創建表PROXY
create table PROXY (IP VARCHAR(255),SCORE VARCHAE(255));
創建setting.py文件
先在一個文件中定義一些配置信息,如數據庫的設置、一些不變量如滿分的數值等
# 數據庫地址 HOST = '192.168.98.128' # 填寫要連接數據庫的ip # MySql端口 MYSQL_PORT = 3306 # MySQl用戶名、密碼 MYSQL_USERNAME = 'root' MYSQL_PASSWORD = 'RZXrzx1218' # 數據庫名 SQL_NAME = 'test' # MAX_SCORE、MIN_SCORE、INITIAL_SCORE 分別代表最大分數、最小分數、初始分數 # 代理等級 MAX_SCORE = 30 MIN_SCORE = 0 INITIAL_SCORE = 10 VALID_STATUS_CODES = [200, 302] # 代理池數量界限 POOL_UPPER_THRESHOLD = 1000 # 檢查周期 TESTER_CYCLE = 20 # 獲取周期 GETTER_CYCLE = 300 # 測試API,建議抓哪個網站測哪個 TEST_URL = 'http://www.baidu.com' # API配置 API_HOST = '0.0.0.0' API_PORT = 5555 # 開關 TESTER_ENABLED = True GETTER_ENABLED = True API_ENABLED = True # 最大批測試量 BATCH_TEST_SIZE = 30
創建db.py文件
定義一個類來操作數據庫的有序集合,內含一些方法來實現分數的設置、代理的獲取等
import pymysql from Proxy_pool.error import PoolEmptyError from Proxy_pool.setting import * from random import choice import re class MySqlClient(object): # 初始化 def __init__(self, host=HOST, port=MYSQL_PORT, username=MYSQL_USERNAME, password=MYSQL_PASSWORD, sqlname=SQL_NAME): self.db = pymysql.connect(host=host, user=username, password=password, port=port, db=sqlname) self.cursor = self.db.cursor() # 添加代理IP def add(self, ip, score=INITIAL_SCORE): sql_add = "INSERT INTO PROXY (IP,SCORE) VALUES ('%s', %s)" % (ip, score) if not re.match('\d+\.\d+\.\d+\.\d+\:\d+', ip): print('代理不符合規范', ip, '丟棄') return if not self.exists(ip): self.cursor.execute(sql_add) self.db.commit() return True # 減少代理分數 def decrease(self, ip): sql_get = "SELECT * FROM PROXY WHERE IP='%s'" % (ip) self.cursor.execute(sql_get) score = self.cursor.fetchone()[1] if score and score > MIN_SCORE: print('代理', ip, '當前分數', score, '減 1') sql_change = "UPDATE PROXY SET SCORE = %s WHERE IP = '%s'" % (score-1, ip) else: print('代理', ip, '當前分數', score, '移除') sql_change = "DELETE FROM PROXY WHERE IP = %s" % (ip) self.cursor.execute(sql_change) self.db.commit() # 分數最大化 def max(self, ip): print('代理', ip, '可用,設置為', MAX_SCORE) sql_max = "UPDATE PROXY SET SCORE = %s WHERE IP = '%s'" % (MAX_SCORE, ip) self.cursor.execute(sql_max) self.db.commit() # 隨機獲取有效代理 def random(self): # 先從滿分中隨機選一個 sql_max = "SELECT * FROM PROXY WHERE SCORE=%s" % (MAX_SCORE) if self.cursor.execute(sql_max): results = self.cursor.fetchall() return choice(results)[0] # 沒有滿分則隨機選一個 else: sql_all = "SELECT * FROM PROXY WHERE SCORE BETWEEN %s AND %s" % (MIN_SCORE, MAX_SCORE) if self.cursor.execute(sql_all): results = self.cursor.fetchall() return choice(results)[0] else: raise PoolEmptyError # 判斷是否存在 def exists(self, ip): sql_exists = "SELECT 1 FROM PROXY WHERE IP='%s' limit 1" % ip return self.cursor.execute(sql_exists) # 獲取數量 def count(self): sql_count = "SELECT * FROM PROXY" return self.cursor.execute(sql_count) # 獲取全部 def all(self): self.count() return self.cursor.fetchall() # 批量獲取 def batch(self, start, stop): sql_batch = "SELECT * FROM PROXY LIMIT %s, %s" % (start, stop - start) self.cursor.execute(sql_batch) return self.cursor.fetchall()
結果:
3.檢測IP
創建tester.py文件
import asyncio import aiohttp import time import sys from aiohttp import ClientError from Proxy_pool.db import MySqlClient from Proxy_pool.setting import * class Tester(object): def __init__(self): self.mysql = MySqlClient() async def test_single_ip(self, ip): """ 測試單個代理 :param ip: :return: """ conn = aiohttp.TCPConnector(verify_ssl=False) async with aiohttp.ClientSession(connector=conn) as session: try: if isinstance(ip, bytes): ip = ip.decode('utf-8') real_ip = 'http://' + ip print('正在測試', ip) async with session.get(TEST_URL, proxy=real_ip, timeout=15, allow_redirects=False) as response: if response.status in VALID_STATUS_CODES: self.mysql.max(ip) print('代理可用', ip) else: self.mysql.decrease(ip) print('請求響應碼不合法 ', response.status, 'IP', ip) except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError): self.mysql.decrease(ip) print('代理請求失敗', ip) def run(self): """ 測試主函數 :return: """ print('測試器開始運行') try: count = self.mysql.count() print('當前剩余', count, '個代理') for i in range(0, count, BATCH_TEST_SIZE): start = i stop = min(i + BATCH_TEST_SIZE, count) print('正在測試第', start + 1, '-', stop, '個代理') test_ip_group = self.mysql.batch(start, stop) loop = asyncio.get_event_loop() tasks = [self.test_single_ip(ip_tuple[0]) for ip_tuple in test_ip_group] loop.run_until_complete(asyncio.wait(tasks)) sys.stdout.flush() time.sleep(5) except Exception as e: print('測試器發生錯誤', e.args)
結果:
4.定義接口
創建api.py
from flask import Flask, g from Proxy_pool.db import MySqlClient __all__ = ['app'] app = Flask(__name__) def get_conn(): if not hasattr(g, 'mysql'): g.mysql = MySqlClient() return g.mysql @app.route('/') def index(): return '<h2>Welcome to Proxy Pool System</h2>' @app.route('/random') def get_proxy(): """ Get a proxy :return: 隨機代理 """ conn = get_conn() return conn.random() @app.route('/count') def get_counts(): """ Get the count of proxies :return: 代理池總量 """ conn = get_conn() return str(conn.count())
結果:
5.調度模塊
調用定義的獲取、存儲、檢測三個模塊,將這三個模塊通過多進程的形式運行起來
創建scheduler.py
import time from multiprocessing import Process from Proxy_pool.api import app from Proxy_pool.getter import Getter from Proxy_pool.tester import Tester from Proxy_pool.db import MySqlClient from Proxy_pool.setting import * class Scheduler(): def schedule_tester(self, cycle=TESTER_CYCLE): """ 定時測試代理 """ tester = Tester() while True: print('測試器開始運行') tester.run() time.sleep(cycle) def schedule_getter(self, cycle=GETTER_CYCLE): """ 定時獲取代理 """ getter = Getter() while True: print('開始抓取代理') getter.run() time.sleep(cycle) def schedule_api(self): """ 開啟API """ app.run(API_HOST, API_PORT) def run(self): print('代理池開始運行') if TESTER_ENABLED: tester_process = Process(target=self.schedule_tester) tester_process.start() if GETTER_ENABLED: getter_process = Process(target=self.schedule_getter) getter_process.start() if API_ENABLED: api_process = Process(target=self.schedule_api) api_process.start()
分別判斷了三個模塊的開關,如果開啟的話,就新建一個 Process 進程,設置好啟動目標,然后調用 start() 方法運行,這樣三個進程就可以並行執行,互不干擾
6.啟動
創建run.py 方法
from Proxy_pool.scheduler import Scheduler import sys import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') def main(): try: s = Scheduler() s.run() except: main() if __name__ == '__main__': main()