python ssh 執行shell命令


# -*- coding: utf-8 -*-

import paramiko import threading def run(host_ip, username, password, command): ssh = paramiko.SSHClient() try: ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(host_ip, 22, username, password) print('===================exec on [%s]=====================' % host_ip) stdin, stdout, stderr = ssh.exec_command(command, timeout=300) out = stdout.readlines()  for o in out: print (o.strip('\n')) except Exception as ex: print('error, host is [%s], msg is [%s]' % (host_ip, ex.message)) finally: ssh.close() if __name__ == '__main__': # 將需要批量執行命令的host ip地址填到這里
    # eg: host_ip_list = ['IP1', 'IP2']
 host_ip_list = ['147.116.20.19'] for _host_ip in host_ip_list: # 用戶名,密碼,執行的命令填到這里
        run(_host_ip, 'tzgame', 'tzgame@1234', 'df -h') run(_host_ip, 'tzgame', 'tzgame@1234', 'ping -c 5 220.181.38.148')

 

pycrypto,由於 paramiko 模塊內部依賴pycrypto,所以先下載安裝pycrypto pip3 install pycrypto pip3 install paramiko (1)基於用戶名和密碼的連接 import paramiko # 創建SSH對象
ssh = paramiko.SSHClient() # 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 連接服務器
ssh.connect(hostname='c1.salt.com', port=22, username='GSuser', password='123') # 執行命令
stdin, stdout, stderr = ssh.exec_command('ls') # 獲取命令結果
result = stdout.read() # 關閉連接
ssh.close() (2)基於公鑰秘鑰連接 import paramiko private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
# 創建SSH對象 ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務器 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)
# 執行命令 stdin, stdout, stderr = ssh.exec_command('df')
# 獲取命令結果 result = stdout.read()
# 關閉連接 ssh.close() SFTPClient:   用於連接遠程服務器並進行上傳下載功能。 (1)基於用戶名密碼上傳下載 import paramiko transport = paramiko.Transport(('hostname',22)) transport.connect(username='GSuser',password='123') sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path sftp.get('remove_path', 'local_path') transport.close() (2)基於公鑰秘鑰上傳下載 import paramiko private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') transport = paramiko.Transport(('hostname', 22)) transport.connect(username='GSuser', pkey=private_key ) sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py sftp.put('/tmp/location.py', '/tmp/test.py')
# 將remove_path 下載到本地 local_path sftp.get('remove_path', 'local_path') transport.close()

 

 

 

 

 

 

 

下面是多線程執行版本

#!/usr/bin/python #coding:utf-8
import threading import subprocess import os import sys sshport = 13131 log_path = 'update_log' output = {} def execute(s, ip, cmd, log_path_today): with s: cmd = '''ssh -p%s root@%s -n "%s" ''' % (sshport, ip, cmd) ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) output[ip] = ret.stdout.readlines() if __name__ == "__main__": if len(sys.argv) != 3: print "Usage: %s config.ini cmd" % sys.argv[0] sys.exit(1) if not os.path.isfile(sys.argv[1]): print "Usage: %s is not file!" % sys.argv[1] sys.exit(1) cmd = sys.argv[2] f = open(sys.argv[1],'r') list = f.readlines() f.close() today = datetime.date.today() log_path_today = '%s/%s' % (log_path,today) if not os.path.isdir(log_path_today): os.makedirs(log_path_today) threading_num = 100
    if threading_num > len(list): threading_num = len(list) s = threading.Semaphore(threading_num) for line in list: ip = line.strip() t = threading.Thread(target=execute,args=(s, ip,cmd,log_path_today)) t.setDaemon(True) t.start() main_thread = threading.currentThread() for t in threading.enumerate(): if t is main_thread: continue t.join() for ip,result in output.items(): print "%s: " % ip for line in result: print " %s" % line.strip() print "Done!"

 

以上腳本讀取兩個參數,第一個為存放IP的文本,第二個為shell命令

執行效果如下

 

 

# -*- coding:utf-8 -*-
import requests from requests.exceptions import RequestException import os, time import re from lxml import etree import threading lock = threading.Lock() def get_html(url):  response = requests.get(url, timeout=10) # print(response.status_code)
    try: if response.status_code == 200: # print(response.text)
            return response.text else: return None except RequestException: print("請求失敗") # return None


def parse_html(html_text):  html = etree.HTML(html_text) if len(html) > 0: img_src = html.xpath("//img[@class='photothumb lazy']/@data-original")  # 元素提取方法
        # print(img_src)
        return img_src else: print("解析頁面元素失敗") def get_image_pages(url):  html_text = get_html(url)  # 獲取搜索url響應內容
    # print(html_text)
    if html_text is not None: html = etree.HTML(html_text)  # 生成XPath解析對象
        last_page = html.xpath("//div[@class='pages']//a[last()]/@href")  # 提取最后一頁所在href鏈接
        print(last_page) if last_page: max_page = re.compile(r'(\d+)', re.S).search(last_page[0]).group()  # 使用正則表達式提取鏈接中的頁碼數字
            print(max_page) print(type(max_page)) return int(max_page)  # 將字符串頁碼轉為整數並返回
        else: print("暫無數據") return None else: print("查詢結果失敗") def get_all_image_url(page_number):  base_url = 'https://imgbin.com/free-png/naruto/' image_urls = [] x = 1  # 定義一個標識,用於給每個圖片url編號,從1遞增
    for i in range(1, page_number): url = base_url + str(i)  # 根據頁碼遍歷請求url
        try: html = get_html(url)  # 解析每個頁面的內容
            if html: data = parse_html(html)  # 提取頁面中的圖片url
                # print(data)
                # time.sleep(3)
                if data: for j in data: image_urls.append({ 'name': x, 'value': j }) x += 1  # 每提取一個圖片url,標識x增加1
        except RequestException as f: print("遇到錯誤:", f) continue
    # print(image_urls)
    return image_urls def get_image_content(url):     try: r = requests.get(url, timeout=15) if r.status_code == 200: return r.content return None except RequestException: return None def main(url, image_name):  semaphore.acquire() # 加鎖,限制線程數
    print('當前子線程: {}'.format(threading.current_thread().name)) save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
    try: file_path = '{0}/{1}.jpg'.format(save_path, image_name) if not os.path.exists(file_path):  # 判斷是否存在文件,不存在則爬取
            with open(file_path, 'wb') as f: f.write(get_image_content(url)) f.close() print('第{}個文件保存成功'.format(image_name)) else: print("第{}個文件已存在".format(image_name)) semaphore.release() # 解鎖imgbin-多線程-重寫run方法.py

    except FileNotFoundError as f: print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) print("報錯:", f) raise

    except TypeError as e: print("第{}個文件下載時遇到錯誤,url為:{}:".format(image_name, url)) print("報錯:", e) class MyThread(threading.Thread): """繼承Thread類重寫run方法創建新進程"""
    def __init__(self, func, args): """ :param func: run方法中要調用的函數名 :param args: func函數所需的參數 """ threading.Thread.__init__(self) self.func = func self.args = args def run(self): print('當前子線程: {}'.format(threading.current_thread().name)) self.func(self.args[0], self.args[1]) # 調用func函數
        # 因為這里的func函數其實是上述的main()函數,它需要2個參數;args傳入的是個參數元組,拆解開來傳入


if __name__ == '__main__': start = time.time() print('這是主線程:{}'.format(threading.current_thread().name)) urls = get_all_image_url(5)  # 獲取所有圖片url列表
    thread_list = []  # 定義一個列表,向里面追加線程
    semaphore = threading.BoundedSemaphore(5) # 或使用Semaphore方法
    for t in urls: # print(i)
 m = MyThread(main, (t["value"], t["name"]))  # 調用MyThread類,得到一個實例
 thread_list.append(m) for m in thread_list: m.start() # 調用start()方法,開始執行

    for m in thread_list: m.join() # 子線程調用join()方法,使主線程等待子線程運行完畢之后才退出
 end = time.time() print(end-start) # get_image_pages("https://imgbin.com/free-png/Naruto")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM