一、寫在前面
在上一篇博客中提到過對於網絡爬蟲這種包含大量網絡請求的任務,是可以用Celery來做到加速爬取的,那么,這一篇博客就要具體說一下怎么用Celery來對我們的爬蟲進行一個加速!
二、知識補充
1.class celery.group
group這個類表示創建一組要並行執行的任務,不過一組任務是懶惰的,所以你需要運行並對其進行評估。要了解這個類,可以查看文檔,或者在Pycharm中直接Ctrl+左鍵就能直接查看源碼了,如下圖:
當然了,直接看源碼還不夠,最好還是自己動下手。所以先創建一個test.py,其中代碼如下:
1 from celery import Celery 2 3 4 app = Celery("test", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379") 5 6 7 @app.task 8 def add(x, y): 9 return x + y 10 11 12 if __name__ == '__main__': 13 app.start()
然后運行Celery服務器,再在test.py所在目錄下創建一個test_run.py用於測試,其中代碼如下:
1 from celery import group 2 from .test import add 3 4 5 lazy_group = group(add.s(2, 2), add.s(4, 4)) 6 print(type(lazy_group)) 7 result = lazy_group() 8 print(result) 9 print(type(result)) 10 print(result.get())
在Pycharm中運行test_run.py,得到的結果如下:
<class 'celery.canvas.group'>
fe54f453-eb9c-4b24-87e3-a26fab75967f
<class 'celery.result.GroupResult'>
[4, 8]
通過查看源碼可以知道,是可以往group中傳入一個由任務組成的可迭代對象的,所以這就進行一下測試,對上面的代碼進行一點修改:
1 from celery import group 2 from CelerySpider.test import add 3 4 5 lazy_group = group(add.s(x, y) for x, y in zip([1, 3, 5, 7, 9], [2, 4, 6, 8, 10])) 6 result = lazy_group() 7 print(result) 8 print(result.get())
運行之后得到了我們想要的結果:
f03387f1-af00-400b-b58a-37901563251d
[3, 7, 11, 15, 19]
2.celer.result.collect()
在Celery中有一個類result,這個類包含了任務運行的結果和狀態等,而在這個類中就有一個collect()方法,使用該方法能在結果返回時收集結果。和之前一樣的步驟,先看看源碼:
這里看源碼也是看得一頭霧水,不如動手寫代碼試試看。創建一個app.py,其中代碼如下:
1 from celery import Celery, group, result 2 3 4 app = Celery("test", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379") 5 6 7 @app.task(trail=True) 8 def A(how_many): 9 return group(B.s(i) for i in range(how_many))() 10 11 12 @app.task(trail=True) 13 def B(i): 14 return pow2.delay(i) 15 16 17 @app.task(trail=True) 18 def pow2(i): 19 return i ** 2 20 21 22 if __name__ == '__main__': 23 app.start()
可以看到在設置任務的時候都加了參數trail=True,這是為了存儲子任務列表運行后的結果,雖然是默認設置,但這里明確啟用。在運行Celery服務器之中,進入app.py同級目錄,輸入python,然后執行如下代碼:
>>> from app import A
>>> res = A.delay(10)
>>> [i[1] for i in res.collect() if isinstance(i[1], int)]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
三、具體步驟
1.項目結構
這個爬蟲項目的基本文件如下:
其中app.py用於創建Celery實例,celeryconfig.py是Celery需要使用的配置文件,tasks.py里面的則是具體的任務,crawl.py是爬蟲腳本,在打開Celery服務器之后,運行此文件即可。
2.主要代碼
首先是app.py,代碼如下,其中config_from_object()方法用於配置Celery,傳入的參數是一個可被導入的模塊:
1 from celery import Celery 2 3 4 app = Celery("spiders", include=["CelerySpider.tasks"]) 5 # 導入配置文件 6 app.config_from_object("CelerySpider.celeryconfig") 7 8 9 if __name__ == '__main__': 10 app.start()
下面是tasks.py中的代碼,其中包含了發送請求和解析網頁的代碼:
1 import requests 2 from lxml import etree 3 from celery import group 4 from CelerySpider.app import app 5 6 7 headers = { 8 "Cookie": "__cfduid=d5d815918f19b7370d14f80fc93f1f27e1566719058; UM_distinctid=16cc7bba92f7b6-0aac860ea9b9a7-7373e61-144000-16cc7bba930727; CNZZDATA1256911977=1379501843-1566718872-https%253A%252F%252Fwww.baidu.com%252F%7C1566718872; XSRF-TOKEN=eyJpdiI6InJvNVdZM0krZ1wvXC9BQjg3YUk5aGM1Zz09IiwidmFsdWUiOiI5WkI4QU42a0VTQUxKU2ZZelVxK1dFdVFydlVxb3g0NVpicEdkSGtyN0Uya3VkXC9pUkhTd2plVUtUTE5FNWR1aCIsIm1hYyI6Ijg4NjViZTQzNGRhZDcxNTdhMDZlMWM5MzI4NmVkOGZhNmRlNTBlYWM0MzUyODIyOWQ4ZmFhOTUxYjBjMTRmNDMifQ%3D%3D; doutula_session=eyJpdiI6IjFoK25pTG50azEwOXlZbmpWZGtacnc9PSIsInZhbHVlIjoiVGY2MU5Ob2pocnJsNVBLZUNMTWw5OVpjT0J6REJmOGVpSkZwNFlUZVwvd0tsMnZsaiszWEpTbEdyZFZ6cW9UR1QiLCJtYWMiOiIxZGQzNTJlNzBmYWE0MmQzMzQ0YzUzYmYwYmMyOWY3YzkxZjJlZTllNDdiZTlkODA2YmQ3YWRjNGRmZDgzYzNmIn0%3D", 9 "Referer": "https://www.doutula.com/article/list/?page=1", 10 "UserAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36" 11 } 12 13 14 @app.task(trail=True) 15 def main(urls): 16 # 主函數 17 return group(call.s(url) for url in urls)() 18 19 20 @app.task(trail=True) 21 def call(url): 22 # 發送請求 23 try: 24 res = requests.get(url, headers=headers) 25 parse.delay(res.text) 26 except Exception as e: 27 print(e) 28 29 30 @app.task(trail=True) 31 def parse(html): 32 # 解析網頁 33 et = etree.HTML(html) 34 href_list = et.xpath('//*[@id="home"]/div/div[2]/a/@href') 35 result = [] 36 for href in href_list: 37 href_res = requests.get(href, headers=headers) 38 href_et = etree.HTML(href_res.text) 39 src_list = href_et.xpath('//*[@class="artile_des"]/table/tbody/tr/td/a/img/@src') 40 result.extend(src_list) 41 return result
最后是crawl.py中的代碼:
1 import time 2 from CelerySpider.tasks import main 3 4 5 start_time = time.time() 6 7 8 url_list = ["https://www.doutula.com/article/list/?page={}".format(i) for i in range(1, 31)] 9 res = main.delay(url_list) 10 all_src = [] 11 for i in res.collect(): 12 if isinstance(i[1], list) and isinstance(i[1][0], str): 13 all_src.extend(i[1]) 14 15 print("Src count: ", len(all_src)) 16 17 18 end_time = time.time() 19 print("Cost time: ", end_time - start_time)
此次爬取的網站是一個表情包網站,url_list就表示要爬取的url,這里我選擇爬取30頁來測試。all_src用於存儲表情包圖片的資源鏈接,通過collect()方法提取出要爬取的鏈接,然后將這些表情包下載下來,最后打印出下載的圖片數量和整個程序所耗費的時間。
四、運行結果
當運行Celery服務后,再運行crawl.py文件,會看到如下信息打印出來:
當整個爬蟲運行完畢后,會打印出所耗費的時間:
完整代碼已上傳到GitHub!