爬蟲連接mongodb、多線程多進程的使用


一、連接mongodb

 

1、            設置數據庫 client=pymongo.MongoClient(‘localhost’)

2、            db=client[‘lagou’]設置連接的數據庫名稱

POSITION_NAME=’’ 、PAGE_SUM 、PAGE_SIZE 等為你設置的變量名稱。
3、DATA_NAME=’dataposition’   # # 指定數據庫的名字
4、設置保存在mongo數據庫中的數據:
def save_to_mongo(data):
    if db[DATA_NAME].update({'positionId': data['positionId']}, {'$set': data}, True):
        print('Saved to Mongo', data['positionId'])
    else:
        print('Saved to Mongo Failed', data['positionId'])
這是以positionId為唯一標識,如果數據庫里面已經存在有positionId,說明數據已經爬過了,不再更新。
 
二、多進程設置和使用:
1、導入多進程:from multiprocessing import Pool
導入時間  import time
2、start_time = time.time()
pool = Pool()  # pool()參數:進程個數:默認的是電腦cpu的核的個數,如果要指定進程個數,這個進程個數要小於等於cpu的核數
# 第一個參數是一個函數體,不需要加括號,也不需指定參數。。
#  第二個參數是一個列表,列表中的每個參數都會傳給那個函數體
pool.map(to_mongo_pool,[i for i in range(PAGE_SUM)])
# close它只是把進程池關閉
pool.close()
# join起到一個阻塞的作用,主進程要等待子進程運行完,才能接着往下運行
pool.join()
end_time = time.time()
print("總耗費時間%.2f秒" % (end_time - start_time))
 

to_mongo_pool:這個函數要設計好,就一個參數就夠了,然后把它的參數放在列表里面,通過map高階函數一次傳給to_mongo_pool

 

多線程的使用:

多線程要配合隊列使用:

 

# coding=utf-8
import requests
from lxml import etree 
import threading 導入線程
from queue import Queue  導入隊列



# https://docs.python.org/3/library/queue.html#module-queue
# 隊列使用方法簡介
# q.qsize() 返回隊列的大小
# q.empty() 如果隊列為空,返回True,反之False
# q.full() 如果隊列滿了,返回True,反之False
# q.full 與 maxsize 大小對應
# q.get([block[, timeout]]) 獲取隊列,timeout等待時間
# q.get_nowait() 相當q.get(False)
# q.put(item) 寫入隊列,timeout等待時間
# q.put_nowait(item) 相當q.put(item, False)
# q.task_done() 在完成一項工作之后,q.task_done() 函數向任務已經完成的隊列發送一個信號
# q.join() 實際上意味着等到隊列為空,再執行別的操作


class Lianjia:
    def __init__(self):
        self.url_temp = url = "https://gz.lianjia.com/ershoufang/pg{}/"
       
self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"}
        self.url_queue = Queue()
        self.html_queue = Queue()
        self.content_queue = Queue()

    def get_url_list(self):
        # return [self.url_temp.format(i) for i in range(1,14)]
       
for i in range(1, 14):
            # 把13個索引頁面的Url放進url_queue隊列里
           
self.url_queue.put(self.url_temp.format(i))

 

   定義運行函數

def run(self):  # 實現主要邏輯
    thread_list = []
    # 1.url_list
    # threading.Thread不需要傳參數,參數都是從隊列里面取得
    t_url = threading.Thread(target=self.get_url_list)
    thread_list.append(t_url)
    # 2.遍歷,發送請求,獲取響應
    for i in range(20):  # 添加20個線程
        t_parse = threading.Thread(target=self.parse_url)
        thread_list.append(t_parse)
    # 3.提取數據
    for i in range(2):  # 添加2個線程
        t_html = threading.Thread(target=self.get_content_list)
        thread_list.append(t_html)
    # 4.保存
    t_save = threading.Thread(target=self.save_content_list)
    thread_list.append(t_save)
    for t in thread_list:
        t.setDaemon(True)  # 把子線程設置為守護線程,該線程不重要,主線程結束,子線程結束(子線程是while true不會自己結束)
        t.start()

    for q in [self.url_queue, self.html_queue, self.content_queue]:
        q.join()  # 讓主線程等待阻塞,等待隊列的任務完成(即隊列為空時 )之后再進行主線程

    print("主線程結束")

 代碼如下:

# coding=utf-8
import requests
from lxml import etree
import threading
from queue import Queue


# https://docs.python.org/3/library/queue.html#module-queue
# 隊列使用方法簡介
# q.qsize() 返回隊列的大小
# q.empty() 如果隊列為空,返回True,反之False
# q.full() 如果隊列滿了,返回True,反之False
# q.full 與 maxsize 大小對應
# q.get([block[, timeout]]) 獲取隊列,timeout等待時間
# q.get_nowait() 相當q.get(False)
# q.put(item) 寫入隊列,timeout等待時間
# q.put_nowait(item) 相當q.put(item, False)
# q.task_done() 在完成一項工作之后,q.task_done() 函數向任務已經完成的隊列發送一個信號
# q.join() 實際上意味着等到隊列為空,再執行別的操作


class Lianjia:
    def __init__(self):
        self.url_temp = url = "https://gz.lianjia.com/ershoufang/pg{}/"
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"}
        self.url_queue = Queue()
        self.html_queue = Queue()
        self.content_queue = Queue()

    def get_url_list(self):
        # return [self.url_temp.format(i) for i in range(1,14)]
        for i in range(1, 14):
            # 把13個索引頁面的Url放進url_queue隊列里
            self.url_queue.put(self.url_temp.format(i))

    def parse_url(self):
        while True:
            # get方法和task_done搭配使用
            # 在put是隊列+1,get和task_done一起使用時隊列才會-1
            url = self.url_queue.get()
            print(url)
            response = requests.get(url, headers=self.headers)
            # 然后把索引頁的響應頁面放進html_queue隊列里
            self.html_queue.put(response.content.decode())
            self.url_queue.task_done()

    def get_content_list(self):  # 提取數據
        while True:
            # 先從索引頁響應頁面html_queue隊列里面取出索引頁面
            html_str = self.html_queue.get()

            html = etree.HTML(html_str)
            div_list = html.xpath('//li[@class="clear LOGCLICKDATA"]')  # 分組
            content_list = []
            for div in div_list:
                item = {}
                item['title'] = div.xpath('.//div[@class="title"]/a/text()')
                item['href'] = div.xpath('.//div[@class="title"]/a/@href')
                item['totalPrice'] = div.xpath('.//div[@class="totalPrice"]/span/text()')
                item['houseInfo'] = div.xpath('.//div[@class="houseInfo"]/text()')
                content_list.append(item)
            # 把content_list放進content_queue里面
            self.content_queue.put(content_list)
            self.html_queue.task_done()

    def save_content_list(self):  # 保存
        while True:
            content_list = self.content_queue.get()
            for i in content_list:
                print(i)
                pass
            self.content_queue.task_done()

    def run(self):  # 實現主要邏輯
        thread_list = []
        # 1.url_list
        # threading.Thread不需要傳參數,參數都是從隊列里面取得
        t_url = threading.Thread(target=self.get_url_list)
        thread_list.append(t_url)
        # 2.遍歷,發送請求,獲取響應
        for i in range(20):  # 添加20個線程
            t_parse = threading.Thread(target=self.parse_url)
            thread_list.append(t_parse)
        # 3.提取數據
        for i in range(2):  # 添加2個線程
            t_html = threading.Thread(target=self.get_content_list)
            thread_list.append(t_html)
        # 4.保存
        t_save = threading.Thread(target=self.save_content_list)
        thread_list.append(t_save)
        for t in thread_list:
            t.setDaemon(True)  # 把子線程設置為守護線程,該線程不重要,主線程結束,子線程結束(子線程是while true不會自己結束)
            t.start()

        for q in [self.url_queue, self.html_queue, self.content_queue]:
            q.join()  # 讓主線程等待阻塞,等待隊列的任務完成(即隊列為空時 )之后再進行主線程

        print("主線程結束")


if __name__ == '__main__':
    qiubai =Lianjia()
    qiubai.run()

# 所沒有tast_done方法,程序最終會卡着不動,無法終止

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM