一個exploit-db的爬蟲demo


2年前的實驗室項目需要對exploit-db進行爬蟲,這里回顧一下知識。

基本思路,使用urllib.request訪問exploit-db,使用BeautifulSoup對Response進行解析,然后將提取出的內容存儲至Mysql中。

urllib

寫這個demo的時候Python2還沒有廢棄,這里將代碼移植至Python3中。

由於exploit-db中漏洞頁面的url是https://www.exploit-db.com/exploits/ + eid的方式構成的,因此遍歷eid即可爬取所有的漏洞。

構造Request與網頁訪問
Request文檔

class urllib.request.Request(url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None)

headers可以在構造函數中指定,也可以通過add_header方法進行添加

urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)

urlopen()函數既可以接收url,也可以接收Request。urlopen()將返回一個字節對象,需要我們自行處理編碼。

def spider(spider_url):
    # 構造request
    user_agent = random.choice(ua_list)
    spider_request = request.Request(spider_url)
    spider_request.add_header('User-Agent', user_agent)

    spider_response = request.urlopen(spider_request, timeout=30)

    html = spider_response.read().decode('utf-8')

異常處理
urllib.request中的異常
爬取過程中遇到的一些異常

  • URLError //頁面不存在
  • socket.timeout //read()超時
  • UnicodeDecodeError //目標頁面是pdf,decode('utf-8')錯誤
def spider(spider_url):
    # 構造request
    user_agent = random.choice(ua_list)
    spider_request = request.Request(spider_url)
    spider_request.add_header('User-Agent', user_agent)

    try:
        spider_response = request.urlopen(spider_request, timeout=30)
    except error.URLError as e:
        return 'error, URLError'

    # noinspection PyBroadException
    try:
        html = spider_response.read().decode('utf-8')
    except socket.timeout as e:
        return 'error, socket.timeout'
    except UnicodeDecodeError as e:
        return 'error, UnicodeDecodeError'
    except Exception as e:
        return 'error, Exception: %s' % e

    return html

BeautifulSoup

exploit-db在這段時間也更新了頁面,之前寫的解析函數已經無法運行。
BeautifulSoup的安裝和詳細使用方法可以參考官方文檔,這里對使用的函數進行說明:
BeautifulSoup通過將html/xml文件轉變成一個BeautifulSoup對象,然后根據該對象提供的一些方法對html/xml進行查找和修改。

BeautifulSoup可以通過.訪問標簽,通過[]訪問屬性,通過find()和find_all()選擇需要的標簽,然后提取其中的信息。

Chrome提供的檢查工具可以很容易確定元素的位置,分析html中需要的標簽的位置,然后選擇合適的過濾器。

def bs4html(html):
    # 實現對html的解析
    soup = BeautifulSoup(html, 'html.parser')
    for div in soup.find_all('div', class_='col-sm-12 col-md-6 col-lg-3 d-flex align-items-stretch'):
        for h in div.find_all('div', class_='col-6 text-center'):
            print(h.h4.get_text().strip() + h.h6.get_text().strip())
        for s in div.find_all('div', class_='stats h5 text-center'):
            if s.strong.string.strip() == 'EDB Verified:':
                if s.i['class'] == ['mdi', 'mdi-24px', 'mdi-check']:
                    print('EDB Verified: Yes')
                else:
                    print('EDB Verified: No')
            elif s.strong.string.strip() == 'Exploit:':
                print(s.strong.string.strip() + s.a['href'])
            else:
                if s.find('a') is None:
                    print(s.strong.string.strip())
                else:
                    print(s.strong.string.strip() + s.a['href'])

數據庫存儲

ORM也就將數據庫映射成對象,然后使用對象的方式操作SQL語句,這里使用SQLalchemy框架。
需要實現兩個類,一個類用於和數據庫通信,完成增刪改查等操作,另一個類是映射類,將數據庫中的表與之形成映射。
數據庫中的表

class DBPoc(Base):
    __tablename__ = 'exp_poc_info'

    id = Column(Integer, primary_key=True)
    eid = Column(Integer)
    cve = Column(String)

    title = Column(String)
    author = Column(String)
    published_time = Column(String)

    verified = Column(String)

    platform = Column(String)
    exploit_type = Column(String)
    exploit_url = Column(String)
    exploit_app = Column(String)

    def __init__(self, eid, cve,
                 title, author, published_time, verified,
                 platform, exploit_type, exploit_url, exploit_app):
        self.eid = eid
        self.cve = cve

        self.title = title
        self.author = author
        self.published_time = published_time
        self.verified = verified

        self.platform = platform
        self.exploit_type = exploit_type
        self.exploit_url = exploit_url
        self.exploit_app = exploit_app

與數據庫的通信
create_engine()與數據庫進行連接,而具體的增刪改查需要使用session進行操作

class DBEngine(object):
    def __init__(self):
        #
        self.engine = create_engine('sqlite:///exploit_db.sqlite', echo=False)
        db_session = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
        self.session = db_session()

    def close_db(self):
        #
        self.session.close()

    # interface
    # lower words connected with '_'
    def add_poc(self, Poc):
        # 添加poc
        self.session.add(Poc)
        self.session.commit()

    def del_poc(self, eid):
        # 刪除poc
        poc = self.session.query(DBPoc).filter(DBPoc.eid == eid).first()
        try:
            self.session.delete(poc)
            self.session.commit()
        except Exception as e:
            print(e)

    def is_eid_exist(self, eid):
        # exist True
        # not exist False
        if self.session.query(DBPoc).filter(DBPoc.eid == eid).first() is None:
            return False
        else:
            return True

    def view_all_poc(self):
        print('DBPoc:')
        all_poc = self.session.query(DBPoc)
        for poc in all_poc:
            print(poc)

完整的代碼見github


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM