多線程爬蟲


應用場景

1、多進程 :CPU密集程序
2、多線程 :爬蟲(網絡I/O)、本地磁盤I/O

知識點回顧

隊列

# 導入模塊
from queue import Queue
# 使用
q = Queue()
q.put(url)
q.get() # 當隊列為空時,阻塞
q.empty() # 判斷隊列是否為空,True/False

線程模塊

# 導入模塊
from threading import Thread
​
# 使用流程  
t = Thread(target=函數名) # 創建線程對象
t.start() # 創建並啟動線程
t.join()  # 阻塞等待回收線程

小米應用商店抓取(多線程)

目標

  1. 網址 :百度搜 - 小米應用商店,進入官網,應用分類 - 聊天社交
  2. 目標 :爬取應用名稱和應用鏈接

實現步驟

1、確認是否為動態加載

1、頁面局部刷新

2、右鍵查看網頁源代碼,搜索關鍵字未搜到,因此此網站為動態加載網站,需要抓取網絡數據包分析

2、F12抓取網絡數據包

1、抓取返回json數據的URL地址(Headers中的Request URL)

  http://app.mi.com/categotyAllListApi?page={}&categoryId=2&pageSize=30

2、查看並分析查詢參數(headers中的Query String Parameters)只有page在變,0 1 2 3 ... ... ,這樣我們就可以通過控制page的值拼接多個返回json數據的URL地址

  page: 1

  categoryId: 2

  pageSize: 30

 

3、將抓取數據保存到csv文件

注意多線程寫入的線程鎖問題

from threading import Lock

lock = Lock()

lock.acquire()

lock.release()

整體思路

  1. 在 __init__(self) 中創建文件對象,多線程操作此對象進行文件寫入
  2. 每個線程抓取數據后將數據進行文件寫入,寫入文件時需要加鎖
  3. 所有數據抓取完成關閉文件
import requests
from threading import Thread
from queue import Queue
import time
from lxml import etree
import csv
from threading import Lock
from fake_useragent import UserAgent


class XiaomiSpider(object):
    def __init__(self):
        self.url = 'http://app.mi.com/categotyAllListApi?page={}&categoryId={}&pageSize=30'
        self.q = Queue()  # 存放所有URL地址的隊列
        self.i = 0
        self.id_list = []  # 存放所有類型id的空列表
        # 打開文件
        self.f = open('xiaomi.csv', 'a', newline="")
        self.writer = csv.writer(self.f)
        self.lock = Lock()  # 創建鎖
        self.ua = UserAgent()

    def get_cateid(self):
        # 請求
        url = 'http://app.mi.com/'
        headers = {'User-Agent': self.ua.random}
        html = requests.get(url=url, headers=headers).text
        # 解析
        parse_html = etree.HTML(html)
        li_list = parse_html.xpath('//ul[@class="category-list"]/li')
        for li in li_list:
            typ_name = li.xpath('./a/text()')[0]
            typ_id = li.xpath('./a/@href')[0].split('/')[-1]
            pages = self.get_pages(typ_id)  # 計算每個類型的頁數
            self.id_list.append((typ_id, pages))

        self.url_in()  # 入隊列

    # 獲取counts的值並計算頁數
    def get_pages(self, typ_id):
        # 每頁返回的json數據中,都有count這個key
        url = self.url.format(0, typ_id)
        html = requests.get(url=url, headers={'User-Agent': self.ua.random}).json()
        count = html['count']       # 類別中的數據總數
        pages = int(count) // 30 + 1        # 每頁30個,看有多少頁

        return pages

    # url入隊列
    def url_in(self):
        for id in self.id_list:
            # id為元組,(typ_id, pages)-->('2',pages)
            for page in range(2):
                url = self.url.format(page, id[0])
                print(url)
                # 把URL地址入隊列
                self.q.put(url)

    # 線程事件函數: get() - 請求 - 解析 - 處理數據
    def get_data(self):
        while True:
            # 當隊列不為空時,獲取url地址
            if not self.q.empty():
                url = self.q.get()
                headers = {'User-Agent': self.ua.random}
                html = requests.get(url=url, headers=headers).json()
                self.parse_html(html)
            else:
                break

    # 解析函數
    def parse_html(self, html):
        # 存放1頁的數據 - 寫入到csv文件
        app_list = []
        for app in html['data']:
            # 應用名稱 + 鏈接 + 分類
            name = app['displayName']
            link = 'http://app.mi.com/details?id=' + app['packageName']
            typ_name = app['level1CategoryName']
            # 把每一條數據放到app_list中,目的為了 writerows()
            app_list.append([name, typ_name, link])
            print(name, typ_name)
            self.i += 1

        # 開始寫入1頁數據 - app_list
        self.lock.acquire()
        self.writer.writerows(app_list)
        self.lock.release()

    # 主函數
    def main(self):
        self.get_cateid()       # URL入隊列
        t_list = []
        # 創建多個線程
        for i in range(1):
            t = Thread(target=self.get_data)
            t_list.append(t)
            t.start()

        # 統一回收線程
        for t in t_list:
            t.join()

        # 關閉文件
        self.f.close()
        print('數量:', self.i)


if __name__ == '__main__':
    start = time.time()
    spider = XiaomiSpider()
    spider.main()
    end = time.time()
    print('執行時間:%.2f' % (end - start))

騰訊招聘數據抓取(Ajax)

確定URL地址及目標

要求與分析

  1. 通過查看網頁源碼,得知所需數據均為 Ajax 動態加載
  2. 通過F12抓取網絡數據包,進行分析
  3. 一級頁面抓取數據: 職位名稱
  4. 二級頁面抓取數據: 工作職責、崗位要求

一級頁面json地址(pageIndex在變,timestamp未檢查)

https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn

二級頁面地址(postId在變,在一級頁面中可拿到)

https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn

useragents.py文件

ua_list = [
  'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1',
  'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:6.0) Gecko/20100101 Firefox/6.0',
  'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; InfoPath.3)',
]

我們先來回憶一下原來的騰訊招聘爬蟲代碼

import time
import json
import random
import requests
from useragents import ua_list


class TencentSpider(object):
    def __init__(self):
        self.one_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn'
        self.two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn'
        self.f = open('tencent.json', 'a')  # 打開文件
        self.item_list = []  # 存放抓取的item字典數據

    # 獲取響應內容函數
    def get_page(self, url):
        headers = {'User-Agent': random.choice(ua_list)}
        html = requests.get(url=url, headers=headers).text
        html = json.loads(html)  # json格式字符串轉為Python數據類型

        return html

    # 主線函數: 獲取所有數據
    def parse_page(self, one_url):
        html = self.get_page(one_url)
        item = {}
        for job in html['Data']['Posts']:
            item['name'] = job['RecruitPostName']  # 名稱
            post_id = job['PostId']  # postId,拿postid為了拼接二級頁面地址
            # 拼接二級地址,獲取職責和要求
            two_url = self.two_url.format(post_id)
            item['duty'], item['require'] = self.parse_two_page(two_url)
            print(item)
            self.item_list.append(item)  # 添加到大列表中

    # 解析二級頁面函數
    def parse_two_page(self, two_url):
        html = self.get_page(two_url)
        duty = html['Data']['Responsibility']  # 工作責任
        duty = duty.replace('\r\n', '').replace('\n', '')  # 去掉換行
        require = html['Data']['Requirement']  # 工作要求
        require = require.replace('\r\n', '').replace('\n', '')  # 去掉換行

        return duty, require

    # 獲取總頁數
    def get_numbers(self):
        url = self.one_url.format(1)
        html = self.get_page(url)
        numbers = int(html['Data']['Count']) // 10 + 1  # 每頁有10個推薦

        return numbers

    def main(self):
        number = self.get_numbers()
        for page in range(1, 3):
            one_url = self.one_url.format(page)
            self.parse_page(one_url)

        # 保存到本地json文件:json.dump
        json.dump(self.item_list, self.f, ensure_ascii=False)
        self.f.close()


if __name__ == '__main__':
    start = time.time()
    spider = TencentSpider()
    spider.main()
    end = time.time()
    print('執行時間:%.2f' % (end - start))
View Code

多線程實現

多線程即把所有一級頁面鏈接提交到隊列,進行多線程數據抓取

代碼實現

import requests
import json
import time
import random
from useragents import ua_list
from threading import Thread
from queue import Queue


class TencentSpider(object):
    def __init__(self):
        self.one_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn'
        self.two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn'
        self.q = Queue()
        self.i = 0  # 計數

    # 獲取響應內容函數
    def get_page(self, url):
        headers = {'User-Agent': random.choice(ua_list)}
        html = requests.get(url=url, headers=headers).text
        # json.loads()把json格式的字符串轉為python數據類型
        html = json.loads(html)

        return html

    # 主線函數: 獲取所有數據
    def parse_page(self):
        while True:
            if not self.q.empty():
                one_url = self.q.get()
                html = self.get_page(one_url)
                item = {}
                for job in html['Data']['Posts']:
                    item['name'] = job['RecruitPostName']  # 名稱
                    post_id = job['PostId']  # 拿postid為了拼接二級頁面地址
                    # 拼接二級地址,獲取職責和要求
                    two_url = self.two_url.format(post_id)
                    item['duty'], item['require'] = self.parse_two_page(two_url)
                    print(item)
                # 每爬取按完成1頁隨機休眠
                time.sleep(random.uniform(0, 1))
            else:
                break

    # 解析二級頁面函數
    def parse_two_page(self, two_url):
        html = self.get_page(two_url)
        # 用replace處理一下特殊字符
        duty = html['Data']['Responsibility']
        duty = duty.replace('\r\n', '').replace('\n', '')
        # 處理要求
        require = html['Data']['Requirement']
        require = require.replace('\r\n', '').replace('\n', '')

        return duty, require

    # 獲取總頁數
    def get_numbers(self):
        url = self.one_url.format(1)
        html = self.get_page(url)
        numbers = int(html['Data']['Count']) // 10 + 1

        return numbers

    def main(self):
        # one_url入隊列
        number = self.get_numbers()
        for page in range(1, number + 1):
            one_url = self.one_url.format(page)
            self.q.put(one_url)

        t_list = []
        for i in range(5):
            t = Thread(target=self.parse_page)
            t_list.append(t)
            t.start()

        for t in t_list:
            t.join()

        print('數量:', self.i)


if __name__ == '__main__':
    start = time.time()
    spider = TencentSpider()
    spider.main()
    end = time.time()
    print('執行時間:%.2f' % (end - start))

多進程實現

import requests
import json
import time
import random
from useragents import ua_list
from multiprocessing import Process
from queue import Queue


class TencentSpider(object):
    def __init__(self):
        self.one_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn'
        self.two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn'
        self.q = Queue()

    # 獲取響應內容函數
    def get_page(self, url):
        headers = {'User-Agent': random.choice(ua_list)}
        html = requests.get(url=url, headers=headers).text
        # json格式字符串 -> Python
        html = json.loads(html)

        return html

    # 主線函數: 獲取所有數據
    def parse_page(self):
        while True:
            if not self.q.empty():
                one_url = self.q.get()
                html = self.get_page(one_url)
                item = {}
                for job in html['Data']['Posts']:
                    # 名稱
                    item['name'] = job['RecruitPostName']
                    # postId
                    post_id = job['PostId']
                    # 拼接二級地址,獲取職責和要求
                    two_url = self.two_url.format(post_id)
                    item['duty'], item['require'] = self.parse_two_page(two_url)

                    print(item)
            else:
                break

    # 解析二級頁面函數
    def parse_two_page(self, two_url):
        html = self.get_page(two_url)
        # 用replace處理一下特殊字符
        duty = html['Data']['Responsibility']
        duty = duty.replace('\r\n', '').replace('\n', '')
        # 處理要求
        require = html['Data']['Requirement']
        require = require.replace('\r\n', '').replace('\n', '')

        return duty, require

    # 獲取總頁數
    def get_numbers(self):
        url = self.one_url.format(1)
        html = self.get_page(url)
        numbers = int(html['Data']['Count']) // 10 + 1

        return numbers

    def main(self):
        # url入隊列
        number = self.get_numbers()
        for page in range(1, number + 1):
            one_url = self.one_url.format(page)
            self.q.put(one_url)

        t_list = []
        for i in range(4):
            t = Process(target=self.parse_page)
            t_list.append(t)
            t.start()

        for t in t_list:
            t.join()


if __name__ == '__main__':
    start = time.time()
    spider = TencentSpider()
    spider.main()
    end = time.time()
    print('執行時間:%.2f' % (end - start))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM