容器監控cAdvisor原理分析


cAdvisor是一款強大的 Docker Container 監控工具,方便容器用戶,對運行中的容器進行資源使用和性能分析。用於收集、聚合、處理和導出運行中容器的信息。cAdvisor提供了對Docker容器的原生支持,並且應該支持任何其他容器類型。

Kubelet內置了對cAdvisor的支持,用戶可以直接通過Kubelet組件獲取給節點上容器相關監控指標。

本系列文章cAdvisor代碼,以v0.37.5代碼為例。

cAdvisor主函數分析

以下是main函數代碼,會對代碼進行簡單注解,並對代碼進行一定程度上的精簡,其代碼路徑為:/cadvisor/cmd/cadvisor.go

根據以下代碼可以總結cAdvisor主要完成了以下幾個工作:

  • 提供API給外部使用,包括一般API接口和prometheus接口

  • 可實現第三方數據存儲,支持 bigquery、es、influxdb、kafka、redis、statsd、stdout

  • 收集數據包括 container、process、machine、Go runtime

func main() {
    klog.InitFlags(nil)
    defer klog.Flush()
    flag.Parse()

    if *versionFlag {
        fmt.Printf("cAdvisor version %s (%s)\n", version.Info["version"], version.Info["revision"])
        os.Exit(0)
    }
    // 拿到所有需要收集的metrics類型,即從全量的metrics類型中,排除掉flag.disable_metrics,剩余的metrics集
    // 返回的值大概為container.MetricSet{
    //                     CpuUsageMetrics:                struct{}{}, //cpu
    //                    ProcessSchedulerMetrics:        struct{}{}, //sched
    //                    PerCpuUsageMetrics:             struct{}{}, //precpu
    //                    ....}
    includedMetrics := toIncludedMetrics(ignoreMetrics.MetricSet)

    // 利用cpu個數或是flag.max_procs,設置最大可執行的cpu個數
    setMaxProcs()

    //1. 初始化本地內存
    //2. 初始化存儲介質,可初始化多個,支持:bigquery,es,influxdb,kafka,redis,statsd,stdout【用flag.storage_driver】
    //3. 定時將數據存入存儲介質中【flag.storage_duration】 ??
    memoryStorage, err := NewMemoryStorage()
    if err != nil {
        klog.Fatalf("Failed to initialize storage driver: %s", err)
    }

    // 系統fs對象
    sysFs := sysfs.NewRealSysFs()

    // 利用證書,創建http 的 client
    collectorHttpClient := createCollectorHttpClient(*collectorCert, *collectorKey)

    // 創建resourceManager
    resourceManager, err := manager.New(memoryStorage, sysFs, housekeepingConfig, includedMetrics, &collectorHttpClient, strings.Split(*rawCgroupPrefixWhiteList, ","), *perfEvents)
    if err != nil {
        klog.Fatalf("Failed to create a manager: %s", err)
    }

    mux := http.NewServeMux()

    if *enableProfiling {
        mux.HandleFunc("/debug/pprof/", pprof.Index)
        mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
        mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
        mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
    }

    // Register all HTTP handlers.
    err = cadvisorhttp.RegisterHandlers(mux, resourceManager, *httpAuthFile, *httpAuthRealm, *httpDigestFile, *httpDigestRealm, *urlBasePrefix)
    if err != nil {
        klog.Fatalf("Failed to register HTTP handlers: %v", err)
    }

    containerLabelFunc := metrics.DefaultContainerLabels
    if !*storeContainerLabels {
        whitelistedLabels := strings.Split(*whitelistedContainerLabels, ",")
        containerLabelFunc = metrics.BaseContainerLabels(whitelistedLabels)
    }

    // Register Prometheus collector to gather information about containers, Go runtime, processes, and machine
    cadvisorhttp.RegisterPrometheusHandler(mux, resourceManager, *prometheusEndpoint, containerLabelFunc, includedMetrics)

    // Start the manager.
    if err := resourceManager.Start(); err != nil {
        klog.Fatalf("Failed to start manager: %v", err)
    }

    // Install signal handler. 
    installSignalHandler(resourceManager)

    klog.V(1).Infof("Starting cAdvisor version: %s-%s on port %d", version.Info["version"], version.Info["revision"], *argPort)

    rootMux := http.NewServeMux()
    rootMux.Handle(*urlBasePrefix+"/", http.StripPrefix(*urlBasePrefix, mux))

    addr := fmt.Sprintf("%s:%d", *argIp, *argPort)
    klog.Fatal(http.ListenAndServe(addr, rootMux))
}

其中resourceManager類型是manager,粗略瀏覽下manager結構的字段以及相關功能

type manager struct {
    // 當前受到監控的容器存在一個map中 containerData結構中包括了對容器的各種具體操作方式和相關信息
    containers               map[namespacedContainerName]*containerData
    // 對map中數據存取時采用的Lock
    containersLock           sync.RWMutex
    // 緩存在內存中的數據 主要是容器的相關信息
    memoryCache              *memory.InMemoryCache
    // host上的實際文件系統的相關信息
    fsInfo                   fs.FsInfo
    // 系統fs對象,里面有一些查詢系統文件的方法
    sysFs                    sysfs.SysFs
    machineMu                sync.RWMutex // protects machineInfo
    // machine的相關信息 cpu memory network system信息等等
    machineInfo              info.MachineInfo
    // 用於存放退出信號的channel manager關閉的時候會給其中的channel發送退出信號
    quitChannels             []chan error
    //cadvisor本身所運行的那個容器(如果cadvisor運行在容器中)
    cadvisorContainer        string
    // 是否在hostnamespace中?
    inHostNamespace          bool
    // 對event相關操作進行的封裝
    eventHandler             events.EventManager
    // manager的啟動時間
    startupTime              time.Time
    // 在內存中保留數據的時間 也就是下次開始搜集容器相關信息並且更新內存信息的時間
    maxHousekeepingInterval  time.Duration
    // 是否允許動態設置dynamic housekeeping
    allowDynamicHousekeeping bool
    includedMetrics          container.MetricSet
    containerWatchers        []watcher.ContainerWatcher
    eventsChannel            chan watcher.ContainerEvent
    collectorHTTPClient      *http.Client
    nvidiaManager            stats.Manager
    perfManager              stats.Manager
    resctrlManager           stats.Manager
    // List of raw container cgroup path prefix whitelist.
    rawContainerCgroupPathPrefixWhiteList []string
}

cAdvisor數據采集分析

cAdvisor的數據采集分為兩個部分machineInfo和containerInfo。以下就詳細介紹這兩部分數據采集的過程。對數據采集需要用到resourceManager,這是對數據采集的抽象,其結構體內容的具體介紹見。其數據采集開始代碼是 /cmd/cadvisor.go -> main.go 中的代碼:

resourceManager, err := manager.New(memoryStorage, sysFs, housekeepingConfig, includedMetrics, &collectorHttpClient, strings.Split(*rawCgroupPrefixWhiteList, ","), *perfEvents)
if err != nil {
    klog.Fatalf("Failed to create a manager: %s", err)
}

// Start the manager.
if err := resourceManager.Start(); err != nil {
    klog.Fatalf("Failed to start manager: %v", err)
}

數據采集結構圖

img

machineInfo

machineInfo的數據采集具體代碼,主要是 /machine/info.go -> Info() ,在new manager的時候會去調用一次這個方法,主要是讀取系統文件(具體文件見上面的“整體結構圖”),將數據放入到m.MachineInfo中,后續在Start方法中,起一個協程,定時調用該方法,更新本地cache。相關代碼地址如下:

/machine/info.go
/machine/machine.go
/machine/operatingsystem_unix.go
/machine/operatingsystem_windows.go

containerInfo

整體的containerInfo的數據采集,由 /manager/manager.go -> Start() 開始,起整體流程圖如下:

img

整體的流程的可以概括為兩各部分:

  1. 利用inotify去watch cgroupPath,監控該目錄下的變更,拿到該目錄下的增刪改查事件,也就知道container的變更,從而動態更新cache中的數據
  2. 定時check,cache中的m.containers和主動去拿獲取目前存在的container,對整體做一個diff,從而更新cache中的數據

創建Container

其代碼路徑為 /manager/manager.go -> CreateContainer , 具體代碼如下,詳細解析可以看代碼中的注釋(代碼有做一些刪減)。

// Create a container.
func (m *manager) createContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
    m.containersLock.Lock()
    defer m.containersLock.Unlock()

    return m.createContainerLocked(containerName, watchSource)
}

func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error {
    namespacedName := namespacedContainerName{
        Name: containerName,
    }

    // 查看該container是否以及存在,如果已存在,則直接return
    if _, ok := m.containers[namespacedName]; ok {
        return nil
    }

    // for (factories) 判斷是否能創建handler,如果可以則創建handler。
    // 該handler實現了 ContainerHandler的interface,里面有GetSpec()、GetStats()、ListContainers等方法
    handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.inHostNamespace)
    if err != nil {
        return err
    }
    if !accept {
        // ignoring this container.
        klog.V(4).Infof("ignoring container %q", containerName)
        return nil
    }

    logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
    // 創建 containerData struct{}結構體的對象
    cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping, clock.RealClock{})
    if err != nil {
        return err
    }
    ......

    // 將該container及其所有的aliases,放入到m.containers中
    m.containers[namespacedName] = cont
    for _, alias := range cont.info.Aliases {
        m.containers[namespacedContainerName{
            Namespace: cont.info.Namespace,
            Name:      alias,
        }] = cont
    }

    klog.V(3).Infof("Added container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace)
    ......
    
    // 構建event,找到到合適的m.eventHandler.watchers的*[]watchers,放入到*[]watchers的EventChannel.channel中
    newEvent := &info.Event{
        ContainerName: contRef.Name,
        Timestamp:     contSpec.CreationTime,
        EventType:     info.EventContainerCreation,
    }
    err = m.eventHandler.AddEvent(newEvent)
    if err != nil {
        return err
    }

    // Start the container's housekeeping.
    // 開啟一個housekeeping的協程,定時調用updateStats(),即更新cont的數據
    return cont.Start()
}

其中 m.eventHandler.AddEvent(newEvent) ,其邏輯是找到到合適的 m.eventHandler.watchers 的[]watchers,再將newEvent分別放入到[]watchers中,其中根據條件匹配到合適的[]watchers邏輯如下:

  • watcher.request.SndTime< newEvent.timestamp< watcher.request.EndTime

  • newEvent.EventType在watcher.request.EventType中有

  • newEvent.ContainerName的前綴是watcher.request.ContainerName

檢測子容器

其代碼路徑為 /manager/manager.go -> detectSubcontainers() , 主要是拿到containerName=“/”下的所有container 和 m.containers做diff,獲取新增的容器added 和 已刪除的容器removed

  • Added:對於added的容器調用m.CreateContainer()(具體可參考:4.1 創建container)
  • Removed:對於removed的容器調用m.destroyContainer(),將該容器及其aliases在cache中的記錄全部刪除掉

其diff具體邏輯如下圖:

img

Watch

其代碼路徑為 /manager/manager.go -> watchForNewContainers(quit chan error)
用的是 k8s.io/utils/inotify 中的watch功能,即watch一個目錄,從而拿到該目錄下的所有變更。所以這里利用的是inotify來watch cgroupPath,從而watch到container的變更

  • 調用m.containerWatchers中watch的start(),watch cgroupPaths中的變化,獲取該目錄變更event,並將得到的event,放入條件匹配的watch的EventChannel.channel中

  • 調用 detectSubContainers(“/”) (具體可參考:4.2 檢測子容器)

  • go func{}處理以上的到的event,對於add事件調用 m.CreateContainer() ,對於delete事件調用 m.destroyContainer() ,收到quit信號,則退出協程

全局更新

其代碼路徑為 /manager/manager.go -> globalHousekeeping(quit chan error) ,主要是定時調用m.detectSubcontainers("/"),具體邏輯可參考檢測子容器。間隔時間:globalHousekeepingInterval

func (m *manager) globalHousekeeping(quit chan error) {
    // longHousekeeping := min(100ms,*globalHousekeepingInterval / 2)
    longHousekeeping := 100 * time.Millisecond
    if *globalHousekeepingInterval/2 < longHousekeeping {
        longHousekeeping = *globalHousekeepingInterval / 2
    }

    // 定時,間隔時間 *globalHousekeepingInterval
    ticker := time.NewTicker(*globalHousekeepingInterval)
    for {
        select {
        case t := <-ticker.C:
            start := time.Now()

            // Check for new containers.
            err := m.detectSubcontainers("/")
            if err != nil {
                klog.Errorf("Failed to detect containers: %s", err)
            }

            // housekeeping 耗時超過longHousekeeping,則打印一條日志
            duration := time.Since(start)
            if duration >= longHousekeeping {
                klog.V(3).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration)
            }
        case <-quit:
            // Quit if asked to do so.
            quit <- nil
            klog.Infof("Exiting global housekeeping thread")
            return
        }
    }
}

cAdvisor數據存儲分析

cAdvisor不僅會在本地存儲,用於prometheus拉取,而且還支持將數據存入第三方存儲介質,用於數據的持久化,其邏輯相對簡單,但是卻很重要。

存儲核心代碼

cadvisor/cmd/storagedriver.go

// 主要用於返回,各第三方init時放入map中的client,以及和storage相關的flag
storage/* 

// 主要是一些創建storage的client,AddStats,Close方法
cadvisor/cmd/storage/bigquery/*
cadvisor/cmd/storage/elasticsearch/*
cadvisor/cmd/storage/influxdb/*
cadvisor/cmd/storage/kafka/*
cadvisor/cmd/storage/redis/*
cadvisor/cmd/storage/statsd/*
cadvisor/cmd/storage/stdout/*

// 本地cache相關操作
utils/timed_store.go

代碼入口

在真正執行add之前,需要初始化存儲對象。數據存儲最重要的結構體是InMamoryCache,其具體結構如下,具體邏輯看注釋:

type InMemoryCache struct {
    // 讀寫鎖
    lock              sync.RWMutex
    // container本地cache
    containerCacheMap map[string]*containerCache
    // 最大存活時間,這個在下面的存儲過程中會用到
    maxAge            time.Duration
    // 不同第三方存儲實現的interface
    backend           []storage.StorageDriver
}

初始調用在main函數中的

func main() {    
    ....
    
    memoryStorage, err := NewMemoryStorage()
    if err != nil {
        klog.Fatalf("Failed to initialize storage driver: %s", err)
    }
    
    ....
}

其中NewMemoryStorage() 函數在cmd/storagedriver.go中,具體代碼如下:

// NewMemoryStorage creates a memory storage with an optional backend storage option.
func NewMemoryStorage() (*memory.InMemoryCache, error) {
    backendStorages := []storage.StorageDriver{}
    // storageDriver: flag.storage_driver啟動時輸入,多個存儲介質用逗號分割(默認為空)
    for _, driver := range strings.Split(*storageDriver, ",") {
        if driver == "" {
            continue
        }
        // 返回的是各第三方存儲的StorageDriver,以elasticsearch為例,就是
        // /cmd/internal/storage/elasticsearch/elasticsearch.go -> func new() (storage.StorageDriver, error)
        storage, err := storage.New(driver)
        if err != nil {
            return nil, err
        }
        backendStorages = append(backendStorages, storage)
        klog.V(1).Infof("Using backend storage type %q", driver)
    }
    klog.V(1).Infof("Caching stats in memory for %v", *storageDuration)
    
    // *InMemoryCache,其中maxAge就是flag.storage_duration啟動輸入的值(默認2m)
    return memory.New(*storageDuration, backendStorages), nil
}

數據存儲過程

真正數據的存儲過程分為兩個部分:本地存儲和第三方介質存儲

  • 本地存儲:

    • InMemoryCache.containerCacheMap是一個map,其具體結構為map[string]*ContainerCache,其key是container的name,value是ContainerCache,先判斷該map中是否有containerName的數據,如果沒有則新增
    • 對containerName相對應的ContainerCache插入數據,插入數據的步驟分三步:
      • 將數據根據timestamp插入到相應位置
      • 將TimeStore.buffer中timestamp < 剛插入數據的timestamp - age 的數據remove掉
      • 查看buffer中數據個數 > maxItems,將timestamp小的數據remove掉
  • 第三方介質存儲:for bankend中的方法,調用各介質的AddStats方法,將數據存入

具體調用過程可參考以下圖:

img

若有收獲,就點個贊吧


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM