115.python獲取服務器信息


1.產生原因

項目需求獲取服務器一段時間之內的cpu使用率,內存使用率等信息.由於數據會被多人訪問,所以采用python元類的方式創建單例模式,每個請求訪問的都是同一個對象.同時通過linux的sar命令獲取相關信息,加載在內存中,其中我限制了返回數據的列表長度200(自己調整就行了),同時這個東西其實在寫入數據的時候最好使用異步任務celery獲者python apscheduler模塊.

2.代碼實現

import os, re, datetime
import threading


class SingletonType(type):
    _instance_lock = threading.Lock()

    def __init__(self, *args, **kwargs):
        super(SingletonType, self).__init__(*args, **kwargs)

    def __call__(cls, *args, **kwargs):
        if not hasattr(cls, "_instance"):
            with cls._instance_lock:
                if not hasattr(cls, "_instance"):
                    cls._instance = super(SingletonType, cls).__call__(*args, **kwargs)
        return cls._instance


class ServerInfo(metaclass=SingletonType):

    def __init__(self):
        self.time_list = []
        self.dick_read_list = []
        self.dick_write_list = []
        self.net_read_list = []
        self.net_write_list = []
        self.cpu_use_list = []
        self.mem_use_list = []

    def get_data(self):
        self.set_server_info()
        data = self.get_server_info()
        return data

    def get_server_info(self):
        self.check_list_length()
        data = {"cpu_use": self.cpu_use_list,
                "mem_use": self.mem_use_list,
                "disk_read": self.dick_read_list,
                "dick_write": self.dick_write_list,
                "net_read": self.net_read_list,
                "net_write": self.net_write_list,
                "time": self.time_list}
        return data

    def set_server_info(self):
        # 磁盤io情況
        rev = os.popen("sar -b 1 1").readlines()
        i_rev = re.findall('(\S+)', rev[-1])[-3]
        o_rev = re.findall('(\S+)', rev[-1])[-4]
        self.dick_read_list.insert(0, o_rev)
        self.dick_write_list.insert(0, i_rev)

        # 網絡io情況
        i_net = 0
        o_net = 0
        nets = os.popen("sar -n DEV 1 1").readlines()
        for net in nets:
            if re.search('lo', net) or re.search('IFACE', net):
                pass
            elif re.search('Average', net):
                i_net += float(re.findall('(\S+)', net)[-5])
                o_net += float(re.findall('(\S+)', net)[-4])

        self.net_read_list.insert(0, o_net)
        self.net_write_list.insert(0, i_net)

        # cpu使用情況
        cpu = os.popen("sar -u 1 1").readlines()
        cpu_use = 100 - float(re.findall('(\S+)', cpu[-1])[-1])
        self.cpu_use_list.insert(0, cpu_use)

        # 內存使用情況
        mem = os.popen('LANG=en_US.UTF-8;sar -r 1 1').readlines()
        data = re.findall('(\S+)', mem[-1])[3]

        self.mem_use_list.insert(0, data)

        # 時間信息
        t1 = datetime.datetime.now()
        now_time = t1.strftime("%Y-%m-%d %H:%M:%S")
        self.time_list.insert(0, now_time)

    def slice_list(self, target_list: list):
        if len(target_list) > 200:
            target_list = target_list[0:200]
        return target_list

    def check_list_length(self):
        self.slice_list(self.time_list)
        self.slice_list(self.dick_read_list)
        self.slice_list(self.dick_write_list)
        self.slice_list(self.net_read_list)
        self.slice_list(self.net_write_list)
        self.slice_list(self.cpu_use_list)
        self.slice_list(self.mem_use_list)


if __name__ == '__main__':
    obj1 = ServerInfo()
    obj2 = ServerInfo()
    # print(obj1 is obj2)
    # thread_list = []
    # for i in range(10):
    #     obj1 = ServerInfo()
    #     t = threading.Thread(target=obj1.set_server_info)
    #     thread_list.append(t)
    #
    # t = None
    # for t in thread_list:
    #     time.sleep(1)
    #     t.start()
    # t.join()
    # obj2 = obj1 = ServerInfo()
    # print(obj2.get_server_info())


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM