主类
import os import re import threading import paramiko from stat import S_ISDIR import logging2 # 定义一个类,表示一台远端linux主机 class Linux(threading.Thread): # 通过IP, 用户名,密码,超时时间初始化一个远程Linux主机 def __init__(self, thread_id, thread_name, ip, username, password, remote_dir, local_dir, pattern, isRmRemoteFile = 0, timeout=30): threading.Thread.__init__(self) self.thread_id = thread_id self.thread_name = thread_name self.ip = ip self.username = username self.password = password self.timeout = timeout self.remote_dir = remote_dir self.local_dir = local_dir self.pattern = pattern #是否sftp成功后删除远程文件,1-删除,0-不删除 self.isRmRemoteFile = isRmRemoteFile # transport和chanel self.t = '' self.chan = '' # 链接失败的重试次数 self.try_times = 3 # 调用该方法连接远程主机 def connect(self): pass # 断开连接 def close(self): pass # 发送要执行的命令 def send(self, cmd): pass # get单个文件 def sftp_get(self, remotefile, localfile): t = paramiko.Transport(sock=(self.ip, 22)) t.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(t) sftp.get(remotefile, localfile) t.close() # put单个文件 def sftp_put(self, localfile, remotefile): t = paramiko.Transport(sock=(self.ip, 22)) t.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(t) sftp.put(localfile, remotefile) t.close() #----获取远端linux主机上指定目录及其子目录下的所有文件------ def __get_all_files_in_remote_dir(self, sftp, remote_dir, pattern = '.*'): # 保存所有文件的列表 all_files = list() # 去掉路径字符串最后的字符'/',如果有的话 if remote_dir[-1] == '/': remote_dir = remote_dir[0:-1] # 获取当前指定目录下的所有目录及文件,包含属性值 files = sftp.listdir_attr(remote_dir) for x in files: # remote_dir目录中每一个文件或目录的完整路径 filename = remote_dir + '/' + x.filename # 如果是目录,则递归处理该目录,这里用到了stat库中的S_ISDIR方法,与linux中的宏的名字完全一致 if S_ISDIR(x.st_mode): all_files.extend(self.__get_all_files_in_remote_dir(sftp, filename)) else: #matchObj = re.match('.*\.py$', filename, re.M|re.I) matchObj = re.match(pattern, filename, re.M|re.I) if matchObj: all_files.append(filename) return all_files def sftp_get_dir(self): t = paramiko.Transport(sock=(self.ip, 22)) t.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(t) # 获取远端linux主机上指定目录及其子目录下的所有文件 all_files = self.__get_all_files_in_remote_dir(sftp, self.remote_dir, self.pattern) # 依次get每一个文件 for x in all_files: filename = x.split('/')[-1] local_filename = os.path.join(self.local_dir, filename) logging2.debug('正在下载远程文件:[%s],本地文件:[%s]', x, local_filename) sftp.get(x, local_filename) if self.isRmRemoteFile == 1 and os.path.exists(local_filename) == True: logging2.debug('已下载本地[%s],删除远程文件[%s]', local_filename, x) sftp.remove(x) t.close() def run(self): # 将远端remote_path目录中的所有文件get到本地local_path目录 Linux.sftp_get_dir(self) if __name__ == '__main__': logging2.init("collect", "DEBUG") remote_path1 = r'/public/jxbill/shell/python/data' local_path1 = r'/home/zxl/public/code/python/sftp/data' pattern1 = r'.*' host1 = Linux(1, 'thread_27', '192.168.161.27', 'user', 'password', remote_path1, local_path1, pattern1) remote_path2 = r'/public/ahbill/shell/python/data' local_path2 = r'/home/zxl/public/code/python/sftp/data' pattern2 = r'.*\.py$' host2 = Linux(1, 'thread_26', '192.168.161.26', 'user', 'password', remote_path2, local_path2, pattern2, 1) host1.start() host2.start() host1.join() host2.join()
日志类logging2.py
import os import threading import queue import time import datetime import logging from logging.handlers import RotatingFileHandler class logging2(threading.Thread): AQueue = queue.Queue(100000) nPID = os.getpid() Adt = datetime.datetime.now().strftime('%Y%m%d') nCount = 1 def __init__(self, threadID, name, module, logLevel): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.module = module print("set loglevel: [%s]" % (logLevel) ) formatter = logging.Formatter('%(asctime)s|%(name)s|%(process)d|%(levelname)s|%(message)s') logfile = "log_" + self.module + "_" + str(logging2.nPID) + "_" + str(logging2.Adt) + ".log" self.logger = logging.getLogger(__name__) self.rHandler = RotatingFileHandler(logfile, maxBytes = 10*1024*1024, backupCount = 10) self.rHandler.setFormatter(formatter) self.console = logging.StreamHandler() self.console.setFormatter(formatter) if logLevel == 'DEBUG' : self.logger.setLevel(level = logging.DEBUG) self.rHandler.setLevel(logging.DEBUG) self.console.setLevel(logging.DEBUG) elif logLevel == 'INFO' : self.logger.setLevel(level = logging.INFO) self.rHandler.setLevel(logging.INFO) self.console.setLevel(logging.INFO) elif logLevel == 'WARNING' : self.logger.setLevel(level = logging.WARN) self.rHandler.setLevel(logging.WARN) self.console.setLevel(logging.WARN) elif logLevel == 'ERROR' : self.logger.setLevel(level = logging.ERROR) self.rHandler.setLevel(logging.ERROR) self.console.setLevel(logging.ERROR) self.logger.addHandler(self.rHandler) self.logger.addHandler(self.console) #如果跨天了,则重新生成新的文件名 def reSetLog(self): AdtTemp = datetime.datetime.now().strftime('%Y%m%d') #比较新的时间 if AdtTemp == logging2.Adt: return(True) logging2.Adt = AdtTemp logfile = "log_" + self.module + "_" + str(logging2.nPID) + "_" + str(AdtTemp) + ".log" self.rHandler = RotatingFileHandler(logfile, maxBytes = 1*1024, backupCount = 10) self.logger.addHandler(self.rHandler) self.logger.addHandler(self.console) logging2.nCount += 1 def run(self): print ("开启日志线程:" + self.name) i = 0 while True: #data = "queue test data" #debug(data) #print("Queuesize: %s" % (logging2.AQueue.qsize())) self.reSetLog() if logging2.AQueue.empty() == False: #从队列获取日志消息 data = logging2.AQueue.get() #解析日志消息,格式:日志级别,内容 level = list(data.keys())[0] content = data.get(level) #把内容按分隔符|解析成list传入参数 lstContent = list(content.split('|')) if level == 'DEBUG' : self.logger.debug(*lstContent) elif level == 'INFO' : self.logger.info(*lstContent) elif level == 'WARNING' : self.logger.warn(*lstContent) elif level == 'ERROR' : self.logger.error(*lstContent) else: time.sleep(0.5) print ("退出线程:" + self.name) def debug(*content): logMsg = "" #传入多个参数用竖线分隔符分开 for i in range(len(content)): if i == len(content)-1: logMsg += content[i] else: logMsg += content[i]+"|" logging2.AQueue.put({'DEBUG':logMsg}) def info(*content): logMsg = "" for i in range(len(content)): if i == len(content)-1: logMsg += content[i] else: logMsg += content[i]+"|" logging2.AQueue.put({'INFO':logMsg}) def warn(*content): logMsg = "" for i in range(len(content)): if i == len(content)-1: logMsg += content[i] else: logMsg += content[i]+"|" logging2.AQueue.put({'WARNING':logMsg}) def error(*content): logMsg = "" for i in range(len(content)): if i == len(content)-1: logMsg += content[i] else: logMsg += content[i]+"|" logging2.AQueue.put({'ERROR':logMsg}) def init(module, level): # 创建新线程 thread1 = logging2(1, "Thread-log", module, level) # 开启新线程 thread1.start() # thread1.join()
结果显示:
日志文件:
-rw-rw-r--. 1 zxl zxl 2684 6月 28 15:20 log_collect_17744_20200628.log
日志内容:
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_ah_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ah_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_ah.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ah.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_jx_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_jx_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_jx.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_jx.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_ln_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ln_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_ln.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ln.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_xz_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_xz_5G.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_xz.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_xz.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/logserv.py],本地文件:[/home/zxl/public/code/python/sftp/data/logserv.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/monitorSys.py],本地文件:[/home/zxl/public/code/python/sftp/data/monitorSys.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/MvSvnLibToNew.py],本地文件:[/home/zxl/public/code/python/sftp/data/MvSvnLibToNew.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/teleCMD.py],本地文件:[/home/zxl/public/code/python/sftp/data/teleCMD.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/1.a],本地文件:[/home/zxl/public/code/python/sftp/data/1.a]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下载远程文件:[/public/ahbill/shell/python/data/2.py],本地文件:[/home/zxl/public/code/python/sftp/data/2.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下载本地[/home/zxl/public/code/python/sftp/data/2.py],删除远程文件[/public/ahbill/shell/python/data/2.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下载远程文件:[/public/ahbill/shell/python/data/1.py],本地文件:[/home/zxl/public/code/python/sftp/data/1.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下载本地[/home/zxl/public/code/python/sftp/data/1.py],删除远程文件[/public/ahbill/shell/python/data/1.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下载远程文件:[/public/ahbill/shell/python/data/3.py],本地文件:[/home/zxl/public/code/python/sftp/data/3.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下载本地[/home/zxl/public/code/python/sftp/data/3.py],删除远程文件[/public/ahbill/shell/python/data/3.py]