用python做的windows和linx文件夾同步。解決自動同步、加快傳輸大量小文件的速度、更豐富的文件上傳過濾設置。


現在工具不好用,用的pycharm自動同步,但對於git拉下來的新文件不能自動上傳到linux,只有自己編輯過或者手動ctrl + s的文件才會自動同步。導致為了不遺漏文件,經常需要全量上傳,速度非常慢。

由於經常需要在windows的pycharm上直接使用linux解釋器,要快速測試,頻繁在本機和linux用git push pull不方便,測試環境是 用的git,但開發時候還是直接映射文件夾同步比使用git更方便。

采用了連接池的方式,比單線程單linux鏈接,一個一個的上傳體積很小的碎片時候,文件上傳速度提高了數十倍。

 

單linux連接上傳。

"""
自動同步文件夾到linux機器
"""
import json
import os
import queue
import re
import time
from collections import OrderedDict
from pathlib import Path
import paramiko
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler


class LinuxSynchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/'), only_upload_within_the_last_modify_time=7 * 24 * 60 * 60, cycle_interval=10, ):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以這些結尾的文件
        :param file_volume_limit: 最大文件容量能夠限制,如果超過此大小,則該文件不上傳
        :param path_pattern_exluded_tuple: 更強大的文件排除功能,比光排除以什么后綴結尾更強大靈活
        :param only_upload_within_the_last_modify_time: 只上傳離當前時間最晚修改時間以后的文件
        :param cycle_interval: 每隔多少秒掃描一次需要上傳的文件。
        """
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = remote_dir
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = only_upload_within_the_last_modify_time
        self._cycle_interval = cycle_interval
        self._file_volume_limit = file_volume_limit
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self.build_connect()

    # noinspection PyAttributeOutsideInit
    def build_connect(self):
        self.logger.warning('建立linux連接')
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        self.sftp = paramiko.SFTPClient.from_transport(t)

        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self.ssh = ssh

    # @decorators.tomorrow_threads(1)
    def ftp_upload(self, file: str):
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))
        for _ in range(10):
            try:
                time_start = time.time()
                self.sftp.put(file, file_remote)
                self.logger.debug(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
                break
            except FileNotFoundError:
                cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
                self.logger.info(cmd)
                tdin, stdout, stderr = self.ssh.exec_command(cmd)
                stderr_bytes = stderr.read()
                # self.logger.debug(stderr_bytes)
                if stderr_bytes != b'':
                    self.logger.debug(stderr_bytes)
            except OSError as e:
                self.logger.exception(e)
                pass
                self.build_connect()     # OSError: Socket is closed

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                        self.filename__st_mtime_map[file_full_name] = file_st_mtime
                        total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        self.logger.warning(f'需要上傳的所有文件數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,文件分別是 {json.dumps(self.filename__filesize_map, indent=4)}')

    @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)()

    def _start_upload_files(self):
        with decorators.TimerContextManager():
            self.find_all_files_meet_the_conditions()
            for file in self.filename__filesize_map:
                self.ftp_upload(file)
            self.logger.warn('完成')

 

 

采用了連接池 加多線程上傳

 

"""
自動同步文件夾到linux機器
這個更犀利,采用了連接池 加線程池,上傳大量碎片文件的速度大幅提升。
"""
import hashlib
import json
import os
from threading import Thread
import queue
import re
import shutil
import filecmp
import time
from collections import OrderedDict
from pathlib import Path
from typing import Union
import paramiko
from paramiko import SSHException
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler, nb_print, BoundedThreadPoolExecutor


class LocalCopier(LoggerMixinDefaultWithFileHandler):
    """
    本地的兩個文件夾之間的同步
    """

    def __init__(self, local_dir, remote_dir, *args, **kwargs):
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = str(remote_dir).replace('\\', '/')
        self.logger_extra_suffix = '本地windows間復制'

    def upload(self, file: str):
        file_remote = file.replace(self._local_dir, self._remote_dir)
        if not Path(file_remote).parent.exists():
            os.makedirs(str(Path(file_remote).parent))
        # if self.get_file_md5(Path(file).open('rb')) != self.get_file_md5(Path(file_remote).open('rb')) :
        if not Path(file_remote).exists() or not filecmp.cmp(file, file_remote):
            shutil.copyfile(file, file_remote)
            self.logger.info(f'從 {file} 復制成功到{file_remote} ,大小是 {round(os.path.getsize(file) / 1024)} kb')
        else:
            self.logger.debug(f'{file} 不復制到 {file_remote} 沒有變化。')

    @staticmethod
    def get_file_md5(file):
        m = hashlib.md5()
        while True:
            # 如果不用二進制打開文件,則需要先編碼
            # data = f.read(1024).encode('utf-8')
            data = file.read(1024)  # 將文件分塊讀取
            if not data:
                break
            m.update(data)
        return m.hexdigest()


@decorators.flyweight
class LinuxConnectionPool(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password):  # 對相同的鏈接參數做了享元模式保存連接池。
        self.logger_extra_suffix = host
        self.logger.warning(f'初始化linux連接池{host}')
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self.queue_sftp_free = queue.Queue(100)
        self.queue_ssh_free = queue.Queue(100)
        self.build_connect()

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=0)
    def build_sftp(self):
        self.logger.warning(f'建立linux sftp連接中。。。')
        t_start = time.time()
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        sftp = paramiko.SFTPClient.from_transport(t)
        self.queue_sftp_free.put(sftp)
        self.logger.warning(f'建立linux sftp連接耗時 {round(time.time() - t_start, 2)}')

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=1)
    def bulid_ssh(self):
        self.logger.warning(f'建立linux ssh連接中。。。。')
        t_start = time.time()
        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self.queue_ssh_free.put(ssh)
        self.logger.warning(f'建立linux ssh連接耗時 {round(time.time() - t_start, 2)}')

    def build_connect(self):
        # decorators.tomorrow_threads(10)(self._build_sftp)()
        # decorators.tomorrow_threads(10)(self.__class__._bulid_ssh)(self)
        def _inner():
            executor = BoundedThreadPoolExecutor(100)
            for _ in range(10):
                time.sleep(0.2)
                executor.submit(self.build_sftp)
            for _ in range(3):
                time.sleep(0.5)
                executor.submit(self.bulid_ssh)

        Thread(target=_inner).start()

    def borrow_sftp(self):
        return self.queue_sftp_free.get()

    def borrow_ssh(self):
        return self.queue_ssh_free.get()

    def back_sftp(self, sftp):
        self.queue_sftp_free.put(sftp)

    def back_ssh(self, ssh):
        self.queue_ssh_free.put(ssh)


class LinuxRemoteUploader(LocalCopier):
    """
    windows同步到linux。
    """

    def __init__(self, local_dir, remote_dir, host, port, username, password):
        super().__init__(local_dir, remote_dir)
        self.logger_extra_suffix = host
        self.linux_conn_pool = LinuxConnectionPool(host, port, username, password)

    def _do_mkdir_operation(self, file_remote):
        cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
        self.logger.info(cmd)
        ssh = self.linux_conn_pool.borrow_ssh()
        try:
            tdin, stdout, stderr = ssh.exec_command(cmd)
        except SSHException:
            self.linux_conn_pool.bulid_ssh()
        except Exception as e:
            self.logger.exception(e)
        else:
            stderr_bytes = stderr.read()
            # self.logger.debug(stderr_bytes)
            if stderr_bytes != b'':
                self.logger.debug(stderr_bytes)
            self.linux_conn_pool.back_ssh(ssh)

    @decorators.tomorrow_threads(19)
    def upload(self, file: str):
        self.logger.debug(f'sftp空閑鏈接數量  {self.linux_conn_pool.queue_sftp_free.qsize()},  ssh空閑鏈接數量 {self.linux_conn_pool.queue_ssh_free.qsize()}')
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))

        for _ in range(10):
            sftp = self.linux_conn_pool.borrow_sftp()
            try:
                time_start = time.time()
                sftp.put(file, file_remote)
                self.logger.info(f'{file_remote} 上傳成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上傳時間是 {round(time.time() - time_start, 2)}')
                self.linux_conn_pool.back_sftp(sftp)
                # self.linux_conn_pool.logger.debug((self.linux_conn_pool.queue_sftp_free.qsize(),self.linux_conn_pool.queue_ssh_free.qsize()))
                break
            except FileNotFoundError:
                self._do_mkdir_operation(file_remote)
                self.linux_conn_pool.back_sftp(sftp)
            except (OSError, SSHException) as e:
                self.logger.exception(e)
                self.linux_conn_pool.build_sftp()  # OSError: Socket is closed


class Synchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/', 'cnbooking_all.json'), only_upload_within_the_last_modify_time='7 * 24 * 60 * 60', cycle_interval=2, just_windows_copy=False):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以這些結尾的文件。
        :param file_volume_limit: 最大文件容量能夠限制,如果超過此大小,則該文件不上傳
        :param path_pattern_exluded_tuple: 更強大的文件排除功能,比光排除以什么后綴結尾更強大靈活,使用的是python正則表達式。
        :param only_upload_within_the_last_modify_time: 只上傳離當前時間最晚修改時間以后的文件。
        :param cycle_interval: 每隔多少秒掃描一次需要上傳的文件。
        :param just_windows_copy: 執行windows不同文件夾之間的復制,不上傳linux。
        """
        self.logger_extra_suffix = host if not just_windows_copy else '本地'
        self._local_dir = str(local_dir).replace('\\', '/')
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = self._compute_result(only_upload_within_the_last_modify_time)
        self._cycle_interval = cycle_interval
        self._file_volume_limit = self._compute_result(file_volume_limit)
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self._just_windows_copy = just_windows_copy
        self.uploader = LinuxRemoteUploader(local_dir, remote_dir, host, port, username, password) if not just_windows_copy else LocalCopier(local_dir, remote_dir, host, port, username, password)

    @staticmethod
    def _compute_result(sth: Union[str, int]):
        return sth if isinstance(sth, int) else eval(sth)

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        t_start = time.time()
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name
                                                                                                                                             not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        if self.filename__st_mtime_map.get(file_full_name, None) != file_st_mtime:
                            self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                            self.filename__st_mtime_map[file_full_name] = file_st_mtime
                            total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        if len(self.filename__filesize_map) > 0:
            self.logger.warning(f'需要{"復制"  if self._just_windows_copy else "上傳"} 的所有文件數量是 {len(self.filename__filesize_map)} ,總大小是 {round(total_volume / 1024, 2)} kb ,'
                                f'查找文件耗時 {round(time.time() - t_start, 2)} 秒,文件分別是 {json.dumps(self.filename__filesize_map, indent=4)}')

    # @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        Thread(target=decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)).start()

    def _start_upload_files(self):
        self.find_all_files_meet_the_conditions()
        for file in self.filename__filesize_map:
            self.uploader.upload(file)


# noinspection PyPep8
if __name__ == '__main__':
    """
    配置里面的內容格式如下,支持同步多個文件夾映射。
    [
      {
        "host": "112.90.xx.xx",
        "port": 10005,
        "username": "root",
        "password": "@0^Lc97MewI3i7xxxxxx",
        "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
        "remote_dir": "/home/ydf/hotelf15",
        "file_suffix_tuple_exluded": [
          ".pyc",
          ".log",
          ".gz"
        ],
        "path_pattern_exluded_tuple": [
          "/.git/",
          "/.idea/",
          "cnbooking_cn_all.json"
        ],
        "only_upload_within_the_last_modify_time": "365 * 24 * 3600",
        "file_volume_limit": "2 * 1000 * 1000",
        "cycle_interval": 10
      }
    ]
    """

    for config_item in json.load(Path('/windows_to_linux_syn_config.json').open()):
        nb_print(json.dumps(config_item))
        Synchronizer(**config_item).start_upload_files()

    # sc create PythonApp6 binPath= "D:\Users\ydf\Desktop\oschina\coding\hotel_fares\dist\windows_to_linux_syn2\windows_to_linux_syn2.exe"
    # pyinstaller --distpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --workpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --specpath=D:\Users\ydf\Desktop\oschina\specify_pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 可以使用pyinstaller打包這個文件。先添加PYTHONPATH變量,在另外的文件夾執行這個命令。
    # pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py

    # cd ..
    # set PYTHONPATH=D:\coding2\hotel_fares
    # pyinstaller -F --icon="D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 測試更新。。。。。。.

 

 

 

 

 

 

 

 配置里面的內容如下。

[
  {
    "host": "112.xx.89.16",
    "port": 10033,
    "username": "root",
    "password": "xxxx",
    "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
    "remote_dir": "/home/ydf/hotelf18",
    "file_suffix_tuple_exluded": [
      ".pyc",
      ".log",
      ".gz"
    ],
    "path_pattern_exluded_tuple": [
      "/.git/",
      "/.idea/",
      "cnbooking_cn_all.json"
    ],
    "only_upload_within_the_last_modify_time": "30 * 24 * 3600",
    "file_volume_limit": "2 * 1000 * 1000",
    "cycle_interval": 1
  },
  {
    "host": "112.90.xx.16",
    "port": 10033,
    "username": "root",
    "password": "xxxx",
    "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\movie_data",
    "remote_dir": "/home/ydf/movie_data2",
    "file_suffix_tuple_exluded": [
      ".pyc",
      ".log",
      ".gz"
    ],
    "path_pattern_exluded_tuple": [
      "/.git/",
      "/.idea/",
      "cnbooking_cn_all.json"
    ],
    "only_upload_within_the_last_modify_time": "30 * 24 * 3600",
    "file_volume_limit": "2 * 1000 * 1000",
    "cycle_interval": 1
  }
]

 

 

 

 

 

 

 

 

第一次運行是對指定最晚修改間之內的文件進行全量上傳,之后是每隔2秒(由json文件動態配置)檢查一次,將最近 10分鍾之內變化的文件,上傳到linux。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM