【原創】本教程僅供娛樂 - 如有侵權,請聯系本人予以刪除!
不喜勿噴【手動狗頭】
更新時間 - 2021-05-10
下載圖片相關
1、漫畫下載
(1) 基本使用

import os import time import requests from uuid import uuid4 class Konachan(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/漫畫/" # 創建目錄 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 獲取圖片列表 def get_images(self, number): url = "https://konachan.net/post.json?tags=holds%3Afalse%20%20limit%3A{}&api_version=2&filter=1&include_tags=1&include_votes=1&include_pools=1".format( number) try: self.create_folder(self.base_path) response = requests.get(url=url) json_data = response.json() data_list = json_data["posts"] for data in data_list: self.download_image(data["file_url"]) except: pass # 下載圖片 def download_image(self, url): print("正在下載圖片:", url) start = time.time() # 下載開始時間 # 拼接文件路徑 file_suffix = str(url).split('.')[-1] file_name = str(uuid4()) + '.' + file_suffix file_path = self.base_path + file_name # 請求下載 response = requests.get(url, timeout=10) print(response.status_code) with open(file_path, mode='wb') as f: f.write(response.content) end = time.time() # 下載結束時間 print('下載完成!耗時: %.2f秒' % (end - start)) # 輸出下載用時時間 if __name__ == '__main__': print("開始獲取圖片") kc = Konachan() kc.get_images(100)
(2) 線程池

import os import requests from uuid import uuid4 from concurrent.futures import ThreadPoolExecutor, wait class Konachan(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/漫畫/" # 創建了一個線程池(最多5個線程) self.pool = ThreadPoolExecutor(5) # 創建目錄 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 獲取圖片列表 def get_images(self, number): url = "https://konachan.net/post.json?tags=holds%3Afalse%20%20limit%3A{}&api_version=2&filter=1&include_tags=1&include_votes=1&include_pools=1".format(number) try: self.create_folder(self.base_path) response = requests.get(url=url) json_data = response.json() data_list = json_data["posts"] # 多線程調用下載函數 tasks = [self.pool.submit(self.download_image, data["file_url"]) for data in data_list] wait(tasks) except: pass # 下載圖片 def download_image(self, url): print("正在下載圖片:", url) # 拼接文件路徑 file_suffix = str(url).split('.')[-1] file_name = str(uuid4()) + '.' + file_suffix file_path = self.base_path + file_name # 請求下載 response = requests.get(url, timeout=10) with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': print("開始獲取圖片") kc = Konachan() kc.get_images(1000)
(3) 異步下載

import os import time import aiohttp import asyncio from uuid import uuid4 class Konachan(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/漫畫/" # 創建目錄 async def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 獲取圖片列表 async def get_images(self, number): url = "https://konachan.net/post.json?tags=holds%3Afalse%20%20limit%3A{}&api_version=2&filter=1&include_tags=1&include_votes=1&include_pools=1".format( number) try: await self.create_folder(self.base_path) # 異步請求 async with aiohttp.ClientSession() as session: async with session.get(url, verify_ssl=False) as response: json_data = await response.json() data_list = json_data["posts"] tasks = [asyncio.create_task(self.download_image(session, data["file_url"])) for data in data_list] await asyncio.wait(tasks) except: pass # 下載圖片 async def download_image(self, session, url): print("正在下載圖片:", url) # 拼接文件路徑 file_suffix = str(url).split('.')[-1] file_name = str(uuid4()) + '.' + file_suffix file_path = self.base_path + file_name # 請求下載 async with session.get(url, verify_ssl=False) as response: content = await response.content.read() with open(file_path, mode='wb') as file_object: file_object.write(content) if __name__ == '__main__': start = time.time() # 下載開始時間 print("開始獲取圖片") kc = Konachan() asyncio.run(kc.get_images(1000)) end = time.time() # 下載結束時間 print('下載完成!耗時: %.2f秒' % (end - start)) # 輸出下載用時時間
2、妹子圖下載
(1) 基本使用

# 網站限制1分鍾 50 次 import os import time import requests from bs4 import BeautifulSoup # 妹子圖下載 class MeiZiTu(): def __init__(self): self.base_url = "https://www.mzitu.com/" self.headers = { "cookie": None, "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36" } self.image_url = None self.base_path = "C:/Users/admin/Desktop/images/" # 創建目錄 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 設置cookies def set_cookie(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["cookie"] = '; '.join(cookie_list) # 獲取圖片 def get_images(self, url=None): url = url if url else self.base_url response = requests.get(url=url, headers=self.headers) if response.status_code != 200: time.sleep(60) self.set_cookie() self.get_images(url) else: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select(".postlist > ul > li > a") for a in a_list: image = a.select_one("img") print("圖片標題: ", image.attrs["alt"]) print("圖片地址: ", image.attrs["data-original"]) print("詳情頁鏈接: ", a.attrs["href"]) # 創建目錄 path = self.base_path + image.attrs["alt"] + '/' self.create_folder(path) # 下載圖片 self.download_image(image.attrs["data-original"], path=path, ref=self.base_url) # 套圖翻頁 next_page = self.get_max_page(url=a.attrs["href"]) for i in range(1, next_page): url = a.attrs["href"] + "/" + str(i) self.get_images_details(url, path) # 翻頁 next_page = soup.select_one(".nav-links > .next") if next_page: self.get_images(next_page.attrs["href"]) else: pass # 獲取套圖最大地址 def get_max_page(self, url): response = requests.get(url=url, headers=self.headers) if response.status_code != 200: time.sleep(60) self.set_cookie() self.get_max_page(url) else: soup = BeautifulSoup(response.text, 'lxml') pagenavi = soup.select(".pagenavi > a > span")[-2] next_page = pagenavi.string return int(next_page) # 獲取圖片詳情 def get_images_details(self, url, path): response = requests.get(url=url, headers=self.headers) if response.status_code != 200: time.sleep(60) self.set_cookie() self.get_images_details(url, path) else: soup = BeautifulSoup(response.text, 'lxml') image = soup.select_one(".main-image > p > a > img") self.download_image(image.attrs["src"], path=path, ref=url) # 下載圖片 def download_image(self, url, path, ref): print("正在下載圖片:", url) headers = { "Referer": ref, "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } video_name = str(url).split('/')[-1] response = requests.get(url, headers=headers) if response.status_code != 200: pass else: file_path = path + str(video_name) with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': meizitu = MeiZiTu() meizitu.get_images()
(2) 線程池

# 網站限制1分鍾 50 次 import re import os import time import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, wait # 創建了一個線程池(最多5個線程) pool = ThreadPoolExecutor(5) # 妹子圖下載 class MeiZiTu(): def __init__(self): self.base_url = "https://www.mzitu.com/" self.image_url = None self.base_path = "G:/crawler/images/" self.headers = { "cookie": None, "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36" } # 創建目錄 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 設置cookies def set_cookie(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["cookie"] = '; '.join(cookie_list) # 獲取圖片列表 def get_images(self, url=None): url = url if url else self.base_url response = requests.get(url=url, headers=self.headers) if response.status_code != 200: time.sleep(60) self.set_cookie() self.get_images(url) else: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select(".postlist > ul > li > a") for a in a_list[1:4]: image = a.select_one("img") print("圖片標題: ", image.attrs["alt"]) # 創建目錄 path = self.base_path + image.attrs["alt"] + '/' self.create_folder(path) # 下載小圖圖片 self.download_image(image.attrs["data-original"], path=path, ref=self.base_url) # 獲取套圖列表 image_url_list = [] max_page, image_url = self.get_max_page(url=a.attrs["href"]) print(image_url) file_name = str(image_url).split('/')[-1] prefix = str(image_url).replace(file_name, '') name = re.findall("\D+", file_name) res = re.findall("\d+", file_name)[0] for i in range(1, max_page + 1): if i < 10: i = "0" + str(i) else: i = str(i) ref = a.attrs["href"] if i == 1 else a.attrs["href"] + "/" + str(i) data = { "image_url": prefix + res + name[0] + i + name[1], "ref": ref } image_url_list.append(data) # 多線程調用下載函數 tasks = [pool.submit(self.download_image, data["image_url"], path, data["ref"]) for data in image_url_list] wait(tasks) # 翻頁 # next_page = soup.select_one(".nav-links > .next") # if next_page: # self.get_images(next_page.attrs["href"]) # else: # pass # 獲取套圖最大地址&第一張圖片的地址 def get_max_page(self, url): response = requests.get(url=url, headers=self.headers) if response.status_code != 200: time.sleep(60) self.set_cookie() self.get_max_page(url) else: soup = BeautifulSoup(response.text, 'lxml') pagenavi = soup.select(".pagenavi > a > span")[-2] image = soup.select_one(".main-image > p > a > img") max_page = int(pagenavi.string) return max_page, image.attrs["src"] # 下載圖片 def download_image(self, url, path, ref): print("正在下載圖片:", url) headers = { "Referer": ref, "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } print(headers) video_name = str(url).split('/')[-1] response = requests.get(url, headers=headers) if response.status_code != 200: pass else: file_path = path + str(video_name) with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': meizitu = MeiZiTu() meizitu.get_images() """ https://imgpc.iimzt.com/2020/05/28b06.jpg """
3、彼岸壁紙下載
(1) 基本使用

import os import time import requests from random import choice from bs4 import BeautifulSoup class BianWallpaper(object): def __init__(self): self.user_agent_list = [ "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.1 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2226.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.4; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2225.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2225.0 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2224.3 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.93 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.124 Safari/537.36", "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2049.0 Safari/537.36", "Mozilla/5.0 (Windows NT 4.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2049.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.67 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.67 Safari/537.36", "Mozilla/5.0 (X11; OpenBSD i386) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1944.0 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.3319.102 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.2309.372 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.2117.157 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1866.237 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.137 Safari/4E423F", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.116 Safari/537.36 Mozilla/5.0 (iPad; U; CPU OS 3_2 like Mac OS X; en-us) AppleWebKit/531.21.10 (KHTML, like Gecko) Version/4.0.4 Mobile/7B334b Safari/531.21.10", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.517 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1667.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1664.3 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1664.3 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.16 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1623.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.17 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.62 Safari/537.36", "Mozilla/5.0 (X11; CrOS i686 4319.74.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.57 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.2 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1468.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1467.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1464.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1500.55 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.90 Safari/537.36", "Mozilla/5.0 (X11; NetBSD) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36", "Mozilla/5.0 (X11; CrOS i686 3912.101.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.60 Safari/537.17", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_2) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1309.0 Safari/537.17", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.15 (KHTML, like Gecko) Chrome/24.0.1295.0 Safari/537.15", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.14 (KHTML, like Gecko) Chrome/24.0.1292.0 Safari/537.14" ] self.headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Host": "www.netbian.com", "Upgrade-Insecure-Requests": "1", "Cookie": None, "User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36" } self.base_url = "http://www.netbian.com" self.start_url = "http://www.netbian.com/" self.base_path = "G:/crawler/videos/" # 獲取cookie def set_cookie(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["Cookie"] = '; '.join(cookie_list) self.headers["User-Agent"] = choice(self.user_agent_list) # 創建目錄 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 獲取所有的壁紙分類 def get_category_list(self): # 設置cookie self.set_cookie() category_list = [] response = requests.get(self.start_url, headers=self.headers) # 獲取網頁編碼 response.encoding = response.apparent_encoding if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select('#header > div.head > ul > li:nth-child(1) > div > a') for a in a_list: if a.text == "4k壁紙": pass else: category_list.append({"title": a.text, "url": self.base_url + a.attrs["href"]}) for index, category in enumerate(category_list): print(index, '---', category["title"]) category_index = int(input("請選擇你要下載的圖片分類: ")) category_url = category_list[category_index]["url"] self.base_path = self.base_path + category_list[category_index]["title"] + '/' self.create_folder(self.base_path) self.get_image_list(category_url) else: time.sleep(60) self.set_cookie() self.get_category_list() # 獲取圖片列表 def get_image_list(self, url): response = requests.get(url=url, headers=self.headers) # 獲取網頁編碼 response.encoding = response.apparent_encoding if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select(".list > ul > li > a") prev = soup.select(".prev")[-1] for a in a_list: link = self.base_url + a.attrs["href"] self.get_wallpaper(link) if "下一頁" in prev.string: next_url = self.base_url + prev.attrs["href"] self.get_image_list(next_url) else: pass else: time.sleep(60) self.set_cookie() self.get_image_list(url) # 獲取壁紙鏈接 def get_wallpaper(self, url): response = requests.get(url=url, headers=self.headers) # 獲取網頁編碼 response.encoding = response.apparent_encoding if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') wallpaper_url = soup.select_one("#main > div.endpage > div > p > a > img").attrs["src"] self.download_image(wallpaper_url) else: time.sleep(60) self.set_cookie() self.get_wallpaper(url) # 下載壁紙 def download_image(self, url): print("正在下載圖片:", url) start = time.time() # 下載開始時間 file_name = str(url).split('/')[-1] file_path = self.base_path + file_name # 請求下載 response = requests.get(url, timeout=10) with open(file_path, mode='wb') as f: f.write(response.content) end = time.time() # 下載結束時間 print('下載完成!耗時: %.2f秒' % (end - start)) # 輸出下載用時時間 if __name__ == '__main__': bian = BianWallpaper() bian.get_category_list()
4、斗圖下載
(1) 基本使用

import os import time import requests from bs4 import BeautifulSoup class DouTu(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/斗圖/" self.base_url = "https://www.doutula.com" self.headers = { "Cookie": "_agep=1617332993; _agfp=394e543fd58b15645b20d85328c7c13a; _agtk=ea484a0f61d912d369bd5765299b9702; Hm_lvt_2fc12699c699441729d4b335ce117f40=1617332992,1618563143; Hm_lpvt_2fc12699c699441729d4b335ce117f40=1618563729", "Host": "www.doutula.com", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36", } self.create_folder(self.base_path) # 創建目錄 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 設置cookies def set_cookie(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["Cookie"] = '; '.join(cookie_list) # 生成鏈接列表 def get_url_list(self, number): for i in range(1, number): yield f"https://www.doutula.com/photo/list/?page={i}" # 獲取圖片列表 def get_images(self, url): try: response = requests.get(url=url, headers=self.headers) print(response.status_code) if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select(".list-group-item > div > div > a") for a in a_list: img = a.select('img')[-1] self.download_image(img.attrs["data-backup"]) else: time.sleep(60) self.set_cookie() self.get_images(url) except Exception as e: time.sleep(60) self.set_cookie() self.get_images(url) # 下載圖片 def download_image(self, url): print("正在下載圖片:", url) start = time.time() # 下載開始時間 file_path = self.base_path + str(url).split('/')[-1] response = requests.get(url, timeout=10) with open(file_path, mode='wb') as f: f.write(response.content) end = time.time() # 下載結束時間 print('下載完成!耗時: %.2f秒' % (end - start)) # 輸出下載用時時間 if __name__ == '__main__': dt = DouTu() number = int(input("請輸入爬取的頁數: ")) url_list = dt.get_url_list(number) for url in url_list: dt.get_images(url)
(2) 線程池下載

import os import time import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, wait # 創建了一個線程池(最多5個線程) pool = ThreadPoolExecutor(5) class DouTu(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/斗圖/" self.base_url = "https://www.doutula.com" self.headers = { "Cookie": "_agep=1617332993; _agfp=394e543fd58b15645b20d85328c7c13a; _agtk=ea484a0f61d912d369bd5765299b9702; Hm_lvt_2fc12699c699441729d4b335ce117f40=1617332992,1618563143; Hm_lpvt_2fc12699c699441729d4b335ce117f40=1618563729", "Host": "www.doutula.com", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36", } self.create_folder(self.base_path) # 創建目錄 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 設置cookies def set_cookie(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["Cookie"] = '; '.join(cookie_list) # 生成鏈接列表 def get_url_list(self, number): for i in range(1, number): yield f"https://www.doutula.com/photo/list/?page={i}" # 獲取圖片列表 def get_images(self, url): try: response = requests.get(url=url, headers=self.headers) if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select(".list-group-item > div > div > a") tasks = [pool.submit(self.download_image, a.select('img')[-1].attrs["data-backup"]) for a in a_list] wait(tasks) else: time.sleep(60) self.set_cookie() self.get_images(url) except Exception as e: time.sleep(60) self.set_cookie() self.get_images(url) # 下載圖片 def download_image(self, url): print(url) file_path = self.base_path + str(url).split('/')[-1] response = requests.get(url, timeout=10) with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': dt = DouTu() number = int(input("請輸入爬取的頁數: ")) url_list = dt.get_url_list(number) for url in url_list: dt.get_images(url)
5、英雄聯盟英雄皮膚下載
(1) 基本使用

import os import requests class LOL(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/英雄聯盟/" self.hero_list_url = "http://game.gtimg.cn/images/lol/act/img/js/heroList/hero_list.js" self.hero_url = "https://game.gtimg.cn/images/lol/act/img/js/hero/{}.js" # 創建目錄 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass def get_hero_list(self): response = requests.get(url=self.hero_list_url) res_json = response.json() hero_list = res_json.get("hero") for hero in hero_list[27:]: heroId = hero.get("heroId") self.get_hero(heroId) print("----------------------------------------------------------------------------------------------") def get_hero(self, heroId): response = requests.get(url=self.hero_url.format(heroId)) json_data = response.json() # 英雄基本信息 hero = json_data.get("hero") heroId = hero["heroId"] name = hero["name"] alias = hero["alias"] title = hero["title"] shortBio = hero["shortBio"] print(f"{heroId} - {name} - {alias} - {title} - {shortBio}") download_path = self.base_path + name self.create_folder(download_path) # 英雄皮膚信息 skins = json_data.get("skins") for skin in skins: skin_name = skin["name"] skin_image = skin["mainImg"] description = skin["description"] if skin_image: print("\t", f"{skin_name} - {skin_image} - {description}") self.download_image(skin_image, download_path, skin_name) # 下載圖片 def download_image(self, url, path, filename): filename = filename.replace('/','') response = requests.get(url) file_path = f"{path}/{filename}.jpg" with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': lol = LOL() lol.get_hero_list()
下載視頻相關
1、抖音
(1) 根據用戶主頁下載用戶所有視頻(無水印)

import os import time from urllib import parse from concurrent.futures import ThreadPoolExecutor, wait import requests # 創建了一個線程池(最多5個線程) pool = ThreadPoolExecutor(5) class DouYin(object): def __init__(self): # 默認請求頭 self.headers = { "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1" } # 獲取用戶個人信息接口 self.user_info_url = "https://www.iesdouyin.com/web/api/v2/user/info/" # 獲取用戶作品接口 self.video_list_url = "https://www.iesdouyin.com/web/api/v2/aweme/post/" # 獲取用戶喜歡作品列表 self.like_list_url = "https://www.iesdouyin.com/web/api/v2/aweme/like/" # 獲取用戶挑戰 self.challenge_list_url = 'https://www.iesdouyin.com/web/api/v2/challenge/aweme/' # 簽名 self.sign = "mIrLFQAA-OUypYuVfKXa6ZiKyw" # 定義存儲路徑 self.base_path = "G:/crawler/videos/" # 創建目錄 def create_folder(self): if not os.path.exists(self.base_path): os.makedirs(self.base_path) else: pass # 獲取跳轉鏈接 def get_link(self, url): response = requests.get(url=url, headers=self.headers, allow_redirects=False) location = response.headers["location"] params = { "sec_uid": dict(parse.parse_qsl(parse.urlsplit(location).query))["sec_uid"] } self.get_user_info(params) # 獲取用戶信息 def get_user_info(self, params): response = requests.get(url=self.user_info_url, params=params) response_data = response.json() user_info = response_data["user_info"] print("用戶抖音號:", user_info["unique_id"]) print("用戶排名ID:", user_info["short_id"]) print("用戶昵稱:", user_info["nickname"]) print("用戶簽名:", user_info["signature"]) print("用戶獲贊數:", user_info["total_favorited"]) print("用戶粉絲數:", user_info["follower_count"]) print("用戶作品數:", user_info["aweme_count"]) self.base_path = self.base_path + user_info["unique_id"] + "/" self.create_folder() print("開始獲取用戶作品....") self.get_aweme_list(sec_uid=params["sec_uid"]) # print("開始獲取用戶喜歡作品....") # self.get_like_aweme_list(sec_uid=params["sec_uid"]) # print("開始獲取用戶挑戰作品....") # self.get_challenge_aweme_list(ch_id=params["sec_uid"]) # 獲取用戶作品列表 def get_aweme_list(self, sec_uid, count=10, max_cursor=0, aid=1128, dytk=""): params = { "sec_uid": sec_uid, "count": count, "max_cursor": max_cursor, "aid": aid, "_signature": self.sign, "dytk": dytk, } response = requests.get(url=self.video_list_url, params=params) response_data = response.json() # 多線程調用下載函數 tasks = [pool.submit(self.download_video, aweme) for aweme in response_data["aweme_list"]] wait(tasks) # 獲取下一頁鏈接 next_page = response_data["max_cursor"] if next_page: # 獲取下一頁鏈接 self.get_aweme_list(sec_uid=sec_uid, max_cursor=next_page) else: print("下載完成.....") # 獲取用戶喜歡作品列表(暫不支持) def get_like_aweme_list(self, sec_uid, count=10, max_cursor=0, aid=1128, dytk=""): params = { "sec_uid": sec_uid, "count": count, "max_cursor": max_cursor, "aid": aid, "_signature": self.sign, "dytk": dytk, } response = requests.get(url=self.like_list_url, params=params) response_data = response.json() if len(response_data["aweme_list"]) <= 0: print("當前用戶沒有喜歡的作品,或不能查看.....") else: # 多線程調用下載函數 tasks = [pool.submit(self.download_video, aweme, type='/likes') for aweme in response_data["aweme_list"]] wait(tasks) # 獲取下一頁鏈接 next_page = response_data["max_cursor"] if next_page: # 獲取下一頁鏈接 self.get_aweme_list(sec_uid=sec_uid, max_cursor=next_page) else: print("下載完成.....") # 獲取用戶挑戰作品列表 def get_challenge_aweme_list(self, ch_id, count=10, cursor=0, aid=1128, screen_limit=3, download_click_limit=0): params = { "ch_id": ch_id, "count": count, "cursor": cursor, "aid": aid, "screen_limit": screen_limit, "download_click_limit": download_click_limit, "_signature": self.sign } response = requests.get(url=self.challenge_list_url, params=params) response_data = response.json() while response_data["has_more"]: for aweme in response_data["aweme_list"]: try: print("作品ID: ", aweme["aweme_id"]) print("分組ID: ", aweme["group_id"]) print("作品簡介: ", aweme["desc"]) print("作者昵稱: ", aweme["author"]["nickname"]) print("作者簽名: ", aweme["author"]["signature"]) print("音頻鏈接: ", aweme["music"]["play_url"]["uri"]) print("視頻鏈接: ", aweme["video"]["play_addr"]["url_list"][0]) print("\n") except: pass cursor += count self.get_challenge_aweme_list(ch_id=ch_id, cursor=cursor) # 多線程下載視頻 def download_video(self, aweme): url = aweme["video"]["play_addr_lowbr"]["url_list"][0] video_name = aweme["aweme_id"] response = requests.get(url) file_path = self.base_path + str(video_name) + '.mp4' with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': url = input("請輸入你要抓取的鏈接如: https://v.douyin.com/xxxxxxx/ \n : ") douyin = DouYin() douyin.get_link(url=url)
(2) 抖音視頻解析

import re import requests class VideoParsing(object): def __init__(self): self.headers = { 'User-Agent': 'mozilla/5.0 (iphone; cpu iphone os 14_4 like mac os x) applewebkit/605.1.15 (khtml, like gecko) version/14.0.3 mobile/15e148 safari/604.1' } # 專門負責請求 def get_response(self, url): try: response = requests.get(url=url, headers=self.headers, timeout=5) if response.status_code == 200: return response except Exception as e: print(e) for i in range(1, 10): print(f'請求{url}超時,第{i}次重復請求') response = requests.get(url, headers=self.headers, timeout=5) if response.status_code == 200: return response def parsing(self, video_url_share): # 匹配鏈接 video_url_share = re.findall('https.*/', video_url_share)[0] # 獲取跳轉之后的鏈接 video_url_redirect = self.get_response(video_url_share).url # 獲取視頻id video_id = re.findall(r'video/(\d+)/', str(video_url_redirect))[0] # 獲取視頻API鏈接 video_url_api = f'https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={video_id}' # 獲取json數據 video_url_json = self.get_response(video_url_api).json() # 獲取視頻鏈接 video_url = video_url_json.get('item_list')[0].get('video').get('play_addr').get('url_list')[0] # 更換參數 video_url = video_url.replace('playwm', 'play').replace('&ratio=720p', '') # 獲取鏈接 video_url_web = self.get_response(video_url).url.replace('http:', 'https:') # 生成視頻名稱 video_name = f'douyin{video_id}.mp4' print(video_name, video_url_web) if __name__ == '__main__': """ https://v.douyin.com/eB8SLLk/ """ video_parsing = VideoParsing() while True: video_url_share = input("請輸入要解析的鏈接\n:") video_parsing.parsing(video_url_share)
(3) 抖音排行

import requests class DouYin(object): def __init__(self): # 熱門搜索 self.HOT_SEARCH_URL = 'https://aweme.snssdk.com/aweme/v1/hot/search/list/' # 熱門明星 self.HOT_STAR_URL = 'https://aweme.snssdk.com/aweme/v1/hotsearch/star/billboard/' # 熱門 self.HOT_LIVE_URL = 'https://webcast.amemv.com/webcast/ranklist/hot/' self.BRAND_CATEGORY_URL = 'https://aweme.snssdk.com/aweme/v1/hotsearch/brand/category/' self.HOT_BRAND_URL = 'https://aweme.snssdk.com/aweme/v1/hotsearch/brand/billboard/' self.HOT_MUSIC_URL = 'https://aweme.snssdk.com/aweme/v1/chart/music/list/' self.HEADERS = { 'user-agent': 'okhttp3' } self.QUERIES = { 'device_platform': 'android', 'version_name': '13.2.0', 'version_code': '130200', 'aid': '1128' } def get_hot_search(self): """ 熱點 """ response = requests.get(self.HOT_SEARCH_URL, params=self.QUERIES) response_data = response.json() items = [item for item in response_data["data"]["word_list"]] return items def get_hot_star(self): """ 明星 """ response = requests.get(self.HOT_STAR_URL, params=self.QUERIES) response_data = response.json() items = [item for item in response_data["user_list"]] return items def get_hot_live(self): """ 直播 """ response = requests.get(self.HOT_LIVE_URL, params=self.QUERIES) response_data = response.json() items = [item for item in response_data["data"]["ranks"]] return items def get_brand_category(self): """ 品牌分類 """ response = requests.get(self.BRAND_CATEGORY_URL, params=self.QUERIES) response_data = response.json() items = [item for item in response_data["category_list"]] return items def get_hot_brand(self, category: int): """ 品牌榜 """ params = self.QUERIES.copy() params.update({'category_id': str(category)}) response = requests.get(self.HOT_BRAND_URL, params=params) response_data = response.json() items = [item for item in response_data["brand_list"]] return items def get_hot_music(self): """ 音樂 """ params = self.QUERIES.copy() params.update({'chart_id': '6853972723954146568', 'count': '100'}) response = requests.get(self.HOT_MUSIC_URL, params=params) response_data = response.json() items = [item for item in response_data["music_list"]] return items def run(): douyin = DouYin() # # 調用熱點函數 # items = douyin.get_hot_search() # for item in items: # print(item) # # 獲取熱點明星 # items = douyin.get_hot_star() # for item in items: # print(item) # # 獲取熱點直播 # items = douyin.get_hot_live() # for item in items: # print(item) # # 獲取熱點分類 # items = douyin.get_brand_category() # for item in items: # print(item) # # 獲取熱點品牌榜 # items = douyin.get_hot_brand(1) # for item in items: # print(item) # 獲取熱點音樂 items = douyin.get_hot_music() for item in items: print("歌曲ID: ", item["music_info"]["id"]) print("歌曲名稱: ", item["music_info"]["title"]) print("歌曲作者: ", item["music_info"]["author"]) print("歌曲鏈接: ", item["music_info"]["play_url"]["uri"]) print('\n') if __name__ == '__main__': run()
2、A站視頻下載(四種清晰度)
(1) 單個視頻下載

import re from uuid import uuid4 import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, wait # 創建了一個線程池(最多5個線程) pool = ThreadPoolExecutor(5) # AcFun彈幕網 class SpiderAcfunVideo(object): def __init__(self): self.headers = { "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25" } self.video_url = "https://api-new.acfunchina.com/rest/app/play/playInfo/mp4?videoId={}&resourceId={}&resourceType=2&mkey=AAHewK3eIAAyMjAzNTI2NDMAAhAAMEP1uwS3Vi7NYAAAAJumF4MyTTFh5HGoyjW6ZpdjKymALUy9jZbsMTBVx-F10EhxyvpMtGQbBCYipvkMShM3iMNwbMd9DM6r2rnOYRVEdr6MaJS4yxxlA_Sl3JNWup57qBCQzOSC7SZnbEsHTQ%3D%3D&market=xiaomi&product=ACFUN_APP&sys_version=10&app_version=6.20.0.915&boardPlatform=sdm845&sys_name=android&socName=UNKNOWN&appMode=0" self.base_path = "C:/Users/admin/Desktop/videos/" def get_videos_msg(self, page_url): response = requests.get(url=page_url, headers=self.headers) soup = BeautifulSoup(response.text, 'lxml') # print(soup) # 提取視頻標題 title = soup.find("title").string # 提取視頻ID video_id = re.search(r"(?<=\"vid\":\")\d+(?=\",)", response.text).group() # 提取視頻資源ID resource_id = re.search(r"(?<=\"ac\":\")\d+(?=\",)", response.text).group() self.get_video_url(title, video_id, resource_id) # 獲取視頻鏈接 def get_video_url(self, title, video_id, resource_id): url = self.video_url.format(video_id, resource_id) response = requests.get(url=url, headers=self.headers) streams = response.json()["playInfo"]["streams"] print("視頻標題: ", title) tasks = [pool.submit(self.download_video, stream["playUrls"][0]) for stream in streams] wait(tasks) # 下載視頻 def download_video(self, url): video_name = str(uuid4()) response = requests.get(url) file_path = self.base_path + str(video_name) + '.mp4' with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == "__main__": while True: # https://www.acfun.cn/v/ac16986343 page_url = input("請輸入視頻地址>>>: ") if page_url == 'q': print("退出...") break video = SpiderAcfunVideo() video.get_videos_msg(page_url=page_url)
(2) 搜索批量下載

import re from uuid import uuid4 import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, wait # 創建了一個線程池(最多5個線程) pool = ThreadPoolExecutor(5) # AcFun彈幕網 class SpiderAcfunVideo(object): def __init__(self): self.headers = { "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25", "cookie": None } self.search_url = "https://www.acfun.cn/search" self.base_url = "https://www.acfun.cn/" self.video_url = "https://api-new.acfunchina.com/rest/app/play/playInfo/mp4?videoId={}&resourceId={}&resourceType=2&mkey=AAHewK3eIAAyMjAzNTI2NDMAAhAAMEP1uwS3Vi7NYAAAAJumF4MyTTFh5HGoyjW6ZpdjKymALUy9jZbsMTBVx-F10EhxyvpMtGQbBCYipvkMShM3iMNwbMd9DM6r2rnOYRVEdr6MaJS4yxxlA_Sl3JNWup57qBCQzOSC7SZnbEsHTQ%3D%3D&market=xiaomi&product=ACFUN_APP&sys_version=10&app_version=6.20.0.915&boardPlatform=sdm845&sys_name=android&socName=UNKNOWN&appMode=0" self.base_path = "G:/crawler/videos/" # 獲取cookie def get_cookies(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["cookie"] = '; '.join(cookie_list) def search_videos(self, keyword, type='video'): params = { "type": type, "keyword": keyword } print(self.headers) response = requests.get(url=self.search_url, params=params, headers=self.headers) response_text = str(response.text).replace('\\', '') pattern = re.compile(r'<a href="/v(.*?)"') link_list = pattern.findall(response_text) for link in link_list: page_url = self.base_url + "v" + link self.get_videos_msg(page_url) def get_videos_msg(self, page_url): response = requests.get(url=page_url, headers=self.headers) soup = BeautifulSoup(response.text, 'lxml') # print(soup) # 提取視頻標題 title = soup.find("title").string # 提取視頻ID video_id = re.search(r"(?<=\"vid\":\")\d+(?=\",)", response.text).group() # 提取視頻資源ID resource_id = re.search(r"(?<=\"ac\":\")\d+(?=\",)", response.text).group() self.get_video_url(title, video_id, resource_id) # 獲取視頻鏈接 def get_video_url(self, title, video_id, resource_id): url = self.video_url.format(video_id, resource_id) response = requests.get(url=url, headers=self.headers) streams = response.json()["playInfo"]["streams"] print("視頻標題: ", title) tasks = [pool.submit(self.download_video, stream["playUrls"][0]) for stream in streams] wait(tasks) # 下載視頻 def download_video(self, url): video_name = str(uuid4()) response = requests.get(url) file_path = self.base_path + str(video_name) + '.mp4' with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == "__main__": while True: # https://www.acfun.cn/v/ac16986343 keyword = input("請輸入關鍵字>>>: ") if keyword == 'q': print("退出...") break video = SpiderAcfunVideo() video.get_cookies() video.search_videos(keyword=keyword)