一、選題背景
為什么要選擇此選題呢?
針對爬蟲數量日益增長這一現象,通過控制台控制的腳本雜亂無序,本文設計了一種基於可視化頁面來管理爬蟲腳本的系統。該系統利用Scrapyd,它是一個部署和運行 Scrapy 爬蟲的應用,允許使用 HTTP JSON API 部署 Scrapy 項目並控制其爬蟲。
二、基於scrapy框架的爬蟲系統的設計方案
該系統采用Python作為第一編程語言,后台使用Flask框架,前台使用Vue框架,任務調度、服務器注冊、Scrapy執行模塊、Scrapy爬蟲腳本、本系統是以主服務器為主,從服務器為次,服務器中心的主要功能是,由於大部分的腳本都是在主節點Master上運行,如果遇到需要分布式的爬蟲可以在現在服務器上搭建完環境上傳完腳本,通過服務器注冊中心添加節點后,即可在系統上管理。主節點與其他服務器通信方式采用輪訓請求判斷服務器是否在線。在服務器節點信息發生變更時通知任務調度模塊
任務調度模塊負責所有已注冊服務器的爬蟲任務的創建、更改和刪除,以及爬蟲任務的配置、執行和調度。通過調度請求將調度信息向各個服務器上的Scrapy執行模塊發出調度請求。同時還支持對任務調度日志以及執行日志的監控。
執行器模塊負責接收調度模塊發出的調度請求並能執行任務的業務邏輯,由於需要用到Scrapy框架的調度,所以每個爬蟲項目都必須嚴格按照Scrapy框架進行開發,開發完成后通過Scrapyd 上傳到各個服務器。執行爬蟲的原理很簡單,其實就是一個 shell 命令。用戶在爬蟲中輸入執行爬蟲的 shell 命令,例如scrapy some_spider,Scrapt執行器會讀取這個命令,並在 shell 中直接執行。因此,每一次運行爬蟲任務就是運行一次shell命令,區別在於需要安裝 Python 跟Scrapy。
其中、任務管理中心按划分的功能又可以分為三個小模塊:任務管理模塊,調度管理模塊、日志管理模塊。
任務管理:如何執行、調度爬蟲抓取任務,以及如何監控任務,包括日志監控等等;
爬蟲管理:包括爬蟲部署,即將開發好的爬蟲部署(打包或復制)到相應的節點上,以及爬蟲配置和版本管理;
節點管理:包括節點(服務器/機器)的注冊和監控,以及節點之間的通信,如何監控節點性能狀況等;
前端應用:包括一個可視化 UI 界面,讓用戶可通過與其交互,與后台應用進行通信。
三、基於scrapy框架的爬蟲系統的實現方案
1. 定時任務模塊
該模塊主要功能點是爬蟲定時任務的查看、修改、刪除以及任務的調度,任務的啟停。前端UI部分使用VUE。
圖3.1 定時任務模塊列表圖
圖3.2 定時任務模塊操作管理圖
該頁面展示了關閉調度器,暫停調度器,移除所有任務按鈕,爬蟲任務列表,其中對應着每個任務的序號、服務器、項目名、定時類型、定時長度、成功運行的次數、最近一次運行的時間&下次運行的時間,修改的時間,調度狀態,以及對該任務的操作方法,新增任務作為移動到爬蟲列表里的設定定時任務里面。
圖3.3 爬蟲列表管理圖
爬蟲腳本任務的功能主要分為新建、修改、刪除和查看功能。在scheduler調度累分別對應了addJob、remove_job、jobDetail的方法。這部分代碼作為暴露給外部的API接口的只是調用了scheduler各種的方法,具體代碼如下:
def run_job(): global scheduler job_id = request.args.get("job_id") job = scheduler.get_job(job_id) job_info = get_job_info(job) if job_info: scheduler.add_job(run_spider, kwargs=job_info) message = "運行成功" message_type = "success" else: message = "運行失敗" message_type = "warning" data = { "message": message, "message_type": message_type } return jsonify(data) @scheduler_app.route("/removeJob") def remove_job(): global scheduler job_id = request.args.get("job_id") scheduler.remove_job(job_id) return jsonify( { "message": "任務移除成功" } ) @scheduler_app.route("/pauseJob") def pause_job(): global scheduler job_id = request.args.get("job_id") scheduler.pause_job(job_id) return jsonify( { "message": "暫停成功", "message_type": "warning" } ) @scheduler_app.route("/resumeJob") def resume_job(): global scheduler job_id = request.args.get("job_id") scheduler.resume_job(job_id) return jsonify( { "message": "繼續運行", "message_type": "success" } )
爬蟲任務調度是,將配置好的爬蟲任務放入調度列表中,按照自定義的觸發規則來調度任務。該方法會先獲取scheduler類的set_schedule方法數據插入后會將任務插入到調度列表中,並根據此方法的返回值來返回改任務的調度狀態具體代碼如下:
if trigger == "cron": crontabs = parse_crontab(cron) if not crontabs: return None minute, hour, day, month, day_of_week = crontabs scheduler.add_job( run_spider, kwargs=kwargs, id=job_id, replace_existing=True, trigger="cron", minute=minute, hour=hour, day=day, month=month, day_of_week=day_of_week ) # 時間間隔方式執行 elif trigger == "interval": try: interval = int(interval) except Exception: return None scheduler.add_job( run_spider, kwargs=kwargs, id=job_id, replace_existing=True, trigger="interval", minutes=interval ) # 執行一次 elif trigger == "date": try: run_datetime = datetime.strptime(run_datetime, "%Y-%m-%d %H:%M:%S") except Exception: return None scheduler.add_job( run_spider, kwargs=kwargs, id=job_id, replace_existing=True, trigger="date", run_date=run_datetime ) # 隨機時間間隔執行 用於擬人操作 elif trigger == "random": try: randoms = random_time.split("-") random_start, random_end = [int(rand.strip()) for rand in randoms] except Exception: return None random_delay = random.randint(random_start, random_end) scheduler.add_job( run_spider, kwargs=kwargs, id=job_id, replace_existing=True, trigger="interval", minutes=random_delay ) else: return None return job_id
setschedule 方法首先獲取了全局的 scheduler 調度器,判斷是否是更新還是新增操作,如果是新增操作就通過 hash 生成一個 uuid 當作 jobid ,通過將傳入的參數加jobid 封裝成一個數組,系統設定定時的類型有四種分別是單次任務、周期任務、間隔任務、隨機任務,按傳入的參數 trigger 判斷定時任務的類型,最后調用 scheduler 的 addjob 方法將任務加入到Scrapy調度中心。
2. 日志管理模塊實現的頁面以及代碼
日志模塊分為日志的查看,清理這兩個功能。細分下來日志查看還分為調度模塊統計查看和腳本執行過程日志的查看,在Controller中對應的方法是getschedulehistory,將查詢到的調度信息以 JSON 的形式返回,前端利用Echart 圖標的折線圖顯示,通過點擊任務的調度歷史按鈕 能夠實時的查看某個任務的最新執行日志情況折線圖,調度模塊調度歷史日志展示如圖。
圖3.4 調度模塊調度歷史日志展示圖1
圖3.3 調度模塊調度歷史日志展示圖2
其中從Scrapyd讀取調度歷史的方法 getschedulehistory 的實現細節如下:
job_id = request.args.get("job_id") count = request.args.get("count", "30") with scheduler_history.lock: result = history.select(job_id, count) fmt = "%H:%M" now = datetime.now() min_time = now spider_name = None schedule_list = defaultdict(int) for row in result: if spider_name is None: spider_name = row["spider_name"] schedule_time = row["schedule_time"] if schedule_time < min_time: min_time = schedule_time t = schedule_time.strftime(fmt) schedule_list[t] += 1 time_list = [] while min_time <= now: time_list.append(min_time.strftime(fmt)) min_time += timedelta(minutes=1) data_list = [] for time_item in time_list: data_list.append(schedule_list.get(time_item, 0)) data = { "title": spider_name, "values": data_list, "keys": time_list } return jsonify(data)
getschedulehistory先獲取了jobid,並設置了要獲取的條數,鎖定Scrapy阻止線程新增,然后獲取該任務下的設定的調度任務的條數。通過實例化一個 dict 對獲取的調度任務歷史列表進行遍歷,判斷符合時間段的條件后,拼接成一個list 返回符合 echart 圖標的 json 數據返回給前端展示。
通過該任務定時列表的查看日志文件可以獲取到最新的爬蟲運行日志文件,點擊日志目錄可以獲取最近五條的腳本日志文件,如果想查詢腳本運行日志可以從任務列表里面查看。
圖3.6 查看單個日志
日志的清理目前是設置超過3個閾值后采用先進先出的刪除策略,還有一種
更優的解決方案是設定每個每個項目日志的大小,然后新增一個定時周期的腳本來清除數據。
3. 服務器注冊實現的頁面以及代碼
服務器模塊分為服務器的查看,注冊和刪除這三個功能。服務器詳情頁頁面如下圖所示。
圖3.7 服務器列表圖
如果開發者想要添加服務器沒有將自己的服務器信息填寫完整,提示用戶請填寫完整。服務器列表添加頁面。如下圖所示。
圖3.8 服務器狀態圖
具體代碼如下:
@api_app.route("/addServer", methods=["POST"]) def add_server(): server_host = request.json.get("server_host", "") server_name = request.json.get("server_name", "") server_username = request.json.get("server_username", "") server_password = request.json.get("server_password", "") server_name = server_name.strip() server_host = server_host.strip() server_username = server_username.strip() server_password = server_password.strip() # 參數校驗,以及服務器校驗, # 添加的時候不需要校驗服務器正確性,允許先添加地址和端口再啟動服務 # if (not all([server_name, server_host])) or (check_server(server_host) is False): if not all([server_name, server_host]): message = "添加失敗,請檢查服務器地址是否正確" message_type = "warning" else: user_server_table.insert( { "server_name": server_name, "server_host": server_host, "server_username": server_username, "server_password": server_password, } ) message = "添加成功" message_type = "success" data = { "message": message, "message_type": message_type } return jsonify(data)
3.3 爬蟲腳本主體功能的具體實現
3.3.1 爬蟲腳本的實現過程
以爬取京東為例,當前爬蟲腳本主要有4個模塊,數據字段定義(items.py)、數據獲取腳本(spider.py)、數據加工程序(piplines.py)以及配置文件(settings.py)。展示系爬蟲腳本組成結構:
圖3.9 爬蟲腳本結構圖
數據字段定義程序
Scrapy系統中的item類是主要設置數據字段的類,其組成與字典類型相近。在對網站內容進行爬取時,對象由項目類型定義,並且項目只有一個 Feild 類的對象。對於京東目標網站來說,數據描述模塊一共標識了7個對象,如以下代碼所示。
class ProductItem(Item): # define the fields for your item here like: name = scrapy.Field() dp = Field() title = Field() price = Field() comment = Field() url = Field() type = Field() Pass
數據獲取&解析腳本
在數據爬取的程序中,Scrapy 提供解析的摸板,我們定義了爬蟲腳本的開始URL、用於 CSS 或者 XPATH 提取規則以及網頁的加載的設置等等。下面我們直接根據下圖展示的網頁來對字段進行提取。
圖3.10 京東爬取目標頁1
圖3.11 京東爬取目標頁2
下面根據 CSS 選擇器的抽取數據方法依次對字段進行解析,具體代碼如下:
def parse(self, response): soup = BeautifulSoup(response.text, 'lxml') lis = soup.find_all(name='li', class_="gl-item") for li in lis: proc_dict = {} dp = li.find(name='span', class_="J_im_icon") if dp: proc_dict['dp'] = dp.get_text().strip() else: continue id = li.attrs['data-sku'] title = li.find(name='div', class_="p-name p-name-type-2") proc_dict['title'] = title.get_text().strip() price = li.find(name='strong', class_="J_" + id) proc_dict['price'] = price.get_text() comment = li.find(name='a', id="J_comment_" + id) proc_dict['comment'] = comment.get_text() + '條評論' url = 'https://item.jd.com/' + id + '.html' proc_dict['url'] = url proc_dict['type'] = 'JINGDONG' yield proc_dict
數據加工處理程序
在 Pipeline 中定義了數據持久化的方法。本文基於 Twisted 框架實現了異步存儲數據的 MysqlPipline,使得從 Spider 中解析出的 item 數據異步插入到 MySQL 數據庫中。其核心步驟的偽代碼如下:
class MongoPipeline(object): def __init__(self, mongo_url, mongo_db, collection): self.mongo_url = mongo_url self.mongo_db = mongo_db self.collection = collection @classmethod # from_crawler是一個類方法,由 @classmethod標識,是一種依賴注入的方式,它的參數就是crawler # 通過crawler我們可以拿到全局配置的每個配置信息,在全局配置settings.py中的配置項都可以取到。 # 所以這個方法的定義主要是用來獲取settings.py中的配置信息 def from_crawler(cls, crawler): return cls( mongo_url=crawler.settings.get('MONGO_URL'), mongo_db=crawler.settings.get('MONGO_DB'), collection=crawler.settings.get('COLLECTION') ) def open_spider(self, spider): self.client = pymongo.MongoClient(self.mongo_url) self.db = self.client[self.mongo_db] def process_item(self, item, spider): # name = item.__class__.collection name = self.collection self.db[name].insert(dict(item)) return item def close_spider(self, spider): self.client.close()
配置文件
在抓取京東頁面時,Scrapy 提供了輔助配置項來完成爬蟲系統的抓取流程,下面將介紹幾個主要配置項:
ROBOTSTXT_OBEY:如果為真,爬蟲將遵守機器人的協議。它通常設置為real,表示它不遵守robots協議。
COOKIES_ENABLED :在登錄過程中需要決定是否使用cookie。
SCHEDULER_PERSIST :在 Redis 中保存 Scrapy-Redis 的各個隊列,即在暫停爬蟲時,不清空請求隊列。
數據連接的基本信息:數據庫的 IP 地址,數據庫端口等信息。本文所實現的京東爬蟲采用mongodb進行存儲,故設置一下配置:
KEYWORDS = ['phone']
MAX_PAGE = 2
MONGO_URL = 'localhost'
MONGO_DB = 'test'
COLLECTION = 'PhoneItem'
SELENIUM_TIMEOUT = 30
DEFAULT_REQUEST_HEADERS :設置默認的瀏覽器頭部
3.3 數據存儲模塊的實現
腳本編寫完成后將腳本打包上傳到系統並運行,就可以在數據庫中獲取數據來,下圖是已經成功抓取並存入mongodb的數據。
圖3.13 Mongodb數據存儲圖
四、總結
本次作業在設計過程中,我學習到了許多的新技術,如Flask框架, Vue框架, Scrapy執行模塊、Scrapy爬蟲腳本等。同時也收獲了許多新的編程思維,在日后進行Python開發時有更多的思路及方法。本次作業發現我對Python的掌握確實猶如滄海一粟,在以后的學習生涯定要更加努力。