1.產生原因
項目需求獲取服務器一段時間之內的cpu使用率,內存使用率等信息.由於數據會被多人訪問,所以采用python元類的方式創建單例模式,每個請求訪問的都是同一個對象.同時通過linux的sar命令獲取相關信息,加載在內存中,其中我限制了返回數據的列表長度200(自己調整就行了),同時這個東西其實在寫入數據的時候最好使用異步任務celery獲者python apscheduler模塊.
2.代碼實現
import os, re, datetime
import threading
class SingletonType(type):
_instance_lock = threading.Lock()
def __init__(self, *args, **kwargs):
super(SingletonType, self).__init__(*args, **kwargs)
def __call__(cls, *args, **kwargs):
if not hasattr(cls, "_instance"):
with cls._instance_lock:
if not hasattr(cls, "_instance"):
cls._instance = super(SingletonType, cls).__call__(*args, **kwargs)
return cls._instance
class ServerInfo(metaclass=SingletonType):
def __init__(self):
self.time_list = []
self.dick_read_list = []
self.dick_write_list = []
self.net_read_list = []
self.net_write_list = []
self.cpu_use_list = []
self.mem_use_list = []
def get_data(self):
self.set_server_info()
data = self.get_server_info()
return data
def get_server_info(self):
self.check_list_length()
data = {"cpu_use": self.cpu_use_list,
"mem_use": self.mem_use_list,
"disk_read": self.dick_read_list,
"dick_write": self.dick_write_list,
"net_read": self.net_read_list,
"net_write": self.net_write_list,
"time": self.time_list}
return data
def set_server_info(self):
# 磁盤io情況
rev = os.popen("sar -b 1 1").readlines()
i_rev = re.findall('(\S+)', rev[-1])[-3]
o_rev = re.findall('(\S+)', rev[-1])[-4]
self.dick_read_list.insert(0, o_rev)
self.dick_write_list.insert(0, i_rev)
# 網絡io情況
i_net = 0
o_net = 0
nets = os.popen("sar -n DEV 1 1").readlines()
for net in nets:
if re.search('lo', net) or re.search('IFACE', net):
pass
elif re.search('Average', net):
i_net += float(re.findall('(\S+)', net)[-5])
o_net += float(re.findall('(\S+)', net)[-4])
self.net_read_list.insert(0, o_net)
self.net_write_list.insert(0, i_net)
# cpu使用情況
cpu = os.popen("sar -u 1 1").readlines()
cpu_use = 100 - float(re.findall('(\S+)', cpu[-1])[-1])
self.cpu_use_list.insert(0, cpu_use)
# 內存使用情況
mem = os.popen('LANG=en_US.UTF-8;sar -r 1 1').readlines()
data = re.findall('(\S+)', mem[-1])[3]
self.mem_use_list.insert(0, data)
# 時間信息
t1 = datetime.datetime.now()
now_time = t1.strftime("%Y-%m-%d %H:%M:%S")
self.time_list.insert(0, now_time)
def slice_list(self, target_list: list):
if len(target_list) > 200:
target_list = target_list[0:200]
return target_list
def check_list_length(self):
self.slice_list(self.time_list)
self.slice_list(self.dick_read_list)
self.slice_list(self.dick_write_list)
self.slice_list(self.net_read_list)
self.slice_list(self.net_write_list)
self.slice_list(self.cpu_use_list)
self.slice_list(self.mem_use_list)
if __name__ == '__main__':
obj1 = ServerInfo()
obj2 = ServerInfo()
# print(obj1 is obj2)
# thread_list = []
# for i in range(10):
# obj1 = ServerInfo()
# t = threading.Thread(target=obj1.set_server_info)
# thread_list.append(t)
#
# t = None
# for t in thread_list:
# time.sleep(1)
# t.start()
# t.join()
# obj2 = obj1 = ServerInfo()
# print(obj2.get_server_info())