在Scrapy中使用Django的ORM異步存儲數據


在Scrapy中使用Django的ORM異步存儲數據

django的orm可以脫離django使用,只要我們將django的環境舒適化就可以了。

在scrapy中使用

首先我們的創建一個django項目,然后在創建一個scrapy項目。

然后再scrapy中初始化django的環境

一般我們在scrapy的項目的__init__.py里面初始化

import django
import os
import sys

# 將django的項目路徑加入到當前的環境
sys.path.insert(0, os.path.dirname(os.getcwd()))

# django項目舒適化
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProject.settings')

django.setup()

初始化完成之后,我們就可以直接在scrapy中導入django的orm並使用了。

異步存儲

因為scrapy是異步的爬蟲框架,如果我們在里面直接使用django的orm會有io阻塞的操作。這時候我們就借助asyncio這個包來幫助我們在scrapy中運行同步阻塞的代碼。

因為我們scrapy中處理數據都放在pipline中處理,所以我放在pipline中展示

from concurrent.futures import ThreadPoolExecutor
import asyncio
from goods import models
from . import items


class WebspidersPipeline:
    '''todo 異步存儲'''
    
    # 創建事件循環對象
    loop = asyncio.get_event_loop()
    # 創建線程池
    executor = ThreadPoolExecutor()
    # 任務隊列
    tasks = []
	
    # 處理不同的pipline
    async def process_item(self, item, spider):
        if isinstance(item, items.GoodsItem):
            return self.process_goods_item(item, spider)
        elif isinstance(item, items.GoodsSizeItem):
            return self.process_goods_size_item(item, spider)
        elif isinstance(item, items.GoodsStockItem):
            return self.process_goods_stock_item(item, spider)
        return item
	
    def process_goods_item(self, item, spider):
        '''將保存數據的處理方法加入到任務隊列'''
        task = self.loop.run_in_executor(self.executor, self.executor_func(models.Goods, item), )
        self.tasks.append(task)
        return item

    def process_goods_size_item(self, item, spider):
        task = self.loop.run_in_executor(self.executor, self.executor_func(models.GoodsSize, item), )
        self.tasks.append(task)
        return item

    def process_goods_stock_item(self, item, spider):
        task = self.loop.run_in_executor(self.executor, self.executor_func(models.GoodsStock, item), )
        self.tasks.append(task)
        return item

    @staticmethod
    def executor_func(model, item):
        '''主要作用是將有參數的函數轉換為無參數的函數返回,方便run_in_executor方法調用,這個方法它只接受位置傳參,不接受關鍵字傳參'''
        def func():
            return model.objects.create(**item)

        return func

    def close_spider(self, spider):
        '''當爬蟲關閉的時候調用這個方法保存數據'''
        self.loop.run_until_complete(asyncio.wait(self.tasks))

運行結果

之前直接使用同步的方法存儲的時候,2000個請求+數據存儲花費了大約10分鍾(sqlite3)

后面使用異步存儲的時候,使用sqlite3會報錯,因為sqlite3是單線程的,我們是一個線程池對象,並發存儲會被sqlite3拒絕(database was locked)

后面改用了mysql存儲,2000個請求+數據存儲花費了大約40s,這個提升量還是很驚人的。

后面分析了一下,在scrapy中使用同步的方式存儲會導致scrapy的異步請求會等待同步的存儲完成之后才去執行,大量的時間浪費了等待上面。

后面單獨執行網絡請求部分,沒有數據存儲,2000個請求花費了大約25s旁邊。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM