主類
import os import re import threading import paramiko from stat import S_ISDIR import logging2 # 定義一個類,表示一台遠端linux主機 class Linux(threading.Thread): # 通過IP, 用戶名,密碼,超時時間初始化一個遠程Linux主機 def __init__(self, thread_id, thread_name, ip, username, password, remote_dir, local_dir, pattern, isRmRemoteFile = 0, timeout=30): threading.Thread.__init__(self) self.thread_id = thread_id self.thread_name = thread_name self.ip = ip self.username = username self.password = password self.timeout = timeout self.remote_dir = remote_dir self.local_dir = local_dir self.pattern = pattern #是否sftp成功后刪除遠程文件,1-刪除,0-不刪除 self.isRmRemoteFile = isRmRemoteFile # transport和chanel self.t = '' self.chan = '' # 鏈接失敗的重試次數 self.try_times = 3 # 調用該方法連接遠程主機 def connect(self): pass # 斷開連接 def close(self): pass # 發送要執行的命令 def send(self, cmd): pass # get單個文件 def sftp_get(self, remotefile, localfile): t = paramiko.Transport(sock=(self.ip, 22)) t.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(t) sftp.get(remotefile, localfile) t.close() # put單個文件 def sftp_put(self, localfile, remotefile): t = paramiko.Transport(sock=(self.ip, 22)) t.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(t) sftp.put(localfile, remotefile) t.close() #----獲取遠端linux主機上指定目錄及其子目錄下的所有文件------ def __get_all_files_in_remote_dir(self, sftp, remote_dir, pattern = '.*'): # 保存所有文件的列表 all_files = list() # 去掉路徑字符串最后的字符'/',如果有的話 if remote_dir[-1] == '/': remote_dir = remote_dir[0:-1] # 獲取當前指定目錄下的所有目錄及文件,包含屬性值 files = sftp.listdir_attr(remote_dir) for x in files: # remote_dir目錄中每一個文件或目錄的完整路徑 filename = remote_dir + '/' + x.filename # 如果是目錄,則遞歸處理該目錄,這里用到了stat庫中的S_ISDIR方法,與linux中的宏的名字完全一致 if S_ISDIR(x.st_mode): all_files.extend(self.__get_all_files_in_remote_dir(sftp, filename)) else: #matchObj = re.match('.*\.py$', filename, re.M|re.I) matchObj = re.match(pattern, filename, re.M|re.I) if matchObj: all_files.append(filename) return all_files def sftp_get_dir(self): t = paramiko.Transport(sock=(self.ip, 22)) t.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(t) # 獲取遠端linux主機上指定目錄及其子目錄下的所有文件 all_files = self.__get_all_files_in_remote_dir(sftp, self.remote_dir, self.pattern) # 依次get每一個文件 for x in all_files: filename = x.split('/')[-1] local_filename = os.path.join(self.local_dir, filename) logging2.debug('正在下載遠程文件:[%s],本地文件:[%s]', x, local_filename) sftp.get(x, local_filename) if self.isRmRemoteFile == 1 and os.path.exists(local_filename) == True: logging2.debug('已下載本地[%s],刪除遠程文件[%s]', local_filename, x) sftp.remove(x) t.close() def run(self): # 將遠端remote_path目錄中的所有文件get到本地local_path目錄 Linux.sftp_get_dir(self) if __name__ == '__main__': logging2.init("collect", "DEBUG") remote_path1 = r'/public/jxbill/shell/python/data' local_path1 = r'/home/zxl/public/code/python/sftp/data' pattern1 = r'.*' host1 = Linux(1, 'thread_27', '192.168.161.27', 'user', 'password', remote_path1, local_path1, pattern1) remote_path2 = r'/public/ahbill/shell/python/data' local_path2 = r'/home/zxl/public/code/python/sftp/data' pattern2 = r'.*\.py$' host2 = Linux(1, 'thread_26', '192.168.161.26', 'user', 'password', remote_path2, local_path2, pattern2, 1) host1.start() host2.start() host1.join() host2.join()
日志類logging2.py
import os import threading import queue import time import datetime import logging from logging.handlers import RotatingFileHandler class logging2(threading.Thread): AQueue = queue.Queue(100000) nPID = os.getpid() Adt = datetime.datetime.now().strftime('%Y%m%d') nCount = 1 def __init__(self, threadID, name, module, logLevel): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.module = module print("set loglevel: [%s]" % (logLevel) ) formatter = logging.Formatter('%(asctime)s|%(name)s|%(process)d|%(levelname)s|%(message)s') logfile = "log_" + self.module + "_" + str(logging2.nPID) + "_" + str(logging2.Adt) + ".log" self.logger = logging.getLogger(__name__) self.rHandler = RotatingFileHandler(logfile, maxBytes = 10*1024*1024, backupCount = 10) self.rHandler.setFormatter(formatter) self.console = logging.StreamHandler() self.console.setFormatter(formatter) if logLevel == 'DEBUG' : self.logger.setLevel(level = logging.DEBUG) self.rHandler.setLevel(logging.DEBUG) self.console.setLevel(logging.DEBUG) elif logLevel == 'INFO' : self.logger.setLevel(level = logging.INFO) self.rHandler.setLevel(logging.INFO) self.console.setLevel(logging.INFO) elif logLevel == 'WARNING' : self.logger.setLevel(level = logging.WARN) self.rHandler.setLevel(logging.WARN) self.console.setLevel(logging.WARN) elif logLevel == 'ERROR' : self.logger.setLevel(level = logging.ERROR) self.rHandler.setLevel(logging.ERROR) self.console.setLevel(logging.ERROR) self.logger.addHandler(self.rHandler) self.logger.addHandler(self.console) #如果跨天了,則重新生成新的文件名 def reSetLog(self): AdtTemp = datetime.datetime.now().strftime('%Y%m%d') #比較新的時間 if AdtTemp == logging2.Adt: return(True) logging2.Adt = AdtTemp logfile = "log_" + self.module + "_" + str(logging2.nPID) + "_" + str(AdtTemp) + ".log" self.rHandler = RotatingFileHandler(logfile, maxBytes = 1*1024, backupCount = 10) self.logger.addHandler(self.rHandler) self.logger.addHandler(self.console) logging2.nCount += 1 def run(self): print ("開啟日志線程:" + self.name) i = 0 while True: #data = "queue test data" #debug(data) #print("Queuesize: %s" % (logging2.AQueue.qsize())) self.reSetLog() if logging2.AQueue.empty() == False: #從隊列獲取日志消息 data = logging2.AQueue.get() #解析日志消息,格式:日志級別,內容 level = list(data.keys())[0] content = data.get(level) #把內容按分隔符|解析成list傳入參數 lstContent = list(content.split('|')) if level == 'DEBUG' : self.logger.debug(*lstContent) elif level == 'INFO' : self.logger.info(*lstContent) elif level == 'WARNING' : self.logger.warn(*lstContent) elif level == 'ERROR' : self.logger.error(*lstContent) else: time.sleep(0.5) print ("退出線程:" + self.name) def debug(*content): logMsg = "" #傳入多個參數用豎線分隔符分開 for i in range(len(content)): if i == len(content)-1: logMsg += content[i] else: logMsg += content[i]+"|" logging2.AQueue.put({'DEBUG':logMsg}) def info(*content): logMsg = "" for i in range(len(content)): if i == len(content)-1: logMsg += content[i] else: logMsg += content[i]+"|" logging2.AQueue.put({'INFO':logMsg}) def warn(*content): logMsg = "" for i in range(len(content)): if i == len(content)-1: logMsg += content[i] else: logMsg += content[i]+"|" logging2.AQueue.put({'WARNING':logMsg}) def error(*content): logMsg = "" for i in range(len(content)): if i == len(content)-1: logMsg += content[i] else: logMsg += content[i]+"|" logging2.AQueue.put({'ERROR':logMsg}) def init(module, level): # 創建新線程 thread1 = logging2(1, "Thread-log", module, level) # 開啟新線程 thread1.start() # thread1.join()
結果顯示:
日志文件:
-rw-rw-r--. 1 zxl zxl 2684 6月 28 15:20 log_collect_17744_20200628.log
日志內容:
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_ah_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ah_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_ah.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ah.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_jx_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_jx_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_jx.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_jx.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_ln_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ln_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_ln.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ln.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_xz_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_xz_5G.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_xz.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_xz.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/logserv.py],本地文件:[/home/zxl/public/code/python/sftp/data/logserv.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/monitorSys.py],本地文件:[/home/zxl/public/code/python/sftp/data/monitorSys.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/MvSvnLibToNew.py],本地文件:[/home/zxl/public/code/python/sftp/data/MvSvnLibToNew.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/teleCMD.py],本地文件:[/home/zxl/public/code/python/sftp/data/teleCMD.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/1.a],本地文件:[/home/zxl/public/code/python/sftp/data/1.a]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下載遠程文件:[/public/ahbill/shell/python/data/2.py],本地文件:[/home/zxl/public/code/python/sftp/data/2.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下載本地[/home/zxl/public/code/python/sftp/data/2.py],刪除遠程文件[/public/ahbill/shell/python/data/2.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下載遠程文件:[/public/ahbill/shell/python/data/1.py],本地文件:[/home/zxl/public/code/python/sftp/data/1.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下載本地[/home/zxl/public/code/python/sftp/data/1.py],刪除遠程文件[/public/ahbill/shell/python/data/1.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下載遠程文件:[/public/ahbill/shell/python/data/3.py],本地文件:[/home/zxl/public/code/python/sftp/data/3.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下載本地[/home/zxl/public/code/python/sftp/data/3.py],刪除遠程文件[/public/ahbill/shell/python/data/3.py]