python 下載文件的幾種方式


1 、一般同步下載

示例代碼:

import requests
import os

def downlaod(url, file_path):
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:68.0) Gecko/20100101 Firefox/68.0"
    }
    r = requests.get(url=url, headers=headers)
    with open(file_path, "wb") as f:
        f.write(r.content)
        f.flush()

2、 使用流式請求,requests.get方法的stream

默認情況下是stream的值為false,它會立即開始下載文件並存放到內存當中,倘若文件過大就會導致內存不足的情況,程序就會報錯。
當把get函數的stream參數設置成True時,它不會立即開始下載,當你使用iter_content或iter_lines遍歷內容或訪問內容屬性時才開始下載,需要注意一點:文件沒有下載之前,它也需要保持連接。

iter_content:一塊一塊的遍歷要下載的內容
iter_lines:一行一行的遍歷要下載的內容

使用上面兩個函數下載大文件可以防止占用過多的內存,因為每次只下載小部分數據。

示例代碼:

3 、異步下載文件

由於request的請求是阻塞式的,所以要用aiohttp模塊來發起請求。

示例代碼:

import aiohttp
import asyncio
import os


async def handler(url, file_path):
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:68.0) Gecko/20100101 Firefox/68.0"
    }
    async with aiohttp.ClientSession() as session:
        r = await session.get(url=url, headers=headers)
        with open(file_path, "wb") as f:
            f.write(await r.read())
            f.flush()
            os.fsync(f.fileno())


loop = asyncio.get_event_loop()
loop.run_until_complete(handler(url, file_path))

4、 異步拆分下載文件

上面用的是一個協程下載一個文件,下面的方法是將文件分成幾部分,每個部分用一個協程下載,最后再寫入文件。

下面這個例子用的是流式寫入,即把內容寫入到磁盤里面。

import aiohttp
import asyncio
import time
import os


async def consumer(queue):
    option = await queue.get()
    start = option["start"]
    end = option["end"]
    url = option["url"]
    filename = option["filename"]
    i = option["i"]

    print(f"第{i}個任務開始運行")
    async with aiohttp.ClientSession() as session:
        headers = {"Range": f"bytes={start}-{end}"}
        r = await session.get(url=url, headers=headers)
        with open(filename, "rb+") as f:
            f.seek(start)
            while True:
                chunk = await r.content.read(end - start)
                if not chunk:
                    break
                f.write(chunk)
                f.flush()
                os.fsync(f.fileno())
                print(f"第{i}個任務正在寫入中ing")
        queue.task_done()
        print(f"第{i}個任務寫入成功")


async def producer(url, headers, filename, queue, coro_num):
    async with aiohttp.ClientSession() as session:
        resp = await session.head(url=url, headers=headers)
        file_size = int(resp.headers["content-length"])
        # 創建一個文件
        with open(filename, "wb") as f:
            pass
        part = file_size // coro_num
        for i in range(coro_num):
            start = part * i
            if i == coro_num - 1:
                end = file_size
            else:
                end = start + part
            info = {
                "start": start,
                "end": end,
                "url": url,
                "filename": filename,
                "i": i,
            }
            queue.put_nowait(info)


async def main():
    # 需要填的有url,filename,coro_num
    url = ""
    filename = ""
    coro_num = 0
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:68.0) Gecko/20100101 Firefox/68.0"
    }
    queue = asyncio.Queue(coro_num)
    await producer(url, headers, filename, queue, coro_num)
    task_list = []
    for i in range(coro_num):
        task = asyncio.create_task(consumer(queue))
        task_list.append(task)
    await queue.join()
    for i in task_list:
        i.cancel()
    await asyncio.gather(*task_list)


startt = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time() - startt
print(f"用了{end}秒")

5、注意

以上的示例都是介紹思路,程序並不健壯,健壯的程序需要加入錯誤捕獲和錯誤處理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM