1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
from scrapy.conf import settings from scrapy.exceptions import DropItem from twisted.enterprise import adbapi import json class DoubanmeinvPipeline(object): #插入的sql语句 feed_key = ['feedId','userId','createOn','title','thumbUrl','href','description','pics'] user_key = ['userId','name','avatar'] insertFeed_sql = '''insert into MeiziFeed (%s) values (%s)''' insertUser_sql = '''insert into MeiziUser (%s) values (%s)''' feed_query_sql = "select * from MeiziFeed where feedId = %s" user_query_sql = "select * from MeiziUser where userId = %s" feed_seen_sql = "select feedId from MeiziFeed" user_seen_sql = "select userId from MeiziUser" max_dropcount = 50 current_dropcount = 0 def __init__(self): dbargs = settings.get('DB_CONNECT') db_server = settings.get('DB_SERVER') dbpool = adbapi.ConnectionPool(db_server,**dbargs) self.dbpool = dbpool #更新看过的id列表 d = self.dbpool.runInteraction(self.update_feed_seen_ids) d.addErrback(self._database_error) u = self.dbpool.runInteraction(self.update_user_seen_ids) u.addErrback(self._database_error) def __del__(self): self.dbpool.close() #更新feed已录入的id列表 def update_feed_seen_ids(self, tx): tx.execute(self.feed_seen_sql) result = tx.fetchall() if result: #id[0]是因为result的子项是tuple类型 self.feed_ids_seen = set([int(id[0]) for id in result]) else: #设置已查看过的id列表 self.feed_ids_seen = set() #更新user已录入的id列表 def update_user_seen_ids(self, tx): tx.execute(self.user_seen_sql) result = tx.fetchall() if result: #id[0]是因为result的子项是tuple类型 self.user_ids_seen = set([int(id[0]) for id in result]) else: #设置已查看过的id列表 self.user_ids_seen = set() #处理每个item并返回 def process_item(self, item, spider): query = self.dbpool.runInteraction(self._conditional_insert, item) query.addErrback(self._database_error, item) feedId = item['feedId'] if(int(feedId) in self.feed_ids_seen): self.current_dropcount += 1 if(self.current_dropcount >= self.max_dropcount): spider.close_down = True raise DropItem("重复的数据:%s" % item['feedId']) else: return item #插入数据 def _conditional_insert(self, tx, item): #插入Feed tx.execute(self.feed_query_sql, (item['feedId'])) result = tx.fetchone() if result == None: self.insert_data(item,self.insertFeed_sql,self.feed_key) else: print "该feed已存在数据库中:%s" % item['feedId'] #添加进seen列表中 feedId = item['feedId'] if int(feedId) not in self.feed_ids_seen: self.feed_ids_seen.add(int(feedId)) #插入User user = item['userInfo'] tx.execute(self.user_query_sql, (user['userId'])) user_result = tx.fetchone() if user_result == None: self.insert_data(user,self.insertUser_sql,self.user_key) else: print "该用户已存在数据库:%s" % user['userId'] #添加进seen列表中 userId = user['userId'] if int(userId) not in self.user_ids_seen: self.user_ids_seen.add(int(userId)) #插入数据到数据库中 def insert_data(self, item, insert, sql_key): fields = u','.join(sql_key) qm = u','.join([u'%s'] * len(sql_key)) sql = insert % (fields,qm) data = [item[k] for k in sql_key] return self.dbpool.runOperation(sql,data) #数据库错误 def _database_error(self, e): print "Database error: ", e |