遠程異步執行Linux/Windows平台上的可執行腳本 | paramiko/pywinrm | djcelery | Python


 

任務:通過python執行遠程Linux或者Windows系統中的腳本,並且接收其終端打印的返回值;
展示效果:兩個button按鈕:1.執行腳本; 2.查詢執行結果; 
工具:1.針對Linux系統,使用paramiko模塊,通過ssh協議,不需要在服務器上安裝任何服務; 2.針對Windows系統,使用pywinrm模塊,通過winrm服務,需要在windows上開始winrm服務(默認是關閉的);

 

1.Linux系統

import paramiko
import sys


class ServerByPara(object):
    def __init__(self, cmd, host, user, password, system_name):
        self.cmd = cmd
        self.client = paramiko.SSHClient()
        self.host = host
        self.user = user
        self.pwd = password

    def exec_linux_cmd(self):
        data = ''
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.connect(hostname=self.host, username=self.user, password=self.pwd)
        stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True)
        for i in stdout.readlines():
            data += i
        return data

    def run(self):
        self.exec_linux_cmd()
        print(data)



if __name__ == '__main__':
    # server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])
    server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat", "192.168.1.5", "username", "password")
    server_obj.run()

 

2.Windows系統

  2.1 在以管理員身份運行的powershell中執行命令,winrm quickconfig或winrm qc 查看winrm是否開啟成功,如果提示未開始,輸入enter執行提示操作開始winrm;

  2.2 將網絡設置為專用網絡;

  2.3 執行一下兩個命令,允許其他機器鏈接:

    winrm set winrm/config/service/auth '@{Basic="true"}'

    winrm set winrm/config/service '@{AllowUnencrypted="true"}'

  2.4 代碼:

import winrm


s = winrm.Session("192.168.100.153", auth=("username", "password"))
r = s.run_cmd(r"C:\Users\Administrator\Desktop\test.bat")
print(r.std_out)
data_init = ""
for num, data in enumerate(r.std_out.decode()):
    if num != 0:
        data_init+=str(data)
print(data_init)

 

3.綜合腳本 remote.py

 

"""
paramiko, pywinrm實現windows/linux腳本調用
"""
import paramiko
import winrm
import json


class ServerByPara(object):
    def __init__(self, cmd, host, user, password, system_choice):
        self.cmd = cmd
        self.client = paramiko.SSHClient()
        self.host = host
        self.user = user
        self.pwd = password
        self.system_choice = system_choice

    def exec_linux_cmd(self):
        data_init = ''
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.connect(hostname=self.host, username=self.user, password=self.pwd)
        stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True)
        if stderr.readlines():
            exec_tag = 1
            for data in stdout.readlines():
                data_init += data
        else:
            exec_tag = 0
            for data in stdout.readlines():
                data_init += data
        return json.dumps({
            "exec_tag": exec_tag,
            "data": data_init,
        }, ensure_ascii=False)

    def exec_win_cmd(self):
        data_init = ""
        s = winrm.Session(self.host, auth=(self.user, self.pwd))
        ret = s.run_cmd(self.cmd)
        if ret.std_err.decode():
            exec_tag = 1
            for data in ret.std_err.decode().split("\r\n"):
                data_init += data
        else:
            exec_tag = 0
            for data in ret.std_out.decode().split("\r\n"):
                data_init += data
        return json.dumps({
            "exec_tag": exec_tag,
            "data": data_init,
        }, ensure_ascii=False)

    def run(self):
        if self.system_choice == "Linux":
            result = self.exec_linux_cmd()
        else:
            result = self.exec_win_cmd()
        print(result)
        with open(r"script_info.txt", "w") as f:
            f.write(result)

# if __name__ == '__main__':
# server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])
# server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat", "192.168.12.149", "a",
#                           "password", "Windows")
# server_obj = ServerByPara(r"/root/Desktop/test.sh >> log.txt", "192.168.109.132", "root",
#                           "password", "Linux")
# server_obj.run()

 

 

4.views視圖中調用remote.py中的類,並且實例化,執行run()方法

import remote


cmd = r"C:\Users\a\Desktop\test.bat param01 param02"
ip = "192.168.12.149"
username = "a"
password = "password"
system_choice = "Windows"
obj = remote.ServerByPara(cmd, ip, username, password, system_choice)
obj.run()

 

5.使用分布式任務隊列celery或者django第三方工具djcelery,避免腳本長久執行導致的頁面不刷新

這里使用djcelery;

執行腳本可以將返回值寫入日志文件或者其他任何操作,以后自己讀取;

下面講述djcelery的配置,實現定時任務,實現分布式任務,開啟與關閉任務:

  5.1 djcelery的配置

  項目目錄下新建py文件: celery.py

from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_test.settings')

app = Celery('my_test')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
celery.py

  項目目錄下__init__.py文件中寫入一下代碼:

  其中:from __future__ import absolute_import  可以避免celery.py與第三方模塊沖突;

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)
__init__.py

  settings.py文件中做以下主要配置(使用djcelery自帶的隊列,如果使用redis,rabbitmq等其他隊列,需做其他配置):

  下面使用django數據庫表來充當broker,如果在生產階段,可能需要使用flower來監控任務,則不能使用數據庫,只能使用其他隊列:BROKER_URL = 'redis://127.0.0.1:6379/0'

  運行flower:python manage.py celery -A TSDRM worker flower

  查看任務情況,訪問:127.0.0.1:5555

import djcelery

djcelery.setup_loader()
BROKER_URL = 'django://'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
settings.py

   下面是celery做的消息隊列配置:

CELERY_DEFAULT_QUEUE = "default_dongwm" # 默認的隊列,如果一個消息不符合其他的隊列就會放在默認隊列里面

CELERY_QUEUES = {
    "default_dongwm": { # 這是上面指定的默認隊列
        "exchange": "default_dongwm",
        "exchange_type": "direct",
        "routing_key": "default_dongwm"
    },
    "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列
        "routing_key": "topictest.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "test2": { # test和test2是2個fanout隊列,注意他們的exchange相同
        "exchange": "broadcast_tasks",
        "exchange_type": "fanout",
        "binding_key": "broadcast_tasks",
    },
    "test": {
        "exchange": "broadcast_tasks",
        "exchange_type": "fanout",
        "binding_key": "broadcast_tasks2",
    },
}

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):

        if task.startswith('topictest'):
            return {
                'queue': 'topicqueue',
            }
        # 我的dongwm.tasks文件里面有2個任務都是test開頭
        elif task.startswith('dongwm.tasks.test'):
            return {
                "exchange": "broadcast_tasks",
            }
        # 剩下的其實就會被放到默認隊列
        else:
            return None

# CELERY_ROUTES本來也可以用一個大的含有多個字典的字典,但是不如直接對它做一個名稱統配
CELERY_ROUTES = (MyRouter(), )
celery中隊列分配
# BROKER_URL = 'redis://127.0.0.1:6379/0'
# CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# BROKER_URL = 'amqp://root:password@localhost:5672/myvhost'

CELERY_TIMEZONE = 'Asia/Shanghai'  # 時區
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
消息隊列及celery時區/數據格式

 

 

  5.2 在admin創建定時(間隔 / 每)時間

1.數據庫表遷移出djcelery相關的表: python3 manage.py migrate;

2.在Crontabs表中添加每分鍾,每天等定時任務,或者在Intervals表中添加間隔時間;

3.在Periodic tasks中添加任務,指定已經注冊的task,並選擇intervals或者crontabs作為定時形式;

       

  5.1 tasks.py 中添加執行腳本的任務

from cloud import remote
import json
from djcelery import models as celery_models  # 通過修改表PeriodicTask中enabled字段來開啟關閉定時任務
from celery import share_task  # task裝飾器在后面版本被啟用
@share_task
def exec_script(): cmd = r"/root/Desktop/test.sh" ip = "192.168.109.132" username = "root" password = "password" system_choice = "Linux" obj = remote.ServerByPara(cmd, ip, username, password, system_choice) obj.run() @shared_task # @share_task裝飾器表示添加定時任務,有些博客中使用的是@task在后面的版本中被啟用了。 def get_script_info(): with open(r"script_info.txt", "r") as f: ret = f.read() print(ret) if ret != "": try: task = celery_models.PeriodicTask.objects.get(name="get_script_info") task.enabled = False task.save() except celery_models.PeriodicTask.DoesNotExist as e: print(e) ret = json.loads(ret) # 將執行結果寫入數據庫中 print(ret["exec_tag"]) if int(ret["exec_tag"]) == 0: with open('result_info.txt', "w") as f: f.write(ret["data"]) print("腳本執行成功!") print("data: ", ret["data"]) else: print("腳本執行失敗!請重新執行!") print("data: ", ret["data"]) # 清空文件 with open(r"script_info.txt", "w") as f: f.write("")

 

  5.3 視圖中調用任務/關閉定時任務  views.py

from cloudDR.celery import exec_script
from djcelery import models as celery_models

def index(request): 
    if request.user.is_authenticated(): 
        exec_script.delay()  # celery.py 中 ip等參數可以通過delay傳入;
        # 開啟定時任務
        try:
            task = celery_models.PeriodicTask.objects.get(name="get_script_info")
            task.enabled = True
            task.save()
        except celery_models.PeriodicTask.DoesNotExist as e:
            print(e)

 

  5.4 開啟celery,開啟beat監控

celery,beat一直開啟,只需要通過代碼開啟關閉任務;

如果將info改成debug反應就比較慢,可能卡主不動;

$ celery -A project_name worker -l info
$ celery -A project_name beat -l info--max-interval=5  # 表示每2秒查看/刷新一下任務列表,添加新的,剔除舊的

 


 

6.實際項目

   

要求:異步任務完成多個腳本執行,每個腳本執行完之后根據返回信息執行第二個腳本,並且將當前腳本順序寫入數據庫;

    當刷新前端頁面時,查詢數據庫,顯示當前腳本執行所在進度;

# views.py
from django.shortcuts import render
from django.http import JsonResponse
from .tasks import exec_script
from . import models


def index(request):
    # 查詢數據庫
    try:
        query_obj = models.Tags.objects.filter(id=1)[0]
        tag = query_obj.name
        print("tag", tag)
    except:
        tag = "before"
    return render(request, "index.html", {"tag": tag})


def change_button(request):
    # 循環執行多次腳本,接受上一次腳本的返回值
    cmd_list = [r"/root/Desktop/test01.sh", r"/root/Desktop/test02.sh", r"/root/Desktop/test03.sh"]
    ip = "192.168.109.132"
    username = "root"
    password = "password"
    system_choice = "Linux"
    exec_script.delay(cmd_list, ip, username, password, system_choice)

    return JsonResponse({"result": "腳本跑起來啦!請刷新頁面查看進度!"})
<!-- index.html -->
{% load staticfiles %}
<html>

<head>
    <meta charset="UTF-8">
    <meta name="viewport"
          content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>首頁</title>
    <script src='{% static "plugins\jquery.min.js"%}'></script>
    <link rel="stylesheet" href="{% static 'plugins\bootstrap\css\bootstrap.min.css' %}">
    <script src="{% static 'plugins\bootstrap\js\bootstrap.min.js' %}"></script>
    <style>
        .container {
            display: table;
            height: 100%;
        }

        .row {
            display: table-cell;
            vertical-align: middle;
        }
    </style>
</head>
<body>
<div class="container">
    <div class="row" align="center">
        <button id="exec_script">執行腳本</button>
        <br>

        {% if not tag == "before" and not tag == "done"%}
        <div class="progress" style="width:300px;">
            <span>流程進度</span>
            <input id="tag" type="hidden" value="{{ tag }}">
            <div class="progress-bar" role="progressbar" aria-valuenow="60"
                 aria-valuemin="0" aria-valuemax="100" style="width: 10%;">
                <span class="sr-only">40% 完成</span>
            </div>
        </div>
        {% endif %}
    </div>
</div>
{% csrf_token %}

<script>
    $("#exec_script").on("click", function () {
        var csrfToken = $("[name='csrfmiddlewaretoken']").val();
        $.ajax({
            url: "/change_button/",
            type: "post",
            data: {"name": name, "csrfmiddlewaretoken": csrfToken},
            success: function (data) {
                alert(data["result"]);
                $("#exec_script").attr("disabled", true).attr("style", "color:red;")
            }
        })
    });
    $(".progress-bar").attr("style", "width: 10%;".replace("10", $("#tag").val() * 10));
    // during execute
    if ($("#tag").val() != "before" && $("#tag").val() != "done" ) {
        $("#exec_script").attr("disabled", true).attr("style", "color:red;");
    }
    // before execute
    if ($("#tag").val() == undefined) {
        $("#exec_script").attr("disabled", false).removeAttr("style", "color:red;");
    }
    // after execute
    // if ($("#tag").val() == "done"){
    //     alert("流程執行完成!");
    //     // 將數據庫中tag改成before
    //     $("#exec_script").attr("disabled", false).removeAttr("style", "color:red;");
    // }

</script>

</body>
</html>
# task.py
from __future__ import absolute_import
from celery import task
from .scripts import remote
from . import models
import time


@task  # @task裝飾器表示添加分布式任務
def exec_script(cmd_list, ip, username, password, system_choice):
    for num, cmd in enumerate(cmd_list):
        print("當前執行腳本位置:", num+1)
        obj = remote.ServerByPara(cmd, ip, username, password, system_choice)
        result = obj.run()
        if result["exec_tag"] == 0:
            # 表示當前腳本運行成功,在數據庫中寫入當前所在流程
            location_tag = num+1
            try:
                query_obj = models.Tags.objects.filter(id=1)[0]
            except:
                query_obj = models.Tags()
            query_obj.name = location_tag
            query_obj.save()
# 完成所有腳本執行
if num == len(cmd_list)-1: try: query_obj = models.Tags.objects.filter(id=1)[0] except: query_obj = models.Tags() query_obj.name = "done" query_obj.save()
# 模擬腳本所執行時長 time.sleep(
5)
"""
paramiko, pywinrm實現windows/linux腳本調用
"""
import paramiko
import sys
import winrm


class ServerByPara(object):
    def __init__(self, cmd, host, user, password, system_choice):
        self.cmd = cmd
        self.client = paramiko.SSHClient()
        self.host = host
        self.user = user
        self.pwd = password
        self.system_choice = system_choice 

    def exec_linux_cmd(self):
        data_init = ''
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.connect(hostname=self.host, username=self.user, password=self.pwd)
        stdin, stdout, stderr = self.client.exec_command(self.cmd, get_pty=True)
        for i in stdout.readlines():
            data_init += i
        return data_init

    def exec_win_cmd(self):
        s = winrm.Session(self.host, auth=(self.user, self.pwd))
        ret = s.run_cmd(self.cmd)
        data_init = ""
        for num, data in enumerate(ret.std_out.decode().split("\r\n")):
            if ">" not in data:
                data_init += data
        return data_init

    def run(self):
        if self.system_choice == "Linux":
            result = self.exec_linux_cmd()
        else:
            result = self.exec_win_cmd()
        print(result)
        return result


if __name__ == '__main__':
    pass
    # server_obj = ServerByPara(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5])
    # server_obj = ServerByPara(r"C:\Users\a\Desktop\test.bat a b", "192.168.12.149", "a", "password", "Windows")
    # server_obj.run()
remote.py

 

7.shell/bat腳本接收命令返回值存至變量

@echo off
for /f "delims=" %%i in ('C:\User\Administrator\Desktop\python_exe.py') do (set a=%%i)
echo %a%
bat
python_result=`/root/Desktop/python_exe.py`
echo $python_result
shell

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM