轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客:https://www.luozhiyun.com
源碼版本是1.19
概述
k8s的Event事件是一種資源對象,用於展示集群內發生的情況,k8s系統中的各個組件會將運行時發生的各種事件上報給apiserver 。可以通過kubectl get event 或 kubectl describe pod podName 命令顯示事件,查看k8s集群中發生了哪些事件。
apiserver 會將Event事件存在etcd集群中,為避免磁盤空間被填滿,故強制執行保留策略:在最后一次的事件發生后,刪除1小時之前發生的事件。
如:
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 19s default-scheduler Successfully assigned default/hpatest-bbb44c476-8d45v to 192.168.13.130
Normal Pulled 15s kubelet, 192.168.13.130 Container image "nginx" already present on machine
Normal Created 15s kubelet, 192.168.13.130 Created container hpatest
Normal Started 13s kubelet, 192.168.13.130 Started container hpatest
當集群中的 node 或 pod 異常時,大部分用戶會使用 kubectl 查看對應的 events,我們通過前面章節的代碼分析可以看到這樣的代碼:
recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
通過查找也可以確認基本上與node 或 pod相關的模塊都會涉及到事件,如:controller-manage、kube-proxy、kube-scheduler、kubelet 等。
Event事件管理機制主要有三部分組成:
- EventRecorder:是事件生成者,k8s組件通過調用它的方法來生成事件;
- EventBroadcaster:事件廣播器,負責消費EventRecorder產生的事件,然后分發給broadcasterWatcher;
- broadcasterWatcher:用於定義事件的處理方式,如上報apiserver;
整個事件管理機制的流程大致如圖:
下面我們以kubelet 中的Event事件來作為分析的例子進行講解。
源碼分析
kubelet 在初始化的時候會調用makeEventRecorder進行Event初始化。
makeEventRecorder
文件位置:cmd/kubelet/app/server.go
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
if kubeDeps.Recorder != nil {
return
}
// 初始化 EventBroadcaster
eventBroadcaster := record.NewBroadcaster()
// 初始化 EventRecorder
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
//記錄Event到log
eventBroadcaster.StartStructuredLogging(3)
if kubeDeps.EventClient != nil {
klog.V(4).Infof("Sending events to api server.")
//上報Event到apiserver並存儲至etcd集群
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
} else {
klog.Warning("No api server defined - no events will be sent to API server.")
}
}
這個方法創建了一個EventBroadcaster,這是一個事件廣播器,會消費EventRecorder記錄的事件並通過StartStructuredLogging和StartRecordingToSink分別將event發送給log和apiserver;EventRecorder,用作事件記錄器,k8s系統組件通過它記錄關鍵性事件;
EventRecorder記錄事件
type EventRecorder interface {
Event(object runtime.Object, eventtype, reason, message string)
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
EventRecorder接口非常的簡單,就3個方法。其中Event是可以用來記錄剛發生的事件;Eventf通過使用fmt.Sprintf格式化輸出事件的格式;AnnotatedEventf功能和Eventf一致,但是附加了注釋字段。
我們記錄事件的時候上面也提到了,一般如下記錄:
recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
Eventf會調用到EventRecorder的實現類recorderImpl中去,最后調用到generateEvent方法中:
Event
文件位置:client-go/tools/record/event.go
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}
func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
}
generateEvent
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
...
//實例化Event
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
//異步調用Action方法將事件寫入到incoming中
go func() {
// NOTE: events should be a non-blocking operation
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}
generateEvent方法會異步的調用Action方法,將事件寫入到incoming中:
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}
調用步驟如下:
EventBroadcaster事件廣播
EventBroadcaster初始化的時候會調用NewBroadcaster方法:
文件位置:client-go/tools/record/event.go
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
sleepDuration: defaultSleepDuration,
}
}
這里會創建一個eventBroadcasterImpl實例,並設置兩個字段Broadcaster和sleepDuration。Broadcaster是這個方法的核心,我們下面接着看:
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, incomingQueueLength),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1)
//開啟事件循環
go m.loop()
return m
}
在這里初始化Broadcaster的時候,會初始化一個broadcasterWatcher,用於定義事件處理方式,如上報apiserver等;初始化incoming,用於EventBroadcaster和EventRecorder進行事件傳輸。
loop
文件位置:k8s.io/apimachinery/pkg/watch/mux.go
func (m *Broadcaster) loop() {
//獲取m.incoming管道中的數據
for event := range m.incoming {
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
}
//進行事件分發
m.distribute(event)
}
m.closeAll()
m.distributing.Done()
}
這個方法會一直后台等待獲取m.incoming管道中的數據,然后調用distribute方法進行事件分發給broadcasterWatcher。incoming管道中的數據是EventRecorder調用Event方法傳入的。
distribute
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
//如果是非阻塞,那么使用DropIfChannelFull標識
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}
如果是非阻塞,那么使用DropIfChannelFull標識,在w.result管道滿了之后,事件會丟失。如果沒有default關鍵字,那么,當w.result管道滿了之后,分發過程會阻塞並等待。
這里之所以需要丟失事件,是因為隨着k8s集群越來越大,上報事件也隨之增多,那么每次上報都要對etcd進行讀寫,這樣會給etcd集群帶來壓力。但是事件丟失並不會影響集群的正常工作,所以非阻塞分發機制下事件會丟失。
recordToSink事件的處理
調用StartRecordingToSink方法會將數據上報到apiserver。
StartRecordingToSink
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
eventCorrelator := NewEventCorrelatorWithOptions(e.options)
return e.StartEventWatcher(
func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, e.sleepDuration)
})
}
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher := e.Watch()
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
continue
}
//回調傳入的方法
eventHandler(event)
}
}()
return watcher
}
StartRecordingToSink會調用StartEventWatcher,StartEventWatcher方法里面會異步的調用 watcher.ResultChan()方法獲取到broadcasterWatcher的result管道,result管道里面的數據就是Broadcaster的distribute方法進行分發的。
最后會回調到傳入的方法recordToSink中。
recordToSink
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
eventCopy := *event
event = &eventCopy
//對事件做預處理,聚合相同的事件
result, err := eventCorrelator.EventCorrelate(event)
if err != nil {
utilruntime.HandleError(err)
}
if result.Skip {
return
}
tries := 0
for {
// 把事件發送到 apiserver
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}
recordToSink方法首先會調用EventCorrelate方法對event做預處理,聚合相同的事件,避免產生的事件過多,增加 etcd 和 apiserver 的壓力,如果傳入的Event太多了,那么result.Skip 就會返回false;
接下來會調用recordEvent方法把事件發送到 apiserver,它會重試很多次(默認是 12 次),並且每次重試都有一定時間間隔(默認是 10 秒鍾)。
下面我們分別來看看EventCorrelate方法和recordEvent方法。
EventCorrelate
文件位置:client-go/tools/record/events_cache.go
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
if newEvent == nil {
return nil, fmt.Errorf("event is nil")
}
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
if c.filterFunc(observedEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
EventCorrelate方法會調用EventAggregate、eventObserve進行聚合,調用filterFunc會調用到spamFilter.Filter方法進行過濾。
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
now := metav1.NewTime(e.clock.Now())
var record aggregateRecord
eventKey := getEventKey(newEvent)
aggregateKey, localKey := e.keyFunc(newEvent)
e.Lock()
defer e.Unlock()
// 查找緩存里面是否也存在這樣的記錄
value, found := e.cache.Get(aggregateKey)
if found {
record = value.(aggregateRecord)
}
// maxIntervalInSeconds默認時間是600s,這里校驗緩存里面的記錄是否太老了
// 如果是那么就創建一個新的
// 如果record在緩存里面找不到,那么lastTimestamp是零,那么也創建一個新的
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
interval := now.Time.Sub(record.lastTimestamp.Time)
if interval > maxInterval {
record = aggregateRecord{localKeys: sets.NewString()}
}
record.localKeys.Insert(localKey)
record.lastTimestamp = now
// 重新加入到LRU緩存中
e.cache.Add(aggregateKey, record)
// 如果沒有達到閾值,那么不進行聚合
if uint(record.localKeys.Len()) < e.maxEvents {
return newEvent, eventKey
}
record.localKeys.PopAny()
eventCopy := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
Namespace: newEvent.Namespace,
},
Count: 1,
FirstTimestamp: now,
InvolvedObject: newEvent.InvolvedObject,
LastTimestamp: now,
// 將Message進行聚合
Message: e.messageFunc(newEvent),
Type: newEvent.Type,
Reason: newEvent.Reason,
Source: newEvent.Source,
}
return eventCopy, aggregateKey
}
EventAggregate方法也考慮了很多,首先是去緩存里面查找有沒有相同的聚合記錄aggregateRecord,如果沒有的話,那么會在校驗時間間隔的時候順便創建聚合記錄aggregateRecord;
由於緩存時lru緩存,所以再將聚合記錄重新Add到緩存的頭部;
接下來會判斷緩存是否已經超過了閾值,如果沒有達到閾值,那么直接返回不進行聚合;
如果達到閾值了,那么會重新copy傳入的Event,並調用messageFunc方法聚合Message;
eventObserve
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
var (
patch []byte
err error
)
eventCopy := *newEvent
event := &eventCopy
e.Lock()
defer e.Unlock()
// 檢查是否在緩存中
lastObservation := e.lastEventObservationFromCache(key)
// 如果大於0說明存在,並且對Count進行自增
if lastObservation.count > 0 {
event.Name = lastObservation.name
event.ResourceVersion = lastObservation.resourceVersion
event.FirstTimestamp = lastObservation.firstTimestamp
event.Count = int32(lastObservation.count) + 1
eventCopy2 := *event
eventCopy2.Count = 0
eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
eventCopy2.Message = ""
newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
}
// 最后重新更新緩存記錄
e.cache.Add(
key,
eventLog{
count: uint(event.Count),
firstTimestamp: event.FirstTimestamp,
name: event.Name,
resourceVersion: event.ResourceVersion,
},
)
return event, patch, err
}
eventObserve方法里面會去查找緩存中的記錄,然后對count進行自增后更新到緩存中。
Filter
文件位置:client-go/tools/record/events_cache.go
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
var record spamRecord
eventKey := getSpamKey(event)
f.Lock()
defer f.Unlock()
value, found := f.cache.Get(eventKey)
if found {
record = value.(spamRecord)
}
if record.rateLimiter == nil {
record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
}
// 使用令牌桶進行過濾
filter := !record.rateLimiter.TryAccept()
// update the cache
f.cache.Add(eventKey, record)
return filter
}
Filter主要時起到了一個限速的作用,通過令牌桶來進行過濾操作。
recordEvent
文件位置:client-go/tools/record/event.go
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
var newEvent *v1.Event
var err error
// 更新已經存在的事件
if updateExistingEvent {
newEvent, err = sink.Patch(event, patch)
}
// 創建一個新的事件
if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
event.ResourceVersion = ""
newEvent, err = sink.Create(event)
}
if err == nil {
eventCorrelator.UpdateState(newEvent)
return true
}
// 如果是已知錯誤,就不要再重試了;否則,返回 false,讓上層進行重試
switch err.(type) {
case *restclient.RequestConstructionError:
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
return true
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
}
return true
case *errors.UnexpectedObjectError:
default:
}
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
return false
}
recordEvent方法會根據eventCorrelator返回的結果來決定是新建一個事件還是更新已經存在的事件,並根據請求的結果決定是否需要重試。
整個recordToSink調用比較繞,這里我把圖畫一下:
到這里整個方法算時講解完畢了。
總結
了解完 events 的整個處理流程后,再梳理一下整個流程:
- 首先是初始化 EventBroadcaster 對象,同時會初始化一個 Broadcaster 對象,並開啟一個loop循環接收所有的 events 並進行廣播;
- 然后通過 EventBroadcaster 對象的 NewRecorder() 方法初始化 EventRecorder 對象,EventRecorder 對象會生成 events 並通過 Action() 方法發送 events 到 Broadcaster 的 channel 隊列中;
- EventBroadcaster 會調用StartStructuredLogging、StartRecordingToSink方法調用封裝好的StartEventWatcher方法,並執行自己的邏輯;
- StartRecordingToSink封裝的StartEventWatcher方法里面會將所有的 events 廣播給每一個 watcher,並調用recordToSink方法對收到 events 后會進行緩存、過濾、聚合而后發送到 apiserver,apiserver 會將 events 保存到 etcd 中。
Reference
https://www.bluematador.com/blog/kubernetes-events-explained
https://kubernetes.io/docs/tasks/debug-application-cluster/debug-application-introspection/