是用redis做任務隊列時,要思考:
- 用什么數據類型來做任務隊列
- 怎樣才能防止重復爬取
上一篇文章已經決定使用list來做任務隊列,但是去重問題沒有得到解決。這里可以用set來解決思考二的問題,就是防止重復爬取的問題。
使用list當作未完成任務隊列,存儲還沒有爬的url(或者是用戶id,文章id等唯一標識)
使用set當作已完成任務隊列,存儲已經爬取的url
每次爬蟲程序從list未完成任務隊列獲取任務的時候,都去set已完成任務隊列里面驗證一下,如果已完成隊列里已經有了,就舍棄掉,如果沒有,就開始爬取,並將這個url加入到已爬取的任務隊列
這樣做的方便之處在於:每當我往list未完成任務隊列里加任務的時候,我不用考慮這個任務有沒有爬過,這個任務是不是已經在未爬取任務隊列了,我只需要往里加就行了,當爬蟲去取的時候,讓爬蟲程序去做這個操作。
以下是具體代碼
算是一個生產消費把,master往隊列里塞任務,parser使用get_html的返回值進行解析,然后入庫。
協程爬取貼吧里發帖內容(redis做任務隊列,mongo存儲)
1 import requests 2 from lxml import etree 3 import redis 4 import asyncio,aiohttp 5
6 import pymongo 7 conn = pymongo.MongoClient('localhost',27017) 8
9 db = conn.nicedb # 指定數據庫名稱,連接nicedb數據庫,沒有則自動創建
10 my_set = db.test_set # 使用test_set集合,沒有則自動創建
11 # 以上兩步都是延時操作,當往數據庫插入第一條數據的時候,才會真正的創建數據庫和集合
12
13 # decode_responses=True,記得加這個參數,不加的話取出來的數據都是bytes類型的
14 r = redis.StrictRedis(host = '127.0.0.1', port = 6379, db = 2,decode_responses=True) 15 # pool = redis.ConnectionPool(host = '127.0.0.1', port = 6379, db = 2)
16 # r = redis.StrictRedis(connection_pool=pool,decode_responses=True)
17
18 def master(page): 19 url = 'https://tieba.baidu.com/f?kw=美女&ie=utf-8&pn={}'.format(page*50) 20 base = 'https://tieba.baidu.com'
21 res = requests.get(url).text 22 html = etree.HTML(res) 23 half_urls = html.xpath("//div[@class='threadlist_title pull_left j_th_tit ']/a/@href") 24 full_urls = [base + i for i in half_urls] 25 for url in full_urls: 26 # 從url_list列表頭部塞任務,也就是url
27 r.lpush('url_list',url) 28 #print(r.llen('url_list'))
29
30 async def get_html(url): 31 async with asyncio.Semaphore(5): # 限制並發數為5個
32 async with aiohttp.ClientSession() as session: 33 async with session.get(url) as html: 34 # errors='ignore',不加這個參數的話,會報錯,具體錯誤內容見下面圖片
35 response = await html.text(encoding='utf-8',errors='ignore') 36 return response 37 async def parse(): 38 while True: 39 # 從redis的url_list列表取任務,從右邊開始取
40 url = r.rpop('url_list') 41 if url == None: 42 break
43 # 判斷這個任務是否已經做過了,也就是判斷這個url在沒在redis的history集合里
44 if r.sismember('history',url) == 1: 45 continue
46 response = await get_html(url) 47 html = etree.HTML(response) 48 content = html.xpath("//div[@class='left_section']/div[2]/div[1]//cc/div[1]/text()")[0].strip() 49 if content != '': 50 # 當內容不為空時,將內容存到mongo里
51 my_set.save({'content':content}) 52 #print(content)
53 # 將爬取過的任務放到redis的history集合里,也就是已完成任務隊列
54 r.sadd('history', url) 55 t1 = time.time() 56 # 爬取前10頁
57 for i in range(10): 58 master() 59
60 # async的一些步驟
61 loop = asyncio.get_event_loop() 62 tasks = [parse() for _ in range(15)] 63 loop.run_until_complete(asyncio.wait(tasks)) 64 loop.close() 65
66 t2 = time.time() 67 print(t2-t1) 68 # 最后用時:32.930299043655396
69 # 把mongo數據庫換成mysql后,用時:43.06192493438721
70
71 原文:https://blog.csdn.net/fiery_heart/article/details/82121237