Python 運用Paramiko實現批量巡檢


通過封裝Paramiko這個SSH模塊,我們可以實現遠程批量管理Linux主機,在此基礎上配合釘釘API接口可實現自動告警機制,定期自動檢查設備狀態,並推送到釘釘群內。

首先需要配置雙網卡模式,我們將無線網卡配置路由讓其走外網與釘釘連接,有線網口則負責與內部服務器相連接,只需要配置路由即可實現。

網絡目標        網絡掩碼          網關       接口   躍點數
0.0.0.0          0.0.0.0      132.35.93.1     132.35.93.11     21
0.0.0.0          0.0.0.0    192.168.191.1    192.168.191.3     25 

C:\Windows\system32> route delete 0.0.0.0

# 所有的外網訪問走無線網卡,從網關 192.168.191.1 出去
C:\Windows\system32>route -p add 0.0.0.0 mask 0.0.0.0 192.168.191.1

# 如果是132網段,則走內部,網關為:132.35.93.1
C:\Windows\system32>route -p add 132.35.0.0 mask 255.255.0.0 132.35.93.1

封裝釘釘接口: 接口的調用需要傳入需要通知特定人的手機號,這個模塊命名為Ding.py 代碼如下。

import requests
import urllib.parse
import datetime,time,hmac,hashlib,base64,json

class DingToken():
    def __init__(self,atAll,atMobiles):
        self.atAll = atAll
        self.atMobiles = atMobiles

    def send_message(self,message):
        timestamp = str(round(time.time() * 1000))
        secret = 'SEC1018485caf7339e38530b'
        secret_enc = secret.encode('utf-8')
        string_to_sign = '{}\n{}'.format(timestamp, secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
        sign = urllib.parse.quote(base64.b64encode(hmac_code))

        headers={'Content-Type': 'application/json'}
        webhook = 'https://oapi.dingtalk.com/robot/send?access_token=0fe10f&timestamp=' + timestamp + "&sign=" + sign
        data = {
            "msgtype": "text",
            "text": {"content": message },
            "at": {
                "atMobiles": [ self.atMobiles ],
                "isAtAll": self.atAll
                }
            }
        requests.post(webhook, data=json.dumps(data), headers=headers)

    # 發送警告信息
    def send_warning(self,platform,person,group,address,send_date,type,message):
        self.send_message(
                    "------------------------------------------------------- \n"
                    "\t\t\t\t\t {0} \n"
                    "------------------------------------------------------- \n"
                    "維護人員: \t {1} \n"
                    "所在分組: \t {2} \n"
                    "系統地址: \t {3} \n"
                    "告警日期: \t {4} \n"
                    "告警類型: \t {5} \n"
                    "------------------------------------------------------- \n"
                    "{6} \n"
                    "-------------------------------------------------------".
                        format(platform,person,group,address,send_date,type,message) )

    # 發送Ping連通性報告
    def send_ping(self,platform,send_date,success_len,error_len,error_list):
        self.send_message(
            "------------------------------------------------------- \n"
            "\t\t\t\t\t {0} \n"
            "------------------------------------------------------- \n"
            "[*] 日期: \t {1} \n[+] 連通主機數: \t {2} \n[-] 失敗主機數: \t {3} \n"
            "------------------------------------------------------- \n"
            "失敗主機列表: \n {4} \n".format(platform,send_date,success_len,error_len,error_list)
        )

if __name__ == "__main__":
    ding = DingToken(False,"15646596977")
    ding.send_warning("總部客服","王瑞","CTI服務組","192.168.1.1","2021:01:01","磁盤異常","C:// \t 100% \n")


封裝好的MySSH模塊: 接着就是封裝一個拉取數據到本地的SSH模塊,這個模塊命名為 MySSH.py 代碼如下。

import paramiko, math,json

class MySSH:
    def __init__(self, address, username, password, default_port):
        self.address = address
        self.default_port = default_port
        self.username = username
        self.password = password

    def Init(self):
        try:
            self.ssh_obj = paramiko.SSHClient()
            self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3,
                                 allow_agent=False, look_for_keys=False)
            self.sftp_obj = self.ssh_obj.open_sftp()
        except Exception:
            return False

    def BatchCMD(self, command):
        try:
            stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
            result = stdout.read()
            if len(result) != 0:
                result = str(result).replace("\\n", "\n")
                result = result.replace("b'", "").replace("'", "")
                return result
            else:
                return None
        except Exception:
            return None

    def CloseSSH(self):
        try:
            self.sftp_obj.close()
            self.ssh_obj.close()
        except Exception:
            pass

    def GetSystemVersion(self):
        return self.BatchCMD("uname")

    # 測試主機連通率
    def GetPing(self):
        try:
            if self.GetSystemVersion() != None:
                print("{} 已連通.".format(self.address))
                return True
            else:
                return False
        except Exception:
            return False

    # 拉取磁盤數據到本地,並返回字典
    def GetAllDiskSpace(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'",
                    "AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
            print("利用率字典: {}".format(ref_dict))
            return ref_dict
        except Exception:
            return False

    # 拉取內存數據到本地。
    def GetAllMemSpace(self):
        cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'",
                    "AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024)
                    mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024)
                    percentage = 100 - int(mem_free / int(mem_total / 100))
                    print("利用百分比: {}  \t 總內存: {}  \t 剩余內存: {}".format(percentage,mem_total,mem_free))
                    return str(percentage) + " %"
        except Exception:
            return False

    # 獲取CPU利用率數據
    def GetCPUPercentage(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'",
                    "AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            each = each.split(":")
                            ref_dict = {"us": each[0], "sys": each[1], "idea": each[2]}
            print("CPU利用率數據: {}".format(ref_dict))
            return ref_dict
        except Exception:
            return False

    # 獲取到系統負載利用率 也就是一分鍾負載五分鍾負載十五分鍾負載
    def GetLoadAVG(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "cat /proc/loadavg | awk '{print $1 \":\" $2 \":\" $3}'",
                    "AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            each = each.replace(",","").split(":")
                            ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}
                            print("負載利用率: {}".format(ref_dict))
                            return ref_dict
            return False
        except Exception:
            return False

    # 檢測指定進程是否存活
    def CheckProcessStatus(self,processname):
        cmd_dict = {"Linux\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}"),
                    "AIX\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}")
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip())
                    if ret_flag != "0":
                        return True
            return False
        except Exception:
            return "None"

    # 檢測指定端口是否存活
    def CheckPortStatus(self,port):
        cmd_dict = {"Linux\n": "netstat -antp | grep {0} | awk {1}".format(port,"{'print $6'}")
            ,       "AIX\n": "netstat -ant | grep {0} | head -n 1 | awk {1}".format(port,"{'print $6'}")
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip())
                    if ret_flag == "LISTEN" or ret_flag == "ESTABLISHED":
                        return True
            return False
        except Exception:
            return False

定義巡檢過程: 這個模塊是最重要的一個模塊,主要負責解析JSON文件並巡檢,該模塊我們就命名為system.py,代碼如下:

from Ding import DingToken
from MySSH import MySSH
import os,sys,time,datetime,json

# --------------------------------------------------------------------------------------------------
# 對所有主機設備進行
def switch_ping():
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        system = ptr.get("system")
        success,error = [],[]
        try:
            for system_list in system:
                for k,v in system_list.items():
                    for item in v:
                        now = datetime.datetime.now()
                        print("[+] 監控范圍: Ping測試 --> 監測組: {} --> 檢測日期: {} --> 檢測地址: {}".format(k,datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),item[0]))
                        #InspectCPU(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,ptr.get("cpu_limit"))
                        ssh = MySSH(item[0],item[1],item[2],ptr.get("default-port"))
                        ssh.Init()
                        ref = ssh.GetPing()
                        if ref == True:
                            success.append(item[0])
                        else:
                            error.append(item[0])
            ding = DingToken(False, ptr.get("telephone"))
            ding.send_ping(ptr.get("platform"),datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),len(success),len(error),error)
        except Exception:
            pass

# --------------------------------------------------------------------------------------------------
# 磁盤告警流程
# 手機號 地址 賬號 密碼 端口 系統名稱 姓名 分組 磁盤最大值
def InspectDisk(telephone,address,username,password,port,platform,person,group,disk_limit):
    ding = DingToken(False, telephone)
    ssh = MySSH(address, username, password, port)
    ssh.Init()
    ret = ssh.GetAllDiskSpace()
    ssh.CloseSSH()
    if ret != False:
        for k,v in ret.items():
            try:
                space = eval(v.split("%")[0])
                if space >= int(disk_limit):
                    now = datetime.datetime.now()
                    strnow = datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S')
                    ding.send_warning(platform, person, group, address, strnow, "磁盤過載", "[ 分區: {} \t | \t 負載率: {} ]\n".format(k,v))
            except Exception:
                pass

def switch_inspect_disk():
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        system = ptr.get("system")
        for system_list in system:
            # 循環所有字典
            for k,v in system_list.items():
                # 循環每個分組內的主機
                for item in v:
                    now = datetime.datetime.now()
                    print("[+] 監控范圍: 磁盤檢測 --> 監測組: {} --> 檢測日期: {} --> 檢測地址: {}".format(k,datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),item[0]))
                    # InspectDisk("15646596977", "192.168.191.4", "root", "1233", "22", "總部客服(呼叫中心平台)", "王瑞", "CTI 3.4",80)
                    InspectDisk(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,ptr.get("disk_limit"))

# --------------------------------------------------------------------------------------------------
# 內存告警流程
# InspectMemory("15646596977","192.168.191.3","root","1233",22,"總部客服系統","王瑞","CTI組",內存閾值)
def InspectMemory(telephone,address,username,password,port,platform,person,group,memory_limit):
    ding = DingToken(False, telephone)
    ssh = MySSH(address, username, password, port)
    ssh.Init()
    ret = ssh.GetAllMemSpace()
    ssh.CloseSSH()
    if ret != False:
        try:
            space = eval(ret.split("%")[0])
            if space >= int(memory_limit):
                now = datetime.datetime.now()
                strnow = datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S')
                ding.send_warning(platform, person, group, address, strnow, "內存過載", "[ 負載率: \t | \t {} ]\n".format(ret))
        except Exception:
            pass

def switch_inspect_memory():
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        system = ptr.get("system")
        for system_list in system:
            for k,v in system_list.items():
                for item in v:
                    now = datetime.datetime.now()
                    print("[+] 監控范圍: 內存檢測 --> 監測組: {} --> 檢測日期: {} --> 檢測地址: {}".format(k,datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),item[0]))
                    InspectMemory(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,ptr.get("memory_limit"))


# --------------------------------------------------------------------------------------------------
# 巡檢CPU利用率
def InspectCPU(telephone,address,username,password,port,platform,person,group,cpu_limit):
    ding = DingToken(False, telephone)
    ssh = MySSH(address, username, password, port)
    ssh.Init()
    ret = ssh.GetCPUPercentage()
    ssh.CloseSSH()
    if ret != False:
        try:
            space = int(100 - int(ret.get("idea")))
            us = ret.get("us") + "%"
            sys = ret.get("sys") + "%"
            if space >= int(cpu_limit):
                print("CPU 總利用率: {}".format(space))
                now = datetime.datetime.now()
                strnow = datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S')
                ding.send_warning(platform, person, group, address, strnow, "CPU 利用率過載","[ 用戶態: \t | \t {} ]\n[ 內核態: \t | \t {} ]\n[ 總利用率: \t | \t {}% ]\n".format(us,sys,space))
        except Exception:
            pass

def switch_inspect_cpu():
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        system = ptr.get("system")
        for system_list in system:
            for k,v in system_list.items():
                for item in v:
                    now = datetime.datetime.now()
                    print("[+] 監控范圍: CPU檢測 --> 監測組: {} --> 檢測日期: {} --> 檢測地址: {}".format(k,datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),item[0]))
                    InspectCPU(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,ptr.get("cpu_limit"))

# --------------------------------------------------------------------------------------------------
# 巡檢系統平均負載
def InspectLoadAvg(telephone,address,username,password,port,platform,person,group,load_avg_limit):
    ding = DingToken(False, telephone)
    ssh = MySSH(address, username, password, port)
    ssh.Init()
    ret = ssh.GetLoadAVG()
    ssh.CloseSSH()
    if ret != False:
        try:
            if float(ret.get("1avg")) >= float(load_avg_limit[0]) or float(ret.get("5avg")) >= float(load_avg_limit[1]) or float(ret.get("15avg")) >= float(load_avg_limit[2]):
                now = datetime.datetime.now()
                strnow = datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S')
                ding.send_warning(platform, person, group, address, strnow, "LoadAvg 過載","[ 一分鍾負載: \t | \t {} ]\n[ 五分鍾負載: \t | \t {} ]\n[ 十五分鍾負載: \t | \t {} ]\n".
                                  format(ret.get("1avg"), ret.get("5avg"),ret.get("15avg")))
        except Exception:
            pass

def switch_inspect_avg():
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        system = ptr.get("system")
        for system_list in system:
            for k,v in system_list.items():
                for item in v:
                    now = datetime.datetime.now()
                    print("[+] 監控范圍: 系統負載檢測 --> 監測組: {} --> 檢測日期: {} --> 檢測地址: {}".format(k,datetime.datetime.strftime(now, '%Y-%m-%d %H:%M:%S'),item[0]))
                    InspectLoadAvg(ptr.get("telephone"),item[0],item[1],item[2],ptr.get("default-port"),ptr.get("platform"),ptr.get("person"),k,list(ptr.get("load_avg_limit")))

# --------------------------------------------------------------------------------------------------
# 巡檢系統端口是否開放
# 首先傳入一個IP地址,解析出其用戶名密碼表
def GetAddressAsPasswd(address):
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        for each_list in ptr.get("system"):
            for k,v in each_list.items():
                for ser in v:
                    if ser[0] == address:
                        return ser
    return False

# 拼接字符串數據
def append_string(src_str,append_str):
    for x in range(len(append_str)):
        src_str += append_str[x]
    return src_str

# 檢查端口是否被關閉了,如果關閉了則提示關閉
def InspectPort(telephone,platform,person):
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        port = ptr.get("port")
        default_port = ptr.get("default-port")
        for each_list in port:
            for k,v in each_list.items():
                for item in v:
                    this_user_passwd = GetAddressAsPasswd(item[0])
                    if this_user_passwd != False:
                        try:
                            now = datetime.datetime.now()
                            print("[+] 監控范圍: 端口狀態檢測 --> 監測組: {} --> 檢測日期: {} --> 檢測地址: {}".format(k,
                                                                                                  datetime.datetime.strftime(
                                                                                                      now,
                                                                                                      '%Y-%m-%d %H:%M:%S'),
                                                                                                  item[0]))
                            ssh = MySSH(this_user_passwd[0], this_user_passwd[1], this_user_passwd[2], default_port)
                            ssh.Init()

                            close_port_list = []
                            for check_port in range(1,len(item)):
                                ref = ssh.CheckPortStatus(item[check_port])
                                print("端口ID: {} \t 狀態位: {} 存活端口: {}".format(check_port, ref, item[check_port]))
                                if ref == False or ref == "None":
                                    close_port_list.append(item[check_port])
                                    print("端口ID: {} \t 狀態位: {} 關閉端口: {}".format(check_port, ref, item[check_port]))

                            ding = DingToken(False,telephone)
                            send = ""
                            for i in close_port_list:
                                im = "[ 端口: {0} \t\t | \t 狀態: 已關閉 ]\n".format(i)
                                send = append_string(send,im)

                            if(send != ""):
                                ding.send_warning(platform,person,k,this_user_passwd[0],datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'),"端口關閉",send)
                            ssh.CloseSSH()
                        except Exception:
                            pass

def switch_inspect_port():
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        telephone = ptr.get("telephone")
        platform = ptr.get("platform")
        person = ptr.get("person")
        InspectPort(telephone, platform, person)


# --------------------------------------------------------------------------------------------------
# 巡檢系統是否開放指定進程
def InspectProcess(telephone,platform,person):
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        process = ptr.get("process")
        default_port = ptr.get("default-port")
        for each_list in process:
            for k,v in each_list.items():
                for item in v:
                    this_user_passwd = GetAddressAsPasswd(item[0])
                    if this_user_passwd != False:
                        try:
                            now = datetime.datetime.now()
                            print("[+] 監控范圍: 進程狀態檢測 --> 監測組: {} --> 檢測日期: {} --> 檢測地址: {}".format(k,
                                                                                                  datetime.datetime.strftime(
                                                                                                      now,
                                                                                                      '%Y-%m-%d %H:%M:%S'),
                                                                                                  item[0]))
                            ssh = MySSH(this_user_passwd[0], this_user_passwd[1], this_user_passwd[2], default_port)
                            ssh.Init()

                            close_process_list = []
                            for check_process in range(1,len(item)):
                                ref = ssh.CheckProcessStatus(item[check_process])
                                print("進程ID: {} \t 狀態位: {} 存活進程: {}".format(check_process, ref, item[check_process]))
                                if ref == False or ref == "None":
                                    close_process_list.append(item[check_process])
                                    print("進程ID: {} \t 狀態位: {} 關閉進程: {}".format(check_process, ref, item[check_process]))

                            ding = DingToken(False,telephone)
                            send = ""
                            for i in close_process_list:
                                im = "[ 進程: {0} \t\t | \t 狀態: 已關閉 ]\n".format(i)
                                send = append_string(send,im)

                            if(send != ""):
                                ding.send_warning(platform,person,k,this_user_passwd[0],datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'),"進程退出",send)
                            ssh.CloseSSH()
                        except Exception:
                            pass

def switch_inspect_process():
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())
        telephone = ptr.get("telephone")
        platform = ptr.get("platform")
        person = ptr.get("person")
        InspectProcess(telephone, platform, person)

以磁盤為例,當超過了我們定義好的閾值時,則會通過釘釘提示用戶告警,告警提示如下。

定義配置文件: 配置文件則是巡檢時需要解析的內容,我們需要依次寫入賬號密碼等信息。

{
    "system_config":
    [
        {"ping_": "True"},
        {"disk_": "True"},
        {"cpu_": "False"},
        {"loadavg_": "False"},
        {"memory_": "False"},
        {"process_": "False"},
        {"port_": "False"}
    ],

    "platform": "總部客服系統(呼叫中心)",
    "person": "王瑞",
    "telephone": "18264825669",
    "e-mail": "admin@lyshark.com",
    "default-port": "22",
    "set-timeout": "3600",

    "disk_limit": 75,
    "memory_limit": 95,
    "cpu_limit": 90,
    "load_avg_limit": ["5.0","8.0","10.0"],

    "system":
    [
        {
        "Centos 服務器組":
            [
                ["192.168.1.1","root","1233"],
                ["192.168.1.1","root","1233"],
                ["192.168.1.1","root","1233"]
            ],
            "AIX 服務器組":
            [
                ["192.168.1.1","root","1233"],
                ["192.168.1.1","root","1233"]
            ]
        }
    ],

    "process":
    [
        {
        "CTI進程組":
            [
                ["192.168.1.1","icdcomm","aplogic","ctilink","cnfgsvr","mcp"],
                ["192.168.1.1","aplogic","ctilink","cnfgsvr","mcp","ccsapp","nis","oas","ivr","ctiserver"]
            ]
        }
    ],

    "port":
    [
        {
        "CTI端口檢測":
            [
                ["192.168.1.1","111","631","8888","19001","25","2556","5150","5600","34902","10000","34901","10005"],
                ["192.168.1.1","111","8080","10000","10004","10005","8888","19001","2556","5600","60661"]
            ]
        }
    ]
}

定義main入口代碼: 入口代碼主要負責解析參數與巡檢,由於怕影響服務器性能,所有沒加多線程支持,不過也夠用了。

import system
import json,time

if __name__ == "__main__":
    with open("./config.json", "r", encoding="utf-8") as read_config_ptr:
        ptr = json.loads(read_config_ptr.read())

        timeout = ptr.get("set-timeout")
        configure = ptr.get("system_config")

        # 默認開關全部關閉
        disk_ = "False"
        cpu_ = "False"
        memory_ = "False"
        loadavg_ = "False"
        process_ = "False"
        port_ = "False"
        ping_ = "False"

        # 判斷配置文件開關是否開啟,如果開啟了則將內存的開關開啟
        for dic in configure:
            for k,v in dic.items():
                if k == "ping_" and v == "True":
                    ping_ = "True"
                if k == "disk_" and v == "True":
                    disk_ = "True"
                if k == "cpu_" and v == "True":
                    cpu_ = "True"
                if k == "memory_" and v == "True":
                    memory_ = "True"
                if k == "loadavg_" and v == "True":
                    loadavg_ = "True"
                if k == "process_" and v == "True":
                    process_ = "True"
                if k == "port_" and v == "True":
                    port_ = "True"

        while True:
            if ping_ == "True":
                system.switch_ping()
            if disk_ == "True":
                system.switch_inspect_disk()
            if cpu_ == "True":
                system.switch_inspect_cpu()
            if memory_ == "True":
                system.switch_inspect_memory()
            if loadavg_ == "True":
                system.switch_inspect_avg()
            if process_ == "True":
                system.switch_inspect_process()
            if port_ == "True":
                system.switch_inspect_port()
            time.sleep(int(timeout))

實現正則解析: 前面的配置無法實現交互式問答,智能機器人推送數據,很被動,我們需要開啟交互機器人,自己實現業務邏輯。

import os,sys,re

def participle(text):
    ref = re.findall(r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)',text)
    address = ".".join(ref[0])
    select = re.findall('查詢', text)

    if(len(address) !=0 and len(select) != 0):
        if len (re.findall('內存',text)) != 0:
            return [address,"內存"]
        elif len (re.findall('磁盤',text)) != 0:
            return [address,"磁盤"]
        elif len (re.findall('負載',text)) != 0:
            return [address,"負載"]

if __name__ == "__main__":
    text = "幫我查詢192.168.1.1主機的內存負載情況"
    text2 = "幫我查詢 192.168.1.200 這 台設 備的 磁盤 使用率"
    text3 = "需要查詢192.168.1.200 設備中負載使用情況,請反饋結果"
    text4 = "發現負載出現告警,請立即查詢192.168.1.200這台設備的負載使用率,並立即反饋"

    print(participle(text))
    print(participle(text2))
    print(participle(text3))
    print(participle(text4))

    txt = '''
    主編單位:總部客服系統
    '''
    addr = re.findall('(主編單位:.*?)\s', txt)[0]
    print(addr)

這個案例,從文本中提取可用信息,並將其返回為列表格式。

接着配置釘釘開發者平台機器人,此處需要有公網地址,作用是,釘釘群有人at機器人時,機器人會將請求post發送到我們的django應用服務上。

使用django配置接收請求並處理即可,處理代碼如下所示。

from django.http import HttpResponse, JsonResponse
import json,hmac,hashlib,base64
import sys,os,re

# 機器人 app_secret
app_secret = "LcKEf0nlqe"

# 根據IP查詢操作使用
def select_participle(text):
    ref = re.findall(r'(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)',text)

    # 判斷如果地址不為空,則進行查詢等操作
    if len(ref) != 0:
        address = ".".join(ref[0])
        select = re.findall('查詢', text)
        if(len(address) !=0 and len(select) != 0):
            if len (re.findall('內存',text)) != 0:
                return [address,"內存"]
            elif len (re.findall('負載',text)) != 0:
                return [address,"負載"]
    else:
        if len(re.findall('主機列表', text)) != 0:
            return ["主機列表"]

def index(request):
    if request.method == "POST":
        HTTP_SIGN = request.META.get("HTTP_SIGN")
        HTTP_TIMESTAMP = request.META.get("HTTP_TIMESTAMP")
        res = json.loads(request.body)

        # 接收用戶的輸入並做處理
        sendnick = res.get("senderNick")
        is_admin = res.get("isAdmin")
        conver = res.get("conversationTitle")
        content = res.get("text").get("content")
        print("發送人: {} 是否管理員: {} 所在組: {} 接收數據: {}".format(sendnick,is_admin,conver,content))

        # 數據的加密解密
        string_to_sign = '{}\n{}'.format(HTTP_TIMESTAMP, app_secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(app_secret.encode("utf-8"), string_to_sign_enc, digestmod=hashlib.sha256).digest()
        sign = base64.b64encode(hmac_code).decode("utf-8")

        ref_select_text = select_participle(content)

        if len(ref_select_text) == 2:
            if sign == HTTP_SIGN:
                if "負載" in ref_select_text[1]:
                    return JsonResponse(
                    {"msgtype": "text",
                         "text": {
                             "content": "經過查詢,主機: {} 負載率為: 10% 當前為正常狀態".format(ref_select_text[0])
                         }
                    })

                if "內存" in ref_select_text[1]:
                    return JsonResponse\
                    (
                        {"msgtype": "text",
                         "text":
                            {
                             "content": "經過查詢,主機: {} 內存占用率為: 70% 當前為正常狀態".format(ref_select_text[0])
                            }
                         }
                    )
        elif len(ref_select_text) == 1:
            if "主機列表" in ref_select_text[0]:
                return JsonResponse \
                        (
                        {"msgtype": "text",
                         "text":
                             {
                                 "content": "目前您所管理的主機,總結: 205台"
                             }
                         }
                    )
        return JsonResponse({"error": "沒有權限"})
    if request.method == "GET":
        return HttpResponse("沒有權限")

django運行后,我們回到釘釘群內,問機器人一些問題看看吧,前提是這些問題已經定義了正則解析。

django 后台則能夠接收到,這些數據,並做出回應。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM