python多線程爬蟲設計及實現示例


爬蟲的基本步驟分為:獲取,解析,存儲。假設這里獲取和存儲為io密集型(訪問網絡和數據存儲),解析為cpu密集型。那么在設計多線程爬蟲時主要有兩種方案:第一種方案是一個線程完成三個步驟,然后運行多個線程;第二種方案是每個步驟運行一個多線程,比如N個線程進行獲取,1個線程進行解析(多個線程之間切換會降低效率),N個線程進行存儲。

下面我們嘗試抓取http://www.chembridge.com/ 庫存葯品信息。

首先確定url為http://www.chembridge.com/search/search.phpsearchType=MFCD&query='+line+'&type=phrase&results=10&search=1其中line為要搜索的葯品信息(要搜索的葯品信息保存在本地文件txt中),這里使用requests庫進行http請求,獲取頁面的代碼如下:

url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
response = requests.get(url,headers=self.headers[0],timeout=20)
html_doc=response.text

頁面解析使用beautifulsoup庫,部分代碼如下:

soup = BeautifulSoup(html_doc, 'lxml')
div=soup.find(id='BBResults')
if div:
    links=div.select('a.chemical')
    for link in links:
        try:
            self.get_page_link(link,line)
        except Exception as e:
            print('%s入庫失敗:'%line,e)
            time.sleep(self.relay*2)
            print('%s重新入庫'%line)
            self.get_page_link(link,line)
            continue
print('%s搜索完成'%line)
def get_page_link(self,link,line):
        res=[]
        href=link.get('href')
        print(href)
        time.sleep(self.relay*2*random.randint(5,15)/10)
        r=requests.get(href,headers=self.headers[1],timeout=20)
        if r.status_code==200:
            parse_html=r.text
            soup1=BeautifulSoup(parse_html, 'lxml')
            catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#獲取catalog
            # print(catalogs)
            table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
            if 'AmountPriceQty.' in table_headers:
                index=table_headers.index('AmountPriceQty.')
                catalog=catalogs[0]
                trs=soup1.select('.form tbody tr')
                if len(catalogs)>1:
                    catalog=catalogs[index]
                for tr in trs:
                    if len(tr.select('td'))>1:
                        row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
                        res.append(row)

最后將res保存到mysql數據庫:

conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
cursor = conn.cursor()
sql = 'INSERT INTO chembridge VALUES(%s,%s,%s,%s)'
cursor.executemany(sql,res)
print('入庫')
conn.commit()
cursor.close()
conn.close()

 

 一、單線程爬蟲封裝的完整代碼如下:

# -*- coding:utf-8 -*-
import requests,random,time
from bs4 import BeautifulSoup
import mysql.connector

class Spider:
    def __init__(self):
        self.headers=[{
            'Host':'www.chembridge.com',
            'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
            'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Accept-Encoding':'gzip, deflate',
            'Referer':'http://www.chembridge.com/search/search.php?search=1',
            'Connection':'keep-alive',
            'Upgrade-Insecure-Requests':'1'
        },
        {
            'Host':'www.hit2lead.com',
            'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
            'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Accept-Encoding':'gzip, deflate, br'
        }]
        self.filename='MDL.txt'

    def get_page_link(self,link):
        res=[]
        href=link.get('href')
        print(href)
        parse_html=requests.get(href,headers=self.headers[1]).text
        soup1=BeautifulSoup(parse_html, 'lxml')
        catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#獲取catalog
        print(catalogs)
        table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
        print(table_headers)
        index=table_headers.index('AmountPriceQty.')
        catalog=catalogs[0]
        trs=soup1.select('.form tbody tr')
        # print(trs)
        if len(catalogs)>1:
            catalog=catalogs[index]
        for tr in trs:
            if len(tr.select('td'))>1:
                row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
                res.append(row)
        print(res)
        conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
        cursor = conn.cursor()
        sql = 'INSERT INTO chembridge_test2 VALUES(%s,%s,%s,%s)'
        cursor.executemany(sql,res)
        conn.commit()
        cursor.close()
        conn.close()

    def get_page(self,line):
        url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
        try:
            response = requests.get(url,headers=self.headers[0],timeout=20)
            print(response.status_code)
            html_doc=response.text
            # print(html_doc)
            soup = BeautifulSoup(html_doc, 'lxml')
            div=soup.find(id='BBResults')
            if div:
                links=div.select('a.chemical')
                for link in links:
                    self.get_page_link(link)
            relay=random.randint(2,5)/10
            print(relay)
            time.sleep(relay)
        except Exception as e:
            print('except:', e)

    def get_file(self,filename):
        i=0
        f=open(filename,'r')
        for line in f.readlines():
            line=line.strip()
            print(line)
            self.get_page(line)
            i=i+1
            print('第%s個'%(i))
        f.close()

    def run(self):
        self.get_file(self.filename)

spider=Spider()
starttime=time.time()
spider.run()
print('耗時:%f s'%(time.time()-starttime))

二、多線程爬蟲設計代碼

1.第一種設計方案的實現示例:

 

# -*- coding:utf-8 -*-
from threading import Thread
import threading
from queue import Queue
import os,time,random
import requests,mysql.connector
from bs4 import BeautifulSoup
from openpyxl.workbook import Workbook
from openpyxl.styles import Font

class ThreadCrawl(Thread):
    def __init__(self,tname,relay):
        Thread.__init__(self)
        #super(MyThread2, self).__init__()
        # self.queue=queue
        # self.lock=lock
        # self.conn=conn
        self.relay=relay*random.randint(5,15)/10
        self.tname=tname
        self.num_retries=3  #設置嘗試重新搜索次數
        self.headers=[{
            'Host':'www.chembridge.com',
            'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
            'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Accept-Encoding':'gzip, deflate',
            'Referer':'http://www.chembridge.com/search/search.php?search=1',
            'Connection':'keep-alive',
            'Upgrade-Insecure-Requests':'1'
        },
        {
            'Host':'www.hit2lead.com',
            'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
            'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Accept-Encoding':'gzip, deflate, br'
        }]

    def run(self):
        print('%s 開始爬取'%self.tname)
        # line = my_queue.get()
        # print(line)
        # while not self.queue.empty():
        while len(words)>0:
            lock.acquire()
            line = words[0]
            words.pop(0)
            lock.release()
            self.get_page(line,self.num_retries)
            time.sleep(self.relay*random.randint(5,15)/10)

        while not my_queue.empty():
            line=my_queue.get()
            print('重新爬取%s...'%line)
            self.get_page(line,num_retries=1)
        print('%s 結束'%self.tname)


    #獲取頁面內容
    def get_page(self,line,num_retries=2):
        print('%s正在搜索%s...'%(self.tname,line))
        # write this thread task
        url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
        try:
            response = requests.get(url,headers=self.headers[0],timeout=20)
            status=response.status_code
            if status==200:
                html_doc=response.text
                # print(html_doc)
                soup = BeautifulSoup(html_doc, 'lxml')
                div=soup.find(id='BBResults')
                if div:
                    links=div.select('a.chemical')
                    for link in links:
                        try:
                            self.get_page_link(link,line)
                        except Exception as e:
                            print('%s入庫失敗:'%line,e)
                            time.sleep(self.relay*2)
                            print('%s重新入庫'%line)
                            self.get_page_link(link,line)
                            continue
                print('%s搜索完成'%line)
                lock.acquire()
                global count
                count=count+1
                print('已完成%s個'%count)
                lock.release()
                # time.sleep(self.relay*random.randint(5,15)/10)
            else:
                print('%s搜索%s網絡異常,錯誤代碼:%s'%(self.tname,line,status))
                # time.sleep(self.relay*random.randint(5,15)/10)
                if num_retries>0:
                    print('%s嘗試重新搜索%s'%(self.tname,line))
                    time.sleep(self.relay*random.randint(5,15)/10)
                    self.get_page(line,num_retries-1)
                else:
                    print('%s四次搜索失敗!!!'%line)
                    my_queue.put(line)
                    # error_list.append(line)

        except Exception as e:
            print('%s搜索%s異常,error:'%(self.tname,line), e)
            # time.sleep(self.relay*random.randint(5,15)/10)
            if num_retries>0:
                print('%s嘗試重新搜索%s'%(self.tname,line))
                time.sleep(self.relay*random.randint(5,15)/10)
                self.get_page(line,num_retries-1)
            else:
                print('%s四次搜索失敗!!!'%line)
                my_queue.put(line)
                # error_list.append(line)
        # self.queue.task_done()

    #獲取下一頁鏈接並解析入庫
    def get_page_link(self,link,line):
        res=[]
        href=link.get('href')
        print(href)
        time.sleep(self.relay*2*random.randint(5,15)/10)
        r=requests.get(href,headers=self.headers[1],timeout=20)
        if r.status_code==200:
            parse_html=r.text
            soup1=BeautifulSoup(parse_html, 'lxml')
            catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#獲取catalog
            # print(catalogs)
            table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
            if 'AmountPriceQty.' in table_headers:
                index=table_headers.index('AmountPriceQty.')
                catalog=catalogs[0]
                trs=soup1.select('.form tbody tr')
                if len(catalogs)>1:
                    catalog=catalogs[index]
                for tr in trs:
                    if len(tr.select('td'))>1:
                        row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
                        res.append(row)
                # print(res)
                lock.acquire()
                conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
                cursor = conn.cursor()
                try:
                    print('%s: %s正在入庫...'%(line,catalog))
                    sql = 'INSERT INTO chembridge VALUES(%s,%s,%s,%s)'
                    cursor.executemany(sql,res)
                    conn.commit()
                except Exception as e:
                    print(e)
                finally:
                    cursor.close()
                    conn.close()
                    lock.release()

def writeToExcel(datas,filename):
    # 在內存創建一個工作簿obj
    result_wb = Workbook()
    #第一個sheet是ws
    ws1 = result_wb.worksheets[0]
    # ws1=wb1.create_sheet('result',0)
    #設置ws的名稱
    ws1.title = "爬取結果"
    row0 = ['catalog', 'amount', 'price', 'qty']
    ft = Font(name='Arial', size=11, bold=True)
    for k in range(len(row0)):
        ws1.cell(row=1,column=k+1).value=row0[k]
        ws1.cell(row=1,column=k+1).font=ft
    for i in range(1,len(datas)+1):
        for j in range(1,len(row0)+1):
            ws1.cell(row=i+1,column=j).value=datas[i-1][j-1]
    # 工作簿保存到磁盤
    result_wb.save(filename = filename)

if __name__ == '__main__':
    starttime=time.time()
    lock = threading.Lock()

    words=[] # 存放搜索字段的數據
    basedir=os.path.abspath(os.path.dirname(__file__))
    filename='MDL.txt'
    file=os.path.join(basedir,filename) #文件路徑
    f=open(file,'r')
    for line in f.readlines():
        line=line.strip()
        words.append(line)
    f.close()

    count=0  # 爬取進度計數
    # global my_queue
    my_queue = Queue() #FIFO隊列,存放第一次搜索失敗的字段,保證線程同步
    error_list=[] #存放最終搜索失敗的字段數組
    threads=[]

    # 程序開始前清空數據庫chembridge表數據
    conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
    cursor = conn.cursor()
    print('清空表...')
    cursor.execute('delete from chembridge')
    conn.commit()
    cursor.close()
    conn.close()

    num_threads=10  #設置爬蟲數量
    relay=10  # 設置爬取時延,時延=relay*(0.5~1.5之間的隨機數)
    threadList = []
    for i in range(1,num_threads+1):
        threadList.append('爬蟲-%s'%i)
    # 開啟多線程
    for tName in threadList:
        thread = ThreadCrawl(tName,relay)
        thread.setDaemon(True)
        thread.start()
        threads.append(thread)
        time.sleep(1)
    # 主線程阻塞,等待所有子線程運行結束
    for t in threads:
        t.join()

    #將數據保存到excel
    conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
    cursor = conn.cursor()
    cursor.execute('select * from chembridge')
    datas=cursor.fetchall()
    conn.commit()
    cursor.close()
    conn.close()
    writeToExcel(datas,'result.xlsx')

    #統計結果
    while not my_queue.empty():
        error_line=my_queue.get()
        error_list.append(error_line)
    print('爬取完成!\n')
    if len(error_list)==0:
        print('爬取失敗列表:0個')
    else:
        print('總共爬取失敗%s個:'%len(error_list),','.join(error_list))
    # print('爬取完成!')
    print('耗時:%f s'%(time.time()-starttime))

words為存放搜索記錄的數組,當搜索記錄失敗時,會立即嘗試重新搜索,num_retries為每條記錄的最大搜索次數。如果某條記錄在搜索num_retries次后仍失敗,會把訪問失敗的word加入my_queue隊列中。

當所有words搜索完時,會重新搜索my_queue中的所有word,循環直到my_queue為空(即所有word搜索成功)。

注意:這里要注意python多線程的GIL,修改同一個全局變量要加鎖。

運行截圖:

 

2.第二種設計方案的實現示例

urls_queue、html_queue和item_queue3分別存放要訪問的url、要解析的頁面和爬取到的結果。分別設計三個類,Fetcher類根據url進行簡單的抓取,Parser類根據抓取內容進行解析,生成待保存的ItemSaver類進行Item的保存。當urls_queue、html_queue和item_queue3個隊列同時為空時,所有子線程終止,任務結束。

# coding=utf-8
import threading
import queue,requests
import time,random
import mysql.connector
from bs4 import BeautifulSoup

class Fetcher(threading.Thread):
    def __init__(self,urls_queue,html_queue):
        threading.Thread.__init__(self)
        self.__running=threading.Event()
        self.__running.set()
        self.urls_queue = urls_queue
        self.html_queue = html_queue
        self.num_retries=3  #設置嘗試重新搜索次數
        self.headers={
            'Host':'www.chembridge.com',
            'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
            'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Accept-Encoding':'gzip, deflate',
            'Referer':'http://www.chembridge.com/search/search.php?search=1',
            'Connection':'keep-alive',
            'Upgrade-Insecure-Requests':'1'
        }

    def run(self):
        while not self.urls_queue.empty():
        # while self.__running.isSet():
            line=self.urls_queue.get()
            print(line)
            time.sleep(2*random.randint(5,15)/10)
            # self.urls_queue.task_done()
            self.get_page(line,self.num_retries)
    def get_page(self,line,num_retries=2):
        url='http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1'
        try:
            response = requests.get(url,headers=self.headers,timeout=20)
            status=response.status_code
            if status==200:
                html_doc=response.text
                print(html_doc)
                self.html_queue.put(html_doc)
                # self.urls_queue.task_done()
                print('%s搜索完成'%line)
            else:
                print('搜索%s網絡異常,錯誤代碼:%s'%(line,status))
                if num_retries>0:
                    print('嘗試重新搜索%s'%(line))
                    time.sleep(2*random.randint(5,15)/10)
                    self.get_page(line,num_retries-1)
                else:
                    print('%s四次搜索失敗!!!'%line)
                    self.urls_queue.put(line)

        except Exception as e:
            print('%s搜索異常,error:'%line,e)
            if num_retries>0:
                print('嘗試重新搜索%s'%(line))
                time.sleep(2*random.randint(5,15)/10)
                self.get_page(line,num_retries-1)
            else:
                print('%s四次搜索失敗!!!'%line)
                self.urls_queue.put(line)

    def stop(self):
        self.__running.clear()

class Parser(threading.Thread):
    def __init__(self, html_queue,item_queue):
        threading.Thread.__init__(self)
        self.__running=threading.Event()
        self.__running.set()
        self.html_queue = html_queue
        self.item_queue = item_queue
        self.num_retries=3  #設置嘗試重新搜索次數
        self.headers={
            'Host':'www.hit2lead.com',
            'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
            'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Accept-Encoding':'gzip, deflate, br'
        }
    def run(self):
        while self.__running.isSet():
            print('html_queue長度: ',self.html_queue.qsize())
            # if self.html_queue.empty():
            #     break
            html_doc=self.html_queue.get()
            try:
                soup = BeautifulSoup(html_doc, 'lxml')
                div=soup.find(id='BBResults')
                if div:
                    links=div.select('a.chemical')
                    for link in links:
                        self.get_page_link(link,self.num_retries)
                relay=random.randint(20,50)/10
                # print(relay)
                time.sleep(relay)
            except Exception as e:
                self.html_queue.put(html_doc)
            # self.html_queue.task_done()

    def get_page_link(self,link,num_retries=2):
        print('haha')
        time.sleep(2*random.randint(5,15)/10)
        res=[]
        href=link.get('href')
        print(href)
        response=requests.get(href,headers=self.headers,timeout=20)
        status=response.status_code
        if status==200:
            parse_html=response.text
            soup1=BeautifulSoup(parse_html, 'lxml')
            catalogs=[catalog.get_text() for catalog in soup1.select('form div.matter h2')]#獲取catalog
            # print(catalogs)
            table_headers=[table_header.get_text(strip=True) for table_header in soup1.select('form .matter thead tr')]
            # print(table_headers)
            if 'AmountPriceQty.' in table_headers:
                index=table_headers.index('AmountPriceQty.')
                catalog=catalogs[0]
                trs=soup1.select('.form tbody tr')
                # print(trs)
                if len(catalogs)>1:
                    catalog=catalogs[index]
                for tr in trs:
                    if len(tr.select('td'))>1:
                        row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.select('td'))
                        res.append(row)
                # print(res)
                self.item_queue.put(res)
        else:
            print('搜索%s網絡異常,錯誤代碼:%s'%(link,status))
            # time.sleep(self.relay*random.randint(5,15)/10)
            if num_retries>0:
                print('嘗試重新搜索%s'%(link))
                time.sleep(random.randint(5,15)/10)
                self.get_page_link(link,num_retries-1)
            else:
                print('%s四次搜索失敗!!!'%line)
    def stop(self):
        self.__running.clear()

class Saver(threading.Thread):
    def __init__(self, item_queue):
        threading.Thread.__init__(self)
        self.__running=threading.Event()
        self.__running.set()
        self.item_queue = item_queue

    def run(self):
        # while not self.item_queue.empty():
        while self.__running.isSet():
            print('item_queue長度: ',self.item_queue.qsize())
            res=self.item_queue.get()
            print(res)
            conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
            cursor = conn.cursor()
            sql = 'INSERT INTO chembridge_test2 VALUES(%s,%s,%s,%s)'
            cursor.executemany(sql,res)
            print('入庫')
            conn.commit()
            cursor.close()
            conn.close()
    def stop(self):
        self.__running.clear()


if __name__ == '__main__':
    starttime=time.time()
    lock = threading.Lock()
    urls_queue = queue.Queue()
    html_queue = queue.Queue()
    item_queue = queue.Queue()

    conn=mysql.connector.connect(host='localhost',user='root', passwd='password', db='test')
    cursor = conn.cursor()
    print('清空表...')
    cursor.execute('delete from chembridge_test2')
    conn.commit()
    cursor.close()
    conn.close()

    print('start...')

    f=open('MDL1.txt','r')
    for line in f.readlines():
        line=line.strip()
        urls_queue.put(line)
    f.close()

    threads=[]
    for j in range(8):
        thread1 = Fetcher(urls_queue,html_queue)
        thread1.setDaemon(True)
        thread1.start()
        threads.append(thread1)
    for j in range(1):
        thread1 = Parser(html_queue,item_queue)
        thread1.setDaemon(True)
        thread1.start()
        threads.append(thread1)
    for j in range(2):
        thread1 = Saver(item_queue)
        thread1.setDaemon(True)
        thread1.start()
        threads.append(thread1)


    # while not urls_queue.empty():
    #     while not html_queue.empty():
    #         while not item_queue.empty():
    #             pass
    while True:
        time.sleep(0.5)
        if urls_queue.empty() and html_queue.empty() and item_queue.empty():
            break

    print('完成!')
    for t in threads:
        t.stop()
    for t in threads:
        t.join()
    print('end')
    print('耗時:%f s'%(time.time()-starttime))

根據網絡情況,設置線程數量,避免requests訪問網絡時阻塞。

另外附上用scrapy實現的代碼

items.py

import scrapy

class ChemItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    catalog=scrapy.Field()
    amount=scrapy.Field()
    price=scrapy.Field()
    qty=scrapy.Field()

quotes_spider.py

# -*- coding: utf-8 -*-
import scrapy
from scrapy.selector import Selector
from tutorial.items import ChemItem

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    # allowed_domains = ["chembridge.com"]
    headers=[{
            'Host':'www.chembridge.com',
            'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
            'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Accept-Encoding':'gzip, deflate',
            'Referer':'http://www.chembridge.com/search/search.php?search=1',
            'Connection':'keep-alive',
            'Upgrade-Insecure-Requests':'1'
        },
        {
            'Host':'www.hit2lead.com',
            'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:53.0) Gecko/20100101 Firefox/53.0',
            'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language':'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Accept-Encoding':'gzip, deflate, br'
        }]
    def start_requests(self):
        start_urls = []
        f=open('MDL.txt','r')
        for line in f.readlines():
            line=line.strip()
            print(line)
            start_urls.append('http://www.chembridge.com/search/search.php?searchType=MFCD&query='+line+'&type=phrase&results=10&search=1')
        for url in start_urls:
            yield scrapy.Request(url=url, callback=self.parse,headers=self.headers[0])

    def parse(self, response):
        links=response.css('#BBResults a.chemical::attr(href)').extract()
        for link in links:
            yield scrapy.Request(url=link,callback=self.parse_dir_contents,headers=self.headers[1])

    def parse_dir_contents(self, response):
        items=[]
        catalogs=response.css('form div.matter h2::text').extract()
        table_headers=[''.join(res.re(r'>(.*)</td>')) for res in response.css('form div.matter thead tr')]
        print(table_headers)
        index=table_headers.index('AmountPriceQty.')
        catalog=catalogs[0]
        trs=response.css('.form tbody tr')
        if len(catalogs)>1:
            catalog=catalogs[index]
        for tr in trs:
            if len(tr.css('td'))>1:
                item=ChemItem()
                # print(tr.css('td::text').extract())
                # row=tuple([catalog])+tuple(td.get_text("|", strip=True) for td in tr.css('td'))
                item['catalog']=catalog
                item['amount']=tr.css('td')[0].css('::text').extract()[0]
                item['price']='|'.join(tr.css('td')[1].css('::text').extract())
                print(len(tr.css('td::text')))
                item['qty']=tr.css('td')[2].css('::text').extract()[0] if len(tr.css('td')[2].css('::text').extract())==1 else tr.css('td')[2].css('::attr(value)').extract()[0]
                # self.log('Saved result %s' % item)
                # print(tr.css('td::text')[0].extract())
                yield item
                # items.append(item)
        # return items

pipelines.py

#將數據存儲到mysql數據庫
from twisted.enterprise import adbapi
import MySQLdb
import MySQLdb.cursors
from scrapy import log

class MySQLStorePipeline(object):
    def __init__(self, dbpool):
        self.dbpool = dbpool

    #數據庫參數
    @classmethod
    def from_settings(cls, settings):
        dbargs = dict(
            host=settings['MYSQL_HOST'],
            db=settings['MYSQL_DBNAME'],
            user=settings['MYSQL_USER'],
            passwd=settings['MYSQL_PASSWD'],
            charset='utf8',
            cursorclass = MySQLdb.cursors.DictCursor,
            use_unicode= True,
        )
        dbpool = adbapi.ConnectionPool('MySQLdb', **dbargs)
        return cls(dbpool)


    # #數據庫參數
    # def __init__(self):
    #     dbargs = dict(
    #          host = 'localhost',
    #          db = 'test',
    #          user = 'root',
    #          passwd = 'password',
    #          cursorclass = MySQLdb.cursors.DictCursor,
    #          charset = 'utf8',
    #          use_unicode = True
    #         )
    #     self.dbpool = adbapi.ConnectionPool('MySQLdb',**dbargs)

    '''
    The default pipeline invoke function
    '''
    def process_item(self, item,spider):
        res = self.dbpool.runInteraction(self.insert_into_table,item)
        res.addErrback(self.handle_error)
        return item
    #插入的表,此表需要事先建好
    def insert_into_table(self,conn,item):
            conn.execute('insert into chembridge(catalog, amount, price,qty) values(%s,%s,%s,%s)', (
                item['catalog'],
                item['amount'],
                 # item['star'][0],
                 item['price'],
                 item['qty']
                ))
    def handle_error(self,e):
        log.err(e)

settings.py

FEED_EXPORTERS = {
    'csv': 'tutorial.spiders.csv_item_exporter.MyProjectCsvItemExporter',
} #tutorial為工程名

FIELDS_TO_EXPORT = [
    'catalog',
    'amount',
    'price',
    'qty'
]

LINETERMINATOR='\n'


ITEM_PIPELINES = {
   'tutorial.pipelines.MySQLStorePipeline': 300,
}

# start MySQL database configure setting
MYSQL_HOST = 'localhost'
MYSQL_DBNAME = 'test'
MYSQL_USER = 'root'
MYSQL_PASSWD = 'password'
# end of MySQL database configure setting

main.py

# -*- coding: utf-8 -*-
from scrapy import cmdline
cmdline.execute("scrapy crawl quotes -o items.csv -t csv".split())

最后運行main.py,將結果同時保存到csv文件和mysql數據庫中。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM