任務:通過python執行遠程Linux或者Windows系統中的腳本,並且接收其終端打印的返回值;
展示效果:兩個button按鈕:1.執行腳本; 2.查詢執行結果;
工具:1.針對Linux系統,使用paramiko模塊,通過ssh協議,不需要在服務器上安裝任何服務; 2.針對Windows系統,使用pywinrm模塊,通過winrm服務,需要在windows上開始winrm服務(默認是關閉的);
1.Linux系統
import paramiko import sys class ServerByPara(object): def __init__(self, cmd, host, user, password, system_name): self.cmd = cmd self.client = paramiko.SSHClient() self.host = host self.user = user self.pwd = password def exec_linux_cmd(self): data = '' self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect(hostname=self.host, username=self.user, password=self.pwd) stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True) for i in stdout.readlines(): data += i return data def run(self): self.exec_linux_cmd() print(data) if __name__ == '__main__': # server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]) server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat", "192.168.1.5", "username", "password") server_obj.run()
2.Windows系統
2.1 在以管理員身份運行的powershell中執行命令,winrm quickconfig或winrm qc 查看winrm是否開啟成功,如果提示未開始,輸入enter執行提示操作開始winrm;
2.2 將網絡設置為專用網絡;
2.3 執行一下兩個命令,允許其他機器鏈接:
winrm set winrm/config/service/auth '@{Basic="true"}'
winrm set winrm/config/service '@{AllowUnencrypted="true"}'
2.4 代碼:
import winrm s = winrm.Session("192.168.100.153", auth=("username", "password")) r = s.run_cmd(r"C:\Users\Administrator\Desktop\test.bat") print(r.std_out) data_init = "" for num, data in enumerate(r.std_out.decode()): if num != 0: data_init+=str(data) print(data_init)
3.綜合腳本 remote.py
""" paramiko, pywinrm實現windows/linux腳本調用 """ import paramiko import winrm import json class ServerByPara(object): def __init__(self, cmd, host, user, password, system_choice): self.cmd = cmd self.client = paramiko.SSHClient() self.host = host self.user = user self.pwd = password self.system_choice = system_choice def exec_linux_cmd(self): data_init = '' self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect(hostname=self.host, username=self.user, password=self.pwd) stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True) if stderr.readlines(): exec_tag = 1 for data in stdout.readlines(): data_init += data else: exec_tag = 0 for data in stdout.readlines(): data_init += data return json.dumps({ "exec_tag": exec_tag, "data": data_init, }, ensure_ascii=False) def exec_win_cmd(self): data_init = "" s = winrm.Session(self.host, auth=(self.user, self.pwd)) ret = s.run_cmd(self.cmd) if ret.std_err.decode(): exec_tag = 1 for data in ret.std_err.decode().split("\r\n"): data_init += data else: exec_tag = 0 for data in ret.std_out.decode().split("\r\n"): data_init += data return json.dumps({ "exec_tag": exec_tag, "data": data_init, }, ensure_ascii=False) def run(self): if self.system_choice == "Linux": result = self.exec_linux_cmd() else: result = self.exec_win_cmd() print(result) with open(r"script_info.txt", "w") as f: f.write(result) # if __name__ == '__main__': # server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]) # server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat", "192.168.12.149", "a", # "password", "Windows") # server_obj = ServerByPara(r"/root/Desktop/test.sh >> log.txt", "192.168.109.132", "root", # "password", "Linux") # server_obj.run()
4.views視圖中調用remote.py中的類,並且實例化,執行run()方法
import remote cmd = r"C:\Users\a\Desktop\test.bat param01 param02" ip = "192.168.12.149" username = "a" password = "password" system_choice = "Windows" obj = remote.ServerByPara(cmd, ip, username, password, system_choice) obj.run()
5.使用分布式任務隊列celery或者django第三方工具djcelery,避免腳本長久執行導致的頁面不刷新
這里使用djcelery;
執行腳本可以將返回值寫入日志文件或者其他任何操作,以后自己讀取;
下面講述djcelery的配置,實現定時任務,實現分布式任務,開啟與關閉任務:
5.1 djcelery的配置
項目目錄下新建py文件: celery.py

from __future__ import absolute_import import os from celery import Celery from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_test.settings') app = Celery('my_test') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
項目目錄下__init__.py文件中寫入一下代碼:
其中:from __future__ import absolute_import 可以避免celery.py與第三方模塊沖突;

from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)
settings.py文件中做以下主要配置(使用djcelery自帶的隊列,如果使用redis,rabbitmq等其他隊列,需做其他配置):
下面使用django數據庫表來充當broker,如果在生產階段,可能需要使用flower來監控任務,則不能使用數據庫,只能使用其他隊列:BROKER_URL = 'redis://127.0.0.1:6379/0'
運行flower:python manage.py celery -A TSDRM worker flower
查看任務情況,訪問:127.0.0.1:5555

