cAdvisor是一款強大的 Docker Container 監控工具,方便容器用戶,對運行中的容器進行資源使用和性能分析。用於收集、聚合、處理和導出運行中容器的信息。cAdvisor提供了對Docker容器的原生支持,並且應該支持任何其他容器類型。
Kubelet內置了對cAdvisor的支持,用戶可以直接通過Kubelet組件獲取給節點上容器相關監控指標。
本系列文章cAdvisor代碼,以v0.37.5代碼為例。
cAdvisor主函數分析
以下是main函數代碼,會對代碼進行簡單注解,並對代碼進行一定程度上的精簡,其代碼路徑為:/cadvisor/cmd/cadvisor.go
根據以下代碼可以總結cAdvisor主要完成了以下幾個工作:
-
提供API給外部使用,包括一般API接口和prometheus接口
-
可實現第三方數據存儲,支持 bigquery、es、influxdb、kafka、redis、statsd、stdout
-
收集數據包括 container、process、machine、Go runtime
func main() {
klog.InitFlags(nil)
defer klog.Flush()
flag.Parse()
if *versionFlag {
fmt.Printf("cAdvisor version %s (%s)\n", version.Info["version"], version.Info["revision"])
os.Exit(0)
}
// 拿到所有需要收集的metrics類型,即從全量的metrics類型中,排除掉flag.disable_metrics,剩余的metrics集
// 返回的值大概為container.MetricSet{
// CpuUsageMetrics: struct{}{}, //cpu
// ProcessSchedulerMetrics: struct{}{}, //sched
// PerCpuUsageMetrics: struct{}{}, //precpu
// ....}
includedMetrics := toIncludedMetrics(ignoreMetrics.MetricSet)
// 利用cpu個數或是flag.max_procs,設置最大可執行的cpu個數
setMaxProcs()
//1. 初始化本地內存
//2. 初始化存儲介質,可初始化多個,支持:bigquery,es,influxdb,kafka,redis,statsd,stdout【用flag.storage_driver】
//3. 定時將數據存入存儲介質中【flag.storage_duration】 ??
memoryStorage, err := NewMemoryStorage()
if err != nil {
klog.Fatalf("Failed to initialize storage driver: %s", err)
}
// 系統fs對象
sysFs := sysfs.NewRealSysFs()
// 利用證書,創建http 的 client
collectorHttpClient := createCollectorHttpClient(*collectorCert, *collectorKey)
// 創建resourceManager
resourceManager, err := manager.New(memoryStorage, sysFs, housekeepingConfig, includedMetrics, &collectorHttpClient, strings.Split(*rawCgroupPrefixWhiteList, ","), *perfEvents)
if err != nil {
klog.Fatalf("Failed to create a manager: %s", err)
}
mux := http.NewServeMux()
if *enableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
}
// Register all HTTP handlers.
err = cadvisorhttp.RegisterHandlers(mux, resourceManager, *httpAuthFile, *httpAuthRealm, *httpDigestFile, *httpDigestRealm, *urlBasePrefix)
if err != nil {
klog.Fatalf("Failed to register HTTP handlers: %v", err)
}
containerLabelFunc := metrics.DefaultContainerLabels
if !*storeContainerLabels {
whitelistedLabels := strings.Split(*whitelistedContainerLabels, ",")
containerLabelFunc = metrics.BaseContainerLabels(whitelistedLabels)
}
// Register Prometheus collector to gather information about containers, Go runtime, processes, and machine
cadvisorhttp.RegisterPrometheusHandler(mux, resourceManager, *prometheusEndpoint, containerLabelFunc, includedMetrics)
// Start the manager.
if err := resourceManager.Start(); err != nil {
klog.Fatalf("Failed to start manager: %v", err)
}
// Install signal handler.
installSignalHandler(resourceManager)
klog.V(1).Infof("Starting cAdvisor version: %s-%s on port %d", version.Info["version"], version.Info["revision"], *argPort)
rootMux := http.NewServeMux()
rootMux.Handle(*urlBasePrefix+"/", http.StripPrefix(*urlBasePrefix, mux))
addr := fmt.Sprintf("%s:%d", *argIp, *argPort)
klog.Fatal(http.ListenAndServe(addr, rootMux))
}
其中resourceManager類型是manager,粗略瀏覽下manager結構的字段以及相關功能
type manager struct {
// 當前受到監控的容器存在一個map中 containerData結構中包括了對容器的各種具體操作方式和相關信息
containers map[namespacedContainerName]*containerData
// 對map中數據存取時采用的Lock
containersLock sync.RWMutex
// 緩存在內存中的數據 主要是容器的相關信息
memoryCache *memory.InMemoryCache
// host上的實際文件系統的相關信息
fsInfo fs.FsInfo
// 系統fs對象,里面有一些查詢系統文件的方法
sysFs sysfs.SysFs
machineMu sync.RWMutex // protects machineInfo
// machine的相關信息 cpu memory network system信息等等
machineInfo info.MachineInfo
// 用於存放退出信號的channel manager關閉的時候會給其中的channel發送退出信號
quitChannels []chan error
//cadvisor本身所運行的那個容器(如果cadvisor運行在容器中)
cadvisorContainer string
// 是否在hostnamespace中?
inHostNamespace bool
// 對event相關操作進行的封裝
eventHandler events.EventManager
// manager的啟動時間
startupTime time.Time
// 在內存中保留數據的時間 也就是下次開始搜集容器相關信息並且更新內存信息的時間
maxHousekeepingInterval time.Duration
// 是否允許動態設置dynamic housekeeping
allowDynamicHousekeeping bool
includedMetrics container.MetricSet
containerWatchers []watcher.ContainerWatcher
eventsChannel chan watcher.ContainerEvent
collectorHTTPClient *http.Client
nvidiaManager stats.Manager
perfManager stats.Manager
resctrlManager stats.Manager
// List of raw container cgroup path prefix whitelist.
rawContainerCgroupPathPrefixWhiteList []string
}
cAdvisor數據采集分析
cAdvisor的數據采集分為兩個部分machineInfo和containerInfo。以下就詳細介紹這兩部分數據采集的過程。對數據采集需要用到resourceManager,這是對數據采集的抽象,其結構體內容的具體介紹見。其數據采集開始代碼是 /cmd/cadvisor.go -> main.go
中的代碼:
resourceManager, err := manager.New(memoryStorage, sysFs, housekeepingConfig, includedMetrics, &collectorHttpClient, strings.Split(*rawCgroupPrefixWhiteList, ","), *perfEvents)
if err != nil {
klog.Fatalf("Failed to create a manager: %s", err)
}
// Start the manager.
if err := resourceManager.Start(); err != nil {
klog.Fatalf("Failed to start manager: %v", err)
}
數據采集結構圖
machineInfo
machineInfo的數據采集具體代碼,主要是 /machine/info.go -> Info()
,在new manager
的時候會去調用一次這個方法,主要是讀取系統文件(具體文件見上面的“整體結構圖”),將數據放入到m.MachineInfo中,后續在Start方法中,起一個協程,定時調用該方法,更新本地cache。相關代碼地址如下:
/machine/info.go
/machine/machine.go
/machine/operatingsystem_unix.go
/machine/operatingsystem_windows.go
containerInfo
整體的containerInfo的數據采集,由 /manager/manager.go -> Start() 開始,起整體流程圖如下:
整體的流程的可以概括為兩各部分:
- 利用inotify去watch cgroupPath,監控該目錄下的變更,拿到該目錄下的增刪改查事件,也就知道container的變更,從而動態更新cache中的數據
- 定時check,cache中的m.containers和主動去拿獲取目前存在的container,對整體做一個diff,從而更新cache中的數據
創建Container
其代碼路徑為 /manager/manager.go -> CreateContainer , 具體代碼如下,詳細解析可以看代碼中的注釋(代碼有做一些刪減)。
// Create a container.
func (m *manager) createContainer(containerName string, watchSource watcher.ContainerWatchSource) error {
m.containersLock.Lock()
defer m.containersLock.Unlock()
return m.createContainerLocked(containerName, watchSource)
}
func (m *manager) createContainerLocked(containerName string, watchSource watcher.ContainerWatchSource) error {
namespacedName := namespacedContainerName{
Name: containerName,
}
// 查看該container是否以及存在,如果已存在,則直接return
if _, ok := m.containers[namespacedName]; ok {
return nil
}
// for (factories) 判斷是否能創建handler,如果可以則創建handler。
// 該handler實現了 ContainerHandler的interface,里面有GetSpec()、GetStats()、ListContainers等方法
handler, accept, err := container.NewContainerHandler(containerName, watchSource, m.inHostNamespace)
if err != nil {
return err
}
if !accept {
// ignoring this container.
klog.V(4).Infof("ignoring container %q", containerName)
return nil
}
logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
// 創建 containerData struct{}結構體的對象
cont, err := newContainerData(containerName, m.memoryCache, handler, logUsage, collectorManager, m.maxHousekeepingInterval, m.allowDynamicHousekeeping, clock.RealClock{})
if err != nil {
return err
}
......
// 將該container及其所有的aliases,放入到m.containers中
m.containers[namespacedName] = cont
for _, alias := range cont.info.Aliases {
m.containers[namespacedContainerName{
Namespace: cont.info.Namespace,
Name: alias,
}] = cont
}
klog.V(3).Infof("Added container: %q (aliases: %v, namespace: %q)", containerName, cont.info.Aliases, cont.info.Namespace)
......
// 構建event,找到到合適的m.eventHandler.watchers的*[]watchers,放入到*[]watchers的EventChannel.channel中
newEvent := &info.Event{
ContainerName: contRef.Name,
Timestamp: contSpec.CreationTime,
EventType: info.EventContainerCreation,
}
err = m.eventHandler.AddEvent(newEvent)
if err != nil {
return err
}
// Start the container's housekeeping.
// 開啟一個housekeeping的協程,定時調用updateStats(),即更新cont的數據
return cont.Start()
}
其中 m.eventHandler.AddEvent(newEvent) ,其邏輯是找到到合適的 m.eventHandler.watchers 的[]watchers,再將newEvent分別放入到[]watchers中,其中根據條件匹配到合適的[]watchers邏輯如下:
-
watcher.request.SndTime< newEvent.timestamp< watcher.request.EndTime
-
newEvent.EventType在watcher.request.EventType中有
-
newEvent.ContainerName的前綴是watcher.request.ContainerName
檢測子容器
其代碼路徑為 /manager/manager.go -> detectSubcontainers() , 主要是拿到containerName=“/”下的所有container 和 m.containers做diff,獲取新增的容器added 和 已刪除的容器removed
- Added:對於added的容器調用m.CreateContainer()(具體可參考:4.1 創建container)
- Removed:對於removed的容器調用m.destroyContainer(),將該容器及其aliases在cache中的記錄全部刪除掉
其diff具體邏輯如下圖:
Watch
其代碼路徑為 /manager/manager.go -> watchForNewContainers(quit chan error)
用的是 k8s.io/utils/inotify 中的watch功能,即watch一個目錄,從而拿到該目錄下的所有變更。所以這里利用的是inotify來watch cgroupPath,從而watch到container的變更
-
調用m.containerWatchers中watch的start(),watch cgroupPaths中的變化,獲取該目錄變更event,並將得到的event,放入條件匹配的watch的EventChannel.channel中
-
調用 detectSubContainers(“/”) (具體可參考:4.2 檢測子容器)
-
go func{}處理以上的到的event,對於add事件調用 m.CreateContainer() ,對於delete事件調用 m.destroyContainer() ,收到quit信號,則退出協程
全局更新
其代碼路徑為 /manager/manager.go -> globalHousekeeping(quit chan error) ,主要是定時調用m.detectSubcontainers("/"),具體邏輯可參考檢測子容器
。間隔時間:globalHousekeepingInterval
func (m *manager) globalHousekeeping(quit chan error) {
// longHousekeeping := min(100ms,*globalHousekeepingInterval / 2)
longHousekeeping := 100 * time.Millisecond
if *globalHousekeepingInterval/2 < longHousekeeping {
longHousekeeping = *globalHousekeepingInterval / 2
}
// 定時,間隔時間 *globalHousekeepingInterval
ticker := time.NewTicker(*globalHousekeepingInterval)
for {
select {
case t := <-ticker.C:
start := time.Now()
// Check for new containers.
err := m.detectSubcontainers("/")
if err != nil {
klog.Errorf("Failed to detect containers: %s", err)
}
// housekeeping 耗時超過longHousekeeping,則打印一條日志
duration := time.Since(start)
if duration >= longHousekeeping {
klog.V(3).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration)
}
case <-quit:
// Quit if asked to do so.
quit <- nil
klog.Infof("Exiting global housekeeping thread")
return
}
}
}
cAdvisor數據存儲分析
cAdvisor不僅會在本地存儲,用於prometheus拉取,而且還支持將數據存入第三方存儲介質,用於數據的持久化,其邏輯相對簡單,但是卻很重要。
存儲核心代碼
cadvisor/cmd/storagedriver.go
// 主要用於返回,各第三方init時放入map中的client,以及和storage相關的flag
storage/*
// 主要是一些創建storage的client,AddStats,Close方法
cadvisor/cmd/storage/bigquery/*
cadvisor/cmd/storage/elasticsearch/*
cadvisor/cmd/storage/influxdb/*
cadvisor/cmd/storage/kafka/*
cadvisor/cmd/storage/redis/*
cadvisor/cmd/storage/statsd/*
cadvisor/cmd/storage/stdout/*
// 本地cache相關操作
utils/timed_store.go
代碼入口
在真正執行add之前,需要初始化存儲對象。數據存儲最重要的結構體是InMamoryCache,其具體結構如下,具體邏輯看注釋:
type InMemoryCache struct {
// 讀寫鎖
lock sync.RWMutex
// container本地cache
containerCacheMap map[string]*containerCache
// 最大存活時間,這個在下面的存儲過程中會用到
maxAge time.Duration
// 不同第三方存儲實現的interface
backend []storage.StorageDriver
}
初始調用在main函數中的
func main() {
....
memoryStorage, err := NewMemoryStorage()
if err != nil {
klog.Fatalf("Failed to initialize storage driver: %s", err)
}
....
}
其中NewMemoryStorage() 函數在cmd/storagedriver.go中,具體代碼如下:
// NewMemoryStorage creates a memory storage with an optional backend storage option.
func NewMemoryStorage() (*memory.InMemoryCache, error) {
backendStorages := []storage.StorageDriver{}
// storageDriver: flag.storage_driver啟動時輸入,多個存儲介質用逗號分割(默認為空)
for _, driver := range strings.Split(*storageDriver, ",") {
if driver == "" {
continue
}
// 返回的是各第三方存儲的StorageDriver,以elasticsearch為例,就是
// /cmd/internal/storage/elasticsearch/elasticsearch.go -> func new() (storage.StorageDriver, error)
storage, err := storage.New(driver)
if err != nil {
return nil, err
}
backendStorages = append(backendStorages, storage)
klog.V(1).Infof("Using backend storage type %q", driver)
}
klog.V(1).Infof("Caching stats in memory for %v", *storageDuration)
// *InMemoryCache,其中maxAge就是flag.storage_duration啟動輸入的值(默認2m)
return memory.New(*storageDuration, backendStorages), nil
}
數據存儲過程
真正數據的存儲過程分為兩個部分:本地存儲和第三方介質存儲
-
本地存儲:
-
- InMemoryCache.containerCacheMap是一個map,其具體結構為map[string]*ContainerCache,其key是container的name,value是ContainerCache,先判斷該map中是否有containerName的數據,如果沒有則新增
- 對containerName相對應的ContainerCache插入數據,插入數據的步驟分三步:
-
-
- 將數據根據timestamp插入到相應位置
- 將TimeStore.buffer中timestamp < 剛插入數據的timestamp - age 的數據remove掉
-
-
-
- 查看buffer中數據個數 > maxItems,將timestamp小的數據remove掉
-
-
第三方介質存儲:for bankend中的方法,調用各介質的AddStats方法,將數據存入
具體調用過程可參考以下圖:
若有收獲,就點個贊吧