VictoriaMetrics源碼及相關API解析


flock.lock

vmStorage在啟動時會根據路徑以及給定的最大保存時間創建Storage對象,然后還會根據cache路徑(path + /cache)下是否存在/reset_cache_on_startup這一路徑在選擇進行cache目錄下的清空(reset),然后是創建flock.lock文件,這是一個文件鎖文件,調用的底層API是

if err := unix.Flock(int(flockF.Fd()), unix.LOCK_EX|unix.LOCK_NB); err != nil {
	return nil, fmt.Errorf("cannot acquire lock on file %q: %w", flockFile, err)
}

調用unix.Flock(int fd, int operation)對創建的flock.lock文件上文件鎖,第二個參數的含義是指上鎖的鎖類型,有LOCK_SHLOCK_EXLOCK_NBLOCK_UN四種,含義如下

  • LOCK_SH表示共享鎖
  • LOCK_EX表示排它鎖
  • LOCK_NB讓想要嘗試給文件上排它鎖的進程不進入阻塞狀態(默認會進入阻塞狀態)而直接返回
  • LOCK_UN用於解除文件的文件鎖

而flock()添加的鎖是建議性鎖,也就是使用flock()對文件上鎖后,其他進程仍然可以對該文件進行操作,而只有通過flock()調用去檢測才可以知道文件是否處於被鎖定的狀態,也就是flock()只是起到一個通知的作用,沒有強制性的阻止文件修改的動作。

atomic.Value

vmStorage在啟動時會需要指定每小時以及每天的最大保存進storage的數量,這個數量由一個限制器實現:

type Limiter struct {
	maxItems int
	v        atomic.Value

	wg     sync.WaitGroup
	stopCh chan struct{}
}

其中,atomic.Value類型類似於一個容器,可以將任意類型的讀寫操作分裝成原子性操作(讓中間狀態對外不可見),atomic.Value類型對外暴露的方法只有兩個

  • v.Store(c) - 寫操作,將原始的變量c存放到一個atomic.Value類型的v里
  • c = v.Load() - 讀操作,從線程安全的v中讀取上一步存放的內容

Request.FormValue

Golang中net/http包下Request.FormValue方法可以獲取 url 中 ? 后面的請求參數

TSID

vmStorage在啟動時還需要獲取到分配到的內存大小,然后根據一定的比例創建cache區域,有MetricName->TSIDMetricID->TSIDMetricID->MetricName以供后續插入數據建立索引使用,其中有關與TSID的創建是至關重要的,如下圖所示,VictoriaMetrics在接收到寫入請求時,會對請求中包含的時序數據做轉換處理,首先根據包含metric和labels的MetricName生成一個唯一表示TSID,然后metric + labels + TSID作為索引index, TSID + timestamp + value作為數據data,最后索引index和數據data分別進行存儲和檢索。

TSID的結構

// storage/tsid.go
type TSID struct {
	// metricName, 指標名對應的id
	MetricGroupID uint64

	JobID              uint32
	InstanceID       uint32

        // metricNameRaw, 指標名+tags對應的id
	MetricID          uint64
}

TSID一共有四個字段,其中JobIDInstanceID是為了兼容Prometheus的協議而添加的,而且只有MetricID是必須的,其他三個字段都是可選的。TSID的初始化函數如下,這一函數會在VmStorage啟動的main函數里面的go srv.RunVMInsert()go srv.RunVMSelect()中被調用到

func (db *indexDB) generateTSID(dst *TSID, metricName []byte, mn *MetricName) error {
	// Search the TSID in the external storage.
	// This is usually the db from the previous period.
	var err error
	if db.doExtDB(func(extDB *indexDB) {
		err = extDB.getTSIDByNameNoCreate(dst, metricName)
	}) {
		if err == nil {
			// The TSID has been found in the external storage.
			return nil
		}
		if err != io.EOF {
			return fmt.Errorf("external search failed: %w", err)
		}
	}

	// The TSID wasn't found in the external storage.
	// Generate it locally.
	dst.AccountID = mn.AccountID
	dst.ProjectID = mn.ProjectID
	dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
	if len(mn.Tags) > 0 {
		dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value))
	}
	if len(mn.Tags) > 1 {
		dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value))
	}
	dst.MetricID = generateUniqueMetricID()
	return nil
}

其中,JobID和InstanceID是為了兼容Prometheus協議而添加的,並且是根據tag字段的第一個跟第二個進行生成,因為VictoriaMetrics會將接收到的這兩個ID放在前兩位。MetricGroupID是對指標名稱做hash算法;而MetricID是進程當前啟動的納秒時間戳,並且在隨后的每次新的TSID的生成都會在此基礎上自增1

倒排索引

VictoriaMetrics在創建完TSID后就會建立一系列的索引以供在查找時使用,並且由於TSID中除了MetricID外,其他字段都是可供選擇的,所以可供使用的有效信息只有MetricID,因為VictoriaMetrics在構建tags到TSID的字典的過程中,就是在構建tag->MetricID的字典。
對於接收到的一個時序指標,VictoriaMetrics會生成以下幾種索引,分別是

  • MetricName -> TSID
  • MetricID -> MetricName
  • MetricID -> TSID
  • Tags -> MetricID

例如以http_requests_total{status="200", method="GET"}為例,則MetricName為http_requests_total{status="200", method="GET"}, 假設生成的TSID為{metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286},那么生成的索引如下:

  • metricName -> TSID, 即http_requests_total{status="200", method="GET"} -> {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286}
  • metricID -> metricName,即51106185174286 -> http_requests_total{status="200", method="GET"}
  • metricID -> TSID,即51106185174286 -> {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286}
  • tag -> metricID,即 status="200" -> 51106185174286, method="GET" -> 51106185174286, "" = http_requests_total -> 51106185174286

有了上面這些索引的item之后,就可以進行基於tag的多維查詢,VictoriaMetrics會首先根據tag尋找到所對應的MetricID列表,然后求出所有tag的MetricID列表的交集,然后再根據這一MetricID檢索出TSID去到數據文件查詢數據以及根據MetricID到索引文件檢索原始的MetricName

VmSelect

VmSelect在接受到查詢請求后,會將查詢任務拆分成多個任務分發給VMStorage,再將返回的結果聚合返回,接受的url格式為/{prefix}/{authToken}/{suddix},含義分別如下:

  • prefix:操作類型,VmSelect支持的是select和delete
  • authToken:賬號類型,格式為accountID[:projectID],用於租戶隔離,projectID可選
  • suffix:prometheus的查詢api

從VMSelect的api接收到查詢的query到查詢VmStorage,中間主要是一些解析查詢語句的過程,然后會通過下面這一函數對所有的VmStorage進行遍歷查詢(無差別對待)

func startStorageNodesRequest(denyPartialResponse bool, f func(idx int, sn *storageNode) interface{}) *storageNodesRequest {
	resultsCh := make(chan interface{}, len(storageNodes))
	for idx, sn := range storageNodes {
		go func(idx int, sn *storageNode) {
			result := f(idx, sn)
			resultsCh <- result
		}(idx, sn)
	}
	return &storageNodesRequest{
		denyPartialResponse: denyPartialResponse,
		resultsCh:           resultsCh,
	}
}

其中,storageNodes是在VmSelect初始化的時候通過func InitStorageNodes(addrs []string)便初始化好的一個節點,這就意味着當我們使用增加storageNodes的方式擴容VictoriaMetrics的時候,就需要修改VmSelect和VmInsert的參數並重新啟動,並且如果有一個VmStorage宕機的時候,查詢出來的數據也是不完整的。

VmInsert


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM