aws cloudwatch監控怎么通過釘釘機器人報警


最近在完善海外業務在aws服務的CloudWatchh監控,發現CloudWatch報警通知要通過aws的sns服務,直接支持的通道有短信和郵件,但是我們想推到釘釘群里面的群機器人里面這個就要借助aws的Lambda函數服務

然后選擇用sns來觸發

 

python腳本內容變量event是sns傳過來的消息內容,有點坑的是CloudWatch的報警信息很亂還要稍微優化一下

 

# -*- coding: utf-8 -*-
import json
import os
import re
from botocore.vendored import requests


def size_b_to_other(size):
    """用於轉換容量單位"""
    units = ['B', 'KB', 'MB', 'GB', 'TB']
    # 處理異常
    if size <= 0:
        return False

    # 遍歷單位的位置並用取整除賦值
    for unit in units:
        if size >= 1024:
            size //= 1024
        else:
            size_h = '{} {}'.format(size, unit)
            return size_h

    size_h = '{} {}'.format(size, unit)
    return size_h
    
    
def lambda_handler(event, context):
    token = os.getenv('token')
    url = "https://oapi.dingtalk.com/robot/send?access_token="
    headers = {'Content-Type': 'application/json'}
    # 解析要使用的字段
    Sns = event['Records'][0]['Sns']
    Subject = Sns['Subject']
    if "ALARM" in Subject:
        title = "======報警信息======"
    elif "OK" in Subject:
        title = "======恢復======"
    Timestamp = Sns['Timestamp']
    Message = Sns['Message']
    Message = json.loads(Message)
    try:
        NewStateReason = Message['NewStateReason']
        AlarmDescription = Message['AlarmDescription']
        # 轉換cloudwatch單位為友好單位
        datapoint = re.findall(r'[[](.*?)[]]', NewStateReason)
        threshold = re.findall(r'[(](.*?)[)]', NewStateReason)
        count = (datapoint[0].count(","))
        if count == 0:
            datapoint = float(str.split(datapoint[0])[0])
            threshold = float(str.split(threshold[1])[0])
            if threshold > 1000:
                datapoint = size_b_to_other(datapoint)
                threshold = size_b_to_other(threshold)
        else:
            i = threshold[len(threshold) - 1]
            pattern = re.compile(r'^[0-9]+\.[0-9]')
            if pattern.search(i):
                threshold = threshold[len(threshold) - 1]
            else:
                threshold = threshold[len(threshold) - 2] + threshold[len(threshold) - 1]
            
        # 定義消息內容
        content = title + "\n報警主題:" + "" + Subject + "" \
                "\n報警時間:" + "" + Timestamp + "" \
                "\n報警原因:" + "" + NewStateReason + "" \
                "\n友好信息:" + "" + "當前值=" + str(datapoint) + " 連續" + str(count + 1) + "次達到 " + "閥值=" + str(threshold) + "" \
                "\n備注信息:" + "" + str(AlarmDescription) + ""
    
    except:
        Message = json.dumps(Message, sort_keys=True, indent=2)
        content = title + "\n報警主題:" + "" + Subject + "" \
            "\n詳細信息:" + "" + Message + "" \
            "\n備注信息:【消息解析異常】"

    data = {
        "msgtype": "text",
        "text": {
            "content": content
        }
    }
    
    data = json.dumps(data)
    request = requests.post(url + token, data, headers=headers)
    return request.text

 測試一下

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM