远程异步执行Linux/Windows平台上的可执行脚本 | paramiko/pywinrm | djcelery | Python


 

任务:通过python执行远程Linux或者Windows系统中的脚本,并且接收其终端打印的返回值;
展示效果:两个button按钮:1.执行脚本; 2.查询执行结果; 
工具:1.针对Linux系统,使用paramiko模块,通过ssh协议,不需要在服务器上安装任何服务; 2.针对Windows系统,使用pywinrm模块,通过winrm服务,需要在windows上开始winrm服务(默认是关闭的);

 

1.Linux系统

import paramiko
import sys


class ServerByPara(object):
    def __init__(self, cmd, host, user, password, system_name):
        self.cmd = cmd
        self.client = paramiko.SSHClient()
        self.host = host
        self.user = user
        self.pwd = password

    def exec_linux_cmd(self):
        data = ''
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.connect(hostname=self.host, username=self.user, password=self.pwd)
        stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True)
        for i in stdout.readlines():
            data += i
        return data

    def run(self):
        self.exec_linux_cmd()
        print(data)



if __name__ == '__main__':
    # server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])
    server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat", "192.168.1.5", "username", "password")
    server_obj.run()

 

2.Windows系统

  2.1 在以管理员身份运行的powershell中执行命令,winrm quickconfig或winrm qc 查看winrm是否开启成功,如果提示未开始,输入enter执行提示操作开始winrm;

  2.2 将网络设置为专用网络;

  2.3 执行一下两个命令,允许其他机器链接:

    winrm set winrm/config/service/auth '@{Basic="true"}'

    winrm set winrm/config/service '@{AllowUnencrypted="true"}'

  2.4 代码:

import winrm


s = winrm.Session("192.168.100.153", auth=("username", "password"))
r = s.run_cmd(r"C:\Users\Administrator\Desktop\test.bat")
print(r.std_out)
data_init = ""
for num, data in enumerate(r.std_out.decode()):
    if num != 0:
        data_init+=str(data)
print(data_init)

 

3.综合脚本 remote.py

 

"""
paramiko, pywinrm实现windows/linux脚本调用
"""
import paramiko
import winrm
import json


class ServerByPara(object):
    def __init__(self, cmd, host, user, password, system_choice):
        self.cmd = cmd
        self.client = paramiko.SSHClient()
        self.host = host
        self.user = user
        self.pwd = password
        self.system_choice = system_choice

    def exec_linux_cmd(self):
        data_init = ''
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.connect(hostname=self.host, username=self.user, password=self.pwd)
        stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True)
        if stderr.readlines():
            exec_tag = 1
            for data in stdout.readlines():
                data_init += data
        else:
            exec_tag = 0
            for data in stdout.readlines():
                data_init += data
        return json.dumps({
            "exec_tag": exec_tag,
            "data": data_init,
        }, ensure_ascii=False)

    def exec_win_cmd(self):
        data_init = ""
        s = winrm.Session(self.host, auth=(self.user, self.pwd))
        ret = s.run_cmd(self.cmd)
        if ret.std_err.decode():
            exec_tag = 1
            for data in ret.std_err.decode().split("\r\n"):
                data_init += data
        else:
            exec_tag = 0
            for data in ret.std_out.decode().split("\r\n"):
                data_init += data
        return json.dumps({
            "exec_tag": exec_tag,
            "data": data_init,
        }, ensure_ascii=False)

    def run(self):
        if self.system_choice == "Linux":
            result = self.exec_linux_cmd()
        else:
            result = self.exec_win_cmd()
        print(result)
        with open(r"script_info.txt", "w") as f:
            f.write(result)

# if __name__ == '__main__':
# server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])
# server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat", "192.168.12.149", "a",
#                           "password", "Windows")
# server_obj = ServerByPara(r"/root/Desktop/test.sh >> log.txt", "192.168.109.132", "root",
#                           "password", "Linux")
# server_obj.run()

 

 

