2年前的實驗室項目需要對exploit-db進行爬蟲,這里回顧一下知識。
基本思路,使用urllib.request訪問exploit-db,使用BeautifulSoup對Response進行解析,然后將提取出的內容存儲至Mysql中。
urllib
寫這個demo的時候Python2還沒有廢棄,這里將代碼移植至Python3中。
由於exploit-db中漏洞頁面的url是https://www.exploit-db.com/exploits/
+ eid
的方式構成的,因此遍歷eid即可爬取所有的漏洞。
構造Request與網頁訪問
Request文檔
class urllib.request.Request(url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None)
headers可以在構造函數中指定,也可以通過add_header
方法進行添加
urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)
urlopen()函數既可以接收url,也可以接收Request。urlopen()將返回一個字節對象,需要我們自行處理編碼。
def spider(spider_url):
# 構造request
user_agent = random.choice(ua_list)
spider_request = request.Request(spider_url)
spider_request.add_header('User-Agent', user_agent)
spider_response = request.urlopen(spider_request, timeout=30)
html = spider_response.read().decode('utf-8')
異常處理
urllib.request中的異常
爬取過程中遇到的一些異常
- URLError //頁面不存在
- socket.timeout //read()超時
- UnicodeDecodeError //目標頁面是pdf,decode('utf-8')錯誤
def spider(spider_url):
# 構造request
user_agent = random.choice(ua_list)
spider_request = request.Request(spider_url)
spider_request.add_header('User-Agent', user_agent)
try:
spider_response = request.urlopen(spider_request, timeout=30)
except error.URLError as e:
return 'error, URLError'
# noinspection PyBroadException
try:
html = spider_response.read().decode('utf-8')
except socket.timeout as e:
return 'error, socket.timeout'
except UnicodeDecodeError as e:
return 'error, UnicodeDecodeError'
except Exception as e:
return 'error, Exception: %s' % e
return html
BeautifulSoup
exploit-db在這段時間也更新了頁面,之前寫的解析函數已經無法運行。
BeautifulSoup的安裝和詳細使用方法可以參考官方文檔,這里對使用的函數進行說明:
BeautifulSoup通過將html/xml文件轉變成一個BeautifulSoup
對象,然后根據該對象提供的一些方法對html/xml進行查找和修改。
BeautifulSoup可以通過.
訪問標簽,通過[]
訪問屬性,通過find()和find_all()選擇需要的標簽,然后提取其中的信息。
Chrome提供的檢查工具可以很容易確定元素的位置,分析html中需要的標簽的位置,然后選擇合適的過濾器。
def bs4html(html):
# 實現對html的解析
soup = BeautifulSoup(html, 'html.parser')
for div in soup.find_all('div', class_='col-sm-12 col-md-6 col-lg-3 d-flex align-items-stretch'):
for h in div.find_all('div', class_='col-6 text-center'):
print(h.h4.get_text().strip() + h.h6.get_text().strip())
for s in div.find_all('div', class_='stats h5 text-center'):
if s.strong.string.strip() == 'EDB Verified:':
if s.i['class'] == ['mdi', 'mdi-24px', 'mdi-check']:
print('EDB Verified: Yes')
else:
print('EDB Verified: No')
elif s.strong.string.strip() == 'Exploit:':
print(s.strong.string.strip() + s.a['href'])
else:
if s.find('a') is None:
print(s.strong.string.strip())
else:
print(s.strong.string.strip() + s.a['href'])
數據庫存儲
ORM也就將數據庫映射成對象,然后使用對象的方式操作SQL語句,這里使用SQLalchemy框架。
需要實現兩個類,一個類用於和數據庫通信,完成增刪改查等操作,另一個類是映射類,將數據庫中的表與之形成映射。
數據庫中的表
class DBPoc(Base):
__tablename__ = 'exp_poc_info'
id = Column(Integer, primary_key=True)
eid = Column(Integer)
cve = Column(String)
title = Column(String)
author = Column(String)
published_time = Column(String)
verified = Column(String)
platform = Column(String)
exploit_type = Column(String)
exploit_url = Column(String)
exploit_app = Column(String)
def __init__(self, eid, cve,
title, author, published_time, verified,
platform, exploit_type, exploit_url, exploit_app):
self.eid = eid
self.cve = cve
self.title = title
self.author = author
self.published_time = published_time
self.verified = verified
self.platform = platform
self.exploit_type = exploit_type
self.exploit_url = exploit_url
self.exploit_app = exploit_app
與數據庫的通信
create_engine()與數據庫進行連接,而具體的增刪改查需要使用session進行操作
class DBEngine(object):
def __init__(self):
#
self.engine = create_engine('sqlite:///exploit_db.sqlite', echo=False)
db_session = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
self.session = db_session()
def close_db(self):
#
self.session.close()
# interface
# lower words connected with '_'
def add_poc(self, Poc):
# 添加poc
self.session.add(Poc)
self.session.commit()
def del_poc(self, eid):
# 刪除poc
poc = self.session.query(DBPoc).filter(DBPoc.eid == eid).first()
try:
self.session.delete(poc)
self.session.commit()
except Exception as e:
print(e)
def is_eid_exist(self, eid):
# exist True
# not exist False
if self.session.query(DBPoc).filter(DBPoc.eid == eid).first() is None:
return False
else:
return True
def view_all_poc(self):
print('DBPoc:')
all_poc = self.session.query(DBPoc)
for poc in all_poc:
print(poc)
完整的代碼見github