應用場景
1、多進程 :CPU密集程序
2、多線程 :爬蟲(網絡I/O)、本地磁盤I/O
知識點回顧
隊列
# 導入模塊 from queue import Queue # 使用 q = Queue() q.put(url) q.get() # 當隊列為空時,阻塞 q.empty() # 判斷隊列是否為空,True/False
線程模塊
# 導入模塊 from threading import Thread # 使用流程 t = Thread(target=函數名) # 創建線程對象 t.start() # 創建並啟動線程 t.join() # 阻塞等待回收線程
小米應用商店抓取(多線程)
目標
- 網址 :百度搜 - 小米應用商店,進入官網,應用分類 - 聊天社交
- 目標 :爬取應用名稱和應用鏈接
實現步驟
1、確認是否為動態加載
1、頁面局部刷新
2、右鍵查看網頁源代碼,搜索關鍵字未搜到,因此此網站為動態加載網站,需要抓取網絡數據包分析
2、F12抓取網絡數據包
1、抓取返回json數據的URL地址(Headers中的Request URL)
http://app.mi.com/categotyAllListApi?page={}&categoryId=2&pageSize=30
2、查看並分析查詢參數(headers中的Query String Parameters)只有page在變,0 1 2 3 ... ... ,這樣我們就可以通過控制page的值拼接多個返回json數據的URL地址
page: 1
categoryId: 2
pageSize: 30
3、將抓取數據保存到csv文件
注意多線程寫入的線程鎖問題
from threading import Lock
lock = Lock()
lock.acquire()
lock.release()
整體思路
- 在 __init__(self) 中創建文件對象,多線程操作此對象進行文件寫入
- 每個線程抓取數據后將數據進行文件寫入,寫入文件時需要加鎖
- 所有數據抓取完成關閉文件
import requests from threading import Thread from queue import Queue import time from lxml import etree import csv from threading import Lock from fake_useragent import UserAgent class XiaomiSpider(object): def __init__(self): self.url = 'http://app.mi.com/categotyAllListApi?page={}&categoryId={}&pageSize=30' self.q = Queue() # 存放所有URL地址的隊列 self.i = 0 self.id_list = [] # 存放所有類型id的空列表 # 打開文件 self.f = open('xiaomi.csv', 'a', newline="") self.writer = csv.writer(self.f) self.lock = Lock() # 創建鎖 self.ua = UserAgent() def get_cateid(self): # 請求 url = 'http://app.mi.com/' headers = {'User-Agent': self.ua.random} html = requests.get(url=url, headers=headers).text # 解析 parse_html = etree.HTML(html) li_list = parse_html.xpath('//ul[@class="category-list"]/li') for li in li_list: typ_name = li.xpath('./a/text()')[0] typ_id = li.xpath('./a/@href')[0].split('/')[-1] pages = self.get_pages(typ_id) # 計算每個類型的頁數 self.id_list.append((typ_id, pages)) self.url_in() # 入隊列 # 獲取counts的值並計算頁數 def get_pages(self, typ_id): # 每頁返回的json數據中,都有count這個key url = self.url.format(0, typ_id) html = requests.get(url=url, headers={'User-Agent': self.ua.random}).json() count = html['count'] # 類別中的數據總數 pages = int(count) // 30 + 1 # 每頁30個,看有多少頁 return pages # url入隊列 def url_in(self): for id in self.id_list: # id為元組,(typ_id, pages)-->('2',pages) for page in range(2): url = self.url.format(page, id[0]) print(url) # 把URL地址入隊列 self.q.put(url) # 線程事件函數: get() - 請求 - 解析 - 處理數據 def get_data(self): while True: # 當隊列不為空時,獲取url地址 if not self.q.empty(): url = self.q.get() headers = {'User-Agent': self.ua.random} html = requests.get(url=url, headers=headers).json() self.parse_html(html) else: break # 解析函數 def parse_html(self, html): # 存放1頁的數據 - 寫入到csv文件 app_list = [] for app in html['data']: # 應用名稱 + 鏈接 + 分類 name = app['displayName'] link = 'http://app.mi.com/details?id=' + app['packageName'] typ_name = app['level1CategoryName'] # 把每一條數據放到app_list中,目的為了 writerows() app_list.append([name, typ_name, link]) print(name, typ_name) self.i += 1 # 開始寫入1頁數據 - app_list self.lock.acquire() self.writer.writerows(app_list) self.lock.release() # 主函數 def main(self): self.get_cateid() # URL入隊列 t_list = [] # 創建多個線程 for i in range(1): t = Thread(target=self.get_data) t_list.append(t) t.start() # 統一回收線程 for t in t_list: t.join() # 關閉文件 self.f.close() print('數量:', self.i) if __name__ == '__main__': start = time.time() spider = XiaomiSpider() spider.main() end = time.time() print('執行時間:%.2f' % (end - start))
騰訊招聘數據抓取(Ajax)
確定URL地址及目標
- URL: 百度搜索騰訊招聘 - 查看工作崗位https://careers.tencent.com/search.html
- 目標: 職位名稱、工作職責、崗位要求
要求與分析
- 通過查看網頁源碼,得知所需數據均為 Ajax 動態加載
- 通過F12抓取網絡數據包,進行分析
- 一級頁面抓取數據: 職位名稱
- 二級頁面抓取數據: 工作職責、崗位要求
一級頁面json地址(pageIndex在變,timestamp未檢查)
https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn
二級頁面地址(postId在變,在一級頁面中可拿到)
https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn
useragents.py文件
ua_list = [ 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1', 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:6.0) Gecko/20100101 Firefox/6.0', 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; InfoPath.3)', ]
我們先來回憶一下原來的騰訊招聘爬蟲代碼

import time import json import random import requests from useragents import ua_list class TencentSpider(object): def __init__(self): self.one_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn' self.two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn' self.f = open('tencent.json', 'a') # 打開文件 self.item_list = [] # 存放抓取的item字典數據 # 獲取響應內容函數 def get_page(self, url): headers = {'User-Agent': random.choice(ua_list)} html = requests.get(url=url, headers=headers).text html = json.loads(html) # json格式字符串轉為Python數據類型 return html # 主線函數: 獲取所有數據 def parse_page(self, one_url): html = self.get_page(one_url) item = {} for job in html['Data']['Posts']: item['name'] = job['RecruitPostName'] # 名稱 post_id = job['PostId'] # postId,拿postid為了拼接二級頁面地址 # 拼接二級地址,獲取職責和要求 two_url = self.two_url.format(post_id) item['duty'], item['require'] = self.parse_two_page(two_url) print(item) self.item_list.append(item) # 添加到大列表中 # 解析二級頁面函數 def parse_two_page(self, two_url): html = self.get_page(two_url) duty = html['Data']['Responsibility'] # 工作責任 duty = duty.replace('\r\n', '').replace('\n', '') # 去掉換行 require = html['Data']['Requirement'] # 工作要求 require = require.replace('\r\n', '').replace('\n', '') # 去掉換行 return duty, require # 獲取總頁數 def get_numbers(self): url = self.one_url.format(1) html = self.get_page(url) numbers = int(html['Data']['Count']) // 10 + 1 # 每頁有10個推薦 return numbers def main(self): number = self.get_numbers() for page in range(1, 3): one_url = self.one_url.format(page) self.parse_page(one_url) # 保存到本地json文件:json.dump json.dump(self.item_list, self.f, ensure_ascii=False) self.f.close() if __name__ == '__main__': start = time.time() spider = TencentSpider() spider.main() end = time.time() print('執行時間:%.2f' % (end - start))
多線程實現
多線程即把所有一級頁面鏈接提交到隊列,進行多線程數據抓取
代碼實現
import requests import json import time import random from useragents import ua_list from threading import Thread from queue import Queue class TencentSpider(object): def __init__(self): self.one_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn' self.two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn' self.q = Queue() self.i = 0 # 計數 # 獲取響應內容函數 def get_page(self, url): headers = {'User-Agent': random.choice(ua_list)} html = requests.get(url=url, headers=headers).text # json.loads()把json格式的字符串轉為python數據類型 html = json.loads(html) return html # 主線函數: 獲取所有數據 def parse_page(self): while True: if not self.q.empty(): one_url = self.q.get() html = self.get_page(one_url) item = {} for job in html['Data']['Posts']: item['name'] = job['RecruitPostName'] # 名稱 post_id = job['PostId'] # 拿postid為了拼接二級頁面地址 # 拼接二級地址,獲取職責和要求 two_url = self.two_url.format(post_id) item['duty'], item['require'] = self.parse_two_page(two_url) print(item) # 每爬取按完成1頁隨機休眠 time.sleep(random.uniform(0, 1)) else: break # 解析二級頁面函數 def parse_two_page(self, two_url): html = self.get_page(two_url) # 用replace處理一下特殊字符 duty = html['Data']['Responsibility'] duty = duty.replace('\r\n', '').replace('\n', '') # 處理要求 require = html['Data']['Requirement'] require = require.replace('\r\n', '').replace('\n', '') return duty, require # 獲取總頁數 def get_numbers(self): url = self.one_url.format(1) html = self.get_page(url) numbers = int(html['Data']['Count']) // 10 + 1 return numbers def main(self): # one_url入隊列 number = self.get_numbers() for page in range(1, number + 1): one_url = self.one_url.format(page) self.q.put(one_url) t_list = [] for i in range(5): t = Thread(target=self.parse_page) t_list.append(t) t.start() for t in t_list: t.join() print('數量:', self.i) if __name__ == '__main__': start = time.time() spider = TencentSpider() spider.main() end = time.time() print('執行時間:%.2f' % (end - start))
多進程實現
import requests import json import time import random from useragents import ua_list from multiprocessing import Process from queue import Queue class TencentSpider(object): def __init__(self): self.one_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn' self.two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn' self.q = Queue() # 獲取響應內容函數 def get_page(self, url): headers = {'User-Agent': random.choice(ua_list)} html = requests.get(url=url, headers=headers).text # json格式字符串 -> Python html = json.loads(html) return html # 主線函數: 獲取所有數據 def parse_page(self): while True: if not self.q.empty(): one_url = self.q.get() html = self.get_page(one_url) item = {} for job in html['Data']['Posts']: # 名稱 item['name'] = job['RecruitPostName'] # postId post_id = job['PostId'] # 拼接二級地址,獲取職責和要求 two_url = self.two_url.format(post_id) item['duty'], item['require'] = self.parse_two_page(two_url) print(item) else: break # 解析二級頁面函數 def parse_two_page(self, two_url): html = self.get_page(two_url) # 用replace處理一下特殊字符 duty = html['Data']['Responsibility'] duty = duty.replace('\r\n', '').replace('\n', '') # 處理要求 require = html['Data']['Requirement'] require = require.replace('\r\n', '').replace('\n', '') return duty, require # 獲取總頁數 def get_numbers(self): url = self.one_url.format(1) html = self.get_page(url) numbers = int(html['Data']['Count']) // 10 + 1 return numbers def main(self): # url入隊列 number = self.get_numbers() for page in range(1, number + 1): one_url = self.one_url.format(page) self.q.put(one_url) t_list = [] for i in range(4): t = Process(target=self.parse_page) t_list.append(t) t.start() for t in t_list: t.join() if __name__ == '__main__': start = time.time() spider = TencentSpider() spider.main() end = time.time() print('執行時間:%.2f' % (end - start))