4.views视图中调用remote.py中的类,并且实例化,执行run()方法

import remote


cmd = r"C:\Users\a\Desktop\test.bat param01 param02"
ip = "192.168.12.149"
username = "a"
password = "password"
system_choice = "Windows"
obj = remote.ServerByPara(cmd, ip, username, password, system_choice)
obj.run()

 

5.使用分布式任务队列celery或者django第三方工具djcelery,避免脚本长久执行导致的页面不刷新

这里使用djcelery;

执行脚本可以将返回值写入日志文件或者其他任何操作,以后自己读取;

下面讲述djcelery的配置,实现定时任务,实现分布式任务,开启与关闭任务:

  5.1 djcelery的配置

  项目目录下新建py文件: celery.py

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_test.settings')

app = Celery('my_test')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
celery.py

  项目目录下__init__.py文件中写入一下代码:

  其中:from __future__ import absolute_import  可以避免celery.py与第三方模块冲突;

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)
__init__.py

  settings.py文件中做以下主要配置(使用djcelery自带的队列,如果使用redis,rabbitmq等其他队列,需做其他配置):

  下面使用django数据库表来充当broker,如果在生产阶段,可能需要使用flower来监控任务,则不能使用数据库,只能使用其他队列:BROKER_URL = 'redis://127.0.0.1:6379/0'

  运行flower:python manage.py celery -A TSDRM worker flower

  查看任务情况,访问:127.0.0.1:5555

import djcelery

djcelery.setup_loader()
BROKER_URL = 'django://'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
settings.py

   下面是celery做的消息队列配置:

CELERY_DEFAULT_QUEUE = "default_dongwm" # 默认的队列,如果一个消息不符合其他的队列就会放在默认队列里面

CELERY_QUEUES = {
    "default_dongwm": { # 这是上面指定的默认队列
        "exchange": "default_dongwm",
        "exchange_type": "direct",
        "routing_key": "default_dongwm"
    },
    "topicqueue": { # 这是一个topic队列 凡是topictest开头的routing key都会被放到这个队列
        "routing_key": "topictest.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "test2": { # test和test2是2个fanout队列,注意他们的exchange相同
        "exchange": "broadcast_tasks",
        "exchange_type": "fanout",
        "binding_key": "broadcast_tasks",
    },
    "test": {
        "exchange": "broadcast_tasks",
        "exchange_type": "fanout",
        "binding_key": "broadcast_tasks2",
    },
}

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):

        if task.startswith('topictest'):
            return {
                'queue': 'topicqueue',
            }
        # 我的dongwm.tasks文件里面有2个任务都是test开头
        elif task.startswith('dongwm.tasks.test'):
            return {
                "exchange": "broadcast_tasks",
            }
        # 剩下的其实就会被放到默认队列
        else:
            return None

# CELERY_ROUTES本来也可以用一个大的含有多个字典的字典,但是不如直接对它做一个名称统配
CELERY_ROUTES = (MyRouter(), )
celery中队列分配
# BROKER_URL = 'redis://127.0.0.1:6379/0'
# CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# BROKER_URL = 'amqp://root:password@localhost:5672/myvhost'

CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
消息队列及celery时区/数据格式

 

 

  5.2 在admin创建定时(间隔 / 每)时间

1.数据库表迁移出djcelery相关的表: python3 manage.py migrate;

2.在Crontabs表中添加每分钟,每天等定时任务,或者在Intervals表中添加间隔时间;

3.在Periodic tasks中添加任务,指定已经注册的task,并选择intervals或者crontabs作为定时形式;

       

  5.1 tasks.py 中添加执行脚本的任务

from cloud import remote
import json
from djcelery import models as celery_models  # 通过修改表PeriodicTask中enabled字段来开启关闭定时任务
from celery import share_task  # task装饰器在后面版本被启用
@share_task
def exec_script(): cmd = r"/root/Desktop/test.sh" ip = "192.168.109.132" username = "root" password = "password" system_choice = "Linux" obj = remote.ServerByPara(cmd, ip, username, password, system_choice) obj.run() @shared_task # @share_task装饰器表示添加定时任务,有些博客中使用的是@task在后面的版本中被启用了。 def get_script_info(): with open(r"script_info.txt", "r") as f: ret = f.read() print(ret) if ret != "": try: task = celery_models.PeriodicTask.objects.get(name="get_script_info") task.enabled = False task.save() except celery_models.PeriodicTask.DoesNotExist as e: print(e) ret = json.loads(ret) # 将执行结果写入数据库中 print(ret["exec_tag"]) if int(ret["exec_tag"]) == 0: with open('result_info.txt', "w") as f: f.write(ret["data"]) print("脚本执行成功!") print("data: ", ret["data"]) else: print("脚本执行失败!请重新执行!") print("data: ", ret["data"]) # 清空文件 with open(r"script_info.txt", "w") as f: f.write("")

 

  5.3 视图中调用任务/关闭定时任务  views.py

from cloudDR.celery import exec_script
from djcelery import models as celery_models

def index(request): 
    if request.user.is_authenticated(): 
        exec_script.delay()  # celery.py 中 ip等参数可以通过delay传入;
        # 开启定时任务
        try:
            task = celery_models.PeriodicTask.objects.get(name="get_script_info")
            task.enabled = True
            task.save()
        except celery_models.PeriodicTask.DoesNotExist as e:
            print(e)

 

  5.4 开启celery,开启beat监控

celery,beat一直开启,只需要通过代码开启关闭任务;

如果将info改成debug反应就比较慢,可能卡主不动;

$ celery -A project_name worker -l info
$ celery -A project_name beat -l info--max-interval=5  # 表示每2秒查看/刷新一下任务列表,添加新的,剔除旧的

 


 

6.实际项目

   

要求:异步任务完成多个脚本执行,每个脚本执行完之后根据返回信息执行第二个脚本,并且将当前脚本顺序写入数据库;

    当刷新前端页面时,查询数据库,显示当前脚本执行所在进度;

# views.py
from django.shortcuts import render
from django.http import JsonResponse
from .tasks import exec_script
from . import models


def index(request):
    # 查询数据库
    try:
        query_obj = models.Tags.objects.filter(id=1)[0]
        tag = query_obj.name
        print("tag", tag)
    except:
        tag = "before"
    return render(request, "index.html", {"tag": tag})


def change_button(request):
    # 循环执行多次脚本,接受上一次脚本的返回值
    cmd_list = [r"/root/Desktop/test01.sh", r"/root/Desktop/test02.sh", r"/root/Desktop/test03.sh"]
    ip = "192.168.109.132"
    username = "root"
    password = "password"
    system_choice = "Linux"
    exec_script.delay(cmd_list, ip, username, password, system_choice)

    return JsonResponse({"result": "脚本跑起来啦!请刷新页面查看进度!"})
<!-- index.html -->
{% load staticfiles %}
<html>

<head>
    <meta charset="UTF-8">
    <meta name="viewport"
          content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>首页</title>
    <script src='{% static "plugins\jquery.min.js"%}'></script>
    <link rel="stylesheet" href="{% static 'plugins\bootstrap\css\bootstrap.min.css' %}">
    <script src="{% static 'plugins\bootstrap\js\bootstrap.min.js' %}"></script>
    <style>
        .container {
            display: table;
            height: 100%;
        }

        .row {
            display: table-cell;
            vertical-align: middle;
        }
    </style>
</head>
<body>
<div class="container">
    <div class="row" align="center">
        <button id="exec_script">执行脚本</button>
        <br>

        {% if not tag == "before" and not tag == "done"%}
        <div class="progress" style="width:300px;">
            <span>流程进度</span>
            <input id="tag" type="hidden" value="{{ tag }}">
            <div class="progress-bar" role="progressbar" aria-valuenow="60"
                 aria-valuemin="0" aria-valuemax="100" style="width: 10%;">
                <span class="sr-only">40% 完成</span>
            </div>
        </div>
        {% endif %}
    </div>
</div>
{% csrf_token %}

<script>
    $("#exec_script").on("click", function () {
        var csrfToken = $("[name='csrfmiddlewaretoken']").val();
        $.ajax({
            url: "/change_button/",
            type: "post",
            data: {"name": name, "csrfmiddlewaretoken": csrfToken},
            success: function (data) {
                alert(data["result"]);
                $("#exec_script").attr("disabled", true).attr("style", "color:red;")
            }
        })
    });
    $(".progress-bar").attr("style", "width: 10%;".replace("10", $("#tag").val() * 10));
    // during execute
    if ($("#tag").val() != "before" && $("#tag").val() != "done" ) {
        $("#exec_script").attr("disabled", true).attr("style", "color:red;");
    }
    // before execute
    if ($("#tag").val() == undefined) {
        $("#exec_script").attr("disabled", false).removeAttr("style", "color:red;");
    }
    // after execute
    // if ($("#tag").val() == "done"){
    //     alert("流程执行完成!");
    //     // 将数据库中tag改成before
    //     $("#exec_script").attr("disabled", false).removeAttr("style", "color:red;");
    // }

</script>

</body>
</html>
# task.py
from __future__ import absolute_import
from celery import task
from .scripts import remote
from . import models
import time


@task  # @task装饰器表示添加分布式任务
def exec_script(cmd_list, ip, username, password, system_choice):
    for num, cmd in enumerate(cmd_list):
        print("当前执行脚本位置:", num+1)
        obj = remote.ServerByPara(cmd, ip, username, password, system_choice)
        result = obj.run()
        if result["exec_tag"] == 0:
            # 表示当前脚本运行成功,在数据库中写入当前所在流程
            location_tag = num+1
            try:
                query_obj = models.Tags.objects.filter(id=1)[0]
            except:
                query_obj = models.Tags()
            query_obj.name = location_tag
            query_obj.save()
# 完成所有脚本执行
if num == len(cmd_list)-1: try: query_obj = models.Tags.objects.filter(id=1)[0] except: query_obj = models.Tags() query_obj.name = "done" query_obj.save()
# 模拟脚本所执行时长 time.sleep(
5)
"""
paramiko, pywinrm实现windows/linux脚本调用
"""
import paramiko
import sys
import winrm


class ServerByPara(object):
    def __init__(self, cmd, host, user, password, system_choice):
        self.cmd = cmd
        self.client = paramiko.SSHClient()
        self.host = host
        self.user = user
        self.pwd = password
        self.system_choice = system_choice 

    def exec_linux_cmd(self):
        data_init = ''
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.connect(hostname=self.host, username=self.user, password=self.pwd)
        stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True)
        for i in stdout.readlines():
            data_init += i
        return data_init

    def exec_win_cmd(self):
        s = winrm.Session(self.host, auth=(self.user, self.pwd))
        ret = s.run_cmd(self.cmd)
        data_init = ""
        for num, data in enumerate(ret.std_out.decode().split("\r\n")):
            if ">" not in data:
                data_init += data
        return data_init

    def run(self):
        if self.system_choice == "Linux":
            result = self.exec_linux_cmd()
        else:
            result = self.exec_win_cmd()
        print(result)
        return result


if __name__ == '__main__':
    pass
    # server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5])
    # server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat a b", "192.168.12.149", "a", "password", "Windows")
    # server_obj.run()
remote.py

 

7.shell/bat脚本接收命令返回值存至变量

@echo off
for /f "delims=" %%i in ('C:\User\Administrator\Desktop\python_exe.py') do (set a=%%i)
echo %a%
bat
python_result=`/root/Desktop/python_exe.py`
echo $python_result
shell

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM