背景:自己寫的簡單爬取電影種子的爬蟲,數據存儲到Mysql
版本:python3
IDE:pycharm
環境:windows10
項目:scrapy爬蟲
注:如使用,請自行修改,謝謝
工具文件如下:
""" 數據庫連接工具類 # """ import pymysql import traceback from DBUtils.PooledDB import PooledDB from scrapy.utils.project import get_project_settings class MysqlUtil(object): # 獲取setting文件中的配置 settings = get_project_settings() config = { 'host': settings.get('MYSQL_HOST'), 'port': settings.get('MYSQL_PORT'), 'database': settings.get('MYSQL_DATABASE'), 'user': settings.get('MYSQL_USER'), 'password': settings.get('MYSQL_PASSWORD'), 'charset': settings.get('MYSQL_CHARSET') } """ MYSQL數據庫對象,負責產生數據庫連接 , 此類中的連接采用連接池實現獲取連接對象:conn = Mysql.getConn() 釋放連接對象;conn.close()或del conn """ # 連接池對象 __pool = None def __init__(self): # 數據庫構造函數,從連接池中取出連接,並生成操作游標 self._conn = MysqlUtil.get_conn() self._cursor = self._conn.cursor() # 獲取鏈接 @staticmethod def get_conn(): """ @summary: 靜態方法,從連接池中取出連接 @return MySQLdb.connection """ if MysqlUtil.__pool is None: __pool = PooledDB(creator=pymysql, mincached=1, maxcached=20, host=MysqlUtil.config['host'], port=MysqlUtil.config['port'], user=MysqlUtil.config['user'], passwd=MysqlUtil.config['password'], db=MysqlUtil.config['database'], charset=MysqlUtil.config['charset']) return __pool.connection() # 查詢所有數據 def get_all(self, sql, param=None): """ @summary: 執行查詢,並取出所有結果集 @param sql:查詢SQL,如果有查詢條件,請只指定條件列表,並將條件值使用參數[param]傳遞進來 @param param: 可選參數,條件列表值(元組/列表) @return: result list(字典對象)/boolean 查詢到的結果集 """ try: if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) if count > 0: result = self._cursor.fetchall() else: result = False return result except Exception as e: traceback.print_exc(e) # 查詢某一個數據 def get_one(self, sql, param=None): """ @summary: 執行查詢,並取出第一條 @param sql:查詢SQL,如果有查詢條件,請只指定條件列表,並將條件值使用參數[param]傳遞進來 @param param: 可選參數,條件列表值(元組/列表) @return: result list/boolean 查詢到的結果集 """ try: if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) if count > 0: result = self._cursor.fetchone() else: result = False return result except Exception as e: traceback.print_exc(e) # 查詢數量 def get_count(self, sql, param=None): """ @summary: 執行查詢,返回結果數 @param sql:查詢SQL,如果有查詢條件,請只指定條件列表,並將條件值使用參數[param]傳遞進來 @param param: 可選參數,條件列表值(元組/列表) @return: result list/boolean 查詢到的結果集 """ try: if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) return count except Exception as e: traceback.print_exc(e) # 查詢部分 def get_many(self, sql, num, param=None): """ @summary: 執行查詢,並取出num條結果 @param sql:查詢SQL,如果有查詢條件,請只指定條件列表,並將條件值使用參數[param]傳遞進來 @param num:取得的結果條數 @param param: 可選參數,條件列表值(元組/列表) @return: result list/boolean 查詢到的結果集 """ try: if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) if count > 0: result = self._cursor.fetchmany(num) else: result = False return result except Exception as e: traceback.print_exc(e) # 插入一條數據 def insert_one(self, sql, value): """ @summary: 向數據表插入一條記錄 @param sql:要插入的SQL格式 @param value:要插入的記錄數據tuple/list @return: insertId 受影響的行數 """ try: row_count = self._cursor.execute(sql, value) return row_count except Exception as e: traceback.print_exc(e) self.end("rollback") # 插入多條數據 def insert_many(self, sql, values): """ @summary: 向數據表插入多條記錄 @param sql:要插入的SQL格式 @param values:要插入的記錄數據tuple(tuple)/list[list] @return: count 受影響的行數 """ try: row_count = self._cursor.executemany(sql, values) return row_count except Exception as e: traceback.print_exc(e) self.end("rollback") # def __get_insert_id(self): # """ # 獲取當前連接最后一次插入操作生成的id,如果沒有則為0 # """ # self._cursor.execute("SELECT @@IDENTITY AS id") # result = self._cursor.fetchall() # return result[0]['id'] # 執行sql def __query(self, sql, param=None): try: if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) return count except Exception as e: traceback.print_exc(e) # 更新 def update(self, sql, param=None): """ @summary: 更新數據表記錄 @param sql: SQL格式及條件,使用(%s,%s) @param param: 要更新的 值 tuple/list @return: count 受影響的行數 """ return self.__query(sql, param) # 刪除 def delete(self, sql, param=None): """ @summary: 刪除數據表記錄 @param sql: SQL格式及條件,使用(%s,%s) @param param: 要刪除的條件 值 tuple/list @return: count 受影響的行數 """ return self.__query(sql, param) def begin(self): """ @summary: 開啟事務 """ self._conn.autocommit(0) def end(self, option='commit'): """ @summary: 結束事務 """ if option == 'commit': self._conn.commit() else: self._conn.rollback() def dispose(self, is_end=1): """ @summary: 釋放連接池資源 """ if is_end == 1: self.end('commit') else: self.end('rollback') self._cursor.close() self._conn.close()
Mysql與logger配置如下:(在setting.py文件中,其余自行配置)
import datetime
# start MySQL database configure setting MYSQL_HOST = "127.0.0.1" MYSQL_PORT = 3306 MYSQL_DATABASE = "crawler_db" MYSQL_USER = "admin" MYSQL_PASSWORD = "admin" MYSQL_CHARSET = "utf8" # end of MySQL database configure setting # start logger configure setting current_day = datetime.datetime.now() LOG_ENABLED = True # 啟用日志,默認不啟用 LOG_ENCODING = 'utf-8' LOG_FILE = "C:/xxx_spider.{}-{}-{}.log".format(current_day.year, current_day.month, current_day.day) LOG_LEVEL = "INFO" LOG_STDOUT = True # 輸出重定向至log日志,比如print # end logger configure setting
調用方式如下:
# -*- coding: utf-8 -*- from torrentSpider.utils.db_util import MysqlUtil import traceback import logging class MySqlPipeline(object): pool = None def __init__(self): pass # 開啟爬蟲時鏈接數據庫 def open_spider(self, spider): self.pool = MysqlUtil() # 處理 def process_item(self, item, spider): try: # 執行sql語句 # sql = "select * from torrent_ye" # count = self.pool.get_all(sql, None) # print('查詢數量為:' + str(count)) # 先去數據庫查詢,查到了就不入庫了 sql_select = """select count(1) from torrent_ye where torrent_url = %(torrent_url)s""" params_select = {'torrent_url': item['torrent_url']} flag = self.pool.get_count(sql_select, params_select) if flag > 0: logging.info('記錄已經存在:[%s][%s]', item['torrent_title'], item['torrent_url']) return sql_insert = """insert into torrent_ye(torrent_title, torrent_name, torrent_director, torrent_actor, torrent_language, torrent_type, torrent_region, torrent_update_time, torrent_status, torrent_show_time, torrent_introduction, torrent_url) values (%(torrent_title)s,%(torrent_name)s,%(torrent_director)s,%(torrent_actor)s,%(torrent_language)s, %(torrent_type)s,%(torrent_region)s,%(torrent_update_time)s,%(torrent_status)s,%(torrent_show_time)s,%(torrent_introduction)s,%(torrent_url)s)""" params = {'torrent_title': item['torrent_title'], 'torrent_name': item['torrent_name'], 'torrent_director': item['torrent_director'], 'torrent_actor': item['torrent_actor'], 'torrent_language': item['torrent_language'], 'torrent_type': item['torrent_type'], 'torrent_region': item['torrent_region'], 'torrent_update_time': item['torrent_update_time'], 'torrent_status': item['torrent_status'], 'torrent_show_time': item['torrent_show_time'], 'torrent_introduction': item['torrent_introduction'], 'torrent_url': item['torrent_url']} self.pool.insert_one(sql_insert, params) self.pool.end("commit") except Exception as e: logging.error('發生異常:[%s]', e) traceback.print_exc(e) self.pool.end("rollback") # 結束 def close_spider(self, spider): pass