本爬蟲開啟了進程池,執行多個進程爬取網站圖片。爬取過程中,將操作信息寫入日志文件 “Photo_Galleries.log” 。
在main()中:
- 以multiprocessing.Process()啟動一個進程process_logger,執行任務task_logger()。該進程的接口為multiprocessing.Queue,接收queue中的信息並寫入日志文件中。
- 搜素網頁頁面中的元素,獲取所需要的元素信息date_data。
- 以以上元素信息的長度date_data為循環上限,以異步方式啟動進程池pool,執行任務task_gallery(),搜素圖片url並下載圖片。相關信息發送到queue中。
- pool結束后,關閉process_logger。
# -*- coding: utf-8 -*- import logging import multiprocessing import os import time import re import requests import sys from requests_html import HTMLSession COOKIE = 'Hm_lvt_dbc355aef238b6c32b43eacbbf161c3c=1547555739,1547555783; Hm_lpvt_dbc355aef238b6c32b43eacbbf161c3c=1547735534' USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36' def get_directory(file_dir): '''獲取文件完整路徑中的目錄''' file_dir_split = file_dir.split('\\', -1) dir = file_dir.rstrip(file_dir_split[-1]) return dir def logger( log_file="", log_console=False, log_format="%(asctime)s - %(levelname)s - %(message)s", log_setlevel=logging.DEBUG, ): '''定義日志處理函數''' # 如果未設置日志文件和控制台,返回None,結束運行 if log_file == "" and log_console is False: return None else: # 日志處理基本設置 logger = logging.getLogger(__name__) # 新建一個logging對象 logger.setLevel(level=log_setlevel) # 設置日志記錄等級 logging_formatter = logging.Formatter(log_format) # 如果定義了日志文件,則設置日志文件 if log_file != "": # 設置日志文件 logging_file = logging.FileHandler(log_file) logging_file.setLevel(level=log_setlevel) logging_file.setFormatter(logging_formatter) logger.addHandler(logging_file) # 如果定義了控制台,則設置控制台 if log_console is True: # 設置控制台 logging_console = logging.StreamHandler(stream=sys.stdout) logging_console.setLevel(level=log_setlevel) logging_console.setFormatter(logging_formatter) logger.addHandler(logging_console) return logger main_dir = get_directory(__file__) logger = logger(log_file=main_dir + 'Photo_Galleries.log', log_console=True) def task_logger(queue): '''日志記錄進程''' while True: logger_level, logger_message = queue.get(block=True, timeout=60) if logger_level == 'DEBUG': logger.debug(logger_message) if logger_level == 'INFO': logger.info(logger_message) if logger_level == 'WARNING': logger.warning(logger_message) if logger_level == 'ERROR': logger.error(logger_message) if logger_level == 'CRITICAL': logger.critical(logger_message) def create_directory(queue, main_dir, sub_dir): '''在main_dir下創建sub_dir''' # 去除目錄中的特殊字符: \/:*?"<>| sub_dir_name = re.sub(r'[\/:*?"<>|]', '', sub_dir) dir = main_dir + sub_dir_name + '\\' # 建立目錄,用於保存圖片 if os.path.exists(dir) is False: try: os.makedirs(dir, 0o777) queue.put(('INFO', '創建目錄 ' + dir + ' 成功。'), block=True, timeout=60) return dir except Exception: queue.put(('DEBUG', '創建目錄 ' + dir + ' 失敗。'), block=True, timeout=60) return None else: queue.put(('INFO', '目錄 ' + dir + ' 已存在。'), block=True, timeout=60) return dir def get_web_page_elements(queue, url, headers, xpath_expression, timeout): '''獲取網頁中的元素''' session = HTMLSession() err_status = 0 task_finished = False while task_finished is False and err_status <= 10: try: response = session.get(url, headers=headers, timeout=timeout) elements_list = response.html.xpath(xpath_expression) task_finished = True queue.put(('INFO', '在網址 ' + url + ' 中搜索 ' + xpath_expression + ' 成功。'), block=True, timeout=60) return elements_list except Exception: err_status += 1 queue.put(('DEBUG', '在網址 ' + url + ' 中搜索 ' + xpath_expression + ' 失敗。睡眠15秒鍾后重新執行搜素任務。'), block=True, timeout=60) time.sleep(15) finally: session.close() if err_status == 10: queue.put(('DEBUG', '在網址 ' + url + ' 中搜索 ' + xpath_expression + ' 失敗了10次。不再執行該搜素任務。'), block=True, timeout=60) return None def download_img_file(queue, img_dir, img_url, img_headers, timeout): '''下載圖片文件''' img_file_downloaded = False err_status = 0 while img_file_downloaded is False and err_status <= 10: try: img_response = requests.get(img_url, headers=img_headers, timeout=timeout) img_url_split = img_url.split('/', -1) img_name = img_url_split[-1] img_file = open(img_dir + img_name, 'wb') img_file.write(img_response.content) img_file_downloaded = True except Exception: err_status += 1 queue.put(('DEBUG', '在網址 ' + img_url + ' 中下載圖片失敗。睡眠15秒鍾后重啟該下載任務。'), block=True, timeout=60) time.sleep(15) if img_file_downloaded: queue.put(('INFO', '在網址 ' + img_url + ' 中下載圖片成功。'), block=True, timeout=60) else: queue.put(('DEBUG', '在網址 ' + img_url + ' 中下載圖片失敗了10次。不再執行該下載任務。'), block=True, timeout=60) def task_gallery(queue, main_dir, web_page_url, date): '''數據爬取進程''' # 創建日期目錄 gallery_date_dir = create_directory(queue, main_dir, date) if gallery_date_dir is not None: gallery_page_url = web_page_url + '?s=' + date gallery_page_referer = gallery_page_url gallery_page_headers = { 'Referer': gallery_page_referer, 'Cookie': COOKIE, 'User-Agent': USER_AGENT } star_page_xpath_expression = "//div[@class='gal_list']/a" star_page_data = get_web_page_elements(queue, gallery_page_url, gallery_page_headers, star_page_xpath_expression, 30) for each_star_page in star_page_data: star_name = each_star_page.text # 明星名稱 star_page_url = 'https://www.kindgirls.com' + each_star_page.attrs['href'] # 明星完整url star_photo_dir = create_directory(queue, gallery_date_dir, star_name) # 創建明星目錄 if star_photo_dir is not None: star_page_referer = star_page_url star_page_headers = { 'Referer': star_page_referer, 'Cookie': COOKIE, 'User-Agent': USER_AGENT } star_photos_xpath_expression = "//div[@class='gal_list']/a[2]" star_photos_data = get_web_page_elements(queue, star_page_url, star_page_headers, star_photos_xpath_expression, 30) for each in star_photos_data: star_photo_url = each.attrs['href'] star_photo_referer = star_page_url star_photo_headers = { 'Referer': star_photo_referer, 'Cookie': COOKIE, 'User-Agent': USER_AGENT } download_img_file(queue, star_photo_dir, star_photo_url, star_photo_headers, 30) else: pass time.sleep(1) else: pass def main(main_dir): '''main函數''' # 設置 queue queue = multiprocessing.Manager().Queue(8) # 設置進程池大小為8 pool = multiprocessing.Pool() # 啟動日志進程 process_logger = multiprocessing.Process(target=task_logger, args=(queue,)) process_logger.start() # pool.apply_async(task_logger, args=(queue,)) # 網頁數據 web_page_url = '網站地址' web_page_referer = web_page_url web_page_headers = { 'Referer': web_page_referer, 'Cookie': COOKIE, 'User-Agent': USER_AGENT } date_xpath_expression = "//select[@name='s']/option" date_data = get_web_page_elements(queue, web_page_url, web_page_headers, date_xpath_expression, 30) date_data_length = len(date_data) for date_index in range(0, date_data_length): date = date_data[date_index].attrs['value'] pool.apply_async(task_gallery, args=(queue, main_dir, web_page_url, date)) pool.close() pool.join() queue.put(('INFO', '任務完成。關閉日志進程。'), block=True, timeout=60) time.sleep(5) process_logger.terminate() process_logger.join() if __name__ == '__main__': '''本程序入口''' main(main_dir)
