python實現sftp功能


主類

import os
import re
import threading
import paramiko
from stat import S_ISDIR

import logging2

# 定義一個類,表示一台遠端linux主機
class Linux(threading.Thread):
    # 通過IP, 用戶名,密碼,超時時間初始化一個遠程Linux主機
    def __init__(self, thread_id, thread_name, ip, username, password, remote_dir, local_dir, pattern, isRmRemoteFile = 0, timeout=30):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.thread_name = thread_name
        self.ip = ip
        self.username = username
        self.password = password
        self.timeout = timeout
        self.remote_dir = remote_dir
        self.local_dir = local_dir
        self.pattern = pattern
        #是否sftp成功后刪除遠程文件,1-刪除,0-不刪除
        self.isRmRemoteFile = isRmRemoteFile
        # transport和chanel
        self.t = ''
        self.chan = ''
        # 鏈接失敗的重試次數
        self.try_times = 3
    
    # 調用該方法連接遠程主機
    def connect(self):
        pass
    
    # 斷開連接
    def close(self):
        pass
    
    # 發送要執行的命令
    def send(self, cmd):
        pass
    
    # get單個文件
    def sftp_get(self, remotefile, localfile):
        t = paramiko.Transport(sock=(self.ip, 22))
        t.connect(username=self.username, password=self.password)
        sftp = paramiko.SFTPClient.from_transport(t)
        sftp.get(remotefile, localfile)
        t.close()
    
    # put單個文件
    def sftp_put(self, localfile, remotefile):
        t = paramiko.Transport(sock=(self.ip, 22))
        t.connect(username=self.username, password=self.password)
        sftp = paramiko.SFTPClient.from_transport(t)
        sftp.put(localfile, remotefile)
        t.close()
        
    #----獲取遠端linux主機上指定目錄及其子目錄下的所有文件------
    def __get_all_files_in_remote_dir(self, sftp, remote_dir, pattern = '.*'):
        # 保存所有文件的列表
        all_files = list()
        
        # 去掉路徑字符串最后的字符'/',如果有的話
        if remote_dir[-1] == '/':
            remote_dir = remote_dir[0:-1]
        
        # 獲取當前指定目錄下的所有目錄及文件,包含屬性值
        files = sftp.listdir_attr(remote_dir)
        for x in files:
            # remote_dir目錄中每一個文件或目錄的完整路徑
            filename = remote_dir + '/' + x.filename
            # 如果是目錄,則遞歸處理該目錄,這里用到了stat庫中的S_ISDIR方法,與linux中的宏的名字完全一致
            if S_ISDIR(x.st_mode):
                all_files.extend(self.__get_all_files_in_remote_dir(sftp, filename))
            else:
                #matchObj = re.match('.*\.py$', filename, re.M|re.I)
                matchObj = re.match(pattern, filename, re.M|re.I)
                if matchObj:
                    all_files.append(filename)
        return all_files
        
    def sftp_get_dir(self):
        t = paramiko.Transport(sock=(self.ip, 22))
        t.connect(username=self.username, password=self.password)
        sftp = paramiko.SFTPClient.from_transport(t)
        
        # 獲取遠端linux主機上指定目錄及其子目錄下的所有文件
        all_files = self.__get_all_files_in_remote_dir(sftp, self.remote_dir, self.pattern)
        # 依次get每一個文件
        for x in all_files:
            filename = x.split('/')[-1]
            local_filename = os.path.join(self.local_dir, filename)
            logging2.debug('正在下載遠程文件:[%s],本地文件:[%s]', x, local_filename)
            sftp.get(x, local_filename)
            if self.isRmRemoteFile == 1 and os.path.exists(local_filename) == True:
                logging2.debug('已下載本地[%s],刪除遠程文件[%s]', local_filename, x)
                sftp.remove(x)
        t.close()

    def run(self):
        # 將遠端remote_path目錄中的所有文件get到本地local_path目錄
        Linux.sftp_get_dir(self)
            
if __name__ == '__main__':
    logging2.init("collect", "DEBUG")
    
    remote_path1 = r'/public/jxbill/shell/python/data'
    local_path1 = r'/home/zxl/public/code/python/sftp/data'
    pattern1 = r'.*'
    host1 = Linux(1, 'thread_27', '192.168.161.27', 'user', 'password', remote_path1, local_path1, pattern1)

    remote_path2 = r'/public/ahbill/shell/python/data'
    local_path2 = r'/home/zxl/public/code/python/sftp/data'    
    pattern2 = r'.*\.py$'
    host2 = Linux(1, 'thread_26', '192.168.161.26', 'user', 'password', remote_path2, local_path2, pattern2, 1)

    host1.start()
    host2.start()
    host1.join()
    host2.join()

日志類logging2.py

import os
import threading
import queue
import time
import datetime
import logging
from logging.handlers import RotatingFileHandler

class logging2(threading.Thread):
    AQueue = queue.Queue(100000)
    nPID = os.getpid()
    Adt = datetime.datetime.now().strftime('%Y%m%d')
    nCount = 1
    
    def __init__(self, threadID, name, module, logLevel):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.module = module
        
        print("set loglevel: [%s]" % (logLevel) )
        formatter = logging.Formatter('%(asctime)s|%(name)s|%(process)d|%(levelname)s|%(message)s')
        logfile = "log_" + self.module + "_" + str(logging2.nPID) + "_" + str(logging2.Adt) + ".log"
        self.logger = logging.getLogger(__name__)
        
        self.rHandler = RotatingFileHandler(logfile, maxBytes = 10*1024*1024, backupCount = 10)
        self.rHandler.setFormatter(formatter)
        
        self.console = logging.StreamHandler()    
        self.console.setFormatter(formatter)
        
        if logLevel == 'DEBUG' :
            self.logger.setLevel(level = logging.DEBUG)
            self.rHandler.setLevel(logging.DEBUG)
            self.console.setLevel(logging.DEBUG)
        elif logLevel == 'INFO' :
            self.logger.setLevel(level = logging.INFO)
            self.rHandler.setLevel(logging.INFO)
            self.console.setLevel(logging.INFO)
        elif logLevel == 'WARNING' :
            self.logger.setLevel(level = logging.WARN)
            self.rHandler.setLevel(logging.WARN)
            self.console.setLevel(logging.WARN)
        elif logLevel == 'ERROR' :
            self.logger.setLevel(level = logging.ERROR)
            self.rHandler.setLevel(logging.ERROR)
            self.console.setLevel(logging.ERROR)        

        self.logger.addHandler(self.rHandler)
        self.logger.addHandler(self.console)        

    #如果跨天了,則重新生成新的文件名
    def reSetLog(self):
        AdtTemp = datetime.datetime.now().strftime('%Y%m%d')
        #比較新的時間
        if AdtTemp == logging2.Adt:
            return(True)
            
        logging2.Adt = AdtTemp
        logfile = "log_" + self.module + "_" + str(logging2.nPID) + "_" + str(AdtTemp) + ".log"
        self.rHandler = RotatingFileHandler(logfile, maxBytes = 1*1024, backupCount = 10)
        
        self.logger.addHandler(self.rHandler)
        self.logger.addHandler(self.console)    
        logging2.nCount += 1
        
    def run(self):
        print ("開啟日志線程:" + self.name)
        i = 0
        while True:
            #data = "queue test data"
            #debug(data)
            #print("Queuesize: %s" % (logging2.AQueue.qsize()))
            self.reSetLog()
            if logging2.AQueue.empty() == False:
                #從隊列獲取日志消息
                data = logging2.AQueue.get()
                #解析日志消息,格式:日志級別,內容
                level = list(data.keys())[0]
                content = data.get(level)
                #把內容按分隔符|解析成list傳入參數
                lstContent = list(content.split('|'))
                if level == 'DEBUG' :
                    self.logger.debug(*lstContent)
                elif level == 'INFO' :
                    self.logger.info(*lstContent)
                elif level == 'WARNING' :
                    self.logger.warn(*lstContent)
                elif level == 'ERROR' :
                    self.logger.error(*lstContent)
            else:
                time.sleep(0.5)

        print ("退出線程:" + self.name)    
    
def debug(*content):    
    logMsg = ""
    #傳入多個參數用豎線分隔符分開
    for i in range(len(content)):
        if i == len(content)-1:
            logMsg += content[i]
        else:
            logMsg += content[i]+"|"
    logging2.AQueue.put({'DEBUG':logMsg})
            
def info(*content):
    logMsg = ""
    for i in range(len(content)):
        if i == len(content)-1:
            logMsg += content[i]
        else:
            logMsg += content[i]+"|"
    logging2.AQueue.put({'INFO':logMsg})
                
def warn(*content):
    logMsg = ""
    for i in range(len(content)):
        if i == len(content)-1:
            logMsg += content[i]
        else:
            logMsg += content[i]+"|"
    logging2.AQueue.put({'WARNING':logMsg})
        
def error(*content):
    logMsg = ""
    for i in range(len(content)):
        if i == len(content)-1:
            logMsg += content[i]
        else:
            logMsg += content[i]+"|"
    logging2.AQueue.put({'ERROR':logMsg})
                    
def init(module, level):
    # 創建新線程
    thread1 = logging2(1, "Thread-log", module, level)
    # 開啟新線程
    thread1.start()
#    thread1.join()

 

 

結果顯示:

日志文件:

-rw-rw-r--. 1 zxl zxl 2684 6月  28 15:20 log_collect_17744_20200628.log

 

日志內容:

2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_ah_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ah_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_ah.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ah.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_jx_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_jx_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_jx.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_jx.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_ln_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ln_5G.py]
2020-06-28 15:32:44,740|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_ln.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_ln.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_xz_5G.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_xz_5G.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/cppcheck_xz.py],本地文件:[/home/zxl/public/code/python/sftp/data/cppcheck_xz.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/logserv.py],本地文件:[/home/zxl/public/code/python/sftp/data/logserv.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/monitorSys.py],本地文件:[/home/zxl/public/code/python/sftp/data/monitorSys.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/MvSvnLibToNew.py],本地文件:[/home/zxl/public/code/python/sftp/data/MvSvnLibToNew.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/teleCMD.py],本地文件:[/home/zxl/public/code/python/sftp/data/teleCMD.py]
2020-06-28 15:32:44,741|logging2|18058|DEBUG|正在下載遠程文件:[/public/jxbill/shell/python/data/1.a],本地文件:[/home/zxl/public/code/python/sftp/data/1.a]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下載遠程文件:[/public/ahbill/shell/python/data/2.py],本地文件:[/home/zxl/public/code/python/sftp/data/2.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下載本地[/home/zxl/public/code/python/sftp/data/2.py],刪除遠程文件[/public/ahbill/shell/python/data/2.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下載遠程文件:[/public/ahbill/shell/python/data/1.py],本地文件:[/home/zxl/public/code/python/sftp/data/1.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下載本地[/home/zxl/public/code/python/sftp/data/1.py],刪除遠程文件[/public/ahbill/shell/python/data/1.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|正在下載遠程文件:[/public/ahbill/shell/python/data/3.py],本地文件:[/home/zxl/public/code/python/sftp/data/3.py]
2020-06-28 15:32:44,742|logging2|18058|DEBUG|已下載本地[/home/zxl/public/code/python/sftp/data/3.py],刪除遠程文件[/public/ahbill/shell/python/data/3.py]

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM