【原创】本教程仅供娱乐 - 如有侵权,请联系本人予以删除!
不喜勿喷【手动狗头】
更新时间 - 2021-05-10
下载图片相关
1、漫画下载
(1) 基本使用

import os import time import requests from uuid import uuid4 class Konachan(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/漫画/" # 创建目录 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 获取图片列表 def get_images(self, number): url = "https://konachan.net/post.json?tags=holds%3Afalse%20%20limit%3A{}&api_version=2&filter=1&include_tags=1&include_votes=1&include_pools=1".format( number) try: self.create_folder(self.base_path) response = requests.get(url=url) json_data = response.json() data_list = json_data["posts"] for data in data_list: self.download_image(data["file_url"]) except: pass # 下载图片 def download_image(self, url): print("正在下载图片:", url) start = time.time() # 下载开始时间 # 拼接文件路径 file_suffix = str(url).split('.')[-1] file_name = str(uuid4()) + '.' + file_suffix file_path = self.base_path + file_name # 请求下载 response = requests.get(url, timeout=10) print(response.status_code) with open(file_path, mode='wb') as f: f.write(response.content) end = time.time() # 下载结束时间 print('下载完成!耗时: %.2f秒' % (end - start)) # 输出下载用时时间 if __name__ == '__main__': print("开始获取图片") kc = Konachan() kc.get_images(100)
(2) 线程池

import os import requests from uuid import uuid4 from concurrent.futures import ThreadPoolExecutor, wait class Konachan(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/漫画/" # 创建了一个线程池(最多5个线程) self.pool = ThreadPoolExecutor(5) # 创建目录 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 获取图片列表 def get_images(self, number): url = "https://konachan.net/post.json?tags=holds%3Afalse%20%20limit%3A{}&api_version=2&filter=1&include_tags=1&include_votes=1&include_pools=1".format(number) try: self.create_folder(self.base_path) response = requests.get(url=url) json_data = response.json() data_list = json_data["posts"] # 多线程调用下载函数 tasks = [self.pool.submit(self.download_image, data["file_url"]) for data in data_list] wait(tasks) except: pass # 下载图片 def download_image(self, url): print("正在下载图片:", url) # 拼接文件路径 file_suffix = str(url).split('.')[-1] file_name = str(uuid4()) + '.' + file_suffix file_path = self.base_path + file_name # 请求下载 response = requests.get(url, timeout=10) with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': print("开始获取图片") kc = Konachan() kc.get_images(1000)
(3) 异步下载

import os import time import aiohttp import asyncio from uuid import uuid4 class Konachan(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/漫画/" # 创建目录 async def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 获取图片列表 async def get_images(self, number): url = "https://konachan.net/post.json?tags=holds%3Afalse%20%20limit%3A{}&api_version=2&filter=1&include_tags=1&include_votes=1&include_pools=1".format( number) try: await self.create_folder(self.base_path) # 异步请求 async with aiohttp.ClientSession() as session: async with session.get(url, verify_ssl=False) as response: json_data = await response.json() data_list = json_data["posts"] tasks = [asyncio.create_task(self.download_image(session, data["file_url"])) for data in data_list] await asyncio.wait(tasks) except: pass # 下载图片 async def download_image(self, session, url): print("正在下载图片:", url) # 拼接文件路径 file_suffix = str(url).split('.')[-1] file_name = str(uuid4()) + '.' + file_suffix file_path = self.base_path + file_name # 请求下载 async with session.get(url, verify_ssl=False) as response: content = await response.content.read() with open(file_path, mode='wb') as file_object: file_object.write(content) if __name__ == '__main__': start = time.time() # 下载开始时间 print("开始获取图片") kc = Konachan() asyncio.run(kc.get_images(1000)) end = time.time() # 下载结束时间 print('下载完成!耗时: %.2f秒' % (end - start)) # 输出下载用时时间
2、妹子图下载
(1) 基本使用

# 网站限制1分钟 50 次 import os import time import requests from bs4 import BeautifulSoup # 妹子图下载 class MeiZiTu(): def __init__(self): self.base_url = "https://www.mzitu.com/" self.headers = { "cookie": None, "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36" } self.image_url = None self.base_path = "C:/Users/admin/Desktop/images/" # 创建目录 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 设置cookies def set_cookie(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["cookie"] = '; '.join(cookie_list) # 获取图片 def get_images(self, url=None): url = url if url else self.base_url response = requests.get(url=url, headers=self.headers) if response.status_code != 200: time.sleep(60) self.set_cookie() self.get_images(url) else: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select(".postlist > ul > li > a") for a in a_list: image = a.select_one("img") print("图片标题: ", image.attrs["alt"]) print("图片地址: ", image.attrs["data-original"]) print("详情页链接: ", a.attrs["href"]) # 创建目录 path = self.base_path + image.attrs["alt"] + '/' self.create_folder(path) # 下载图片 self.download_image(image.attrs["data-original"], path=path, ref=self.base_url) # 套图翻页 next_page = self.get_max_page(url=a.attrs["href"]) for i in range(1, next_page): url = a.attrs["href"] + "/" + str(i) self.get_images_details(url, path) # 翻页 next_page = soup.select_one(".nav-links > .next") if next_page: self.get_images(next_page.attrs["href"]) else: pass # 获取套图最大地址 def get_max_page(self, url): response = requests.get(url=url, headers=self.headers) if response.status_code != 200: time.sleep(60) self.set_cookie() self.get_max_page(url) else: soup = BeautifulSoup(response.text, 'lxml') pagenavi = soup.select(".pagenavi > a > span")[-2] next_page = pagenavi.string return int(next_page) # 获取图片详情 def get_images_details(self, url, path): response = requests.get(url=url, headers=self.headers) if response.status_code != 200: time.sleep(60) self.set_cookie() self.get_images_details(url, path) else: soup = BeautifulSoup(response.text, 'lxml') image = soup.select_one(".main-image > p > a > img") self.download_image(image.attrs["src"], path=path, ref=url) # 下载图片 def download_image(self, url, path, ref): print("正在下载图片:", url) headers = { "Referer": ref, "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } video_name = str(url).split('/')[-1] response = requests.get(url, headers=headers) if response.status_code != 200: pass else: file_path = path + str(video_name) with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': meizitu = MeiZiTu() meizitu.get_images()
(2) 线程池

# 网站限制1分钟 50 次 import re import os import time import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, wait # 创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5) # 妹子图下载 class MeiZiTu(): def __init__(self): self.base_url = "https://www.mzitu.com/" self.image_url = None self.base_path = "G:/crawler/images/" self.headers = { "cookie": None, "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36" } # 创建目录 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 设置cookies def set_cookie(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["cookie"] = '; '.join(cookie_list) # 获取图片列表 def get_images(self, url=None): url = url if url else self.base_url response = requests.get(url=url, headers=self.headers) if response.status_code != 200: time.sleep(60) self.set_cookie() self.get_images(url) else: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select(".postlist > ul > li > a") for a in a_list[1:4]: image = a.select_one("img") print("图片标题: ", image.attrs["alt"]) # 创建目录 path = self.base_path + image.attrs["alt"] + '/' self.create_folder(path) # 下载小图图片 self.download_image(image.attrs["data-original"], path=path, ref=self.base_url) # 获取套图列表 image_url_list = [] max_page, image_url = self.get_max_page(url=a.attrs["href"]) print(image_url) file_name = str(image_url).split('/')[-1] prefix = str(image_url).replace(file_name, '') name = re.findall("\D+", file_name) res = re.findall("\d+", file_name)[0] for i in range(1, max_page + 1): if i < 10: i = "0" + str(i) else: i = str(i) ref = a.attrs["href"] if i == 1 else a.attrs["href"] + "/" + str(i) data = { "image_url": prefix + res + name[0] + i + name[1], "ref": ref } image_url_list.append(data) # 多线程调用下载函数 tasks = [pool.submit(self.download_image, data["image_url"], path, data["ref"]) for data in image_url_list] wait(tasks) # 翻页 # next_page = soup.select_one(".nav-links > .next") # if next_page: # self.get_images(next_page.attrs["href"]) # else: # pass # 获取套图最大地址&第一张图片的地址 def get_max_page(self, url): response = requests.get(url=url, headers=self.headers) if response.status_code != 200: time.sleep(60) self.set_cookie() self.get_max_page(url) else: soup = BeautifulSoup(response.text, 'lxml') pagenavi = soup.select(".pagenavi > a > span")[-2] image = soup.select_one(".main-image > p > a > img") max_page = int(pagenavi.string) return max_page, image.attrs["src"] # 下载图片 def download_image(self, url, path, ref): print("正在下载图片:", url) headers = { "Referer": ref, "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36' } print(headers) video_name = str(url).split('/')[-1] response = requests.get(url, headers=headers) if response.status_code != 200: pass else: file_path = path + str(video_name) with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': meizitu = MeiZiTu() meizitu.get_images() """ https://imgpc.iimzt.com/2020/05/28b06.jpg """
3、彼岸壁纸下载
(1) 基本使用

import os import time import requests from random import choice from bs4 import BeautifulSoup class BianWallpaper(object): def __init__(self): self.user_agent_list = [ "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.1 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2227.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2226.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.4; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2225.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2225.0 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2224.3 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.93 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.124 Safari/537.36", "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2049.0 Safari/537.36", "Mozilla/5.0 (Windows NT 4.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2049.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.67 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.67 Safari/537.36", "Mozilla/5.0 (X11; OpenBSD i386) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1944.0 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.3319.102 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.2309.372 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.2117.157 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1866.237 Safari/537.36", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.137 Safari/4E423F", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.116 Safari/537.36 Mozilla/5.0 (iPad; U; CPU OS 3_2 like Mac OS X; en-us) AppleWebKit/531.21.10 (KHTML, like Gecko) Version/4.0.4 Mobile/7B334b Safari/531.21.10", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.517 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1667.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1664.3 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1664.3 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.16 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1623.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.17 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.62 Safari/537.36", "Mozilla/5.0 (X11; CrOS i686 4319.74.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.57 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.2 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1468.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1467.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1464.0 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1500.55 Safari/537.36", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.90 Safari/537.36", "Mozilla/5.0 (X11; NetBSD) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36", "Mozilla/5.0 (X11; CrOS i686 3912.101.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.60 Safari/537.17", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_2) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1309.0 Safari/537.17", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.15 (KHTML, like Gecko) Chrome/24.0.1295.0 Safari/537.15", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.14 (KHTML, like Gecko) Chrome/24.0.1292.0 Safari/537.14" ] self.headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Host": "www.netbian.com", "Upgrade-Insecure-Requests": "1", "Cookie": None, "User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36" } self.base_url = "http://www.netbian.com" self.start_url = "http://www.netbian.com/" self.base_path = "G:/crawler/videos/" # 获取cookie def set_cookie(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["Cookie"] = '; '.join(cookie_list) self.headers["User-Agent"] = choice(self.user_agent_list) # 创建目录 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 获取所有的壁纸分类 def get_category_list(self): # 设置cookie self.set_cookie() category_list = [] response = requests.get(self.start_url, headers=self.headers) # 获取网页编码 response.encoding = response.apparent_encoding if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select('#header > div.head > ul > li:nth-child(1) > div > a') for a in a_list: if a.text == "4k壁纸": pass else: category_list.append({"title": a.text, "url": self.base_url + a.attrs["href"]}) for index, category in enumerate(category_list): print(index, '---', category["title"]) category_index = int(input("请选择你要下载的图片分类: ")) category_url = category_list[category_index]["url"] self.base_path = self.base_path + category_list[category_index]["title"] + '/' self.create_folder(self.base_path) self.get_image_list(category_url) else: time.sleep(60) self.set_cookie() self.get_category_list() # 获取图片列表 def get_image_list(self, url): response = requests.get(url=url, headers=self.headers) # 获取网页编码 response.encoding = response.apparent_encoding if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select(".list > ul > li > a") prev = soup.select(".prev")[-1] for a in a_list: link = self.base_url + a.attrs["href"] self.get_wallpaper(link) if "下一页" in prev.string: next_url = self.base_url + prev.attrs["href"] self.get_image_list(next_url) else: pass else: time.sleep(60) self.set_cookie() self.get_image_list(url) # 获取壁纸链接 def get_wallpaper(self, url): response = requests.get(url=url, headers=self.headers) # 获取网页编码 response.encoding = response.apparent_encoding if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') wallpaper_url = soup.select_one("#main > div.endpage > div > p > a > img").attrs["src"] self.download_image(wallpaper_url) else: time.sleep(60) self.set_cookie() self.get_wallpaper(url) # 下载壁纸 def download_image(self, url): print("正在下载图片:", url) start = time.time() # 下载开始时间 file_name = str(url).split('/')[-1] file_path = self.base_path + file_name # 请求下载 response = requests.get(url, timeout=10) with open(file_path, mode='wb') as f: f.write(response.content) end = time.time() # 下载结束时间 print('下载完成!耗时: %.2f秒' % (end - start)) # 输出下载用时时间 if __name__ == '__main__': bian = BianWallpaper() bian.get_category_list()
4、斗图下载
(1) 基本使用

import os import time import requests from bs4 import BeautifulSoup class DouTu(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/斗图/" self.base_url = "https://www.doutula.com" self.headers = { "Cookie": "_agep=1617332993; _agfp=394e543fd58b15645b20d85328c7c13a; _agtk=ea484a0f61d912d369bd5765299b9702; Hm_lvt_2fc12699c699441729d4b335ce117f40=1617332992,1618563143; Hm_lpvt_2fc12699c699441729d4b335ce117f40=1618563729", "Host": "www.doutula.com", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36", } self.create_folder(self.base_path) # 创建目录 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 设置cookies def set_cookie(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["Cookie"] = '; '.join(cookie_list) # 生成链接列表 def get_url_list(self, number): for i in range(1, number): yield f"https://www.doutula.com/photo/list/?page={i}" # 获取图片列表 def get_images(self, url): try: response = requests.get(url=url, headers=self.headers) print(response.status_code) if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select(".list-group-item > div > div > a") for a in a_list: img = a.select('img')[-1] self.download_image(img.attrs["data-backup"]) else: time.sleep(60) self.set_cookie() self.get_images(url) except Exception as e: time.sleep(60) self.set_cookie() self.get_images(url) # 下载图片 def download_image(self, url): print("正在下载图片:", url) start = time.time() # 下载开始时间 file_path = self.base_path + str(url).split('/')[-1] response = requests.get(url, timeout=10) with open(file_path, mode='wb') as f: f.write(response.content) end = time.time() # 下载结束时间 print('下载完成!耗时: %.2f秒' % (end - start)) # 输出下载用时时间 if __name__ == '__main__': dt = DouTu() number = int(input("请输入爬取的页数: ")) url_list = dt.get_url_list(number) for url in url_list: dt.get_images(url)
(2) 线程池下载

import os import time import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, wait # 创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5) class DouTu(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/斗图/" self.base_url = "https://www.doutula.com" self.headers = { "Cookie": "_agep=1617332993; _agfp=394e543fd58b15645b20d85328c7c13a; _agtk=ea484a0f61d912d369bd5765299b9702; Hm_lvt_2fc12699c699441729d4b335ce117f40=1617332992,1618563143; Hm_lpvt_2fc12699c699441729d4b335ce117f40=1618563729", "Host": "www.doutula.com", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36", } self.create_folder(self.base_path) # 创建目录 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass # 设置cookies def set_cookie(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["Cookie"] = '; '.join(cookie_list) # 生成链接列表 def get_url_list(self, number): for i in range(1, number): yield f"https://www.doutula.com/photo/list/?page={i}" # 获取图片列表 def get_images(self, url): try: response = requests.get(url=url, headers=self.headers) if response.status_code == 200: soup = BeautifulSoup(response.text, 'lxml') a_list = soup.select(".list-group-item > div > div > a") tasks = [pool.submit(self.download_image, a.select('img')[-1].attrs["data-backup"]) for a in a_list] wait(tasks) else: time.sleep(60) self.set_cookie() self.get_images(url) except Exception as e: time.sleep(60) self.set_cookie() self.get_images(url) # 下载图片 def download_image(self, url): print(url) file_path = self.base_path + str(url).split('/')[-1] response = requests.get(url, timeout=10) with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': dt = DouTu() number = int(input("请输入爬取的页数: ")) url_list = dt.get_url_list(number) for url in url_list: dt.get_images(url)
5、英雄联盟英雄皮肤下载
(1) 基本使用

import os import requests class LOL(object): def __init__(self): self.base_path = "C:/Users/admin/Desktop/images/英雄联盟/" self.hero_list_url = "http://game.gtimg.cn/images/lol/act/img/js/heroList/hero_list.js" self.hero_url = "https://game.gtimg.cn/images/lol/act/img/js/hero/{}.js" # 创建目录 def create_folder(self, path): if not os.path.exists(path): os.makedirs(path) else: pass def get_hero_list(self): response = requests.get(url=self.hero_list_url) res_json = response.json() hero_list = res_json.get("hero") for hero in hero_list[27:]: heroId = hero.get("heroId") self.get_hero(heroId) print("----------------------------------------------------------------------------------------------") def get_hero(self, heroId): response = requests.get(url=self.hero_url.format(heroId)) json_data = response.json() # 英雄基本信息 hero = json_data.get("hero") heroId = hero["heroId"] name = hero["name"] alias = hero["alias"] title = hero["title"] shortBio = hero["shortBio"] print(f"{heroId} - {name} - {alias} - {title} - {shortBio}") download_path = self.base_path + name self.create_folder(download_path) # 英雄皮肤信息 skins = json_data.get("skins") for skin in skins: skin_name = skin["name"] skin_image = skin["mainImg"] description = skin["description"] if skin_image: print("\t", f"{skin_name} - {skin_image} - {description}") self.download_image(skin_image, download_path, skin_name) # 下载图片 def download_image(self, url, path, filename): filename = filename.replace('/','') response = requests.get(url) file_path = f"{path}/{filename}.jpg" with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': lol = LOL() lol.get_hero_list()
下载视频相关
1、抖音
(1) 根据用户主页下载用户所有视频(无水印)

import os import time from urllib import parse from concurrent.futures import ThreadPoolExecutor, wait import requests # 创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5) class DouYin(object): def __init__(self): # 默认请求头 self.headers = { "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1" } # 获取用户个人信息接口 self.user_info_url = "https://www.iesdouyin.com/web/api/v2/user/info/" # 获取用户作品接口 self.video_list_url = "https://www.iesdouyin.com/web/api/v2/aweme/post/" # 获取用户喜欢作品列表 self.like_list_url = "https://www.iesdouyin.com/web/api/v2/aweme/like/" # 获取用户挑战 self.challenge_list_url = 'https://www.iesdouyin.com/web/api/v2/challenge/aweme/' # 签名 self.sign = "mIrLFQAA-OUypYuVfKXa6ZiKyw" # 定义存储路径 self.base_path = "G:/crawler/videos/" # 创建目录 def create_folder(self): if not os.path.exists(self.base_path): os.makedirs(self.base_path) else: pass # 获取跳转链接 def get_link(self, url): response = requests.get(url=url, headers=self.headers, allow_redirects=False) location = response.headers["location"] params = { "sec_uid": dict(parse.parse_qsl(parse.urlsplit(location).query))["sec_uid"] } self.get_user_info(params) # 获取用户信息 def get_user_info(self, params): response = requests.get(url=self.user_info_url, params=params) response_data = response.json() user_info = response_data["user_info"] print("用户抖音号:", user_info["unique_id"]) print("用户排名ID:", user_info["short_id"]) print("用户昵称:", user_info["nickname"]) print("用户签名:", user_info["signature"]) print("用户获赞数:", user_info["total_favorited"]) print("用户粉丝数:", user_info["follower_count"]) print("用户作品数:", user_info["aweme_count"]) self.base_path = self.base_path + user_info["unique_id"] + "/" self.create_folder() print("开始获取用户作品....") self.get_aweme_list(sec_uid=params["sec_uid"]) # print("开始获取用户喜欢作品....") # self.get_like_aweme_list(sec_uid=params["sec_uid"]) # print("开始获取用户挑战作品....") # self.get_challenge_aweme_list(ch_id=params["sec_uid"]) # 获取用户作品列表 def get_aweme_list(self, sec_uid, count=10, max_cursor=0, aid=1128, dytk=""): params = { "sec_uid": sec_uid, "count": count, "max_cursor": max_cursor, "aid": aid, "_signature": self.sign, "dytk": dytk, } response = requests.get(url=self.video_list_url, params=params) response_data = response.json() # 多线程调用下载函数 tasks = [pool.submit(self.download_video, aweme) for aweme in response_data["aweme_list"]] wait(tasks) # 获取下一页链接 next_page = response_data["max_cursor"] if next_page: # 获取下一页链接 self.get_aweme_list(sec_uid=sec_uid, max_cursor=next_page) else: print("下载完成.....") # 获取用户喜欢作品列表(暂不支持) def get_like_aweme_list(self, sec_uid, count=10, max_cursor=0, aid=1128, dytk=""): params = { "sec_uid": sec_uid, "count": count, "max_cursor": max_cursor, "aid": aid, "_signature": self.sign, "dytk": dytk, } response = requests.get(url=self.like_list_url, params=params) response_data = response.json() if len(response_data["aweme_list"]) <= 0: print("当前用户没有喜欢的作品,或不能查看.....") else: # 多线程调用下载函数 tasks = [pool.submit(self.download_video, aweme, type='/likes') for aweme in response_data["aweme_list"]] wait(tasks) # 获取下一页链接 next_page = response_data["max_cursor"] if next_page: # 获取下一页链接 self.get_aweme_list(sec_uid=sec_uid, max_cursor=next_page) else: print("下载完成.....") # 获取用户挑战作品列表 def get_challenge_aweme_list(self, ch_id, count=10, cursor=0, aid=1128, screen_limit=3, download_click_limit=0): params = { "ch_id": ch_id, "count": count, "cursor": cursor, "aid": aid, "screen_limit": screen_limit, "download_click_limit": download_click_limit, "_signature": self.sign } response = requests.get(url=self.challenge_list_url, params=params) response_data = response.json() while response_data["has_more"]: for aweme in response_data["aweme_list"]: try: print("作品ID: ", aweme["aweme_id"]) print("分组ID: ", aweme["group_id"]) print("作品简介: ", aweme["desc"]) print("作者昵称: ", aweme["author"]["nickname"]) print("作者签名: ", aweme["author"]["signature"]) print("音频链接: ", aweme["music"]["play_url"]["uri"]) print("视频链接: ", aweme["video"]["play_addr"]["url_list"][0]) print("\n") except: pass cursor += count self.get_challenge_aweme_list(ch_id=ch_id, cursor=cursor) # 多线程下载视频 def download_video(self, aweme): url = aweme["video"]["play_addr_lowbr"]["url_list"][0] video_name = aweme["aweme_id"] response = requests.get(url) file_path = self.base_path + str(video_name) + '.mp4' with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == '__main__': url = input("请输入你要抓取的链接如: https://v.douyin.com/xxxxxxx/ \n : ") douyin = DouYin() douyin.get_link(url=url)
(2) 抖音视频解析

import re import requests class VideoParsing(object): def __init__(self): self.headers = { 'User-Agent': 'mozilla/5.0 (iphone; cpu iphone os 14_4 like mac os x) applewebkit/605.1.15 (khtml, like gecko) version/14.0.3 mobile/15e148 safari/604.1' } # 专门负责请求 def get_response(self, url): try: response = requests.get(url=url, headers=self.headers, timeout=5) if response.status_code == 200: return response except Exception as e: print(e) for i in range(1, 10): print(f'请求{url}超时,第{i}次重复请求') response = requests.get(url, headers=self.headers, timeout=5) if response.status_code == 200: return response def parsing(self, video_url_share): # 匹配链接 video_url_share = re.findall('https.*/', video_url_share)[0] # 获取跳转之后的链接 video_url_redirect = self.get_response(video_url_share).url # 获取视频id video_id = re.findall(r'video/(\d+)/', str(video_url_redirect))[0] # 获取视频API链接 video_url_api = f'https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={video_id}' # 获取json数据 video_url_json = self.get_response(video_url_api).json() # 获取视频链接 video_url = video_url_json.get('item_list')[0].get('video').get('play_addr').get('url_list')[0] # 更换参数 video_url = video_url.replace('playwm', 'play').replace('&ratio=720p', '') # 获取链接 video_url_web = self.get_response(video_url).url.replace('http:', 'https:') # 生成视频名称 video_name = f'douyin{video_id}.mp4' print(video_name, video_url_web) if __name__ == '__main__': """ https://v.douyin.com/eB8SLLk/ """ video_parsing = VideoParsing() while True: video_url_share = input("请输入要解析的链接\n:") video_parsing.parsing(video_url_share)
(3) 抖音排行

import requests class DouYin(object): def __init__(self): # 热门搜索 self.HOT_SEARCH_URL = 'https://aweme.snssdk.com/aweme/v1/hot/search/list/' # 热门明星 self.HOT_STAR_URL = 'https://aweme.snssdk.com/aweme/v1/hotsearch/star/billboard/' # 热门 self.HOT_LIVE_URL = 'https://webcast.amemv.com/webcast/ranklist/hot/' self.BRAND_CATEGORY_URL = 'https://aweme.snssdk.com/aweme/v1/hotsearch/brand/category/' self.HOT_BRAND_URL = 'https://aweme.snssdk.com/aweme/v1/hotsearch/brand/billboard/' self.HOT_MUSIC_URL = 'https://aweme.snssdk.com/aweme/v1/chart/music/list/' self.HEADERS = { 'user-agent': 'okhttp3' } self.QUERIES = { 'device_platform': 'android', 'version_name': '13.2.0', 'version_code': '130200', 'aid': '1128' } def get_hot_search(self): """ 热点 """ response = requests.get(self.HOT_SEARCH_URL, params=self.QUERIES) response_data = response.json() items = [item for item in response_data["data"]["word_list"]] return items def get_hot_star(self): """ 明星 """ response = requests.get(self.HOT_STAR_URL, params=self.QUERIES) response_data = response.json() items = [item for item in response_data["user_list"]] return items def get_hot_live(self): """ 直播 """ response = requests.get(self.HOT_LIVE_URL, params=self.QUERIES) response_data = response.json() items = [item for item in response_data["data"]["ranks"]] return items def get_brand_category(self): """ 品牌分类 """ response = requests.get(self.BRAND_CATEGORY_URL, params=self.QUERIES) response_data = response.json() items = [item for item in response_data["category_list"]] return items def get_hot_brand(self, category: int): """ 品牌榜 """ params = self.QUERIES.copy() params.update({'category_id': str(category)}) response = requests.get(self.HOT_BRAND_URL, params=params) response_data = response.json() items = [item for item in response_data["brand_list"]] return items def get_hot_music(self): """ 音乐 """ params = self.QUERIES.copy() params.update({'chart_id': '6853972723954146568', 'count': '100'}) response = requests.get(self.HOT_MUSIC_URL, params=params) response_data = response.json() items = [item for item in response_data["music_list"]] return items def run(): douyin = DouYin() # # 调用热点函数 # items = douyin.get_hot_search() # for item in items: # print(item) # # 获取热点明星 # items = douyin.get_hot_star() # for item in items: # print(item) # # 获取热点直播 # items = douyin.get_hot_live() # for item in items: # print(item) # # 获取热点分类 # items = douyin.get_brand_category() # for item in items: # print(item) # # 获取热点品牌榜 # items = douyin.get_hot_brand(1) # for item in items: # print(item) # 获取热点音乐 items = douyin.get_hot_music() for item in items: print("歌曲ID: ", item["music_info"]["id"]) print("歌曲名称: ", item["music_info"]["title"]) print("歌曲作者: ", item["music_info"]["author"]) print("歌曲链接: ", item["music_info"]["play_url"]["uri"]) print('\n') if __name__ == '__main__': run()
2、A站视频下载(四种清晰度)
(1) 单个视频下载

import re from uuid import uuid4 import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, wait # 创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5) # AcFun弹幕网 class SpiderAcfunVideo(object): def __init__(self): self.headers = { "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25" } self.video_url = "https://api-new.acfunchina.com/rest/app/play/playInfo/mp4?videoId={}&resourceId={}&resourceType=2&mkey=AAHewK3eIAAyMjAzNTI2NDMAAhAAMEP1uwS3Vi7NYAAAAJumF4MyTTFh5HGoyjW6ZpdjKymALUy9jZbsMTBVx-F10EhxyvpMtGQbBCYipvkMShM3iMNwbMd9DM6r2rnOYRVEdr6MaJS4yxxlA_Sl3JNWup57qBCQzOSC7SZnbEsHTQ%3D%3D&market=xiaomi&product=ACFUN_APP&sys_version=10&app_version=6.20.0.915&boardPlatform=sdm845&sys_name=android&socName=UNKNOWN&appMode=0" self.base_path = "C:/Users/admin/Desktop/videos/" def get_videos_msg(self, page_url): response = requests.get(url=page_url, headers=self.headers) soup = BeautifulSoup(response.text, 'lxml') # print(soup) # 提取视频标题 title = soup.find("title").string # 提取视频ID video_id = re.search(r"(?<=\"vid\":\")\d+(?=\",)", response.text).group() # 提取视频资源ID resource_id = re.search(r"(?<=\"ac\":\")\d+(?=\",)", response.text).group() self.get_video_url(title, video_id, resource_id) # 获取视频链接 def get_video_url(self, title, video_id, resource_id): url = self.video_url.format(video_id, resource_id) response = requests.get(url=url, headers=self.headers) streams = response.json()["playInfo"]["streams"] print("视频标题: ", title) tasks = [pool.submit(self.download_video, stream["playUrls"][0]) for stream in streams] wait(tasks) # 下载视频 def download_video(self, url): video_name = str(uuid4()) response = requests.get(url) file_path = self.base_path + str(video_name) + '.mp4' with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == "__main__": while True: # https://www.acfun.cn/v/ac16986343 page_url = input("请输入视频地址>>>: ") if page_url == 'q': print("退出...") break video = SpiderAcfunVideo() video.get_videos_msg(page_url=page_url)
(2) 搜索批量下载

import re from uuid import uuid4 import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, wait # 创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5) # AcFun弹幕网 class SpiderAcfunVideo(object): def __init__(self): self.headers = { "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25", "cookie": None } self.search_url = "https://www.acfun.cn/search" self.base_url = "https://www.acfun.cn/" self.video_url = "https://api-new.acfunchina.com/rest/app/play/playInfo/mp4?videoId={}&resourceId={}&resourceType=2&mkey=AAHewK3eIAAyMjAzNTI2NDMAAhAAMEP1uwS3Vi7NYAAAAJumF4MyTTFh5HGoyjW6ZpdjKymALUy9jZbsMTBVx-F10EhxyvpMtGQbBCYipvkMShM3iMNwbMd9DM6r2rnOYRVEdr6MaJS4yxxlA_Sl3JNWup57qBCQzOSC7SZnbEsHTQ%3D%3D&market=xiaomi&product=ACFUN_APP&sys_version=10&app_version=6.20.0.915&boardPlatform=sdm845&sys_name=android&socName=UNKNOWN&appMode=0" self.base_path = "G:/crawler/videos/" # 获取cookie def get_cookies(self): response = requests.get(url=self.base_url, headers=self.headers) cookie_list = [] for key, value in response.cookies.items(): cookie_list.append(key + "=" + value) self.headers["cookie"] = '; '.join(cookie_list) def search_videos(self, keyword, type='video'): params = { "type": type, "keyword": keyword } print(self.headers) response = requests.get(url=self.search_url, params=params, headers=self.headers) response_text = str(response.text).replace('\\', '') pattern = re.compile(r'<a href="/v(.*?)"') link_list = pattern.findall(response_text) for link in link_list: page_url = self.base_url + "v" + link self.get_videos_msg(page_url) def get_videos_msg(self, page_url): response = requests.get(url=page_url, headers=self.headers) soup = BeautifulSoup(response.text, 'lxml') # print(soup) # 提取视频标题 title = soup.find("title").string # 提取视频ID video_id = re.search(r"(?<=\"vid\":\")\d+(?=\",)", response.text).group() # 提取视频资源ID resource_id = re.search(r"(?<=\"ac\":\")\d+(?=\",)", response.text).group() self.get_video_url(title, video_id, resource_id) # 获取视频链接 def get_video_url(self, title, video_id, resource_id): url = self.video_url.format(video_id, resource_id) response = requests.get(url=url, headers=self.headers) streams = response.json()["playInfo"]["streams"] print("视频标题: ", title) tasks = [pool.submit(self.download_video, stream["playUrls"][0]) for stream in streams] wait(tasks) # 下载视频 def download_video(self, url): video_name = str(uuid4()) response = requests.get(url) file_path = self.base_path + str(video_name) + '.mp4' with open(file_path, mode='wb') as f: f.write(response.content) if __name__ == "__main__": while True: # https://www.acfun.cn/v/ac16986343 keyword = input("请输入关键字>>>: ") if keyword == 'q': print("退出...") break video = SpiderAcfunVideo() video.get_cookies() video.search_videos(keyword=keyword)