python实现sftp功能


主类

import os
import re
import threading
import paramiko
from stat import S_ISDIR

import logging2

# 定义一个类,表示一台远端linux主机
class Linux(threading.Thread):
    # 通过IP, 用户名,密码,超时时间初始化一个远程Linux主机
    def __init__(self, thread_id, thread_name, ip, username, password, remote_dir, local_dir, pattern, isRmRemoteFile = 0, timeout=30):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.thread_name = thread_name
        self.ip = ip
        self.username = username
        self.password = password
        self.timeout = timeout
        self.remote_dir = remote_dir
        self.local_dir = local_dir
        self.pattern = pattern
        #是否sftp成功后删除远程文件,1-删除,0-不删除
        self.isRmRemoteFile = isRmRemoteFile
        # transport和chanel
        self.t = ''
        self.chan = ''
        # 链接失败的重试次数
        self.try_times = 3
    
    # 调用该方法连接远程主机
    def connect(self):
        pass
    
    # 断开连接
    def close(self):
        pass
    
    # 发送要执行的命令
    def send(self, cmd):
        pass
    
    # get单个文件
    def sftp_get(self, remotefile, localfile):
        t = paramiko.Transport(sock=(self.ip, 22))
        t.connect(username=self.username, password=self.password)
        sftp = paramiko.SFTPClient.from_transport(t)
        sftp.get(remotefile, localfile)
        t.close()
    
    # put单个文件
    def sftp_put(self, localfile, remotefile):
        t = paramiko.Transport(sock=(self.ip, 22))
        t.connect(username=self.username, password=self.password)
        sftp = paramiko.SFTPClient.from_transport(t)
        sftp.put(localfile, remotefile)
        t.close()
        
    #----获取远端linux主机上指定目录及其子目录下的所有文件------
    def __get_all_files_in_remote_dir(self, sftp, remote_dir, pattern = '.*'):
        # 保存所有文件的列表
        all_files = list()
        
        # 去掉路径字符串最后的字符'/',如果有的话
        if remote_dir[-1] == '/':
            remote_dir = remote_dir[0:-1]
        
        # 获取当前指定目录下的所有目录及文件,包含属性值
        files = sftp.listdir_attr(remote_dir)
        for x in files:
            # remote_dir目录中每一个文件或目录的完整路径
            filename = remote_dir + '/' + x.filename
            # 如果是目录,则递归处理该目录,这里用到了stat库中的S_ISDIR方法,与linux中的宏的名字完全一致
            if S_ISDIR(x.st_mode):
                all_files.extend(self.__get_all_files_in_remote_dir(sftp, filename))
            else:
                #matchObj = re.match('.*\.py$', filename, re.M|re.I)
                matchObj = re.match(pattern, filename, re.M|re.I)
                if matchObj:
                    all_files.append(filename)
        return all_files
        
    def sftp_get_dir(self):
        t = paramiko.Transport(sock=(self.ip, 22))
        t.connect(username=self.username, password=self.password)
        sftp = paramiko.SFTPClient.from_transport(t)
        
        # 获取远端linux主机上指定目录及其子目录下的所有文件
        all_files = self.__get_all_files_in_remote_dir(sftp, self.remote_dir, self.pattern)
        # 依次get每一个文件
        for x in all_files:
            filename = x.split('/')[-1]
            local_filename = os.path.join(self.local_dir, filename)
            logging2.debug('正在下载远程文件:[%s],本地文件:[%s]', x, local_filename)
            sftp.get(x, local_filename)
            if self.isRmRemoteFile == 1 and os.path.exists(local_filename) == True:
                logging2.debug('已下载本地[%s],删除远程文件[%s]', local_filename, x)
                sftp.remove(x)
        t.close()

    def run(self):
        # 将远端remote_path目录中的所有文件get到本地local_path目录
        Linux.sftp_get_dir(self)
            
if __name__ == '__main__':
    logging2.init("collect", "DEBUG")
    
    remote_path1 = r'/public/jxbill/shell/python/data'
    local_path1 = r'/home/zxl/public/code/python/sftp/data'
    pattern1 = r'.*'
    host1 = Linux(1, 'thread_27', '192.168.161.27', 'user', 'password', remote_path1, local_path1, pattern1)

    remote_path2 = r'/public/ahbill/shell/python/data'
    local_path2 = r'/home/zxl/public/code/python/sftp/data'    
    pattern2 = r'.*\.py$'
    host2 = Linux(1, 'thread_26', '192.168.161.26', 'user', 'password', remote_path2, local_path2, pattern2, 1)

    host1.start()
    host2.start()
    host1.join()
    host2.join()

日志类logging2.py

import os
import threading
import queue
import time
import datetime
import logging
from logging.handlers import RotatingFileHandler

class logging2(threading.Thread):
    AQueue = queue.Queue(100000)
    nPID = os.getpid()
    Adt = datetime.datetime.now().strftime('%Y%m%d')
    nCount = 1
    
    def __init__(self, threadID, name, module, logLevel):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.module = module
        
        print("set loglevel: [%s]" % (logLevel) )
        formatter = logging.Formatter('%(asctime)s|%(name)s|%(process)d|%(levelname)s|%(message)s')
        logfile = "log_" + self.module + "_" + str(logging2.nPID) + "_" + str(logging2.Adt) + ".log"
        self.logger = logging.getLogger(__name__)
        
        self.rHandler = RotatingFileHandler(logfile, maxBytes = 10*1024*1024, backupCount = 10)
        self.rHandler.setFormatter(formatter)
        
        self.console = logging.StreamHandler()    
        self.console.setFormatter(formatter)
        
        if logLevel == 'DEBUG' :
            self.logger.setLevel(level = logging.DEBUG)
            self.rHandler.setLevel(logging.DEBUG)
            self.console.setLevel(logging.DEBUG)
        elif logLevel == 'INFO' :
            self.logger.setLevel(level = logging.INFO)
            self.rHandler.setLevel(logging.INFO)
            self.console.setLevel(logging.INFO)
        elif logLevel == 'WARNING' :
            self.logger.setLevel(level = logging.WARN)
            self.rHandler.setLevel(logging.WARN)
            self.console.setLevel(logging.WARN)
        elif logLevel == 'ERROR' :
            self.logger.setLevel(level = logging.ERROR)
            self.rHandler.setLevel(logging.ERROR)
            self.console.setLevel(logging.ERROR)        

        self.logger.addHandler(self.rHandler)
        self.logger.addHandler(self.console)        

    #如果跨天了,则重新生成新的文件名
    def reSetLog(self):
        AdtTemp = datetime.datetime.now().strftime('%Y%m%d')
        #比较新的时间
        if AdtTemp == logging2.Adt:
            return(True)
            
        logging2.Adt = AdtTemp
        logfile = "log_" + self.module + "_" + str(logging2.nPID) + "_" + str(AdtTemp) + ".log"
        self.rHandler = RotatingFileHandler(logfile, maxBytes = 1*1024, backupCount = 10)
        
        self.logger.addHandler(self.rHandler)
        self.logger.addHandler(self.console)    
        logging2.nCount += 1
        
    def run(self):
        print ("开启日志线程:" + self.name)
        i = 0
        while True:
            #data = "queue test data"
            #debug(data)
            #print("Queuesize: %s" % (logging2.AQueue.qsize()))
            self.reSetLog()
            if logging2.AQueue.empty() == False:
                #从队列获取日志消息
                data = logging2.AQueue.get()
                #解析日志消息,格式:日志级别,内容
                level = list(data.keys())[0]
                content = data.get(level)
                #把内容按分隔符|解析成list传入参数
                lstContent = list(content.split('|'))
                if level == 'DEBUG' :
                    self.logger.debug(*lstContent)
                elif level == 'INFO' :
                    self.logger.info(*lstContent)
                elif level == 'WARNING' :
                    self.logger.warn(*lstContent)
                elif level == 'ERROR' :
                    self.logger.error(*lstContent)
            else:
                time.sleep(0.5)

        print ("退出线程:" + self.name)    
    
def debug(*content):    
    logMsg = ""
    #传入多个参数用竖线分隔符分开
    for i in range(len(content)):
        if i == len(content)-1:
            logMsg += content[i]
        else:
            logMsg += content[i]+"|"
    logging2.AQueue.put({'DEBUG':logMsg})
            
def info(*content):
    logMsg = ""
    for i in range(len(content)):
        if i == len(content)-1:
            logMsg += content[i]
        else:
            logMsg += content[i]+"|"
    logging2.AQueue.put({'INFO':logMsg})
                
def warn(*content):
    logMsg = ""
    for i in range(len(content)):
        if i == len(content)-1:
            logMsg += content[i]
        else:
            logMsg += content[i]+"|"
    logging2.AQueue.put({'WARNING':logMsg})
        
def error(*content):
    logMsg = ""
    for i in range(len(content)):
        if i == len(content)-1:
            logMsg += content[i]
        else:
            logMsg += content[i]+"|"
    logging2.AQueue.put({'ERROR':logMsg})
                    
def init(module, level):
    # 创建新线程
    thread1 = logging2(1, "Thread-log", module, level)
    # 开启新线程
    thread1.start()
#    thread1.join()

 

 

结果显示:

日志文件:

-rw-rw-r--. 1 zxl zxl 2684 6月  28 15:20 log_collect_17744_20200628.log

 

日志内容:

2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_ah_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ah_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_ah.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ah.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_jx_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_jx_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_jx.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_jx.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_ln_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ln_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_ln.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ln.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_xz_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_xz_5G.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/cppcheck_xz.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_xz.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/logserv.py],本地文件:[/home/zxl/public/code/python/sftp/data/logserv.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/monitorSys.py],本地文件:[/home/zxl/public/code/python/sftp/data/monitorSys.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/MvSvnLibToNew.py],本地文件:[/home/zxl/public/code/python/sftp/data/MvSvnLibToNew.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/teleCMD.py],本地文件:[/home/zxl/public/code/python/sftp/data/teleCMD.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下载远程文件:[/public/jxbill/shell/python/data/1.a],本地文件:[/home/zxl/public/code/python/sftp/data/1.a]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下载远程文件:[/public/ahbill/shell/python/data/2.py],本地文件:[/home/zxl/public/code/python/sftp/data/2.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下载本地[/home/zxl/public/code/python/sftp/data/2.py],删除远程文件[/public/ahbill/shell/python/data/2.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下载远程文件:[/public/ahbill/shell/python/data/1.py],本地文件:[/home/zxl/public/code/python/sftp/data/1.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下载本地[/home/zxl/public/code/python/sftp/data/1.py],删除远程文件[/public/ahbill/shell/python/data/1.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下载远程文件:[/public/ahbill/shell/python/data/3.py],本地文件:[/home/zxl/public/code/python/sftp/data/3.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下载本地[/home/zxl/public/code/python/sftp/data/3.py],删除远程文件[/public/ahbill/shell/python/data/3.py]

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM