mq为我们提供丰富的API,我们可以通过对其api的访问来监控mq的状态。例如:消息堆积情况,链接情况等。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import json
import httplib
import base64
import socket
import time
import logging
import logging.config
import requests
# rabbitmq服务器
useServerList = []
prodServerList = []
#betaServerList = []
# mq连接信息
port = 15672
username = "userName"
password = "yourPasswd"
# 构建api路径
path = "/api/overview"
method = "GET"
# udp monintor url
HOST = "http://localhost:1988/metrics"
# log_format
LOG_FILENAME = "error.log"
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(filename)s[:%(lineno)d] - %(message)s"
DATE_FORMAT = "%m-%d-%Y %H:%M:%S %p"
logging.basicConfig(filename=LOG_FILENAME, level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT, filemode='a')
# 获取线上在用的mq主机
def getUsingMq(serverList):
for server in serverList:
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.settimeout(2)
try:
sk.connect((server, port))
useServerList.append(server)
except Exception:
logging.error("\033[1;31;mServer port is not in use\033[0m")
sk.close()
# 获取mq连接数及消息堆积数
def getConnNum(mqServer, mqPort):
# authorize
credentials = base64.b64encode("%s:%s" % (username, password))
# 连接mq服务器
conn = httplib.HTTPConnection(mqServer, mqPort)
try:
conn.request(method, path, "", {"Content-Type": "application/json", "Authorization": "Basic " + credentials})
except socket.error:
logging.error("UNKNOWN: Could not connect to %s:%s" % (mqServer, mqPort))
response = conn.getresponse().read()
conn.close()
return response
if __name__ == "__main__":
# getUsingMq(betaServerList)
while True:
getUsingMq(prodServerList)
for s in useServerList:
res = getConnNum(s, port)
djson = json.loads(res)
connectionsTotal = djson["object_totals"]["connections"]
messagesTotal = djson["queue_totals"]["messages_unacknowledged"]
if s.split('-')[0] == "wg":
ezone = "wg1"
else:
ezone = s.split('-')[0]
points = [{"name": "mqMonitor",
"tags": {"ezone": ezone, "host": s},
"timestamp": int(1000 * time.time()),
"fields": [
{"name": "rabbitmq_connections_total", "type": "gauge", "value": connectionsTotal},
{"name": "rabbitmq_queue_messages_total", "type": "gauge", "value": messagesTotal}]
}]
print(points)
r = requests.post(HOST, params={"key": "system"}, json=points)
#assert r.text == "OK"
r.close()
time.sleep(5)
useServerList = []