最近在完善海外業務在aws服務的CloudWatchh監控,發現CloudWatch報警通知要通過aws的sns服務,直接支持的通道有短信和郵件,但是我們想推到釘釘群里面的群機器人里面這個就要借助aws的Lambda函數服務
然后選擇用sns來觸發
python腳本內容變量event是sns傳過來的消息內容,有點坑的是CloudWatch的報警信息很亂還要稍微優化一下
# -*- coding: utf-8 -*- import json import os import re from botocore.vendored import requests def size_b_to_other(size): """用於轉換容量單位""" units = ['B', 'KB', 'MB', 'GB', 'TB'] # 處理異常 if size <= 0: return False # 遍歷單位的位置並用取整除賦值 for unit in units: if size >= 1024: size //= 1024 else: size_h = '{} {}'.format(size, unit) return size_h size_h = '{} {}'.format(size, unit) return size_h def lambda_handler(event, context): token = os.getenv('token') url = "https://oapi.dingtalk.com/robot/send?access_token=" headers = {'Content-Type': 'application/json'} # 解析要使用的字段 Sns = event['Records'][0]['Sns'] Subject = Sns['Subject'] if "ALARM" in Subject: title = "======報警信息======" elif "OK" in Subject: title = "======恢復======" Timestamp = Sns['Timestamp'] Message = Sns['Message'] Message = json.loads(Message) try: NewStateReason = Message['NewStateReason'] AlarmDescription = Message['AlarmDescription'] # 轉換cloudwatch單位為友好單位 datapoint = re.findall(r'[[](.*?)[]]', NewStateReason) threshold = re.findall(r'[(](.*?)[)]', NewStateReason) count = (datapoint[0].count(",")) if count == 0: datapoint = float(str.split(datapoint[0])[0]) threshold = float(str.split(threshold[1])[0]) if threshold > 1000: datapoint = size_b_to_other(datapoint) threshold = size_b_to_other(threshold) else: i = threshold[len(threshold) - 1] pattern = re.compile(r'^[0-9]+\.[0-9]') if pattern.search(i): threshold = threshold[len(threshold) - 1] else: threshold = threshold[len(threshold) - 2] + threshold[len(threshold) - 1] # 定義消息內容 content = title + "\n報警主題:" + "【" + Subject + "】" \ "\n報警時間:" + "【" + Timestamp + "】" \ "\n報警原因:" + "【" + NewStateReason + "】" \ "\n友好信息:" + "【" + "當前值=" + str(datapoint) + " 連續" + str(count + 1) + "次達到 " + "閥值=" + str(threshold) + "】" \ "\n備注信息:" + "【" + str(AlarmDescription) + "】" except: Message = json.dumps(Message, sort_keys=True, indent=2) content = title + "\n報警主題:" + "【" + Subject + "】" \ "\n詳細信息:" + "【" + Message + "】" \ "\n備注信息:【消息解析異常】" data = { "msgtype": "text", "text": { "content": content } } data = json.dumps(data) request = requests.post(url + token, data, headers=headers) return request.text
測試一下