import djcelery djcelery.setup_loader() BROKER_URL = 'django://' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
下面是celery做的消息隊列配置:
celery中隊列分配CELERY_DEFAULT_QUEUE = "default_dongwm" # 默認的隊列,如果一個消息不符合其他的隊列就會放在默認隊列里面 CELERY_QUEUES = { "default_dongwm": { # 這是上面指定的默認隊列 "exchange": "default_dongwm", "exchange_type": "direct", "routing_key": "default_dongwm" }, "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列 "routing_key": "topictest.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "test2": { # test和test2是2個fanout隊列,注意他們的exchange相同 "exchange": "broadcast_tasks", "exchange_type": "fanout", "binding_key": "broadcast_tasks", }, "test": { "exchange": "broadcast_tasks", "exchange_type": "fanout", "binding_key": "broadcast_tasks2", }, } class MyRouter(object): def route_for_task(self, task, args=None, kwargs=None): if task.startswith('topictest'): return { 'queue': 'topicqueue', } # 我的dongwm.tasks文件里面有2個任務都是test開頭 elif task.startswith('dongwm.tasks.test'): return { "exchange": "broadcast_tasks", } # 剩下的其實就會被放到默認隊列 else: return None # CELERY_ROUTES本來也可以用一個大的含有多個字典的字典,但是不如直接對它做一個名稱統配 CELERY_ROUTES = (MyRouter(), )
消息隊列及celery時區/數據格式# BROKER_URL = 'redis://127.0.0.1:6379/0' # CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' # BROKER_URL = 'amqp://root:password@localhost:5672/myvhost' CELERY_TIMEZONE = 'Asia/Shanghai' # 時區 CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'
5.2 在admin創建定時(間隔 / 每)時間
1.數據庫表遷移出djcelery相關的表: python3 manage.py migrate;
2.在Crontabs表中添加每分鍾,每天等定時任務,或者在Intervals表中添加間隔時間;
3.在Periodic tasks中添加任務,指定已經注冊的task,並選擇intervals或者crontabs作為定時形式;
5.1 tasks.py 中添加執行腳本的任務
from cloud import remote import json from djcelery import models as celery_models # 通過修改表PeriodicTask中enabled字段來開啟關閉定時任務 from celery import share_task # task裝飾器在后面版本被啟用
@share_task def exec_script(): cmd = r"/root/Desktop/test.sh" ip = "192.168.109.132" username = "root" password = "password" system_choice = "Linux" obj = remote.ServerByPara(cmd, ip, username, password, system_choice) obj.run() @shared_task # @share_task裝飾器表示添加定時任務,有些博客中使用的是@task在后面的版本中被啟用了。 def get_script_info(): with open(r"script_info.txt", "r") as f: ret = f.read() print(ret) if ret != "": try: task = celery_models.PeriodicTask.objects.get(name="get_script_info") task.enabled = False task.save() except celery_models.PeriodicTask.DoesNotExist as e: print(e) ret = json.loads(ret) # 將執行結果寫入數據庫中 print(ret["exec_tag"]) if int(ret["exec_tag"]) == 0: with open('result_info.txt', "w") as f: f.write(ret["data"]) print("腳本執行成功!") print("data: ", ret["data"]) else: print("腳本執行失敗!請重新執行!") print("data: ", ret["data"]) # 清空文件 with open(r"script_info.txt", "w") as f: f.write("")
5.3 視圖中調用任務/關閉定時任務 views.py
from cloudDR.celery import exec_script from djcelery import models as celery_models def index(request): if request.user.is_authenticated(): exec_script.delay() # celery.py 中 ip等參數可以通過delay傳入; # 開啟定時任務 try: task = celery_models.PeriodicTask.objects.get(name="get_script_info") task.enabled = True task.save() except celery_models.PeriodicTask.DoesNotExist as e: print(e)
5.4 開啟celery,開啟beat監控
celery,beat一直開啟,只需要通過代碼開啟關閉任務;
如果將info改成debug反應就比較慢,可能卡主不動;
$ celery -A project_name worker -l info $ celery -A project_name beat -l info--max-interval=5 # 表示每2秒查看/刷新一下任務列表,添加新的,剔除舊的
6.實際項目
要求:異步任務完成多個腳本執行,每個腳本執行完之后根據返回信息執行第二個腳本,並且將當前腳本順序寫入數據庫;
當刷新前端頁面時,查詢數據庫,顯示當前腳本執行所在進度;
# views.py from django.shortcuts import render from django.http import JsonResponse from .tasks import exec_script from . import models def index(request): # 查詢數據庫 try: query_obj = models.Tags.objects.filter(id=1)[0] tag = query_obj.name print("tag", tag) except: tag = "before" return render(request, "index.html", {"tag": tag}) def change_button(request): # 循環執行多次腳本,接受上一次腳本的返回值 cmd_list = [r"/root/Desktop/test01.sh", r"/root/Desktop/test02.sh", r"/root/Desktop/test03.sh"] ip = "192.168.109.132" username = "root" password = "password" system_choice = "Linux" exec_script.delay(cmd_list, ip, username, password, system_choice) return JsonResponse({"result": "腳本跑起來啦!請刷新頁面查看進度!"})
<!-- index.html --> {% load staticfiles %} <html> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0"> <meta http-equiv="X-UA-Compatible" content="ie=edge"> <title>首頁</title> <script src='{% static "plugins\jquery.min.js"%}'></script> <link rel="stylesheet" href="{% static 'plugins\bootstrap\css\bootstrap.min.css' %}"> <script src="{% static 'plugins\bootstrap\js\bootstrap.min.js' %}"></script> <style> .container { display: table; height: 100%; } .row { display: table-cell; vertical-align: middle; } </style> </head> <body> <div class="container"> <div class="row" align="center"> <button id="exec_script">執行腳本</button> <br> {% if not tag == "before" and not tag == "done"%} <div class="progress" style="width:300px;"> <span>流程進度</span> <input id="tag" type="hidden" value="{{ tag }}"> <div class="progress-bar" role="progressbar" aria-valuenow="60" aria-valuemin="0" aria-valuemax="100" style="width: 10%;"> <span class="sr-only">40% 完成</span> </div> </div> {% endif %} </div> </div> {% csrf_token %} <script> $("#exec_script").on("click", function () { var csrfToken = $("[name='csrfmiddlewaretoken']").val(); $.ajax({ url: "/change_button/", type: "post", data: {"name": name, "csrfmiddlewaretoken": csrfToken}, success: function (data) { alert(data["result"]); $("#exec_script").attr("disabled", true).attr("style", "color:red;") } }) }); $(".progress-bar").attr("style", "width: 10%;".replace("10", $("#tag").val() * 10)); // during execute if ($("#tag").val() != "before" && $("#tag").val() != "done" ) { $("#exec_script").attr("disabled", true).attr("style", "color:red;"); } // before execute if ($("#tag").val() == undefined) { $("#exec_script").attr("disabled", false).removeAttr("style", "color:red;"); } // after execute // if ($("#tag").val() == "done"){ // alert("流程執行完成!"); // // 將數據庫中tag改成before // $("#exec_script").attr("disabled", false).removeAttr("style", "color:red;"); // } </script> </body> </html>
# task.py from __future__ import absolute_import from celery import task from .scripts import remote from . import models import time @task # @task裝飾器表示添加分布式任務 def exec_script(cmd_list, ip, username, password, system_choice): for num, cmd in enumerate(cmd_list): print("當前執行腳本位置:", num+1) obj = remote.ServerByPara(cmd, ip, username, password, system_choice) result = obj.run() if result["exec_tag"] == 0: # 表示當前腳本運行成功,在數據庫中寫入當前所在流程 location_tag = num+1 try: query_obj = models.Tags.objects.filter(id=1)[0] except: query_obj = models.Tags() query_obj.name = location_tag query_obj.save()
# 完成所有腳本執行 if num == len(cmd_list)-1: try: query_obj = models.Tags.objects.filter(id=1)[0] except: query_obj = models.Tags() query_obj.name = "done" query_obj.save()
# 模擬腳本所執行時長 time.sleep(5)

""" paramiko, pywinrm實現windows/linux腳本調用 """ import paramiko import sys import winrm class ServerByPara(object): def __init__(self, cmd, host, user, password, system_choice): self.cmd = cmd self.client = paramiko.SSHClient() self.host = host self.user = user self.pwd = password self.system_choice = system_choice def exec_linux_cmd(self): data_init = '' self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect(hostname=self.host, username=self.user, password=self.pwd) stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True) for i in stdout.readlines(): data_init += i return data_init def exec_win_cmd(self): s = winrm.Session(self.host, auth=(self.user, self.pwd)) ret = s.run_cmd(self.cmd) data_init = "" for num, data in enumerate(ret.std_out.decode().split("\r\n")): if ">" not in data: data_init += data return data_init def run(self): if self.system_choice == "Linux": result = self.exec_linux_cmd() else: result = self.exec_win_cmd() print(result) return result if __name__ == '__main__': pass # server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5]) # server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat a b", "192.168.12.149", "a", "password", "Windows") # server_obj.run()
7.shell/bat腳本接收命令返回值存至變量

@echo off for /f "delims=" %%i in ('C:\User\Administrator\Desktop\python_exe.py') do (set a=%%i) echo %a%

python_result=`/root/Desktop/python_exe.py`
echo $python_result