python批量拷貝文件


普通批量拷貝文件

import os
import shutil
import logging
from logging import handlers
from colorama import Fore, Style, init

import sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)  # 加入環境變量
from utils.time_utils import run_time
from conf import settings


class Colorlog(object):
    """
    記錄日志,添加顏色
    """
    init(autoreset=True)  # 初始化,並且設置顏色設置自動恢復

    # 根據信息不同設置不同的顏色格式
    info_color = Fore.GREEN + Style.BRIGHT
    warn_color = Fore.YELLOW + Style.BRIGHT
    debug_color = Fore.MAGENTA + Style.BRIGHT
    error_color = Fore.RED + Style.BRIGHT

    def __init__(self, name):
        # 日志格式
        log_format = '[%(asctime)s - %(levelname)s - %(name)s  ] %(message)s '
        self.logger = logging.getLogger(name)
        self.logger.setLevel(settings.LOG_LEVEL)

        console_handler = logging.StreamHandler()
        # 文件絕對路徑
        logfile_path = os.path.join(settings.LOG_DIR, "log", settings.LOG_FILE)
        if not os.path.exists(logfile_path):
            # 創建log目錄
            os.mkdir(os.path.join(settings.LOG_DIR, "log"))
        # 每天創建一個日志文件,文件數不超過20個
        file_handler = handlers.TimedRotatingFileHandler(
            logfile_path, when="D", interval=1, backupCount=20)

        self.logger.addHandler(console_handler)
        self.logger.addHandler(file_handler)

        file_format = logging.Formatter(fmt=log_format)
        console_format = logging.Formatter(
            fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ')

        console_handler.setFormatter(console_format)
        file_handler.setFormatter(file_format)

    def warn(self, message):
        self.logger.warning(Colorlog.warn_color + message)

    def info(self, message):
        self.logger.info(Colorlog.info_color + message)

    def error(self, message):
        self.logger.error(Colorlog.info_color + message)

    def debug(self, message):
        self.logger.debug(Colorlog.info_color + message)


cp_log = Colorlog("cp")


def copy_file(local_file_path, dst_file_path):
    size = bytes2human(os.path.getsize(local_file_path))
    # cp_log.debug(
    #     'copy file {} to {}, file size {}'.format(
    #         local_file_path, dst_file_path, size))
    shutil.copy(local_file_path, dst_file_path)  # copy file


@run_time
def upload_file(src_path, dst_path):
    """
    上傳文件
    :param src_path:
    :param dst_path:
    :return:
    """
    cp_log.info('upload_file %s   %s' % (src_path, dst_path))
    # 目標目錄是否存在,不存在則創建
    if not os.path.exists(dst_path):
        os.makedirs(dst_path)
        cp_log.info('Create Dest Dir %s' % dst_path)

    # 判斷是否為目錄,存在則把文件拷貝到目標目錄下
    if os.path.isdir(src_path):
        all_file_nums = 0
        for root, dirs, files in os.walk(src_path):
            # 遍歷目錄下所有文件根,目錄下的每一個文件夾(包含它自己),
            # 產生3-元組 (dirpath, dirnames, filenames)【文件夾路徑, 文件夾名字, 文件名稱】
            for f in files:
                local_file_path = os.path.join(root, f)  # 本地文件路徑 如/src/q.txt
                dst_file_path = os.path.abspath(
                    local_file_path.replace(
                        src_path, dst_path))  # 目標文件路徑 如/dst/q.txt
                dst_dir = os.path.dirname(dst_file_path)  # 目標文件路徑文件夾 如/dst/
                if not os.path.isdir(dst_dir):
                    os.makedirs(dst_dir)  # 創建目錄
                    cp_log.debug('Create Dest Dir %s' % dst_path)

                copy_file(local_file_path, dst_file_path)  # 拷貝文件
                cp_log.info('copy file {} complete '.format(local_file_path))
                all_file_nums += 1

        cp_log.info(
            'copy all files complete , files count = {}'.format(all_file_nums))
    else:
        cp_log.warn('Dir is not exists %s' % dst_path)


def bytes2human(n):
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for i, s in enumerate(symbols):
        # << 左移” 左移一位表示乘2 即1 << 1=2,二位就表示4 即1 << 2=4,
        # 10位就表示1024 即1 << 10=1024 就是2的n次方
        prefix[s] = 1 << (i + 1) * 10
    for s in reversed(symbols):
        if n >= prefix[s]:
            value = float(n) / prefix[s]
            return '%.1f%s' % (value, s)
    return "%sBytes" % n


if __name__ == '__main__':
    src = 'D://test1'
    dst = 'D://copytest2'
    upload_file(src, dst)

 

輸出結果 

[2018-06-29 15:14:04  - INFO - cp  ] upload_file D://test1   D://copytest2 
[2018-06-29 15:14:04  - INFO - cp  ] Create Dest Dir D://copytest2 
[2018-06-29 15:14:04  - DEBUG - cp  ] Create Dest Dir D://copytest2 
[2018-06-29 15:14:04  - INFO - cp  ] copy file D://test1\20180601\20180601_test.txt complete  
[2018-06-29 15:14:04  - DEBUG - cp  ] Create Dest Dir D://copytest2 
[2018-06-29 15:14:19  - INFO - cp  ] copy file D://test1\20180601\wmv\01文件操作和異常.wmv.pbb complete  
[2018-06-29 15:14:19  - DEBUG - cp  ] Create Dest Dir D://copytest2 
[2018-06-29 15:14:19  - INFO - cp  ] copy file D://test1\20180602\20180602_test.txt complete 
……
[2018-06-29 15:16:20  - INFO - cp  ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\nobatch complete  
[2018-06-29 15:16:20  - INFO - cp  ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\segdemo complete  
[2018-06-29 15:16:20  - INFO - cp  ] copy all files complete , files count = 164 
[2018-06-29 15:16:20  - DEBUG - runtime - time_utils.py - decor- 59 ] func {upload_file} run {  135.2727}s  

  

使用多線程批量拷貝文件

#!/usr/bin/python
# -*- coding: utf-8 -*-
# @Time    : 2018/6/29 10:28
# @Author  : hyang
# @File    : batch_copy.py
# @Software: PyCharm

import os
import shutil
import logging
from logging import handlers
from colorama import Fore, Style, init
from multiprocessing.dummy import Pool as ThreadPool
import queue

import sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)  # 加入環境變量
from utils.time_utils import run_time
from conf import settings


class Colorlog(object):
    """
    記錄日志,添加顏色
    """
    init(autoreset=True)  # 初始化,並且設置顏色設置自動恢復

    # 根據信息不同設置不同的顏色格式
    info_color = Fore.GREEN + Style.BRIGHT
    warn_color = Fore.YELLOW + Style.BRIGHT
    debug_color = Fore.MAGENTA + Style.BRIGHT
    error_color = Fore.RED + Style.BRIGHT

    def __init__(self, name):
        # 日志格式
        log_format = '[%(asctime)s - %(levelname)s - %(name)s  ] %(message)s '
        self.logger = logging.getLogger(name)
        self.logger.setLevel(settings.LOG_LEVEL)

        console_handler = logging.StreamHandler()
        # 文件絕對路徑
        logfile_path = os.path.join(settings.LOG_DIR, "log", settings.LOG_FILE)
        if not os.path.exists(logfile_path):
            # 創建log目錄
            os.mkdir(os.path.join(settings.LOG_DIR, "log"))
        # 每天創建一個日志文件,文件數不超過20個
        file_handler = handlers.TimedRotatingFileHandler(
            logfile_path, when="D", interval=1, backupCount=20)

        self.logger.addHandler(console_handler)
        self.logger.addHandler(file_handler)

        file_format = logging.Formatter(fmt=log_format)
        console_format = logging.Formatter(
            fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ')

        console_handler.setFormatter(console_format)
        file_handler.setFormatter(file_format)

    def warn(self, message):
        self.logger.warning(Colorlog.warn_color + message)

    def info(self, message):
        self.logger.info(Colorlog.info_color + message)

    def error(self, message):
        self.logger.error(Colorlog.info_color + message)

    def debug(self, message):
        self.logger.debug(Colorlog.info_color + message)


cp_log = Colorlog("cp")


def copy_file(local_file_path, dst_file_path, q):
    size = bytes2human(os.path.getsize(local_file_path))
    # cp_log.debug(
    #     'copy file {} to {}, file size {}'.format(
    #         local_file_path, dst_file_path, size))
    shutil.copy(local_file_path, dst_file_path)  # copy file
    q.put(local_file_path)  # 加入隊列


@run_time
def upload_file(src_path, dst_path):
    """
    上傳文件
    :param src_path:
    :param dst_path:
    :return:
    """
    pool = ThreadPool(3)  # 開啟3個線程
    q = queue.Queue()  # 開啟一個隊列
    cp_log.info('upload_file %s   %s' % (src_path, dst_path))
    # 目標目錄是否存在,不存在則創建
    if not os.path.exists(dst_path):
        os.makedirs(dst_path)
        cp_log.info('Create Dest Dir %s' % dst_path)

    # 判斷是否為目錄,存在則把文件拷貝到目標目錄下
    if os.path.isdir(src_path):
        all_file_nums = 0
        for root, dirs, files in os.walk(src_path):
            # 遍歷目錄下所有文件根,目錄下的每一個文件夾(包含它自己),
            # 產生3-元組 (dirpath, dirnames, filenames)【文件夾路徑, 文件夾名字, 文件名稱】
            for f in files:
                all_file_nums += 1
                local_file_path = os.path.join(root, f)  # 本地文件路徑 如/src/q.txt
                dst_file_path = os.path.abspath(
                    local_file_path.replace(
                        src_path, dst_path))  # 目標文件路徑 如/dst/q.txt
                dst_dir = os.path.dirname(dst_file_path)  # 目標文件路徑文件夾 如/dst/
                if not os.path.isdir(dst_dir):
                    os.makedirs(dst_dir)  # 創建目錄
                    cp_log.debug('Create Dest Dir %s' % dst_path)
                pool.apply_async(
                    func=copy_file, args=(
                        local_file_path, dst_file_path, q))

        pool.close()  # close()執行后不會有新的進程加入到pool
        # pool.join()  # join函數等待所有子進程結束

        print('all_file_nums ', all_file_nums)
        num = 0
        while True:
            if not q.empty():
                item = q.get()
                cp_log.info('copy file {} complete '.format(item))
                num += 1
                copy_rate = float(num / all_file_nums) * 100
                cp_log.warn("\r 進度為:%.2f%%" % copy_rate)
                if int(copy_rate) >= 100:
                    break
        cp_log.info(
            'copy all files complete , files count = {}'.format(all_file_nums))
    else:
        cp_log.warn('Dir is not exists %s' % dst_path)


def bytes2human(n):
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for i, s in enumerate(symbols):
        # << 左移” 左移一位表示乘2 即1 << 1=2,二位就表示4 即1 << 2=4,
        # 10位就表示1024 即1 << 10=1024 就是2的n次方
        prefix[s] = 1 << (i + 1) * 10
    for s in reversed(symbols):
        if n >= prefix[s]:
            value = float(n) / prefix[s]
            return '%.1f%s' % (value, s)
    return "%sBytes" % n


if __name__ == '__main__':
    src = 'D://test1'
    dst = 'D://copy_thread_test2'
    upload_file(src, dst)

  輸出結果 

[2018-06-29 15:26:13  - INFO - cp  ] copy file D://test1\20180601\20180601_test.txt complete  
 進度為:0.61% 
[2018-06-29 15:26:13  - INFO - cp  ] copy file D://test1\20180602\20180602_test.txt complete  
 進度為:1.22% 
[2018-06-29 15:26:13  - INFO - cp  ] copy file D://test1\20180602\教程目錄及說明.txt complete  
 進度為:1.83% 
all_file_nums  164
[2018-06-29 15:26:15  - INFO - cp  ] copy file D://test1\20180602\MongoDB權威指南(中文版).pdf complete  
 進度為:2.44% 
[2018-06-29 15:26:15  - INFO - cp  ] copy file D://test1\ibooks\AIX_HACMP_40pages.pdf complete  
 進度為:3.05% 
……
[2018-06-29 15:29:02  - INFO - cp  ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\nobatch complete  
 進度為:99.39% 
[2018-06-29 15:29:02  - INFO - cp  ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\segdemo complete  
 進度為:100.00% 
[2018-06-29 15:29:02  - INFO - cp  ] copy all files complete , files count = 164 
[2018-06-29 15:29:02  - DEBUG - runtime - time_utils.py - decor- 59 ] func {upload_file} run {  168.7767}s  

使用協程批量拷貝文件

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


from gevent import monkey;monkey.patch_all()
import os
import shutil
import logging
import time
from functools import wraps
from logging import handlers
from colorama import Fore, Style, init
from multiprocessing.pool import ThreadPool
import queue
import gevent

import sys

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)  # 加入環境變量


class Colorlog(object):
    """
    記錄日志,添加顏色
    """
    init(autoreset=True)  # 初始化,並且設置顏色設置自動恢復

    # 根據信息不同設置不同的顏色格式
    info_color = Fore.GREEN + Style.BRIGHT
    warn_color = Fore.YELLOW + Style.BRIGHT
    debug_color = Fore.MAGENTA + Style.BRIGHT
    error_color = Fore.RED + Style.BRIGHT

    def __init__(self, name):
        # 日志格式
        log_format = '[%(asctime)s - %(levelname)s - %(name)s  ] %(message)s '
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.DEBUG)

        console_handler = logging.StreamHandler()
        # 文件絕對路徑
        logfile_path = 'test.log'

        # 每天創建一個日志文件,文件數不超過20個
        file_handler = handlers.TimedRotatingFileHandler(
            logfile_path, when="D", interval=1, backupCount=20)

        self.logger.addHandler(console_handler)
        self.logger.addHandler(file_handler)

        file_format = logging.Formatter(fmt=log_format)
        console_format = logging.Formatter(
            fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ')

        console_handler.setFormatter(console_format)
        file_handler.setFormatter(file_format)

    def warn(self, message):
        self.logger.warning(Colorlog.warn_color + message)

    def info(self, message):
        self.logger.info(Colorlog.info_color + message)

    def error(self, message):
        self.logger.error(Colorlog.info_color + message)

    def debug(self, message):
        self.logger.debug(Colorlog.info_color + message)


cp_log = Colorlog("cp")


def run_time(func):
    """
    計算程序運行時間的裝飾器
    :param func:
    :return:
    """

    @wraps(func)
    def decor(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        print("func {%s} run {%10.4f}s " % (func.__name__, (end - start)))
        return res

    return decor


def copy_file(local_file_path, dst_file_path):
    # size = bytes2human(os.path.getsize(local_file_path))
    # cp_log.debug(
    #     'copy file {} to {}, file size {}'.format(
    #         local_file_path, dst_file_path, size))
    shutil.copy(local_file_path, dst_file_path)  # copy file
    cp_log.info(
        'copy file {} ,  size= {} complete '.format(
            local_file_path, bytes2human(
                os.path.getsize(dst_file_path))))


def getdirsize(dir):
    """
    獲得文件夾中所有文件大小
    :param dir:
    :return:
    """
    size = 0
    for root, dirs, files in os.walk(dir):
        size += sum([os.path.getsize(os.path.join(root, name))
                     for name in files])
    return bytes2human(size)


@run_time
def upload_file(src_path, dst_path):
    """
    上傳文件
    :param src_path:
    :param dst_path:
    :return:
    """

    cp_log.info('upload_file %s   %s' % (src_path, dst_path))
    # 目標目錄是否存在,不存在則創建
    if not os.path.exists(dst_path):
        os.makedirs(dst_path)
        cp_log.info('Create Dest Dir %s' % dst_path)

    tasklist = []  # 任務列表
    # 判斷是否為目錄,存在則把文件拷貝到目標目錄下
    if os.path.isdir(src_path):
        all_file_nums = 0
        all_file_size = getdirsize(src_path)
        cp_log.info('all_file_size = %s' % all_file_size)
        for root, dirs, files in os.walk(src_path):
            # 遍歷目錄下所有文件根,目錄下的每一個文件夾(包含它自己),
            # 產生3-元組 (dirpath, dirnames, filenames)【文件夾路徑, 文件夾名字, 文件名稱】
            for f in files:
                all_file_nums += 1
                local_file_path = os.path.join(root, f)  # 本地文件路徑 如/src/q.txt
                dst_file_path = os.path.abspath(
                    local_file_path.replace(
                        src_path, dst_path))  # 目標文件路徑 如/dst/q.txt
                dst_dir = os.path.dirname(dst_file_path)  # 目標文件路徑文件夾 如/dst/
                if not os.path.isdir(dst_dir):
                    os.makedirs(dst_dir)  # 創建目錄
                    cp_log.debug('Create Dest Dir %s' % dst_dir)

                tasklist.append(
                    gevent.spawn(
                        copy_file,
                        local_file_path,
                        dst_file_path))  # 開啟協程

        gevent.joinall(tasklist)  # 阻塞等待所有操作都執行完畢

        print('all_file_nums ', all_file_nums)

        cp_log.info(
            'copy all files complete , files count = {} ,  size = {}'.format(all_file_nums, getdirsize(dst_path)))
    else:
        cp_log.warn('Dir is not exists %s' % dst_path)


def bytes2human(n):
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
    prefix = {}
    for i, s in enumerate(symbols):
        # << 左移” 左移一位表示乘2 即1 << 1=2,二位就表示4 即1 << 2=4,
        # 10位就表示1024 即1 << 10=1024 就是2的n次方
        prefix[s] = 1 << (i + 1) * 10
    for s in reversed(symbols):
        if n >= prefix[s]:
            value = float(n) / prefix[s]
            return '%.1f%s' % (value, s)
    return "%sB" % n


if __name__ == '__main__':
    src = 'C://pythonStudy/python爬蟲參考資料'
    dst = 'C://pythonStudy/copy_thread_test2'
    upload_file(src, dst)

 輸出結果 

"C:\Program Files\Python36\python.exe" batch_copy.py
[2018-06-29 22:50:22  - INFO - cp  ] upload_file C://pythonStudy/python爬蟲參考資料   C://pythonStudy/copy_thread_test2 
[2018-06-29 22:50:22  - INFO - cp  ] Create Dest Dir C://pythonStudy/copy_thread_test2 
[2018-06-29 22:50:22  - INFO - cp  ] all_file_size = 620.6M 
[2018-06-29 22:50:22  - DEBUG - cp  ] Create Dest Dir C:\pythonStudy\copy_thread_test2\python-scraping-master 
[2018-06-29 22:50:22  - DEBUG - cp  ] Create Dest Dir C:\pythonStudy\copy_thread_test2\python-scraping-master\chapter1 
[2018-06-29 22:50:22  - DEBUG - cp  ] Create Dest Dir C:\pythonStudy\copy_thread_test2\python-scraping-master\chapter10 
[2018-06-29 22:50:22  - DEBUG - cp  ] Create Dest Dir 

……
[2018-06-29 22:50:23  - INFO - cp  ] copy file C://pythonStudy/python爬蟲參考資料\python-scraping-master\chapter12\2-seleniumCookies.py ,  size= 528B complete  
[2018-06-29 22:50:23  - INFO - cp  ] copy file C://pythonStudy/python爬蟲參考資料\python-scraping-master\chapter12\3-honeypotDetection.py ,  size= 539B complete  
[2018-06-29 22:50:23  - INFO - cp  ] copy file 
[2018-06-29 22:50:24  - INFO - cp  ] copy file C://pythonStudy/python爬蟲參考資料\python-scraping-master\chapter9\5-BasicAuth.py ,  size= 229B complete  
all_file_nums  130
[2018-06-29 22:50:24  - INFO - cp  ] copy file C://pythonStudy/python爬蟲參考資料\python-scraping-master\files\test.csv ,  size= 114B complete  
func {upload_file} run {    1.2971}s 
[2018-06-29 22:50:24  - INFO - cp  ] copy all files complete , files count = 130 ,  size = 620.6M 

Process finished with exit code 0

  

 

工具文件

time_utils.py

def run_time(func):
    """
    計算程序運行時間的裝飾器
    :param func:
    :return:
    """
    @wraps(func)
    def decor(*args,**kwargs):
        start = time.time()
        res = func(*args,**kwargs)
        end = time.time()
        log.debug("func {%s} run {%10.4f}s " % (func.__name__,(end - start)))
        return res

    return decor

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM