網站采取的反爬蟲措施有:彈出驗證碼,需要登錄。檢測某個IP在單位時間內的請求次數,超過規定的某個值,服務器拒絕服務,返回一些錯誤信息,這是封IP。
既然服務器封IP,可采用某種方式偽裝IP,讓服務器不能識別由本機發起的請求,這樣來避免封IP。這時就需要使用到代理。
一、 代理的設置
代理有免費代理和付費代理。免費代理多數情況下不好用,付費代理比較靠譜。付費代理不用多,穩定可用即可。
西刺免費代理:http://www.xicidaili.com
現在獲取西刺網站上的免費代理IP做一個測試。IP獲取代碼如下所示:
1 import json 2 import pandas as pd 3 from selenium import webdriver 4 from selenium.webdriver.common.by import By 5 from selenium.webdriver.support.ui import WebDriverWait 6 from selenium.webdriver.support import expected_conditions as EC 7 8 URL = 'https://www.xicidaili.com/' 9 _FILENAME = "xicidailiip.json" 10 11 class GetXiciDailiIp(): 12 def __init__(self): 13 self.url = URL 14 self.browser = webdriver.Chrome() 15 self.wait = WebDriverWait(self.browser, 20) 16 self.http_ip_port = {} 17 18 def __del__(self): 19 self.browser.close() 20 21 def get_pagesource(self): 22 """ 23 獲取網頁源代碼 24 :return: 網頁源代碼 25 """ 26 self.browser.get(self.url) 27 self.wait.until(EC.presence_of_element_located((By.ID, "ip_list"))) 28 page_source = self.browser.page_source 29 return page_source 30 31 def parse_ip(self, page_source): 32 """ 33 使用pandas解析網頁中的IP地址 34 :param page_source: 網頁源代碼 35 :return: self.http_ip_port, 包含協議類型,IP地址及端口 36 """ 37 df1 = pd.read_html(page_source) # df1 是列表,df1[0] 才是 DataFrame 38 df2 = df1[0][[5, 1, 2]].dropna() # 選取 協議類型、IP地址、端口列后,去掉所有的NA值行 39 proto = list(df2[5]) # 獲取協議類型列,轉化成列表 40 ip = list(df2[1]) 41 port = list(df2[2]) 42 N = len(proto) 43 s = ['HTTP', 'HTTPS', 'socks4/5'] 44 for i in range(N): 45 if proto[i] in s: 46 ip_port = ip[i] + ":" + port[i] 47 if proto[i] in self.http_ip_port: 48 if ip_port not in self.http_ip_port[proto[i]]: 49 self.http_ip_port[proto[i]].append(ip_port) 50 else: 51 self.http_ip_port[proto[i]] = [ip_port] 52 else: 53 continue 54 return self.http_ip_port 55 56 def ipport_to_file(self, http_ip_port): 57 with open(_FILENAME, 'w') as f: 58 json.dump(http_ip_port, f) 59 60 def crack(self): 61 page_source = self.get_pagesource() 62 http_ip_port = self.parse_ip(page_source) 63 self.ipport_to_file(http_ip_port) 64 #return http_ip_port 65 66 67 if __name__ == "__main__": 68 crack = GetXiciDailiIp() 69 crack.crack()
1、 使用urllib代理設置
先使用最基礎的urllib,來了解下代理的設置方法,代碼如下所示:
1 import json 2 from urllib.error import URLError 3 from urllib.request import ProxyHandler, build_opener 4 import b2_get_xicidaili_ip as B2 5 6 ipportfile = B2._FILENAME # 保存的 IP 及 PORT 文件名稱 7 with open(ipportfile, 'r') as f: 8 ips_ports = json.load(f) 9 10 N = 0 11 while N < 20: 12 if ips_ports.get('HTTPS', None) and ips_ports.get('HTTP', None): 13 proxy_handler = ProxyHandler({ 14 'http': 'http://' + ips_ports['HTTP'][N], # http://171.83.165.125:9999 15 'https': 'https://' + ips_ports['HTTPS'][N], 16 }) 17 opener = build_opener(proxy_handler) 18 try: 19 response = opener.open('http://httpbin.org/get', timeout=30) 20 if response.status == 200: 21 print(response.read().decode('utf-8')) 22 break 23 else: 24 N += 1 25 continue 26 except URLError as e: 27 N += 1 28 print(e.reason)
運行結果如下所示:
{ "args": {}, "headers": { "Accept-Encoding": "identity", "Cache-Control": "max-age=259200", "Host": "httpbin.org", "User-Agent": "Python-urllib/3.6" }, "origin": "171.83.165.125, 171.83.165.125", "url": "https://httpbin.org/get" }
這里使用 ProxyHandler 設置代理,參數是字典類型,鍵名為協議類型,鍵值是代理IP及端口。在代理前面需要加上協議,即http或https。當請求的連接是 http 協議時,ProxyHandler 會調用 http 代理。當請求鏈接是 https 協議時,會調用 https 代理。這里生效的代理是 http://171.83.165.125:9999。
創建完 ProxyHandler 對象后,接下來利用 build_opener() 方法傳入該對象來創建一個 Opener,這樣相當於此 Opener 已經設置好代理。下面直接調用 Opnener 對象的 open() 方法,就可以訪問想要的鏈接。
運行輸出結果是一個 JSON,有一個字段是 origin,標明客戶端的 IP。經驗證,此IP確實為代理的IP,並不是真實的IP。這樣就成功設置好代理,並隱藏真實的IP。
如果是需要認證的代理,可用下面這樣的方法設置:
'http': 'http://' + "username:password@" + ips_ports['HTTP'][N],
其它不做修改。這是在代理前面加入代理認證的用戶名密碼即可。其中username是用戶名,password是密碼,例如 username是michael,密碼是 python,那么代理就是 michael:python@171.83.165.125:9999。
如果代理是 SOCKS5類型,可用下面方式設置代理:
1 import json, socks, socket 2 from urllib import request 3 from urllib.error import URLError 4 import b2_get_xicidaili_ip as B2 5 6 ipportfile = B2._FILENAME 7 with open(ipportfile, 'r') as f: 8 ips_ports = json.load(f) 9 10 N = 0 11 while N < 20: 12 if ips_ports.get('socks4/5', None): 13 ip, port = ips_ports['socks4/5'][N].split(":") 14 socks.set_default_proxy(socks.SOCKS5, ip, int(port)) 15 socket.socket = socks.socksocket 16 try: 17 response = request.urlopen('http://httpbin.org/get', timeout=30) 18 if response.status == 200: 19 print(response.read().decode('utf-8')) 20 break 21 else: 22 N += 1 23 continue 24 except URLError as e: 25 N += 1 26 print(e.reason)
這段代碼的運行,需要安裝 socks 模塊,可用下面命令進行安裝:
pip3 install PySocks
免費代理不好用,請求多次都不能成功。在真正需要代理的場景,還是搞個付費代理靠譜。請求成功的話,輸出與前面的一樣。
2、 requests代理設置
requests的代理設置很簡單,只要傳入 proxies 參數即可。設置方式如下:
1 import json, requests 2 import b2_get_xicidaili_ip as B2 3 4 ipportfile = B2._FILENAME # 保存的 IP 及 PORT 文件名稱 5 with open(ipportfile, 'r') as f: 6 ips_ports = json.load(f) 7 8 N = 0 9 while N < 20: 10 if ips_ports.get('HTTPS', None) and ips_ports.get('HTTP', None): 11 proxies = { 12 'http': 'http://' + ips_ports['HTTP'][N], 13 'https': 'https://' + ips_ports['HTTPS'][N], 14 } 15 try: 16 response = requests.get('https://www.baidu.com', proxies=proxies, timeout=30) 17 if response.status_code == 200: 18 print(response.text) 19 break 20 else: 21 N += 1 22 continue 23 except requests.exceptions.ConnectionError as e: 24 N += 1 25 print('Error', e.args)
請求成功后,輸出網頁的源代碼。requests的代理設置比 urllib簡單很多,只要構造代理字典,然后通過 proxies參數即可,不需要重新構建 Opener。如果代理需要認證,同樣在代理前加上用戶名密碼即可,寫法如下所示:
'http': 'http://' + "username:password" + ":" ips_ports['HTTP'][N],
username和password即是用戶名和密碼。如要使用SOCKS5代理,可使用如下方式來設置:
proxies = { 'http': 'socks5://' + ips_ports['HTTP'][N], 'https': 'socks5://' + ips_ports['HTTPS'][N], }
這里需要額外安裝一個模塊,叫作 requests[socks],安裝命令如下所示:
pip3 install 'requests[socks]'
還可以使用 socks 模塊設置代理,設置方法如下所示:
1 import requests, socks, socket 2
3 socks.set_default_proxy(socks.SOCKS5, '127.0.0.1', 8000) # IP 和端口可以改為代理網站上的IP和端口
4 socket.socket = socks.socksocket 5 try: 6 response = requests.get('http://httpbin.org/get') 7 print(response.text) 8 except requests.exceptions.ConnectionError as e: 9 print('Error', e.args)
用這種方法設置SOCKS5代理,運行結果是一樣的。此方法是全局設置。可以在不同情況下選用不同的方法。
3、 Selenium使用代理
Selenium設置代理有兩種方式:一是使用 Chrome ,有界面瀏覽器;另一種是使用PhantomJS的無界面瀏覽器。
3.1、 Chrome使用代理
對於Chrome,使用 Selenium設置代理方法很簡單,設置方法如下:
1 from selenium import webdriver 2 proxy = '171.83.165.139:9999'
3 chrome_options = webdriver.ChromeOptions() 4 chrome_options.add_argument('--proxy-server=http://' + proxy) 5 browser = webdriver.Chrome(chrome_options=chrome_options) 6 browser.get('http://httpbin.org/get')
這里使用 ChromeOptions() 設置代理,在創建 Chrome 對象時用 chrome_options 參數傳遞即可。運行代碼彈出Chrome瀏覽器,成功訪問目標網站則在頁面上顯示下面的信息:
{ "args": {}, "headers": { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Cache-Control": "max-age=259200", "Host": "httpbin.org", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36" }, "origin": "171.83.165.139, 171.83.165.139", "url": "https://httpbin.org/get" }
代理設置成功,origin就是代理IP的地址。
認證代理設置過程省略。
3.2、 PhantomJS使用代理
PhantomJS代理設置方法可借助 service_args 參數,也就是命令行參數。代理設置方法如下:
1 from selenium import webdriver 2 service_args = [ 3 '--proxy=171.83.165.139:9999', 4 '--proxy-type=http'
5 ] 6 browser = webdriver.PhantomJS(service_args=service_args) 7 browser.get('http://httpbin.org/get') 8 print(browser.page_source)
這里使用 serivce_args 參數,將命令行的一些參數定義為列表,在初始化時候傳遞給 PhantomJS對象即可。輸出如下所示:
<html><head></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">{ "args": {}, "headers": { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,en,*", "Cache-Control": "max-age=259200", "Host": "httpbin.org", "User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/538.1 (KHTML, like Gecko) PhantomJS/2.1.1 Safari/538.1" }, "origin": "171.83.165.139, 171.83.165.139", "url": "https://httpbin.org/get" } </pre></body></html>
輸出結果中的 origin 就是代理的IP地址。如果是認證代理,只需要在 service_args中加入 --proxy-auth選項即可,只需將參數改為下面這樣:
service_args = [
'--proxy=171.83.165.139:9999',
'--proxy-type=http',
'--proxy-auth=username:password'
]
二、 代理池的維護
在爬蟲的時候,有些代理IP是不可用的,可能因某個IP多次訪問同一個網站,造成該IP被封。這里需要提前做篩選,剔除掉不可用的代理,保留可用的代理。可搭建代理池來解決。
在開始之前,需要安裝 Redis數據庫並啟動服務,還需要安裝 aiohttp、requests、redis-py、pyquery、Flask庫。
1、 代理池目標
實現高效易用的代理池。基本模塊分為4塊:存儲模塊、獲取模塊、檢測模塊、接口模塊。
存儲模塊:負責存儲抓取下來的代理。首先要保證代理不重復, 要標識代理的可用情況,還要動態實時處理每個代理,所以一種比較高效和方便的存儲方式就是使用Redis 的Sorted Set ,即有序集合。
獲取模塊:定時在各大代理網站抓取代理。代理可以是免費公開代理也可以是付費代理,形式都是IP 加端口,為此盡量從不同來源獲取,盡量抓取高匿代理,抓取成功之后將可用代理保存到數據庫中。
檢測模塊: 定時檢測數據庫中的代理。需要設置一個檢測鏈接,最好是爬取哪個網站就檢測哪個網站,這樣更加有針對性,如果要做一個通用型的代理,那可以設置百度等鏈接來檢測。另外,需要標識每一個代理的狀態,如設置分數標識, 100 分代表可用,分數越少代表越不可用。檢測一次,如果代理可用,我們可以將分數標識立即設置為100滿分,也可以在原基礎上加1分;如果代理不可用,可以將分數標識減1分,當分數戚到一定闊值后,代理就直接從數據庫移除。通過這樣的標識分數,我們就可以辨別代理的可用情況,選用的時候會更有針對性。
接口模塊: 需要用API 來提供對外服務的接口。其實我們可以直接連接數據庫來取對應的數據,但是這樣就需要知道數據庫的連接信息,並且要配置連接,而比較安全和方便的方式就是提供一個Web API 接口,通過訪問接口即可拿到可用代理。另外,由於可用代理可能有多個,那么可以設置一個隨機返回某個可用代理的接口,這樣就能保證每個可用代理都可以取到,實現負載均衡。
2、 代理池的原理
代理池大致可分為4個模塊:存儲模塊、獲取模塊、檢測模塊、接口模塊。
存儲模塊:使用Redis的有序集合,用來做代理的去重和狀態標識,中心模塊和基礎模塊,將其他模塊串聯起來。
獲取模塊:定時從代理網站獲取代理,將獲取的代理傳遞給存儲模塊,並保存到數據庫。
檢測模塊:定時通過存儲模塊獲取所有代理,並對代理進行檢測,根據不同的檢測結果對代理設置不同的標識。
接口模塊:通過WebAPI提供服務接口,接口通過連接數據庫並通過Web 形式返回可用的代理。
3、 代理池的實現
經過上述分析,下面用代碼實現這4個模塊。
3.1、 存儲模塊
使用Redis有序集合,集合的每一個元素不重復,對於代理池來說,集合的元素就變成了一個個代理,就是IP加端口形式,如1.1.1.1:8000,集合的元素就是這種形式。此外,有序集合的每一個元素有一個分數字段,分數可以重復,可以是浮點數類型,也可以是整數類型。該集合會根據每一個元素的分數對集合進行排序,數值小的排前面,數據值大的排后面,這樣可實現集合元素的排序。
對於代理,這個分數可以作為判斷一個代理是否可用的標志,100分最高,代表最可用,0分最低,代表最不可用。如果要獲取可用代理,可從代理池中隨機獲取分數最高的代理,這樣可保證每個可用代理都會被調用到。
分數是判斷代理穩定性的重要標准,設置分數規則如下:
(1)、分數100為可用,檢測器定時循環檢測每個代理可用情況,一旦檢測到有可用代理就設置為100,檢測到不可用就將分數減1,分數減至0后代理移除。
(2)、新獲取的代理分數為10,如果測試可行,分數立即設置為100,不可行則分數減1,分數減至0后代理移除。
檢測到代理可用就將分數立即設置為100,這樣保證所有可用代理有更大機會被獲取到。立即設置為100而不是每次加1,是因為代理是從各大免費網站獲取的,一個代理並不穩定,5次請求,可能有3次都會失敗。所以請求成功后就設置為100,避免過多的去測試請求,這樣分數最高可用的機會也最大。
先定義一個RedisClient類來操作數據庫的有序集合,定義一些方法實現分數的設置、代理的獲取等。代碼如下所示:
1 MAX_SCORE = 100
2 MIN_SCORE = 0 3 INITIAL_SCORE = 10
4 REDIS_HOST = '192.168.64.50'
5 REDIS_PORT = 6379
6 REDIS_PASSWORD = None 7 REDIS_KEY = 'proxies'
8
9 import redis 10 from random import choice 11 import re 12
13 class RedisClient(object): 14 def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD): 15 """
16 初始化,參數是Reids的連接信息 17 :param host: Redis 地址 18 :param port: Redis 端口 19 :param password: Redis 密碼 20 """
21 self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True) 22
23 def add(self, proxy, score=INITIAL_SCORE): 24 """
25 添加代理,設置分數為最高,默認分數為INITIAL_SCORE 26 :param proxy: 代理 27 :param score: 分數 28 :return: 添加結果 29 """
30 if not re.match('\d+\.\d+\.\d+\.\d+:\d+', proxy): 31 print('代理不符合規范', proxy, '不添加') 32 return
33 if not self.db.zscore(REDIS_KEY, proxy): 34 return self.db.zadd(REDIS_KEY, {proxy: score}) 35
36 def random(self): 37 """
38 隨機獲取有效代理,首先嘗試獲取最高分數代理,如果最高分數不存在,則按照排名獲取,否則異常 39 :return: 隨機代理 40 """
41 result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE) # 選取分數最高的代理
42 if len(result): 43 return choice(result) 44 else: 45 result = self.db.zrevrange(REDIS_KEY, 0, 100) # 索引從0到100的代理
46 if len(result): 47 return choice(result) 48 else: 49 raise PoolEmptyError 50
51 def decrease(self, proxy): 52 """
53 代理無效時,代理分數值減一分,分數小於最小值,則代理刪除 54 :param proxy: 代理 55 :return: 修改后的代理分數 56 """
57 score = self.db.zscore(REDIS_KEY, proxy) 58 if score and score > MIN_SCORE: 59 print("代理", proxy, "當前分數", score, "減1") 60 return self.db.zincrby(REDIS_KEY, proxy, -1) 61 else: 62 print("代理", proxy, "當前分數", score, '移除') 63 return self.db.zrem(REDIS_KEY, proxy) 64
65 def exists(self, proxy): 66 """
67 判斷代理是否在集合中 68 :param proxy: 代理 69 :return: 是否存在 70 """
71 return not self.db.zscore(REDIS_KEY, proxy) == None 72
73 def max(self, proxy): 74 """
75 將代理設置為MAX_SCORE,代理有效時的設置 76 :param proxy: 代理 77 :return: 設置結束 78 """
79 print("代理", proxy, "可用,設置為", MAX_SCORE) 80 return self.db.zadd(REDIS_KEY, {proxy: MAX_SCORE}) 81
82 def count(self): 83 """
84 獲取當前集合的元素個數 85 :return:數量 86 """
87 return self.db.zcard(REDIS_KEY) # zcount(name, min, max)是在一個區間中的個數
88
89 def all(self): 90 """
91 獲取全部代理,檢測使用 92 :return: 全部代理列表 93 """
94 return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE) 95
96 def batch(self, start, stop): 97 """
98 批量獲取,獲取指定索引范圍內的代理,不是分數范圍 99 :param start: 開始索引 100 :param stop: 結束索引 101 :return: 代理列表 102 """
103 return self.db.zrevrange(REDIS_KEY, start, stop - 1)
這里首先定義一些常量,如MAX_SCORE、MIN_SCORE、INITIAL_SCORE代表最大分數、最小分數、初始分數。還有一些Redis的連接信息。REDIS_KEY是有序集合的鍵名,用來獲取代理存儲所使用的有序集合。
有了這些方法,在后面的模塊中調用這個類來連接和操作數據庫。要獲取隨機可用代理,調用 random()方法即可。
3.2、 獲取模塊
這里定義一個Crawler類從各大網站抓取代理,代碼如下:
1 import json, re 2 from .utils import get_page 3 from pyquery import PyQuery as pq 4
5 class ProxyMetaclass(type): 6 def __new__(cls, name, bases, attrs): 7 count = 0 8 attrs['__CrawlFunc__'] = [] 9 for k, v in attrs.items(): 10 if 'crawl_' in k: 11 attrs['__CrawlFunc__'].append(k) 12 count += 1
13 attrs['__CrawlFuncCount__'] = count 14 return type.__new__(cls, name, bases, attrs) 15
16 class Crawler(object, metaclass=ProxyMetaclass): 17 def get_proxies(self, callback): 18 proxies = [] 19 for proxy in eval("self.{}()".format(callback)): 20 print("成功獲取到代理", proxy) 21 proxies.append(proxy) 22 return proxies 23
24 def crawl_daili66(self, page_count=4): 25 """
26 獲取代理66 27 :param page_count: 頁碼 28 :return: 代理 29 """
30 start_url = 'http://www.66ip.cn/{}.html'
31 urls = [start_url.format(page) for page in range(1, page_count+1)] 32 for url in urls: 33 print('Crawling',url) 34 html = get_page(url) 35 if html: 36 doc = pq(html) 37 trs = doc('.containerbox table tr:gt(0)').items() 38 for tr in trs: 39 ip = tr.find('td:nth-child(1)').text() 40 port = tr.find('td:nth-child(2)').text() 41 yield ':'.join([ip, port]) 42
43 def crawl_xicidaili(self): 44 for i in range(1, 3): 45 start_url = 'http://www.xicidaili.com/nn/{}'.format(i) 46 headers = { 47 'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3', 48 'Cookie':'_free_proxy_session=BAh7B0kiD3Nlc3Npb25faWQGOgZFVEkiJTMxMTIyMjkwNDYzYjhlODY3MDY4NzI0NmViMzE1ZDFmBjsAVEkiEF9jc3JmX3Rva2VuBjsARkkiMStES2RNNXNIL0ZnTkdpaUNhUitRVDB5a29PbGloVW44Qzc0WWNrQ2Q1T3c9BjsARg%3D%3D--7d5fcaeb32843a5d36f977d5f5d6c68541017953; Hm_lvt_0cf76c77469e965d2957f0553e6ecf59=1555326928,1555382306,1555465785,1555569224; Hm_lpvt_0cf76c77469e965d2957f0553e6ecf59=1555569269', 49 'Host':'www.xicidaili.com', 50 'Referer':'http://www.xicidaili.com/nn/3', 51 'Upgrade-Insecure-Requests':'1', 52 } 53 html = get_page(start_url, options=headers) 54 if html: 55 find_trs = re.compile('<tr class.*?>(.*?)</tr>', re.S) 56 trs = find_trs.findall(html) 57 find_ip = re.compile('<td>(\d+\.\d+\.\d+\.\d+)</td>') 58 find_port = re.compile('<td>(\d+)</td>') 59 for tr in trs: 60 re_ip_address = find_ip.findall(tr) 61 re_port = find_port.findall(tr) 62 for address, port in zip(re_ip_address, re_port): 63 address_port = address + ":" + port 64 yield address_port.replace(' ', '')
這里將獲取代理的方法都定義為 crawl 開頭,這樣以后還有代理網站時,可添加 crawl 開頭的方法即可。代碼中獲取了代理66和西刺兩個網站的免費代理,在方法中使用生成器,通過 yield 返回一個個代理。使用get_page()方法先獲取網頁,然后用pyquery和re解析,解析出ip和端口形式的代理后返回。
在 Crawler 類中定義的 get_proxies() 方法,將所有以 crawl 開頭的方法都調用一遍,獲取每個方法返回的代理並組合成列表形式返回。這里用元類來實現所有以 crawl 開頭的方法調用,先定義一個類 ProxyMetaclass,Crawl類將設置為元類,元類中實現了 __new__()方法,這個方法有固定幾個參數,第四個參數 attrs 包含了類的一些屬性,通過遍歷 attrs 這個參數即可獲取類的所有方法信息,就同遍歷字典一樣,鍵名對應方法的名,接着判斷方法的開關是否 crawl,是則將其加入到 __CrawlFunc__屬性中。這樣就將所有以 crawl 開頭的方法定義成一個屬性,動態獲取到所有以 crawl 開頭的方法列表。
如果以后有新的代理網站要抓取,只需要在類 Crawler 類中添加一個以 crawl 開頭的方法即可。依照其它幾個方法將其定義成生成器,抓取其網站的代理,然后通過 yield返回代理即可。這樣方便擴展,也不用關心其他部分的實現邏輯。
代理的添加也比較靈活,免費代理和付費代理都可以添加,付費代理的提取方式也是類似的。
這里使用到的 utils中 get_page() 方法代碼如下:
1 import requests 2 from requests.exceptions import ConnectionError 3
4 base_headers = { 5 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 6 'Accept-Encoding': 'gzip, deflate, br', 7 'Accept-Language': 'zh-CN,zh;q=0.9'
8 } 9
10 def get_page(url, options={}): 11 """
12 抓取代理網站的IP 13 :param url: 代理網站網址 14 :param options: 請求頭部信息 15 :return: 獲取成功返回網頁源代碼,失敗返回None 16 """
17 headers = dict(base_headers, **options) # base_headers 與 options 的鍵重復,則更新base_headers中的值
18 try: 19 response = requests.get(url, headers) 20 print("抓取成功", url, response.status_code) 21 if response.status_code == 200: 22 return response.text 23 except ConnectionError: 24 print("抓取失敗", url) 25 return None
接下來定義一個 Getter 類,用來動態地調用所有以 crawl 開頭的方法,獲取抓取到的代理,將其加入到數據庫存儲起來:
1 import sys 2 from .db import RedisClient 3 from .crawler import Crawler 4
5 POOL_UPPER_THRESHOLD = 10000
6 class Getter(): 7 def __init__(self): 8 self.redis = RedisClient() 9 self.crawler = Crawler() 10
11 def is_over_threshold(self): 12 """
13 判斷是否到達代理池限制 14 :return: 15 """
16 if self.redis.count() >= POOL_UPPER_THRESHOLD: 17 return True 18 else: 19 return False 20
21 def run(self): 22 print('獲取器開始執行') 23 if not self.is_over_threshold(): 24 for callback_label in range(self.crawler.__CrawlFuncCount__): 25 callback = self.crawler.__CrawlFunc__[callback_label] 26 # 獲取代理
27 proxies = self.crawler.get_proxies(callback) 28 sys.stdout.flush() 29 for proxy in proxies: 30 self.redis.add(proxy)
Getter類是獲取器類,變量 POOL_UPPER_THRESHOLD 表示的是代理池的最大數量,數量可以靈活配置。is_over_threshold() 方法用於判斷代理池是否到達容量閾值,在這個方法中調用了 RedisClient類的 count() 方法來獲取代理的數量進行判斷,如果數量到達閾值,就返回True,否則返回 False,不想加這個限制的話,可將些方法都返回 True。
接下來定義的 run() 方法,首先判斷代理池是否到達閾值,然后調用 Crawler類的 __CrawFunc__ 屬性,獲取到所有以 crawl 開頭的方法列表,依次通過 get_proxies() 方法調用,得到各個方法抓取到的代理,然后利用 RedisClient 的 add() 方法加入數據庫,這樣就完成了獲取模塊的工作。
3.3、 檢測模塊
對所有代理進行多輪檢測,代理檢測可用,分數就設置為 100,代理不可用,分數減1,這樣可以實時改變每個代理的可用情況。要獲取有效代理,只需要獲取分數最高的代理即可。
由於代理數量多,為了提高代理請求檢測效率,這里使用異步請求庫 aiohttp 進行檢測。requests是同步請求庫,發出請求后需要得到網頁加載完成后才能繼續往下執行,這個過程會阻塞等待響應,如果服務器響應很慢的話,在requests請求這個等待過程,可以調度其他請求或者進行網頁解析。
異步請求庫就解決了這個問題,類似 JavaScript 中的回調,即在請求發出后,程序可繼續執行做其它的事情,當響應到達時,程序再去處理這個響應。於是程序沒有被阻塞,可充分利用時間和資源,大大提高效率。
測試模塊的代碼實現如下:
1 import asyncio 2 import aiohttp 3 import time, sys 4
5 try: 6 from aiohttp import ClientError 7 except: 8 from aiohttp import ClientProxyConnectionError as ProxyConnetionError 9 from .db import RedisClient 10
11 VALID_STATUS_CODES = [200, 302] 12 TEST_URL = 'http://www.baidu.com'
13 BATCH_TEST_SIZE = 100
14 class Tester(object): 15 def __init__(self): 16 self.redis = RedisClient() 17
18 async def test_single_proxy(self, proxy): 19 """
20 測試單個代理 21 :param proxy: 代理IP 22 :return: 23 """
24 conn = aiohttp.TCPConnector(verify_ssl=False) 25 async with aiohttp.ClientSession(connector=conn) as session: 26 try: 27 if isinstance(proxy, bytes): 28 proxy = proxy.decode('utf-8') 29 real_proxy = 'http://' + proxy 30 print('正在測試', proxy) 31 async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response: 32 if response.status in VALID_STATUS_CODES: 33 self.redis.max(proxy) 34 print('代理可用', proxy) 35 else: 36 self.redis.decrease(proxy) 37 print('請求響應不合法', response.status, 'IP', proxy) 38 except (ClientError, aiohttp.ClientConnectionError, asyncio.TimeoutError, AttributeError): 39 self.redis.decrease(proxy) 40 print('代理請求失敗', proxy) 41
42 def run(self): 43 """
44 測試主函數 45 :return: 46 """
47 print('測試器開始運行') 48 try: 49 count = self.redis.count() 50 print('當前剩余', count, '個代理') 51 for i in range(0, count, BATCH_TEST_SIZE): 52 start = i 53 stop = min(i+BATCH_TEST_SIZE, count) 54 print('正在測試', start + 1, '-' ,stop, '個代理') 55 test_proxies = self.redis.batch(start, stop) 56 loop = asyncio.get_event_loop() 57 tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] 58 loop.run_until_complete(asyncio.wait(tasks)) 59 sys.stdout.flush() 60 time.sleep(5) 61 except Exception as e: 62 print('測試器發生錯誤', e.args)
在這個模塊中,類 Tester中 __init__()方法建立了一個 RedisClient對象,該對象可在類中其它地方使用。接下來定義一個test_single_proxy() 方法,這個方法檢測單個代理的可用情況,參數是被檢測的代理。在定義這個方法時加了關鍵字 async,表示是異步的。在方法內部創建了 aiohttp的 ClientSession 對象,該對象類似於 requests 的 Session 對象,可直接調用該對象的get()方法訪問頁面。這里代理的設置通過 proxy 參數傳遞給 get() 方法,在請求方法前面也要加上 async 關鍵字來標明是異步請求,這是 aiohttp 使用時的常見寫法。
測試連接由 TEST_URL 確定,可針對不同的網站設置該值。同一個代理IP在不同的網站請求,可能得到的返回結果是不一樣的。這個TEST_URL 設置為一個穩定的網站(如百度)時,代理池比較通用。
連接狀態碼由 VALID_STATUS_CODES變量定義,以列表的形式存在,包含正常狀態碼,某些目標網站可能會出現其他狀態碼,可進行相應設置。通過判斷響應狀態碼是否在 VALID_STATUS_CODES 列表里,來判斷代理是否可用,可用就調用 RedisClient 的 max()方法將代理分數設為 100,否則調用 decrease() 方法將代理分數減1,如果出現異常,同樣將代理分數減1。
另外一個常量 BATCH_TEST_SIZE 是批量測試的最大值,這里設置一批測試最多 100 個,可避免代理池過大時一次性測試全部代理導致內存開銷過大。
在 run() 方法里獲取所有代理列表,使用 aiohttp 分配任務,啟動運行,這樣就進行了異步檢測。aiohttp官方網站示例:http://aiohttp.readthedocs.io/。
3.4、 接口模塊
要獲取存儲在數據庫中的代理,可使用 RedisClient類連接Redis,然后調用 random() 方法,這樣做雖然效率高,但是會有些問題。比如別人使用這個代理池,需要讓他知道Redis連接用戶名和密碼,這樣不安全。如果代理池在遠程服務器上運行,但遠程服務器的Redis 只允許本地連接,那么就不能遠程直連Redis來獲取代理。如果爬蟲的主機沒有連接 Redis 模塊,或者爬蟲不是由 Python語言編寫的,就無法使用 RedisClient 來獲取代理。如果 RedisClient 類或者數據結構有更新,爬蟲端必須同步這些更新。這樣是非常麻煩的。
考慮到上面這些因素,可將代理池作為一個獨立的服務運行,增加一個接口,以 Web API 的形式展示可用代理。這樣獲取代理只需要請求接口即可,上面的問題也可以避免。下面使用輕量級的庫 Flask 來實現這個接口模塊。代碼如下所示:
1 from flask import Flask, g 2 from .db import RedisClient 3 __all__ = ['app'] 4 app = Flask(__name__) 5 def get_conn(): 6 if not hasattr(g, 'redis'): 7 g.redis = RedisClient() 8 return g.redis 9
10 @app.route('/') 11 def index(): 12 return '<h2>Welecome to Proxy Pool System</h2>'
13
14 @app.route('/random') 15 def get_proxy(): 16 """
17 獲取隨機可用代理 18 :return: 隨機代理 19 """
20 conn = get_conn() 21 return conn.random() 22
23 @app.route('/count') 24 def get_counts(): 25 """
26 獲取代理池總量 27 :return: 代理池總量 28 """
29 conn = get_conn() 30 return str(conn.count()) 31
32 if __name__ == '__main__': 33 app.run()
這里聲明了一個 Flask 對象,定義了3個接口,分別是首頁、隨機代理面、獲取數量頁。運行后,Flask 會啟動一個Web服務,只需要訪問對應接口即可獲取到可用代理。
3.5、 調度模塊
調度模塊就是調用前面定義的3個模塊,將這3個模塊通過多線程形式運行起來,代碼如下:
1 import time 2 from multiprocessing import Process 3 from .api import app 4 from .getter import Getter 5 from .tester import Tester 6
7 TESTER_CYCLE = 20
8 GETTER_CYCLE = 30
9 TESTER_ENABLED = True 10 GETTER_ENABLED = True 11 API_ENABLED = True 12
13 class Scheduler(): 14 def schedule_tester(self, cycle=TESTER_CYCLE): 15 """
16 定時測試代理 17 :return: 18 """
19 tester = Tester() # 初始化測試實例
20 while True: 21 print('測試器開始運行') 22 tester.run() # 開始測試
23 time.sleep(cycle) # 休眠一段時間后進行下一次測試
24
25 def schedule_getter(self, cycle=GETTER_CYCLE): 26 """
27 定時獲取代理 28 :param cycle: 時間間隔 29 :return: 30 """
31 getter = Getter() # 初始化獲取實例
32 while True: 33 print('開始抓取代理') 34 getter.run() # 抓取代理
35 time.sleep(cycle) # 休眠一段時間后下等下一次抓取
36
37 def schedule_api(self): 38 """
39 開啟API 40 :return: 41 """
42 app.run(API_HOST, API_PORT) # 調用 flask 在頁面上顯示一個隨機 IP
43
44 def run(self): 45 print('代理池開始運行') 46
47 if TESTER_ENABLED: 48 tester_process = Process(target=self.schedule_tester) # 設置啟動目標
49 tester_process.start() # 啟動
50
51 if GETTER_ENABLED: 52 getter_process = Process(target=self.schedule_getter) 53 getter_process.start() 54
55 if API_ENABLED: 56 api_process = Process(target=self.schedule_api) 57 api_process.start()
在這個調度模塊代碼中,3個常量TESTER_ENABLED、GETTER_ENABLED、API_ENABLED是布爾型,表示測試模塊、獲取模塊、接口模塊的開關。默認都是True,表示模塊開啟。
啟動入口是 run() 方法,這個方法分別判斷3個模塊的開關。如果開關開啟,啟動時程序就新建一個Process進程,設置好啟動目標,然后調用start() 方法運行。3個進程可以並行執行,互不干擾。
只需要調用 Scheduler 的 run() 方法即可啟動整個代理池。
3.6、 運行
下面將代碼整合下,讓代理運行起來。運行后的代碼池在控制台有輸出,從輸出可以看出可用代理設置為100,不可用代理分數減1。由於當前配置運行在 5555 端口,在瀏覽器地址欄打開 http://127.0.0.1:5555,可看到代理首頁,訪問 http://127.0.0.1:5555/random可獲取隨機可用代理。只要訪問此接口即可獲取一個隨機可用代理。下面的代碼是獲取代理的總開關,運行這段代碼就開始獲取代理。
1 from proxypool.scheduler import Scheduler 2 import sys 3 import io 4
5 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') 6
7 def main(): 8 try: 9 s = Scheduler() # 初始化
10 s.run() # 調用 run() 方法開始運行
11 except: 12 main() 13 if __name__ == '__main__': 14 main()
獲取到的代理保存在Redis數據庫中,可以通過訪問 http://127.0.0.1:5555/random 來獲取隨機可用代理。訪問 http://127.0.0.1:5555是首頁頁面。獲取一個隨機代理的代碼如下所示:
1 import os, sys, requests 2 from bs4 import BeautifulSoup as bs 3 4 dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 5 sys.path.insert(0, dir) 6 7 PROXY_POOL_URL = 'http://127.0.0.1:5555/random' 8 TEST_URL = 'http://docs.jinkan.org/docs/flask/' 9 10 def get_proxy(): 11 """ 12 在本地flask生成的網站上獲取一個隨機可用的 IP 13 :return: 代理IP 14 """ 15 r = requests.get(PROXY_POOL_URL) 16 proxy = bs(r.text, 'lxml').get_text() 17 return proxy 18 19 def crawl(url, proxy): 20 """ 21 測試代理是否用 22 :param url: 測試的目標網站 23 :param proxy: 被測試的代理IP 24 :return: 目標網站的源代碼 25 """ 26 proxies = {'http': proxy, 'https': proxy} 27 try: 28 r = requests.get(url, proxies=proxies) 29 return r.text 30 except requests.exceptions.ConnectionError as e: 31 print('Error', e.args) 32 33 def main(): 34 proxy = get_proxy() 35 html = crawl(TEST_URL, proxy) 36 print(html) 37 38 if __name__ == '__main__': 39 main()
最后通過一個隨機的代理IP去請求指定網址的代碼如下所示:
1 import requests 2 from example import get_proxy 3 #from ..proxypool.setting import TEST_URL
4 TEST_URL = 'https://www.baidu.com'
5
6 proxy = get_proxy() # 獲取代理IP
7
8 proxies = { 9 'http': 'http://' + proxy, 10 'https:': 'https://' + proxy 11 } 12
13 print(TEST_URL) 14 response = requests.get(TEST_URL, proxies=proxies, verify=False) 15 if response.status_code == 200: 16 print('Successfully') 17 print(response.text)
三、 使用代理爬取微信公眾號文章
鏈接是:https://weixin.sogou.com
目標:利用代理爬取微信公眾號文章,提取正文、發表日期、公眾號等內容,將結果保存到MySQL數據庫
需要用到代理池,要用到的Python庫有:aiohttp、requests、redis-py、pyquery、Flask、PyMySQL。
1、 爬取分析
搜狗對微信公眾平台的公眾號和文章做了整合。可通過上面的鏈接搜索到相關的公眾號和文章。例如搜索 python,可以搜索到最新的文章。點擊搜索后,搜索結果的URL中有很多無關GET請求參數,將無關的參數去掉,只保留 type 和 query 參數即可,例如https://weixin.sogou.com/weixin?type=2&query=python,類型為2 代表搜索微信文章,query為python代表搜索關鍵詞為python。
下拉網頁,點擊下一頁即可翻頁。要注意的是,沒有登錄只能看到10頁的內容,登錄后可以看到100頁的內容。如果爬取更多內容,就需要登錄並使用Cookies來爬取。搜狗微信的反爬能力很強,如連接刷新,站點就會彈出驗證碼頁面,如圖1-1所示。
圖1-1 驗證碼頁面
這時網絡請求出現了302跳轉,返回狀態碼為302,跳轉的鏈接開頭為https://weixin.sogou.com/antispider/,這是一個反爬蟲的驗證頁面。所以基本可以確定,如果服務器返回狀態碼是302而非200,則IP訪問次數太高,IP被封禁,這些請求就是失敗了。這種情況可以選擇識別驗證碼,也可使用代理直接切換IP。
遇到這情況,這次使用代理切換IP。代理使用前面搭建的代理池,還要更改檢測的URL為搜狗微信的站點。對於反爬能力很強的網站,遇到這種返回狀態需要重試。所以要采用另一種反爬方式,借助數據庫構造一個爬取隊列,待爬取的請求都放到隊列里,如果請求失敗了重新放回隊列,就會重新調度爬取。
這里采用 Redis 的隊列數據結構,新的請求加入隊列,有需要重試的請求也放回隊列。調度時如果隊列不為空,就把一個個請求取出來執行,得到響應后再進行解析,提取出想要的結果。
這次采用MySQL存儲,使用pymysql庫,將抓取結果構造為一個字典,實現動態存儲。
經過上述分析,這次實現的功能有如下幾點:
修改代理池檢測鏈接為搜狗微信站點;
構造 Redis 爬取隊列,用隊列實現請求的存取;
實現異常處理,失敗的請求重新加入隊列;
實現翻頁和提取文章列表,並把對應的請求加入隊列;
實現微信文章的信息提取;
將提取的信息保存到MySQL。
2、 請求構造
首先實現一個請求 Request 的數據結構,這個請求要包含一些必要信息,如請求鏈接、請求方式、請求頭、超時時間等。此外,對於某個請求,還需要實現對的方法來處理它的響應,所以要再加一個 Calllback回調函數。每次翻頁請求需要代理來實現,所以還需要一個參數 NeedProxy。如果一個請求失敗次數太多,就不再重新請求,還需要加失敗次數的記錄。
這些字段都需要作為Request的一部分,組成一個完整的Request對象放入隊列去調度,這樣從隊列獲取出來的時候直接執行這個Request對象就行。
使用繼承requests庫中的 Request 對象的方式來實現這個數據結構。requests 庫中已經有了 Request對象,它將請求 Request作為一個整體對象去執行,得到響應后再返回。在requests庫中的get()、post() 等方法都是通過執行Request對象實現的。
先來看一下 Request對象的源碼:
1 class Request(RequestHooksMixin): 2 def __init__(self, 3 method=None, url=None, headers=None, files=None, data=None, 4 params=None, auth=None, cookies=None, hooks=None, json=None): 5
6 # Default empty dicts for dict params.
7 data = [] if data is None else data 8 files = [] if files is None else files 9 headers = {} if headers is None else headers 10 params = {} if params is None else params 11 hooks = {} if hooks is None else hooks 12
13 self.hooks = default_hooks() 14 for (k, v) in list(hooks.items()): 15 self.register_hook(event=k, hook=v) 16
17 self.method = method 18 self.url = url 19 self.headers = headers 20 self.files = files 21 self.data = data 22 self.json = json 23 self.params = params 24 self.auth = auth 25 self.cookies = cookies
這是 requests 庫中 Request 對象的構造方法。這個 Request包含了請求方式、請求鏈接、請求頭幾個屬性,但是相比前面分析的還差了幾個。另外還需要實現一個特定的數據結構,在原先的基礎上加入上文所提到的額外幾個屬性。這里需要繼承 Request 對象重新實現一個請求,將這個類定義為 WeixinRequest,代碼如下所示:
1 from requests import Request 2 TIMEOUT = 10
3 class WeixinRequest(Request): 4 def __init__(self, url, callback, method='GET', headers=None, need_proxy=False, fail_time=0, 5 timeout=TIMEOUT): 6 Request.__init__(self, method, url, headers) 7 self.callback = callback 8 self.need_proxy = need_proxy 9 self.fail_time = fail_time 10 self.timeout = timeout
這段代碼中實現了 WeixinRequest 數據結構。在__init__()方法中先調用 Request 的 __init__()方法,然后加入額外的幾個參數,定義為 callback、need_proxy、fail_time、timeout,分別代表回調函數、是否需要代理爬取、失敗次數、超時時間。
可將WeixinRequest作為一個整體來執行,一個個WeixinRequest對象都是獨立的,每個請求都有自己的屬性。例如,調用它的callback,就可知道這個請求響應應該用什么方法來處理,調用 fail_time 就可知道這個請求失敗了多少次,判斷失敗次數是不是到了閾值,該不該丟棄這個請求。
3、 請求隊列實現
構造請求隊列,實現請求存取。存取就兩個操作,一個是存,一個是取,使用Redis的 rpush() 和 lpop() 方法即可。在存取時不能直接存Request對象,Redis里面存的是字符串。在存Request對象前先將其序列化,取出時再將其反序列化,這個過程用pickle模塊實現。代碼如下所示:
1 from redis import StrictRedis 2 from pickle import dumps, loads 3 from .request import WeixinRequest 4
5 REDIS_HOST = '192.168.64.50'
6 REDIS_PORT = 6379
7 REDIS_PASSWORD = None 8 REDIS_KEY = 'weixin'
9
10 class RedisQueue(): 11 def __init__(self): 12 """
13 初始化Redis 14 """
15 self.db = StrictRedis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD) 16
17 def add(self, request): 18 """
19 向隊列添加序列化后的Request 20 :param request: 請求對象 21 :return: 添加結果 22 """
23 if isinstance(request, WeixinRequest): 24 return self.db.rpush(REDIS_KEY, dumps(request)) 25 return False 26
27 def pop(self): 28 """
29 取出一個Request並反序列化 30 :return: Request or None 31 """
32 if self.db.llen(REDIS_KEY): 33 return loads(self.db.lpop(REDIS_KEY)) 34 else: 35 return False 36
37 def clear(self): 38 """刪除數據庫"""
39 self.db.delete(REDIS_KEY) 40
41 def empty(self): 42 """判斷隊列是否為空"""
43 return self.db.llen(REDIS_KEY) == 0 44
45 if __name__ == '__main__': 46 db = RedisQueue() 47 start_url = 'https://www.baidu.com'
48 weixin_request = WeixinRequest(url=start_url, callback='hello', need_proxy=True) 49 db.add(weixin_request) 50 request = db.pop() 51 print(request) 52 print(request.callback, request.need_proxy)
在代碼中 RedisQueue 類中的初始化方法中初始化了一個 StrictRedis對象。接着實現了 add() 方法,先判斷 Request 的類型,如果是 WeixinRequest,就用 pickle 的 dumps() 方法序列化,然后調用 rpush() 方法加入隊列。調用 pop() 方法將從隊列中取出,再調用 pickle的 loads() 方法將其轉化為 WeixinRequest 對象。empty() 方法返回隊列是否為空,只要判斷隊列長度是否為 0 即可。
4、 修改代理池
在生成請求開始爬取之前,先找一些可用代理。將代理池檢測的URL修改成搜狗微信站點,將被搜狗微信封禁的代理剔除掉,留下可用代理。將代理池的中的 TEST_URL 修改為 https://weixin.sogou.com/weixin?type=2&query=python,被本站點封禁的代理就會減分,正常請求的代理就會賦值為100,最后留下可用代理。
修改后將獲取模塊、檢測模塊、接口模塊的開關都設置為True,讓代理池先運行一會。這時數據庫中留下的100分代理就是針對搜狗微信的可用代理。現在訪問代理接口,接口設置為5555,訪問 http://127.0.0.1:5555/random即可獲取隨機可用代理。
現在來定義一個函數get_proxy()獲取隨機代理,該函數封裝在Spider() 類中。
1 import requests 2 PROXY_POOL_URL = 'http://192.168.64.50:5555/random'
3 def get_proxy(self): 4 """
5 從代理池獲取代理 6 :return: 7 """
8 try: 9 response = requests.get(PROXY_POOL_URL) 10 if response.status_code == 200: 11 print('Get Proxy', response.text) 12 return response.text 13 return None 14 except requests.ConnectionError: 15 return None
5、 第一個請求
前面的工作准備好后,下面就構造一個請求放到隊列里以供調度。這里定義一個Spider類,前面的 get_proxy() 函數封裝在這個類中,接下來實現start()方法,代碼如下所示:
1 from requests import Session 2 from redisdb import RedisQueue 3 from mysql import MySQL 4 from request import WeixinRequest 5 from urllib.parse import urlencode 6 import requests 7 8 PROXY_POOL_URL = 'http://192.168.64.50:5555/random' 9 10 class Spider(): 11 base_url = 'https://weixin.sogou.com/weixin' 12 keyword = 'python' 13 headers = { 14 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3', 15 'Accept-Encoding': 'gzip, deflate, br', 16 'Accept-Language': 'zh-CN,zh;q=0.9', 17 'Cache-Control': 'max-age=0', 18 'Connection': 'keep-alive', 19 'Cookie': 'CXID=339A2898FD48047A6358656B1D96964B; SUID=53C8B0755B6358656B6ED3DA000C9B6B; SUV=005D2CADB63586565BF21BEFCBBE3709; pgv_pvi=8799300608; ssuid=6762987792; ld=Elllllllll2t033xlllllVhER71lVlllTCKCjZllll9lVlllVllll5@@@@@@@@@@; LSTMV=51%2C491; LCLKINT=18260; ABTEST=0|1557396157|v1; weixinIndexVisited=1; ppinf=5|1557989778|1559199378|dHJ1c3Q6MToxfGNsaWVudGlkOjQ6MjAxN3x1bmlxbmFtZTozNzolRTclODMlQkQlRTclODElQUIuJUU4JUJFJUI5JUU1JTlGJThFfGNydDoxMDoxNTU3OTg5Nzc4fHJlZm5pY2s6Mzc6JUU3JTgzJUJEJUU3JTgxJUFCLiVFOCVCRSVCOSVFNSU5RiU4RXx1c2VyaWQ6NDQ6bzl0Mmx1R3RUUlI5TjZ4TnlHbU1lM3luRnpUUUB3ZWl4aW4uc29odS5jb218; pprdig=pDwOzQhJnqC3Kr9aBUzytoY1poJ3CRl9cWXScYt2JCLOhSqZrBEDERHkOrt1190yzK_IKSdAPdUFyo5AOvwgG-XzKCvBh7JJs2Xhg2_LmZA_kp7MvDaiyfXumeWcNjtVRVbkbKYutAzfIPkg1sYOfjcc8L_VCyv4lnLLT8sd8Kc; sgid=29-40667169-AVzdAZL0PsNicLSKyFy900f0; SNUID=CBFF0A6FD8DD5F2BCFC93687D80ADC83; ppmdig=155805432800000015f265f013d784e1a66564ab6b844c13; IPLOC=CN5100; sct=42; JSESSIONID=aaa_Q0ixqZPPi_V9ND1Qw', 20 'Host': 'weixin.sogou.com', 21 'Upgrade-Insecure-Requests': '1', 22 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 23 } 24 session = Session() 25 queue = RedisQueue() 26 mysql = MySQL() 27 28 def get_proxy(self):... # 該函數代碼見前面 29 30 def start(self): 31 """ 32 初始化工作 33 :return: 34 """ 35 # 全局更新Headers 36 self.session.headers.update(self.headers) 37 start_url = self.base_url + "?" + urlencode({'query': self.keyword, 'type': 2}) 38 weixin_request = WeixinRequest(url=start_url, callback=self.parse_index, need_proxy=True) 39 # 調度第一個請求,添加到調度隊列 40 self.queue.add(weixin_request)
在這個Spider類中,設置了較多的全局變量,如keyword設置為python,headers為請求頭。請求頭信息可在瀏覽器里登錄賬號后,在開發者工具里將請求頭復制出來,一定要帶上Cookie字段,這樣才能爬取100頁的內容。接着初始了 Session 和 RedisQueue對象,它們分別用來執行請求和存儲請求。另外還初始化了MySQL對象,這個類在后面進行定義。
在start() 方法中,首先全局更新 headers,使得所有請求都能應用 Cookies。接着構造一個起始URL:https://weixin.sogou.com/weixin?type=2&query=python,隨后修改 URL 構造了一個 WeixinRequest對象。回調函數是 Spider 類的parse_index()方法,也當這個請求成功后調用parse_index()來處理和解析。need_proxy 參數設置為 True,代表執行這個請求需要用到代理。隨后調用 RedisQueue的add()方法,將這個請求加入隊列,等待調度。
6、 調度請求
加入第一個請求后,調度開始。首先從隊列中取出這個請求,將它的結果解析出來,生成新的請求加入隊列,然后拿出新的請求,再生成新的請求加入隊列,這樣循環執行,直到隊列中沒有請求,則代表爬取結束。代碼如下所示:
1 VALID_STATUSES = [200] 2 def schedule(self): 3 """
4 調度請求 5 :return: 6 """
7 while not self.queue.empty(): # 數據庫隊列不為空
8 weixin_request = self.queue.pop() # 從數據庫取出一個WeixinRequest對象,此時 weixin_request 是 WeixiRequest類的實例(對象)
9 callback = weixin_request.callback # 獲取回調函數方法
10 print('Schedule', weixin_request.rul) 11 response = self.request(weixin_request) # 調用Spider類的request方法,該方法返回的是響應
12 if response and response.status_code in VALID_STATUSES: 13 results = list(callback(response)) # 調用回調函數,回調函數指向兩個,分別是:parse_detail()和parse_index()
14 if results: 15 for result in results: 16 print('New Result', type(result)) 17 if isinstance(result, WeixinRequest): # 18 self.queue.add(result) # 調用 RedisQueue 類的add方法添加到數據庫隊列中
19 if isinstance(result, dict): # 如果返回對象是字典類型,就將文章保存到MySQL數據庫中
20 self.mysql.insert('articles', result) # 調用MySQL類中的insert方法
21 else: 22 self.error(weixin_request) # 調用Spider類中的error函數
23 else: 24 self.error(weixin_request)
這里的 schedule() 方法,其內部是一個循環,循環的判斷是隊列不為空。當隊列不空就調用 pop() 方法取出下一個請求,調用Spider類的 request() 方法執行這個請求,request() 方法實現代碼如下:
1 from requests import ReadTimeout, ConnectionError 2 def request(self, weixin_request): 3 """
4 執行請求 5 :param weixin_request: 請求 6 :return: 響應 7 """
8 try: 9 if weixin_request.need_proxy: 10 proxy = self.get_proxy() 11 if proxy: 12 proxies = { 13 'http': 'http://' + proxy, 14 'https': 'https://' + proxy, 15 } 16 return self.session.send(weixin_request.prepare(), # 調用的是 Request類的 prepare方法
17 timeout=weixin_request.timeout, allow_redirects=False, proxies=proxies) 18 return self.session.send(weixin_request.prepare(), timeout=weixin_request.timeout, allow_redirects=False) 19 except (ConnectionError, ReadTimeout) as e: 20 print(e.args) 21 return False
在request()中,首先判斷請求是否需要代理,如果需要就調用前面定義的 get_proxy() 方法獲取代理,然后調用Session的send()方法執行這個請求。這里請求調用了 prepare() 方法轉化為 Prepared Request,該用法的具體信息參考 https://2.python-requests.org//en/master/user/advanced/#prepared-requests,同時設置 allow_redirects 為 False,timeout是該請求的超時時間,最后響應返回。
執行 request() 方法后會得到兩種結果:一種是False,請求失敗,連接錯誤;另一種是 Response對象,還需判斷狀態碼,如果狀態碼合法,就進行解析,否則重新將請求加回隊列。狀態碼合法就調用 WeixinRequest的回調函數進行解析,這里回調函數是 parse_index(),其代碼如下所示:
1 from pyquery import PyQuery as pq 2 def parse_index(self, response): 3 """
4 解析索引頁面 5 :param response: 響應 6 :return: 新的響應 7 """
8 doc = pq(response.text) 9 # 找class為news-box標簽下的class為news-list標簽下的li標簽下的class為txt-box標簽下的h3標簽下的a標簽
10 items = doc('.news-box .news-list li .txt-box h3 a').items() 11 for item in items: 12 # 循環執行完后,再去執行下面的 if 語句,也就是當前頁請求完后,再請求下一頁
13 url = item.attr('href') # 獲取文章的鏈接
14 weixin_request = WeixinRequest(url=url, callback=self.parse_detail) # parse_detail是Spider類中的方法,通過回調函數獲取頁面的詳細內容
15 yield weixin_request 16 # 獲取下一頁的標簽連接,該標簽是a標簽,有id屬性。值是 sogou_next
17 next = doc('#sogou_next').attr('href') 18 if next: 19 url = self.base_url + str(next) # 拼接 url,請求這個 url 切換到下一頁
20 # 初始化 WeixinRequest,並傳入下一頁的連接,回調函數調用自身,這時進入到下一頁的爬取
21 weixin_request = WeixinRequest(url=url, callback=self.parse_index, need_proxy=True) 22 yield weixin_request
這個parse_index()方法做了兩件事,一件事是獲取本頁的所有微信文章鏈接,另一件事是獲取下一頁的鏈接,再構造成WeixinRequest對象后yield返回。然后 schedule()方法將返回的結果進行遍歷,利用 isinstance()方法判斷返回結果,如果返回結果是WeixinRequest,就其重新加入隊列。
這時,第一次循環結束。while循環繼續執行。隊列已經包含第一頁內容的文章詳情頁請求和下一頁的請求,所以第二次循環得到下一個請求就是文章詳情頁的請求,程序重新調用 request() 方法獲取其響應,然后調用其對應的回調函數解析。解析詳情頁的回調函數方法一樣,這次是 parse_detail() 方法,該方法也封裝在 Spider 類,實現代碼如下所示:
1 def parse_detail(self, response): 2 """
3 解析詳情頁 4 :param response: 響應 5 :return: 微信公眾號文章 6 """
7 doc = pq(response.text) # 將文本內容轉化為 pyquery 對象
8 # 將文章內容構造成字典,以便於保存到數據庫
9 data = { 10 'title': doc('.rich_media_title').text(), 11 'content': doc('.rich_media_content').text(), 12 'date': doc('#publish_time').text(), 13 'nickname': doc('#js_profile_qrcode > div > strong').text(), 14 'wechat': doc('#js_profile_qrcode > div > p:nth-child(3) > span').text() 15 } 16 yield data
這個 parse_detail() 方法解析微信文章詳情頁內容,提取出標題、正文文本、發布日期、發布人昵稱、微信公眾號名稱,將這些信息組合成一個字典返回。
返回結果后還需要判斷類型,如果是字典類型,程序就調用 mysql 對象的 insert() 方法將數據存入數據庫。這時,第二次循環執行完成。第三次循環、第四次循環,循環往復,每個請求都有各自的回調函數,索引頁解析完成后繼續生成后續請求,詳情頁解析完成后返回結果以便存儲,直到抓取完畢。到此,整個調度基本完成。下面進一步完善整個Spider類代碼,完整Spider類代碼如下所示:
1 from requests import Session 2 from .redisdb import RedisQueue 3 from .mysql import MySQL 4 from .request import WeixinRequest 5 from urllib.parse import urlencode 6 import requests 7 from pyquery import PyQuery as pq 8 from requests import ReadTimeout, ConnectionError 9
10 PROXY_POOL_URL = 'http://192.168.64.50:5555/random'
11 VALID_STATUSES = [200] 12 MAX_FAILED_TIME = 20
13
14 class Spider(): 15 base_url = 'https://weixin.sogou.com/weixin'
16 keyword = 'python'
17 headers = { 18 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3', 19 'Accept-Encoding': 'gzip, deflate, br', 20 'Accept-Language': 'zh-CN,zh;q=0.9', 21 'Cache-Control': 'max-age=0', 22 'Connection': 'keep-alive', 23 'Cookie': 'CXID=339A2898FD48047A5738650B1D96964B; SUID=53C8B0755B68860A5B6ED3DA000C9B6B; SUV=005D2CADB7DD27195BF21BEFCBBE3709; pgv_pvi=8799300608; ssuid=6762987792; ld=Elllllllll2t033xlllllVhER71lllllTCKCjZllll9lllllVllll5@@@@@@@@@@; LSTMV=51%2C491; LCLKINT=18260; ABTEST=0|1557396157|v1; weixinIndexVisited=1; ppinf=5|1557989778|1559199378|dHJ1c3Q6MToxfGNsaWVudGlkOjQ6MjAxN3x1bmlxbmFtZTozNzolRTclODMlQkQlRTclODElQUIuJUU4JUJFJUI5JUU1JTlGJThFfGNydDoxMDoxNTU3OTg5Nzc4fHJlZm5pY2s6Mzc6JUU3JTgzJUJEJUU3JTgxJUFCLiVFOCVCRSVCOSVFNSU5RiU4RXx1c2VyaWQ6NDQ6bzl0Mmx1R3RUUlI5TjZ4TnlHbU1lM3luRnpUUUB3ZWl4aW4uc29odS5jb218; pprdig=pDwOzQhJnqC3Kr9aBUzytoY1poJ3QAl9cWXScYt2JCLOhSqZrBEDERHkOrt1190yzK_IKSdAPdUFyo5AOvwgG-XzKCvBh7JJs2Xhg2_LmZA_kp7MvDaiyfXumeWcNjtVRVbkbKYutAzfIPkg1sYOfjcc8L_VCyv4lnLLT8sd8Kc; sgid=29-40669169-AVzdCZL0PsNicLSKyFy900f0; SNUID=CBFF0A6FD8DD5F2BCFC93687D80ADC83; ppmdig=155805432800000015f265f013d784e1a66564ab6b844c13; IPLOC=CN5100; sct=42; JSESSIONID=aaa_Q0ixqZPPi_V9ND1Qw', 24 'Host': 'weixin.sogou.com', 25 'Upgrade-Insecure-Requests': '1', 26 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 27 } 28 session = Session() 29 queue = RedisQueue() 30 mysql = MySQL() 31
32 def get_proxy(self): 33 """
34 從代理池獲取代理 35 :return: 36 """
37 try: 38 response = requests.get(PROXY_POOL_URL) 39 if response.status_code == 200: 40 print('Get Proxy', response.text) 41 return response.text 42 return None 43 except requests.ConnectionError: 44 return None 45
46 def start(self): 47 """
48 初始化工作 49 :return: 50 """
51 # 全局更新Headers
52 self.session.headers.update(self.headers) 53 start_url = self.base_url + "?" + urlencode({'query': self.keyword, 'type': 2}) 54 weixin_request = WeixinRequest(url=start_url, callback=self.parse_index, need_proxy=True) 55 # 調度第一個請求,添加到高度隊列
56 self.queue.add(weixin_request) 57
58 def parse_index(self, response): 59 """
60 解析索引頁面 61 :param response: 響應 62 :return: 新的響應 63 """
64 doc = pq(response.text) 65 # 找class為news-box標簽下的class為news-list標簽下的li標簽下的class為txt-box標簽下的h3標簽下的a標簽
66 items = doc('.news-box .news-list li .txt-box h3 a').items() 67 for item in items: 68 # 循環執行完后,再去執行下面的 if 語句,也就是當前頁請求完后,再請求下一頁
69 url = item.attr('href') # 獲取文章的鏈接
70 weixin_request = WeixinRequest(url=url, callback=self.parse_detail) # parse_detail是Spider類中的方法,通過回調函數獲取頁面的詳細內容
71 yield weixin_request 72 # 獲取下一頁的標簽連接,該標簽是a標簽,有id屬性。值是 sogou_next
73 next = doc('#sogou_next').attr('href') 74 if next: 75 url = self.base_url + str(next) # 拼接 url,請求這個 url 切換到下一頁
76 # 初始化 WeixinRequest,並傳入下一頁的連接,回調函數調用自身,這時進入到下一頁的爬取
77 weixin_request = WeixinRequest(url=url, callback=self.parse_index, need_proxy=True) 78 yield weixin_request 79
80 def parse_detail(self, response): 81 """
82 解析詳情頁 83 :param response: 響應 84 :return: 微信公眾號文章 85 """
86 doc = pq(response.text) # 將文本內容轉化為 pyquery 對象
87 # 將文章內容構造成字典,以便於保存到數據庫
88 data = { 89 'title': doc('.rich_media_title').text(), 90 'content': doc('.rich_media_content').text(), 91 'date': doc('#publish_time').text(), 92 'nickname': doc('#js_profile_qrcode > div > strong').text(), 93 'wechat': doc('#js_profile_qrcode > div > p:nth-child(3) > span').text() 94 } 95 yield data 96
97 def request(self, weixin_request): 98 """
99 執行請求 100 :param weixin_request: 請求 101 :return: 響應 102 """
103 try: 104 if weixin_request.need_proxy: 105 proxy = self.get_proxy() 106 if proxy: 107 proxies = { 108 'http': 'http://' + proxy, 109 'https': 'https://' + proxy, 110 } 111 return self.session.send(weixin_request.prepare(), # 調用的是 Request類的 prepare方法
112 timeout=weixin_request.timeout, allow_redirects=False, proxies=proxies) 113 return self.session.send(weixin_request.prepare(), timeout=weixin_request.timeout, allow_redirects=False) 114 except (ConnectionError, ReadTimeout) as e: 115 print(e.args) 116 return False 117
118 def error(self, weixin_request): 119 """
120 錯誤處理 121 :param weixin_request: 請求 122 :return: 123 """
124 weixin_request.fail_time = weixin_request.fail_time + 1
125 print('Request Failed', weixin_request.fail_time, 'Times', weixin_request.url) 126 if weixin_request.fail_time < MAX_FAILED_TIME: 127 self.queue.add(weixin_request) # 小於最大請求失敗次數,重新加入數據庫隊列
128
129 def schedule(self): 130 """
131 調度請求 132 :return: 133 """
134 while not self.queue.empty(): # 數據庫隊列不為空
135 weixin_request = self.queue.pop() # 從數據庫取出一個WeixinRequest對象,此時 weixin_request 是 WeixiRequest類的實例(對象)
136 callback = weixin_request.callback # 獲取回調函數方法
137 print('Schedule', weixin_request.rul) 138 response = self.request(weixin_request) # 調用Spider類的request方法,該方法返回的是響應
139 if response and response.status_code in VALID_STATUSES: 140 results = list(callback(response)) # 調用回調函數,回調函數指向兩個,分別是:parse_detail()和parse_index()
141 if results: 142 for result in results: 143 print('New Result', type(result)) 144 if isinstance(result, WeixinRequest): # 145 self.queue.add(result) # 調用 RedisQueue 類的add方法添加到數據庫隊列中
146 if isinstance(result, dict): # 如果返回對象是字典類型,就將文章保存到MySQL數據庫中
147 self.mysql.insert('articles', result) # 調用MySQL類中的insert方法
148 else: 149 self.error(weixin_request) # 調用Spider類中的error函數
150 else: 151 self.error(weixin_request) 152
153 def run(self): 154 """
155 程序主入口 156 :return: 157 """
158 self.start() 159 self.schedule() 160
161
162 if __name__ == '__main__': 163 spider = Spider() 164 spider.run()
最后加入的一個run() 方法作為入口,啟動時只需要執行Spider的run()方法即可。
7、 MySQL存儲
調度模塊完成后,還要定義一個MySQL類供存儲數據。代碼實現如下:
1 import pymysql 2
3 MYSQL_HOST = '192.168.54.50'
4 MYSQL_PORT = 3508
5 MYSQL_USER = 'root'
6 MYSQL_PASSWORD = 'wyic123456'
7 MYSQL_DATABASE = 'weixin'
8
9 class MySQL(): 10 def __init__(self, host=MYSQL_HOST, username=MYSQL_USER, password=MYSQL_PASSWORD, port=MYSQL_PORT, 11 database=MYSQL_DATABASE): 12 """
13 MySQL初始化 14 :param host: 15 :param username: 16 :param password: 17 :param port: 18 :param database: 19 """
20 try: 21 self.db = pymysql.connect(host, username, password, database, charset='utf8', port=port) 22 self.cursor = self.db.cursor() 23 except pymysql.MySQLError as e: 24 print(e.args) 25
26 def insert(self, table, data): 27 """
28 插入數據 29 :param table: 表名 30 :param data: 數據 31 :return: 32 """
33 keys = ', '.join(data.keys()) 34 values = ', '.join(['%s'] * len(data)) 35 sql_query = 'insert into %s (%s) values (%s)' % (table, keys, values) 36 try: 37 self.cursor.excute(sql_query, tuple(data.values())) 38 self.db.commit() 39 except pymysql.MySQLError as e: 40 print(e.args) 41 self.db.rollback()
在MySQL類中,初始化方法初始化了MySQL的連接,需要提供MySQL的用戶名、密碼、端口、數據庫名等信息。數據庫名為 weixin,需
要在數據庫中提前創建。創建命令是:
create database weixin charset utf8;
insert()方法傳入表名和字典即可動態構造SQL,SQL構造之后執行即可插入數據。另外還需要提前在數據庫中建立一個數據表,表名是articles,建表的SQL命令是:
create table articles (
id int(11) NOT NULL,
title varchar(255) NOT NULL,
content text NOT NULL,
date varchar(255),
wechat varchar(255) NOT NULL,
nickname varchar(255) NOT NULL
) default charset=utf8;
alter table articles add primary key (`id`);
到此,這個爬蟲項目基本算是完成了,接下來就是運行這個項目。
8、運行
首先運行前面搭建的代理池,待代理池運行一會兒后,再運行Spider類中的主程序入口。代碼運行成功,但爬取失敗,不知是不是搜狗微信的反爬措施太強而造成的。