基於Scrapy的B站爬蟲
最近又被叫去做爬蟲了,不得不拾起兩年前搞的東西。
說起來那時也是突發奇想,想到做一個B站的爬蟲,然后用的都是最基本的Python的各種庫。
不過確實,實現起來還是有點麻煩的,單純一個下載,就有很多麻煩事。
這回要快速實現一個爬蟲,於是想到基於現成的框架來開發。
Scrapy是以前就常聽說的一個爬蟲框架,另一個是PySpider。
不過以前都沒有好好學過框架。
這回學習了一波,順便擼出來一個小Demo。
這個Demo功能不多,只能爬取B站的視頻列表,不過主要在於學習、記錄、交流,不在於真的要爬B站。。
然后代碼都在GitHub了:
https://github.com/wangzb96/Scrapy-Bilibili
爬蟲的定義
爬蟲的定義有以下兩點:
- 自動爬取網絡資源 (html、json、...)
- 模擬瀏覽器行為
第一點是常規的定義,第二點是進階版的定義,因為如果爬蟲要持久穩定地爬取數據,那么就要模擬真人使用瀏覽器的行為,模擬得越像越好,越不容易被封。
爬蟲的流程
- 頁面分析
- 工具
- 谷歌瀏覽器
- 360極速瀏覽器
- 問題
- 哪些數據需要爬取?
- 這些數據存放在什么文件上?
- 這些文件的鏈接是什么?
- 鏈接的生成規則是什么?
- 存放在其他頁面文件
- 通過某種簡單的規則生成 (如遞增的數字)
- 工具
- 獲取鏈接
- 通過解析網頁文件得到鏈接
- 通過模版生成不同的鏈接
- 下載資源
requests
asyncio
- 頁面解析
json
bs4.BeautifulSoup
pyquery.PyQuery
re
- 數據存儲
- 文件
- 數據庫
Scrapy框架介紹
Scrapy是一個用於實現爬蟲的Python框架,它將爬蟲運行過程抽象成幾個組件,如圖(圖片來自官網):

其中主要包括:
- Engine (不需要用戶實現)
- 驅動組件運行
- Scheduler (不需要用戶實現)
- 接收請求
- 調度請求
- 返回請求
- Downloader (不需要用戶實現)
- 請求網絡資源
- 返回響應
- Spider (需要用戶實現)
- 返回初始請求
- 頁面解析
- 返回Item對象
- 返回新請求
- Item Pipeline (需要用戶實現)
- Item對象清洗
- Item對象驗證
- Item對象保存
- Middleware (需要用戶實現)
- Downloader Middleware
- Spider Middleware
- 在組件運行的一些子過程中執行額外操作
當應用Scrapy實現爬蟲時,由於Scrapy已經實現了Engine、Scheduler、Downloader等組件,所以用戶無需實現這些組件,用戶主要要實現Spider,以及按需實現Item Pipeline、Middleware,另外還需要實現Item類。
基於Scrapy的B站爬蟲實現
以下介紹一個B站美食區視頻列表爬蟲實現的案例。
開始一個Scrapy項目
首先在命令行或終端中輸入:
scrapy startproject scrapy_bilibili
Scrapy會在當前目錄下生成如下的目錄:
- scrapy_bilibili
- scrapy_bilibili
- spiders
- __init__.py
- __init__.py
- items.py
- pipelines.py
- middlewares.py
- settings.py
- spiders
- scrapy.cfg
- scrapy_bilibili
其中斜體的是文件夾,我們把加粗的文件夾設置成項目的根目錄。
B站美食區視頻列表頁面分析
B站美食區的鏈接地址是固定的:
https://www.bilibili.com/v/life/food/?#/all/default/0
進去后里面有個視頻列表,我們使用360極速瀏覽器分析:

分析后,發現一個“newlist”鏈接:
https://api.bilibili.com/x/web-interface/newlist?rid=76&type=0&ps=100&pn=1
點開后,可以看到這個鏈接返回了一個json文件,里面記錄了視頻列表及其中每一個視頻的信息,包括視頻的標題、id、播放量等:

分析一下這個鏈接的參數,rid
是美食區的id,type
是按日期排序還是按熱度排序,ps
表示每頁視頻數量,pn
表示第幾頁。
然后觀察B站的視頻頁面:

發現視頻頁面的鏈接地址是由固定模版生成的:
https://www.bilibili.com/video/{bvid}
其中bvid是每個視頻的id,可以通過“newlist”鏈接獲得。
如果要爬取視頻頁面信息,那么應用以上方法分析一下就可以了。
B站視頻列表Item類實現
Scrapy的Item類,在概念上相當於C/C++的結構體、Java的POJO。
這里簡單起見,我們將視頻列表json文件中每個元素感興趣的信息均存放在一個Item對象中,代碼如下:
點擊查看詳情
from scrapy import Item, Field
class BilibiliVideoListItem(Item):
# 視頻信息
aid = Field() # 視頻ID
bvid = Field() # 視頻ID
tid = Field() # 區
pic = Field() # 封面
title = Field() # 標題
desc = Field() # 簡介
duration = Field() # 總時長,所有分P時長總和
videos = Field() # 分P數
pubdate = Field() # 發布時間
view = Field() # 播放數
danmaku = Field() # 彈幕數
reply = Field() # 評論數
like = Field() # 點贊數
dislike = Field() # 點踩數
coin = Field() # 投幣數
favorite = Field() # 收藏數
share = Field() # 分享數
cid = Field() # 未知
# UP主信息
mid = Field() # UP主ID
name = Field() # 昵稱
face = Field() # 頭像
B站Spider類實現
Spider類是實現爬蟲的關鍵。
首先返回初始鏈接,這里我們直接返回“newlist”第一頁的鏈接;
然后實現頁面解析邏輯,由於返回的頁面是json文件,我們直接將它轉成Python對象,之后依次取出感興趣的屬性,最后封裝成Item對象就可以了;
再之后要返回新的請求對象,這里直接返回下一頁鏈接,並且判斷是否已將所有視頻都爬取了。
點擊查看詳情
from scrapy import Spider, Request
from scrapy_bilibili.items import BilibiliVideoListItem
from util import json2obj
class BilibiliSpider(Spider):
# Spider名字
name = 'BilibiliSpider'
# 視頻列表鏈接模版 (三個參數)
url_fmt = r'https://api.bilibili.com/x/web-interface/newlist?' \
r'rid={rid}&type=0&ps={ps}&pn={pn}'
def __init__(self, *args, rid: int=None, ps: int=None, **kwargs):
"""初始化
Args:
rid: 區ID,默認76,表示美食區
ps: 視頻列表每頁視頻數量,默認100
"""
super().__init__(*args, **kwargs)
if rid is None: rid = 76
if ps is None: ps = 100
self.rid = rid
self.ps = ps
# 視頻列表鏈接模版 (一個參數)
self.url = self.url_fmt.format(rid=rid, ps=ps, pn='{}')
# 初始鏈接
self.start_urls = [self.url.format(1)]
def parse(self, response):
"""頁面解析"""
url = response.url
pn = int(url.rsplit('=', 1)[-1]) # 視頻列表頁碼
page = response.body.decode('UTF-8') # 響應對象中的json文件
obj = json2obj(page) # 轉成Python對象
data = obj['data']
count = data['page']['count'] # 該區當前視頻總數
archives = data['archives']
for i in archives:
aid = i['aid']
bvid = i['bvid'].strip()
tid = i['tid']
pic = i['pic'].strip()
title = i['title'].strip()
desc = i['desc'].strip()
duration = i['duration']
videos = i['videos']
pubdate = i['pubdate']
stat = i['stat']
view = stat['view']
danmaku = stat['danmaku']
reply = stat['reply']
like = stat['like']
dislike = stat['dislike']
coin = stat['coin']
favorite = stat['favorite']
share = stat['share']
cid = i['cid']
owner = i['owner']
mid = owner['mid']
name = owner['name'].strip()
face = owner['face'].strip()
# 封裝成Item對象
item = BilibiliVideoListItem(
aid=aid,
bvid=bvid,
tid=tid,
pic=pic,
title=title,
desc=desc,
duration=duration,
videos=videos,
pubdate=pubdate,
view=view,
danmaku=danmaku,
reply=reply,
like=like,
dislike=dislike,
coin=coin,
favorite=favorite,
share=share,
cid=cid,
mid=mid,
name=name,
face=face,
)
yield item
if pn*self.ps<count: # 如果當前爬取的視頻數量少於視頻總數
url = self.url.format(pn+1) # 下一頁的頁碼
req = Request(url, callback=self.parse) # 下一頁的請求對象
yield req
其中用到的一個函數json2obj
的實現如下:
點擊查看詳情
import json
def json2obj(s: str, enc: str=None):
"""json字符串 -> Python對象
Args:
s: 輸入的json字符串
enc: 字符串編碼格式,默認UTF-8
Returns:
Python對象
"""
if enc is None: enc = 'UTF-8'
return json.loads(s, encoding=enc)
B站Pipeline類實現
接下來要將獲取到的Item對象去重並存入數據庫。
這里我們使用Redis(Windows系統下用Memurai代替)中的Set來實現去重功能,我們用Set存儲視頻的bvid,當一個新的Item對象傳入進來,判斷其bvid是否已在Set中,如果已在則丟棄,如果不在則更新Set,並將Item對象存入數據庫。
數據庫采用MongoDB,每次存數據需要傳遞一個字典或列表對象,所以我們將Item對象轉換成字典對象,並存入數據庫中。
點擊查看詳情
from database import MongoDataBase
from container import Redis
class BilibiliPipeline:
def __init__(self):
"""初始化"""
# 數據庫對象
self.dataBase = MongoDataBase()
# 數據表對象,負責數據保存
self.datas = self.dataBase.getDatas('bilibili', 'video_list')
# 緩存對象
self.redis = Redis(cp=True)
# 集合對象,負責數據去重
self.set = self.redis.getSet('bilibili_video_list')
def process_item(self, item, spider):
"""處理Item對象
對Item對象用Redis的Set進行去重,然后存入MongoDB。
"""
bvid = item['bvid'] # 視頻ID
if bvid not in self.set: # 如果視頻ID不在集合中
self.set.insert(bvid) # 視頻ID加入集合
self.datas.insert(dict(item)) # Item對象轉成字典存入數據庫
return item
其中,container
的實現如下:
點擊查看詳情
from typing import Generator
import redis
class Container:
def __len__(self) -> int:
"""返回容器中元素個數"""
return self.size()
def __contains__(self, *args, **kwargs) -> bool:
"""判斷元素是否存在於容器中"""
return self.has(*args, **kwargs)
def __iter__(self) -> Generator:
"""迭代訪問容器中的所有元素"""
return self.iter()
def size(self) -> int:
"""返回容器中元素個數"""
pass
def has(self, *args, **kwargs) -> bool:
"""判斷元素是否存在於容器中"""
pass
def iter(self) -> Generator:
"""迭代訪問容器中的所有元素"""
pass
class Set(Container):
def insert(self, *args, **kwargs):
"""插入一個元素"""
pass
def delete(self, *args, **kwargs):
"""刪除一個元素"""
pass
def inserts(self, *args, **kwargs):
"""插入多個元素"""
pass
def deletes(self, *args, **kwargs):
"""刪除多個元素"""
pass
class Redis:
def __init__(self, cp: bool=None, cs: int=None, *args, **kwargs):
"""初始化
Args:
cp: 是否使用連接池,默認否
cs: 連接池的最大連接數,默認8
"""
kwargs['decode_responses'] = True # 使Redis默認返回字符串
if cp:
if cs is None: cs = 8
cp = redis.ConnectionPool(max_connections=cs)
kwargs['connection_pool'] = cp
self.redis = redis.Redis(*args, **kwargs)
def getSet(self, key: str):
"""返回集合容器
Args:
key: 集合的名字
"""
return Redis.Set(self.redis, key)
class Container:
def __init__(self, redis, key: str):
self.redis = redis
self.key = key
self.pipeline = None
def getRedis(self):
if self.pipeline: return self.pipeline
return self.redis
def getPipeline(self):
if self.pipeline: return False
self.pipeline = self.redis.pipeline()
return True
def execute(self):
if self.pipeline:
r = self.pipeline.execute()
self.pipeline = None
return r
class Set(Container, Set):
def __init__(self, redis, key: str):
super().__init__(redis, key)
def size(self):
return self.getRedis().scard(self.key)
def has(self, x):
return self.getRedis().sismember(self.key, x)
def iter(self):
return self.getRedis().smembers(self.key)
def insert(self, x):
return self.inserts(x)
def delete(self, x):
return self.deletes(x)
def inserts(self, x, *args):
return self.getRedis().sadd(self.key, x, *args)
def deletes(self, x, *args):
return self.getRedis().srem(self.key, x, *args)
database
的實現如下:
點擊查看詳情
import pymongo
class DataBase:
def getDatas(self, *args, **kwargs):
"""返回數據表對象"""
pass
class Datas:
def insert(self, *args, **kwargs):
"""插入一個數據"""
pass
def delete(self, *args, **kwargs):
"""刪除一個數據"""
pass
def update(self, *args, **kwargs):
"""更新一個數據"""
pass
def inserts(self, *args, **kwargs):
"""插入多個數據"""
pass
def deletes(self, *args, **kwargs):
"""刪除多個數據"""
pass
def updates(self, *args, **kwargs):
"""更新多個數據"""
pass
def find(self, *args, **kwargs):
"""查找數據"""
pass
class MongoDataBase(DataBase):
def __init__(self, *args, **kwargs):
"""初始化"""
self.mongo = pymongo.MongoClient(*args, **kwargs)
def getDatas(self, db_key: str, datas_key: str):
"""返回數據表對象
Args:
db_key: 數據庫名字
datas_key: 數據表名字
"""
return MongoDataBase.MongoDatas(self.mongo[db_key][datas_key])
class MongoDatas(DataBase.Datas):
def __init__(self, datas):
self.datas = datas
def insert(self, d):
return self.datas.insert_one(d)
def delete(self, c):
return self.datas.delete_one(c)
def update(self, c, d):
return self.datas.update_one(c, d)
def inserts(self, d):
return self.datas.insert_many(d)
def deletes(self, c):
return self.datas.delete_many(c)
def updates(self, c, d):
return self.datas.update_many(c, d)
def find(self, *args, **kwargs):
r = self.datas.find(*args, **kwargs)
for i in r:
del i['_id'] # 刪除_id屬性
yield i
最后要在scrapy_bilibili/scrapy_bilibili/settings.py中設置一下Pipeline:
點擊查看詳情
ITEM_PIPELINES = {
'scrapy_bilibili.pipelines.BilibiliPipeline': 100,
}
運行爬蟲
好不容易實現了爬蟲,接下來就來跑一跑吧。
命令行或終端輸入:
scrapy crawl BilibiliSpider
如果要傳入參數,則可以輸入:
scrapy crawl BilibiliSpider -a rid=17
注意要切換到項目的根目錄,並且保證Redis和MongoDB的服務都已經開啟了。

控制台正在瘋狂輸出...
過了十分鍾,我們來看看Redis和MongoDB的情況:

上面是Redis的情況,下了5萬多條數據,然后下面是MongoDB的情況:

整體情況順利。