任务:通过python执行远程Linux或者Windows系统中的脚本,并且接收其终端打印的返回值;
展示效果:两个button按钮:1.执行脚本; 2.查询执行结果;
工具:1.针对Linux系统,使用paramiko模块,通过ssh协议,不需要在服务器上安装任何服务; 2.针对Windows系统,使用pywinrm模块,通过winrm服务,需要在windows上开始winrm服务(默认是关闭的);
1.Linux系统
import paramiko import sys class ServerByPara(object): def __init__(self, cmd, host, user, password, system_name): self.cmd = cmd self.client = paramiko.SSHClient() self.host = host self.user = user self.pwd = password def exec_linux_cmd(self): data = '' self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect(hostname=self.host, username=self.user, password=self.pwd) stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True) for i in stdout.readlines(): data += i return data def run(self): self.exec_linux_cmd() print(data) if __name__ == '__main__': # server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]) server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat", "192.168.1.5", "username", "password") server_obj.run()
2.Windows系统
2.1 在以管理员身份运行的powershell中执行命令,winrm quickconfig或winrm qc 查看winrm是否开启成功,如果提示未开始,输入enter执行提示操作开始winrm;
2.2 将网络设置为专用网络;
2.3 执行一下两个命令,允许其他机器链接:
winrm set winrm/config/service/auth '@{Basic="true"}'
winrm set winrm/config/service '@{AllowUnencrypted="true"}'
2.4 代码:
import winrm s = winrm.Session("192.168.100.153", auth=("username", "password")) r = s.run_cmd(r"C:\Users\Administrator\Desktop\test.bat") print(r.std_out) data_init = "" for num, data in enumerate(r.std_out.decode()): if num != 0: data_init+=str(data) print(data_init)
3.综合脚本 remote.py
""" paramiko, pywinrm实现windows/linux脚本调用 """ import paramiko import winrm import json class ServerByPara(object): def __init__(self, cmd, host, user, password, system_choice): self.cmd = cmd self.client = paramiko.SSHClient() self.host = host self.user = user self.pwd = password self.system_choice = system_choice def exec_linux_cmd(self): data_init = '' self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect(hostname=self.host, username=self.user, password=self.pwd) stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True) if stderr.readlines(): exec_tag = 1 for data in stdout.readlines(): data_init += data else: exec_tag = 0 for data in stdout.readlines(): data_init += data return json.dumps({ "exec_tag": exec_tag, "data": data_init, }, ensure_ascii=False) def exec_win_cmd(self): data_init = "" s = winrm.Session(self.host, auth=(self.user, self.pwd)) ret = s.run_cmd(self.cmd) if ret.std_err.decode(): exec_tag = 1 for data in ret.std_err.decode().split("\r\n"): data_init += data else: exec_tag = 0 for data in ret.std_out.decode().split("\r\n"): data_init += data return json.dumps({ "exec_tag": exec_tag, "data": data_init, }, ensure_ascii=False) def run(self): if self.system_choice == "Linux": result = self.exec_linux_cmd() else: result = self.exec_win_cmd() print(result) with open(r"script_info.txt", "w") as f: f.write(result) # if __name__ == '__main__': # server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4]) # server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat", "192.168.12.149", "a", # "password", "Windows") # server_obj = ServerByPara(r"/root/Desktop/test.sh >> log.txt", "192.168.109.132", "root", # "password", "Linux") # server_obj.run()
4.views视图中调用remote.py中的类,并且实例化,执行run()方法
import remote cmd = r"C:\Users\a\Desktop\test.bat param01 param02" ip = "192.168.12.149" username = "a" password = "password" system_choice = "Windows" obj = remote.ServerByPara(cmd, ip, username, password, system_choice) obj.run()
5.使用分布式任务队列celery或者django第三方工具djcelery,避免脚本长久执行导致的页面不刷新
这里使用djcelery;
执行脚本可以将返回值写入日志文件或者其他任何操作,以后自己读取;
下面讲述djcelery的配置,实现定时任务,实现分布式任务,开启与关闭任务:
5.1 djcelery的配置
项目目录下新建py文件: celery.py

from __future__ import absolute_import import os from celery import Celery from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_test.settings') app = Celery('my_test') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
项目目录下__init__.py文件中写入一下代码:
其中:from __future__ import absolute_import 可以避免celery.py与第三方模块冲突;

from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)
settings.py文件中做以下主要配置(使用djcelery自带的队列,如果使用redis,rabbitmq等其他队列,需做其他配置):
下面使用django数据库表来充当broker,如果在生产阶段,可能需要使用flower来监控任务,则不能使用数据库,只能使用其他队列:BROKER_URL = 'redis://127.0.0.1:6379/0'
运行flower:python manage.py celery -A TSDRM worker flower
查看任务情况,访问:127.0.0.1:5555

import djcelery djcelery.setup_loader() BROKER_URL = 'django://' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
下面是celery做的消息队列配置:
celery中队列分配CELERY_DEFAULT_QUEUE = "default_dongwm" # 默认的队列,如果一个消息不符合其他的队列就会放在默认队列里面 CELERY_QUEUES = { "default_dongwm": { # 这是上面指定的默认队列 "exchange": "default_dongwm", "exchange_type": "direct", "routing_key": "default_dongwm" }, "topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列 "routing_key": "topictest.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "test2": { # test和test2是2个fanout队列,注意他们的exchange相同 "exchange": "broadcast_tasks", "exchange_type": "fanout", "binding_key": "broadcast_tasks", }, "test": { "exchange": "broadcast_tasks", "exchange_type": "fanout", "binding_key": "broadcast_tasks2", }, } class MyRouter(object): def route_for_task(self, task, args=None, kwargs=None): if task.startswith('topictest'): return { 'queue': 'topicqueue', } # 我的dongwm.tasks文件里面有2个任务都是test开头 elif task.startswith('dongwm.tasks.test'): return { "exchange": "broadcast_tasks", } # 剩下的其实就会被放到默认队列 else: return None # CELERY_ROUTES本来也可以用一个大的含有多个字典的字典,但是不如直接对它做一个名称统配 CELERY_ROUTES = (MyRouter(), )
消息队列及celery时区/数据格式# BROKER_URL = 'redis://127.0.0.1:6379/0' # CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' # BROKER_URL = 'amqp://root:password@localhost:5672/myvhost' CELERY_TIMEZONE = 'Asia/Shanghai' # 时区 CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'
5.2 在admin创建定时(间隔 / 每)时间
1.数据库表迁移出djcelery相关的表: python3 manage.py migrate;
2.在Crontabs表中添加每分钟,每天等定时任务,或者在Intervals表中添加间隔时间;
3.在Periodic tasks中添加任务,指定已经注册的task,并选择intervals或者crontabs作为定时形式;
5.1 tasks.py 中添加执行脚本的任务
from cloud import remote import json from djcelery import models as celery_models # 通过修改表PeriodicTask中enabled字段来开启关闭定时任务 from celery import share_task # task装饰器在后面版本被启用
@share_task def exec_script(): cmd = r"/root/Desktop/test.sh" ip = "192.168.109.132" username = "root" password = "password" system_choice = "Linux" obj = remote.ServerByPara(cmd, ip, username, password, system_choice) obj.run() @shared_task # @share_task装饰器表示添加定时任务,有些博客中使用的是@task在后面的版本中被启用了。 def get_script_info(): with open(r"script_info.txt", "r") as f: ret = f.read() print(ret) if ret != "": try: task = celery_models.PeriodicTask.objects.get(name="get_script_info") task.enabled = False task.save() except celery_models.PeriodicTask.DoesNotExist as e: print(e) ret = json.loads(ret) # 将执行结果写入数据库中 print(ret["exec_tag"]) if int(ret["exec_tag"]) == 0: with open('result_info.txt', "w") as f: f.write(ret["data"]) print("脚本执行成功!") print("data: ", ret["data"]) else: print("脚本执行失败!请重新执行!") print("data: ", ret["data"]) # 清空文件 with open(r"script_info.txt", "w") as f: f.write("")
5.3 视图中调用任务/关闭定时任务 views.py
from cloudDR.celery import exec_script from djcelery import models as celery_models def index(request): if request.user.is_authenticated(): exec_script.delay() # celery.py 中 ip等参数可以通过delay传入; # 开启定时任务 try: task = celery_models.PeriodicTask.objects.get(name="get_script_info") task.enabled = True task.save() except celery_models.PeriodicTask.DoesNotExist as e: print(e)
5.4 开启celery,开启beat监控
celery,beat一直开启,只需要通过代码开启关闭任务;
如果将info改成debug反应就比较慢,可能卡主不动;
$ celery -A project_name worker -l info $ celery -A project_name beat -l info--max-interval=5 # 表示每2秒查看/刷新一下任务列表,添加新的,剔除旧的
6.实际项目
要求:异步任务完成多个脚本执行,每个脚本执行完之后根据返回信息执行第二个脚本,并且将当前脚本顺序写入数据库;
当刷新前端页面时,查询数据库,显示当前脚本执行所在进度;
# views.py from django.shortcuts import render from django.http import JsonResponse from .tasks import exec_script from . import models def index(request): # 查询数据库 try: query_obj = models.Tags.objects.filter(id=1)[0] tag = query_obj.name print("tag", tag) except: tag = "before" return render(request, "index.html", {"tag": tag}) def change_button(request): # 循环执行多次脚本,接受上一次脚本的返回值 cmd_list = [r"/root/Desktop/test01.sh", r"/root/Desktop/test02.sh", r"/root/Desktop/test03.sh"] ip = "192.168.109.132" username = "root" password = "password" system_choice = "Linux" exec_script.delay(cmd_list, ip, username, password, system_choice) return JsonResponse({"result": "脚本跑起来啦!请刷新页面查看进度!"})
<!-- index.html --> {% load staticfiles %} <html> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0"> <meta http-equiv="X-UA-Compatible" content="ie=edge"> <title>首页</title> <script src='{% static "plugins\jquery.min.js"%}'></script> <link rel="stylesheet" href="{% static 'plugins\bootstrap\css\bootstrap.min.css' %}"> <script src="{% static 'plugins\bootstrap\js\bootstrap.min.js' %}"></script> <style> .container { display: table; height: 100%; } .row { display: table-cell; vertical-align: middle; } </style> </head> <body> <div class="container"> <div class="row" align="center"> <button id="exec_script">执行脚本</button> <br> {% if not tag == "before" and not tag == "done"%} <div class="progress" style="width:300px;"> <span>流程进度</span> <input id="tag" type="hidden" value="{{ tag }}"> <div class="progress-bar" role="progressbar" aria-valuenow="60" aria-valuemin="0" aria-valuemax="100" style="width: 10%;"> <span class="sr-only">40% 完成</span> </div> </div> {% endif %} </div> </div> {% csrf_token %} <script> $("#exec_script").on("click", function () { var csrfToken = $("[name='csrfmiddlewaretoken']").val(); $.ajax({ url: "/change_button/", type: "post", data: {"name": name, "csrfmiddlewaretoken": csrfToken}, success: function (data) { alert(data["result"]); $("#exec_script").attr("disabled", true).attr("style", "color:red;") } }) }); $(".progress-bar").attr("style", "width: 10%;".replace("10", $("#tag").val() * 10)); // during execute if ($("#tag").val() != "before" && $("#tag").val() != "done" ) { $("#exec_script").attr("disabled", true).attr("style", "color:red;"); } // before execute if ($("#tag").val() == undefined) { $("#exec_script").attr("disabled", false).removeAttr("style", "color:red;"); } // after execute // if ($("#tag").val() == "done"){ // alert("流程执行完成!"); // // 将数据库中tag改成before // $("#exec_script").attr("disabled", false).removeAttr("style", "color:red;"); // } </script> </body> </html>
# task.py from __future__ import absolute_import from celery import task from .scripts import remote from . import models import time @task # @task装饰器表示添加分布式任务 def exec_script(cmd_list, ip, username, password, system_choice): for num, cmd in enumerate(cmd_list): print("当前执行脚本位置:", num+1) obj = remote.ServerByPara(cmd, ip, username, password, system_choice) result = obj.run() if result["exec_tag"] == 0: # 表示当前脚本运行成功,在数据库中写入当前所在流程 location_tag = num+1 try: query_obj = models.Tags.objects.filter(id=1)[0] except: query_obj = models.Tags() query_obj.name = location_tag query_obj.save()
# 完成所有脚本执行 if num == len(cmd_list)-1: try: query_obj = models.Tags.objects.filter(id=1)[0] except: query_obj = models.Tags() query_obj.name = "done" query_obj.save()
# 模拟脚本所执行时长 time.sleep(5)

""" paramiko, pywinrm实现windows/linux脚本调用 """ import paramiko import sys import winrm class ServerByPara(object): def __init__(self, cmd, host, user, password, system_choice): self.cmd = cmd self.client = paramiko.SSHClient() self.host = host self.user = user self.pwd = password self.system_choice = system_choice def exec_linux_cmd(self): data_init = '' self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.client.connect(hostname=self.host, username=self.user, password=self.pwd) stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True) for i in stdout.readlines(): data_init += i return data_init def exec_win_cmd(self): s = winrm.Session(self.host, auth=(self.user, self.pwd)) ret = s.run_cmd(self.cmd) data_init = "" for num, data in enumerate(ret.std_out.decode().split("\r\n")): if ">" not in data: data_init += data return data_init def run(self): if self.system_choice == "Linux": result = self.exec_linux_cmd() else: result = self.exec_win_cmd() print(result) return result if __name__ == '__main__': pass # server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5]) # server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat a b", "192.168.12.149", "a", "password", "Windows") # server_obj.run()
7.shell/bat脚本接收命令返回值存至变量

@echo off for /f "delims=" %%i in ('C:\User\Administrator\Desktop\python_exe.py') do (set a=%%i) echo %a%

python_result=`/root/Desktop/python_exe.py`
echo $python_result