普通批量拷貝文件
import os import shutil import logging from logging import handlers from colorama import Fore, Style, init import sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) # 加入環境變量 from utils.time_utils import run_time from conf import settings class Colorlog(object): """ 記錄日志,添加顏色 """ init(autoreset=True) # 初始化,並且設置顏色設置自動恢復 # 根據信息不同設置不同的顏色格式 info_color = Fore.GREEN + Style.BRIGHT warn_color = Fore.YELLOW + Style.BRIGHT debug_color = Fore.MAGENTA + Style.BRIGHT error_color = Fore.RED + Style.BRIGHT def __init__(self, name): # 日志格式 log_format = '[%(asctime)s - %(levelname)s - %(name)s ] %(message)s ' self.logger = logging.getLogger(name) self.logger.setLevel(settings.LOG_LEVEL) console_handler = logging.StreamHandler() # 文件絕對路徑 logfile_path = os.path.join(settings.LOG_DIR, "log", settings.LOG_FILE) if not os.path.exists(logfile_path): # 創建log目錄 os.mkdir(os.path.join(settings.LOG_DIR, "log")) # 每天創建一個日志文件,文件數不超過20個 file_handler = handlers.TimedRotatingFileHandler( logfile_path, when="D", interval=1, backupCount=20) self.logger.addHandler(console_handler) self.logger.addHandler(file_handler) file_format = logging.Formatter(fmt=log_format) console_format = logging.Formatter( fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ') console_handler.setFormatter(console_format) file_handler.setFormatter(file_format) def warn(self, message): self.logger.warning(Colorlog.warn_color + message) def info(self, message): self.logger.info(Colorlog.info_color + message) def error(self, message): self.logger.error(Colorlog.info_color + message) def debug(self, message): self.logger.debug(Colorlog.info_color + message) cp_log = Colorlog("cp") def copy_file(local_file_path, dst_file_path): size = bytes2human(os.path.getsize(local_file_path)) # cp_log.debug( # 'copy file {} to {}, file size {}'.format( # local_file_path, dst_file_path, size)) shutil.copy(local_file_path, dst_file_path) # copy file @run_time def upload_file(src_path, dst_path): """ 上傳文件 :param src_path: :param dst_path: :return: """ cp_log.info('upload_file %s %s' % (src_path, dst_path)) # 目標目錄是否存在,不存在則創建 if not os.path.exists(dst_path): os.makedirs(dst_path) cp_log.info('Create Dest Dir %s' % dst_path) # 判斷是否為目錄,存在則把文件拷貝到目標目錄下 if os.path.isdir(src_path): all_file_nums = 0 for root, dirs, files in os.walk(src_path): # 遍歷目錄下所有文件根,目錄下的每一個文件夾(包含它自己), # 產生3-元組 (dirpath, dirnames, filenames)【文件夾路徑, 文件夾名字, 文件名稱】 for f in files: local_file_path = os.path.join(root, f) # 本地文件路徑 如/src/q.txt dst_file_path = os.path.abspath( local_file_path.replace( src_path, dst_path)) # 目標文件路徑 如/dst/q.txt dst_dir = os.path.dirname(dst_file_path) # 目標文件路徑文件夾 如/dst/ if not os.path.isdir(dst_dir): os.makedirs(dst_dir) # 創建目錄 cp_log.debug('Create Dest Dir %s' % dst_path) copy_file(local_file_path, dst_file_path) # 拷貝文件 cp_log.info('copy file {} complete '.format(local_file_path)) all_file_nums += 1 cp_log.info( 'copy all files complete , files count = {}'.format(all_file_nums)) else: cp_log.warn('Dir is not exists %s' % dst_path) def bytes2human(n): symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') prefix = {} for i, s in enumerate(symbols): # << 左移” 左移一位表示乘2 即1 << 1=2,二位就表示4 即1 << 2=4, # 10位就表示1024 即1 << 10=1024 就是2的n次方 prefix[s] = 1 << (i + 1) * 10 for s in reversed(symbols): if n >= prefix[s]: value = float(n) / prefix[s] return '%.1f%s' % (value, s) return "%sBytes" % n if __name__ == '__main__': src = 'D://test1' dst = 'D://copytest2' upload_file(src, dst)
輸出結果
[2018-06-29 15:14:04 - INFO - cp ] upload_file D://test1 D://copytest2 [2018-06-29 15:14:04 - INFO - cp ] Create Dest Dir D://copytest2 [2018-06-29 15:14:04 - DEBUG - cp ] Create Dest Dir D://copytest2 [2018-06-29 15:14:04 - INFO - cp ] copy file D://test1\20180601\20180601_test.txt complete [2018-06-29 15:14:04 - DEBUG - cp ] Create Dest Dir D://copytest2 [2018-06-29 15:14:19 - INFO - cp ] copy file D://test1\20180601\wmv\01文件操作和異常.wmv.pbb complete [2018-06-29 15:14:19 - DEBUG - cp ] Create Dest Dir D://copytest2 [2018-06-29 15:14:19 - INFO - cp ] copy file D://test1\20180602\20180602_test.txt complete …… [2018-06-29 15:16:20 - INFO - cp ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\nobatch complete [2018-06-29 15:16:20 - INFO - cp ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\segdemo complete [2018-06-29 15:16:20 - INFO - cp ] copy all files complete , files count = 164 [2018-06-29 15:16:20 - DEBUG - runtime - time_utils.py - decor- 59 ] func {upload_file} run { 135.2727}s
使用多線程批量拷貝文件
#!/usr/bin/python # -*- coding: utf-8 -*- # @Time : 2018/6/29 10:28 # @Author : hyang # @File : batch_copy.py # @Software: PyCharm import os import shutil import logging from logging import handlers from colorama import Fore, Style, init from multiprocessing.dummy import Pool as ThreadPool import queue import sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) # 加入環境變量 from utils.time_utils import run_time from conf import settings class Colorlog(object): """ 記錄日志,添加顏色 """ init(autoreset=True) # 初始化,並且設置顏色設置自動恢復 # 根據信息不同設置不同的顏色格式 info_color = Fore.GREEN + Style.BRIGHT warn_color = Fore.YELLOW + Style.BRIGHT debug_color = Fore.MAGENTA + Style.BRIGHT error_color = Fore.RED + Style.BRIGHT def __init__(self, name): # 日志格式 log_format = '[%(asctime)s - %(levelname)s - %(name)s ] %(message)s ' self.logger = logging.getLogger(name) self.logger.setLevel(settings.LOG_LEVEL) console_handler = logging.StreamHandler() # 文件絕對路徑 logfile_path = os.path.join(settings.LOG_DIR, "log", settings.LOG_FILE) if not os.path.exists(logfile_path): # 創建log目錄 os.mkdir(os.path.join(settings.LOG_DIR, "log")) # 每天創建一個日志文件,文件數不超過20個 file_handler = handlers.TimedRotatingFileHandler( logfile_path, when="D", interval=1, backupCount=20) self.logger.addHandler(console_handler) self.logger.addHandler(file_handler) file_format = logging.Formatter(fmt=log_format) console_format = logging.Formatter( fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ') console_handler.setFormatter(console_format) file_handler.setFormatter(file_format) def warn(self, message): self.logger.warning(Colorlog.warn_color + message) def info(self, message): self.logger.info(Colorlog.info_color + message) def error(self, message): self.logger.error(Colorlog.info_color + message) def debug(self, message): self.logger.debug(Colorlog.info_color + message) cp_log = Colorlog("cp") def copy_file(local_file_path, dst_file_path, q): size = bytes2human(os.path.getsize(local_file_path)) # cp_log.debug( # 'copy file {} to {}, file size {}'.format( # local_file_path, dst_file_path, size)) shutil.copy(local_file_path, dst_file_path) # copy file q.put(local_file_path) # 加入隊列 @run_time def upload_file(src_path, dst_path): """ 上傳文件 :param src_path: :param dst_path: :return: """ pool = ThreadPool(3) # 開啟3個線程 q = queue.Queue() # 開啟一個隊列 cp_log.info('upload_file %s %s' % (src_path, dst_path)) # 目標目錄是否存在,不存在則創建 if not os.path.exists(dst_path): os.makedirs(dst_path) cp_log.info('Create Dest Dir %s' % dst_path) # 判斷是否為目錄,存在則把文件拷貝到目標目錄下 if os.path.isdir(src_path): all_file_nums = 0 for root, dirs, files in os.walk(src_path): # 遍歷目錄下所有文件根,目錄下的每一個文件夾(包含它自己), # 產生3-元組 (dirpath, dirnames, filenames)【文件夾路徑, 文件夾名字, 文件名稱】 for f in files: all_file_nums += 1 local_file_path = os.path.join(root, f) # 本地文件路徑 如/src/q.txt dst_file_path = os.path.abspath( local_file_path.replace( src_path, dst_path)) # 目標文件路徑 如/dst/q.txt dst_dir = os.path.dirname(dst_file_path) # 目標文件路徑文件夾 如/dst/ if not os.path.isdir(dst_dir): os.makedirs(dst_dir) # 創建目錄 cp_log.debug('Create Dest Dir %s' % dst_path) pool.apply_async( func=copy_file, args=( local_file_path, dst_file_path, q)) pool.close() # close()執行后不會有新的進程加入到pool # pool.join() # join函數等待所有子進程結束 print('all_file_nums ', all_file_nums) num = 0 while True: if not q.empty(): item = q.get() cp_log.info('copy file {} complete '.format(item)) num += 1 copy_rate = float(num / all_file_nums) * 100 cp_log.warn("\r 進度為:%.2f%%" % copy_rate) if int(copy_rate) >= 100: break cp_log.info( 'copy all files complete , files count = {}'.format(all_file_nums)) else: cp_log.warn('Dir is not exists %s' % dst_path) def bytes2human(n): symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') prefix = {} for i, s in enumerate(symbols): # << 左移” 左移一位表示乘2 即1 << 1=2,二位就表示4 即1 << 2=4, # 10位就表示1024 即1 << 10=1024 就是2的n次方 prefix[s] = 1 << (i + 1) * 10 for s in reversed(symbols): if n >= prefix[s]: value = float(n) / prefix[s] return '%.1f%s' % (value, s) return "%sBytes" % n if __name__ == '__main__': src = 'D://test1' dst = 'D://copy_thread_test2' upload_file(src, dst)
輸出結果
[2018-06-29 15:26:13 - INFO - cp ] copy file D://test1\20180601\20180601_test.txt complete 進度為:0.61% [2018-06-29 15:26:13 - INFO - cp ] copy file D://test1\20180602\20180602_test.txt complete 進度為:1.22% [2018-06-29 15:26:13 - INFO - cp ] copy file D://test1\20180602\教程目錄及說明.txt complete 進度為:1.83% all_file_nums 164 [2018-06-29 15:26:15 - INFO - cp ] copy file D://test1\20180602\MongoDB權威指南(中文版).pdf complete 進度為:2.44% [2018-06-29 15:26:15 - INFO - cp ] copy file D://test1\ibooks\AIX_HACMP_40pages.pdf complete 進度為:3.05% …… [2018-06-29 15:29:02 - INFO - cp ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\nobatch complete 進度為:99.39% [2018-06-29 15:29:02 - INFO - cp ] copy file D://test1\Tesseract-OCR\tessdata\tessconfigs\segdemo complete 進度為:100.00% [2018-06-29 15:29:02 - INFO - cp ] copy all files complete , files count = 164 [2018-06-29 15:29:02 - DEBUG - runtime - time_utils.py - decor- 59 ] func {upload_file} run { 168.7767}s
使用協程批量拷貝文件
#!/usr/bin/env python3 # -*- coding: utf-8 -*- from gevent import monkey;monkey.patch_all() import os import shutil import logging import time from functools import wraps from logging import handlers from colorama import Fore, Style, init from multiprocessing.pool import ThreadPool import queue import gevent import sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) # 加入環境變量 class Colorlog(object): """ 記錄日志,添加顏色 """ init(autoreset=True) # 初始化,並且設置顏色設置自動恢復 # 根據信息不同設置不同的顏色格式 info_color = Fore.GREEN + Style.BRIGHT warn_color = Fore.YELLOW + Style.BRIGHT debug_color = Fore.MAGENTA + Style.BRIGHT error_color = Fore.RED + Style.BRIGHT def __init__(self, name): # 日志格式 log_format = '[%(asctime)s - %(levelname)s - %(name)s ] %(message)s ' self.logger = logging.getLogger(name) self.logger.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() # 文件絕對路徑 logfile_path = 'test.log' # 每天創建一個日志文件,文件數不超過20個 file_handler = handlers.TimedRotatingFileHandler( logfile_path, when="D", interval=1, backupCount=20) self.logger.addHandler(console_handler) self.logger.addHandler(file_handler) file_format = logging.Formatter(fmt=log_format) console_format = logging.Formatter( fmt=log_format, datefmt='%Y-%m-%d %H:%M:%S ') console_handler.setFormatter(console_format) file_handler.setFormatter(file_format) def warn(self, message): self.logger.warning(Colorlog.warn_color + message) def info(self, message): self.logger.info(Colorlog.info_color + message) def error(self, message): self.logger.error(Colorlog.info_color + message) def debug(self, message): self.logger.debug(Colorlog.info_color + message) cp_log = Colorlog("cp") def run_time(func): """ 計算程序運行時間的裝飾器 :param func: :return: """ @wraps(func) def decor(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print("func {%s} run {%10.4f}s " % (func.__name__, (end - start))) return res return decor def copy_file(local_file_path, dst_file_path): # size = bytes2human(os.path.getsize(local_file_path)) # cp_log.debug( # 'copy file {} to {}, file size {}'.format( # local_file_path, dst_file_path, size)) shutil.copy(local_file_path, dst_file_path) # copy file cp_log.info( 'copy file {} , size= {} complete '.format( local_file_path, bytes2human( os.path.getsize(dst_file_path)))) def getdirsize(dir): """ 獲得文件夾中所有文件大小 :param dir: :return: """ size = 0 for root, dirs, files in os.walk(dir): size += sum([os.path.getsize(os.path.join(root, name)) for name in files]) return bytes2human(size) @run_time def upload_file(src_path, dst_path): """ 上傳文件 :param src_path: :param dst_path: :return: """ cp_log.info('upload_file %s %s' % (src_path, dst_path)) # 目標目錄是否存在,不存在則創建 if not os.path.exists(dst_path): os.makedirs(dst_path) cp_log.info('Create Dest Dir %s' % dst_path) tasklist = [] # 任務列表 # 判斷是否為目錄,存在則把文件拷貝到目標目錄下 if os.path.isdir(src_path): all_file_nums = 0 all_file_size = getdirsize(src_path) cp_log.info('all_file_size = %s' % all_file_size) for root, dirs, files in os.walk(src_path): # 遍歷目錄下所有文件根,目錄下的每一個文件夾(包含它自己), # 產生3-元組 (dirpath, dirnames, filenames)【文件夾路徑, 文件夾名字, 文件名稱】 for f in files: all_file_nums += 1 local_file_path = os.path.join(root, f) # 本地文件路徑 如/src/q.txt dst_file_path = os.path.abspath( local_file_path.replace( src_path, dst_path)) # 目標文件路徑 如/dst/q.txt dst_dir = os.path.dirname(dst_file_path) # 目標文件路徑文件夾 如/dst/ if not os.path.isdir(dst_dir): os.makedirs(dst_dir) # 創建目錄 cp_log.debug('Create Dest Dir %s' % dst_dir) tasklist.append( gevent.spawn( copy_file, local_file_path, dst_file_path)) # 開啟協程 gevent.joinall(tasklist) # 阻塞等待所有操作都執行完畢 print('all_file_nums ', all_file_nums) cp_log.info( 'copy all files complete , files count = {} , size = {}'.format(all_file_nums, getdirsize(dst_path))) else: cp_log.warn('Dir is not exists %s' % dst_path) def bytes2human(n): symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') prefix = {} for i, s in enumerate(symbols): # << 左移” 左移一位表示乘2 即1 << 1=2,二位就表示4 即1 << 2=4, # 10位就表示1024 即1 << 10=1024 就是2的n次方 prefix[s] = 1 << (i + 1) * 10 for s in reversed(symbols): if n >= prefix[s]: value = float(n) / prefix[s] return '%.1f%s' % (value, s) return "%sB" % n if __name__ == '__main__': src = 'C://pythonStudy/python爬蟲參考資料' dst = 'C://pythonStudy/copy_thread_test2' upload_file(src, dst)
輸出結果
"C:\Program Files\Python36\python.exe" batch_copy.py
[2018-06-29 22:50:22 - INFO - cp ] upload_file C://pythonStudy/python爬蟲參考資料 C://pythonStudy/copy_thread_test2
[2018-06-29 22:50:22 - INFO - cp ] Create Dest Dir C://pythonStudy/copy_thread_test2
[2018-06-29 22:50:22 - INFO - cp ] all_file_size = 620.6M
[2018-06-29 22:50:22 - DEBUG - cp ] Create Dest Dir C:\pythonStudy\copy_thread_test2\python-scraping-master
[2018-06-29 22:50:22 - DEBUG - cp ] Create Dest Dir C:\pythonStudy\copy_thread_test2\python-scraping-master\chapter1
[2018-06-29 22:50:22 - DEBUG - cp ] Create Dest Dir C:\pythonStudy\copy_thread_test2\python-scraping-master\chapter10
[2018-06-29 22:50:22 - DEBUG - cp ] Create Dest Dir
……
[2018-06-29 22:50:23 - INFO - cp ] copy file C://pythonStudy/python爬蟲參考資料\python-scraping-master\chapter12\2-seleniumCookies.py , size= 528B complete
[2018-06-29 22:50:23 - INFO - cp ] copy file C://pythonStudy/python爬蟲參考資料\python-scraping-master\chapter12\3-honeypotDetection.py , size= 539B complete
[2018-06-29 22:50:23 - INFO - cp ] copy file
[2018-06-29 22:50:24 - INFO - cp ] copy file C://pythonStudy/python爬蟲參考資料\python-scraping-master\chapter9\5-BasicAuth.py , size= 229B complete
all_file_nums 130
[2018-06-29 22:50:24 - INFO - cp ] copy file C://pythonStudy/python爬蟲參考資料\python-scraping-master\files\test.csv , size= 114B complete
func {upload_file} run { 1.2971}s
[2018-06-29 22:50:24 - INFO - cp ] copy all files complete , files count = 130 , size = 620.6M
Process finished with exit code 0
工具文件
time_utils.py
def run_time(func): """ 計算程序運行時間的裝飾器 :param func: :return: """ @wraps(func) def decor(*args,**kwargs): start = time.time() res = func(*args,**kwargs) end = time.time() log.debug("func {%s} run {%10.4f}s " % (func.__name__,(end - start))) return res return decor