python10作業思路及源碼:類Fabric主機管理程序開發(僅供參考)


 

類Fabric主機管理程序開發

一,作業要求

1, 運行程序列出主機組或者主機列表(已完成)

2,選擇指定主機或主機組(已完成)

3,選擇主機或主機組傳送文件(上傳/下載)(已完成)

4,充分使用多線程或多進程(已完成)

5,不同主機的用戶名,密碼,端口可以不同(已完成)

6,可向主機或主機組批量發布命令(已完成)

7,可一次性執行多條操作命令(已完成)

二,程序文件清單

 

三,程序流程簡圖

 

 

四,程序測試樣圖

 

 

 

 

 

 五,程序核心源碼

#!usr/bin/env python
# -*- coding:utf-8 -*-
# auther:Mr.chen
# 描述:只能運行在MacOS或linux系統下


import os,sys,pickle,time
import paramiko,threading



sys.path.append('..')
PATH = os.path.dirname(os.path.abspath(__file__))
PUT_PATH = PATH.replace('core','Folder/')
GET_PATH = PATH.replace('core','download/')
CONF_PATH = PATH.replace('core','conf/')
semaphore = threading.BoundedSemaphore(1) # 進程鎖

def new_Host():
    while True:
        hostname = raw_input("請輸入服務器的主機名(輸入n=返回上級):")
        if os.path.exists(CONF_PATH+hostname+'_conf'):
            print ("主機名已存在,請重新輸入!")
            continue
        if hostname =='n':
            return
        port = raw_input("請輸入服務器的ssh端口號(輸入n=返回上級):")
        if port == 'n':
            return
        username = raw_input("請輸入登陸的用戶名(輸入n=返回上級):")
        if username == 'n':
            return
        password = raw_input("請輸入用戶的密碼(輸入n=返回上級):")
        if password == 'n':
            return
        dic = {
            'hostname':hostname,    # 主機名
            'port':port,            # 端口
            'username':username,    # 用戶名
            'password':password,    # 密碼
            'status':0              # 狀態(0:未激活 1:已激活 2:激活失敗)
        }
        if os.path.isdir(GET_PATH + hostname) == False:
            command = 'mkdir ' + GET_PATH + hostname
            os.system(command)
        re = hostmessage_Write(dic)
        if re == True:
            return
        else:
            print ("主機信息存儲失敗,請檢查原因!")



def delete_Host():
    List = Traverse_folder()
    while True:
        dic = {}
        num = 0
        print ("已存在的主機列表如下:")
        for i in List:
            print ("{0},主機名:{1}".format(str(num+1),i))
            dic[str(num+1)] = i
            num += 1
        choose = raw_input("請輸入你想刪除的主機索引(輸入n=返回上級):")
        if choose == 'n':
            return
        elif choose in dic:
            hostname = dic[choose]
            command = 'rm -f '+CONF_PATH+hostname+'_conf'
            os.system(command)
            print ("刪除成功!")
            break
        else:
            print ("您的輸入有誤!")



def auto_activeHost():
    text = """
            警告!程序准備開啟多線程模式激活主機,請確保:
            1,遠程服務器處於開啟狀態
            2,DNS或本地hosts映射能夠解析遠程服務器主機名
    """
    while True:
        print (text)
        choose = raw_input("是否確定開始激活遠程主機(y/n)?:")
        if choose == 'n':
            return
        elif choose == 'y':
            break
        else:
            print ("你的輸入有誤!")
    print ("程序開始自動激活遠程主機,請稍后...")
    List = Traverse_folder()
    if len(List) == 0:
        print ("請先創建主機!")
        return
    for i in List:
        dic = hostmessage_Read(i)
        t = threading.Thread(target=auto_Active,args=(dic,)) # 創建多線程對象
        t.setDaemon(True)  # 將對象設置為守護線程
        t.start() # 線程開啟
    while threading.activeCount() != 1:    # 當前活躍的線程數
        pass
    else:
        print ("所有主機激活完畢!")
        time.sleep(2)


def auto_Active(dic):
    ssh = paramiko.SSHClient()  # 創建ssh對象
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #允許連接不在know_hosts文件中的主機
    try:
        ssh.connect(hostname=dic['hostname'],port=int(dic['port']),username=dic['username'],password=dic['password'],timeout=10)
    except Exception,e:
        print ("主機名:{0}的主機激活失敗!失敗原因:{1}".format(dic['hostname'],e))
        dic['status'] = 2
        hostmessage_Write(dic)
    else:
        print ("主機名:{0}的主機激活成功!".format(dic['hostname']))
        dic['status'] = 1
        hostmessage_Write(dic)
    finally:
        ssh.close()


def remote_Host():
    List = Traverse_folder(key='status',value=1)
    if len(List) == 0:
        print ("請先激活主機!")
        return
    while True:
        dic = {}
        print ("已激活主機如下:")
        num = 0
        for i in List:
            print ("{0},主機名:{1}".format(str(num+1),i))
            dic[str(num+1)] = i
            num += 1
        choose = raw_input("請輸入你想操控的主機的索引(可多選,n=返回上級):")
        if choose == 'n':
            return
        choose = list(set(choose)) #去重復
        if set(choose) & set(dic.keys()) == set(choose):
            LIST = []
            for i in choose:
                LIST.append(dic[i])
            remote_Host_control(LIST)
        else:
            print ("您的輸入有誤!")


def remote_Host_control(List):
    help = """
                    help幫助提示:
            1.程序的Folder目錄是本地文件目錄
            2,輸入put向遠程主機上傳文件
            3,輸入get向遠程主機下載文件
            4,輸入其他直接向遠程主機發布命令
    """
    while True:
        print ("正在操控{0}台主機,如下:".format(len(List)))
        for i in List:
            print ("主機名:{0}".format(i))
        command = raw_input("請輸入你想執行的命令(輸入n=返回上級,輸入help獲取幫助):>>")
        if command == 'n':
            return
        elif command == 'help':
            print help
        elif command == 'get':
            print ("程序准備下載文件...")
            remote_path = raw_input("請輸入想要下載的遠程服務器文件絕對路徑(例如:/etc/hosts):")
            LIST = remote_path.split('/')
            filename = LIST[len(LIST)-1]
            for i in List:
                local_path = GET_PATH + i + '/'+filename
                t = threading.Thread(target=get_Method,args=(i,[remote_path,local_path]))
                t.setDaemon(True)
                t.start()
            while threading.activeCount() != 1:
                pass
            else:
                print ("命令執行完畢!")
        elif command == 'put':
            print ("程序准備上傳文件...")
            while True:
                filename = raw_input("請輸入想上傳的文件的文件名:")
                if os.path.exists(PUT_PATH+filename) == False:
                    print ("文件沒有找到,請重新輸入!")
                    continue
                local_path= PUT_PATH+filename
                remote_path = raw_input("你想將文件保存到遠程服務器的哪里?(例如:/etc/):")
                remote_path = remote_path + '/' + filename
                for i in List:
                    t = threading.Thread(target=put_Method, args=(i, [local_path,remote_path]))
                    t.setDaemon(True)
                    t.start()
                while threading.activeCount() != 1:
                    pass
                else:
                    print ("命令執行完畢!")
                    break
        else:
            for i in List:
                t = threading.Thread(target=execute_Command,args=(i,command))
                t.setDaemon(True)
                t.start()
            while threading.activeCount() != 1:
                pass
            else:
                print ("命令執行完畢!")

def put_Method(hostname,Path):
    dic = hostmessage_Read(hostname)
    transport = paramiko.Transport((hostname, int(dic['port'])))
    try:
        transport.connect(username=dic['username'], password=dic['password'])
        sftp = paramiko.SFTPClient.from_transport(transport)
        sftp.put(Path[0], Path[1])
    except Exception, e:
        print ("主機名:{0},上傳失敗!錯誤原因:{1}".format(hostname, e))
    else:
        pass
    finally:
        transport.close()


def get_Method(hostname,Path):
    dic = hostmessage_Read(hostname)
    transport = paramiko.Transport((hostname,int(dic['port'])))
    try:
        transport.connect(username=dic['username'],password=dic['password'])
        sftp = paramiko.SFTPClient.from_transport(transport)
        sftp.get(Path[0],Path[1])
    except Exception,e:
        print ("主機名:{0},下載失敗!錯誤原因:{1}".format(hostname,e))
    else:
        pass
    finally:
        transport.close()

def execute_Command(hostname,command):
    dic = hostmessage_Read(hostname)
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        ssh.connect(hostname=hostname,port=int(dic['port']),username=dic['username'],password=dic['password'])
        command_list = command.strip().split(';')
    except Exception,e:
        semaphore.acquire()     # 線程鎖開啟
        print ("主機:{0}連接出現異常:{1}".format(hostname,e))
        semaphore.release()     # 線程鎖釋放
        return
    for com in command_list:
        stdin, stdout, stderr = ssh.exec_command(com)
        error = stderr.read()
        output = stdout.read()
        semaphore.acquire()     # 線程鎖開啟
        if len(error) != 0:
            print ("主機:{0} 執行{1}命令時出錯:{2}".format(hostname,com, error))
        if len(output) != 0:
            print ("主機:{0},執行{1}命令的結果如下:".format(hostname,com))
            print (output)
        semaphore.release()     # 線程鎖釋放




def Traverse_folder(key = None,value = None):
    '''
    根據條件遍歷某文件夾里的全部文件內容,
    找出符合條件的文件返回包含主機名的列表
    如果無條件,則返回包含所有主機名的列表
    :return:LIST
    '''
    LIST = []
    List = os.listdir(CONF_PATH)
    for i in List:
        if i == '__init__.py' or i == '__init__.pyc':
            continue
        else:
            with open(CONF_PATH+i,'r') as f:
                dic = pickle.load(f)
            if key != None:
                if dic[key] == value:
                    LIST.append(dic['hostname'])
            else:
                LIST.append(dic['hostname'])
    return LIST



def hostmessage_Write(dic):
    with open(CONF_PATH+dic['hostname']+'_conf','w') as f:
        pickle.dump(dic,f)
        return True


def hostmessage_Read(hostname):
    if os.path.exists(CONF_PATH+hostname+'_conf'):
        with open(CONF_PATH+hostname+'_conf','r') as f:
            dic = pickle.load(f)
            return dic



def Main():
    text = """
            歡迎來到Fabric主機管理界面
                1,創建主機
                2,刪除主機
                3,自動激活所有主機
                4,開始遠程操控
                5,退出程序
    """
    while True:
        print text
        choose = raw_input("請輸入你的選擇:")
        dic = {'1':new_Host,'2':delete_Host,'3':auto_activeHost,'4':remote_Host,'5':Exit}
        if choose in dic:
            dic[choose]()
        else:
            print ("你的輸入有誤!")



def Exit():
    print ("程序退出!")
    exit()







if __name__ == "__main__":
    Main()

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM