深入理解k8s中的Event機制


Kubernetes事件(Event)是一種資源對象,用於展示集群內發生的情況。Kubernetes系統中的各個組件會將運行時發生的各種事件(例如,調度器做了什么決定,某些Pod為什么被從節點中驅逐)上報給apiserver
apiserver將Event存儲在Etcd內,強制執行保留策略:在最后一次的事件發生后,刪除1小時之前發生的事件。
 
可以通過kubectl get event或kubectl describe pod <podname>命令顯示事件
這兩個命令均不會顯示Event的名字,通過kubectl get events看到的OBJECT也不是Events的真名,而是與該Event相關的資源的名稱(格式為pod/{Pod名}、node/{Node名}
Event名為{Pod名}.Unix時間戳、{Node名}.Unix時間戳
 
k8s.io/api/core/v1/types.go中定義的Event結構體:
type Event struct {
    metav1.TypeMeta `json:",inline"`         
    metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`    
    InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"` //和哪個資源對象有關
    Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`    //發生原因
    Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`    //詳細信息
    Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`   //來源,包括component、host
    FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp”`   
    LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp”`   
    Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count”`   //事件發生的次數
    Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type”`   
    EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime”`  
    Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
    Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action”`   //針對此事件已采取何種措施
    Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
    ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
    ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}
定義了兩種event類型:
const (
     EventTypeNormal string = "Normal"
     EventTypeWarning string = "Warning"
)
《k8s源碼剖析》中的EventBroadcaster事件管理機制圖:
  • EventRecorder

在client-go中的tools/record/event.go中定義的EventRecorder接口,:

type EventRecorder interface {
    Event(object runtime.Object, eventtype, reason, message string)    
    Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})   //通過使用fmt.Sprintf格式化輸出事件的格式
    AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})    // 功能與Eventf一樣,但附加了注釋(Annotations)字段    
}
EventRecorder定義了記錄Event的三種方法,用以幫助k8s組件記錄Event。
結構體recorderImpl是其實現:
type recorderImpl struct {
    scheme *runtime.Scheme
    source v1.EventSource
    *watch.Broadcaster
    clock clock.Clock
}
recorderImpl結構體中包含apimachinery/pkg/watch/mux.go中的Broadcaster結構體對象地址,因此可以調用Broadcaster實現的方法
recorderImpl實現了EventRecorder接口定義的三個方法,以Event方法為例,調用鏈為:
recorderImpl.Event方法→ recorderImpl.generateEvent方法→Broadcaster.ActionOrDrop方法:
func (recorder *recorderImpl) Event(object runtime.Object,  eventtype, reason, message string) {
     recorder.generateEvent(object, nil, metav1.Now(),  eventtype, reason, message)
}
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
    ref, err := ref.GetReference(recorder.scheme, object)
    if err != nil {
        klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
        return
    }
    if !util.ValidateEventType(eventtype) {
        klog.Errorf("Unsupported event type: '%v'", eventtype)
        return
    }
    event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
    event.Source = recorder.source
    if sent := recorder.ActionOrDrop(watch.Added, event); !sent {
        klog.Errorf("unable to record event: too many queued events, dropped event %#v", event)
    }
}
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
    select {
    case m.incoming <- Event{action, obj}:
        return true
    default:
        return false
    }
}
ActionOrDrop方法將Event寫入m.incoming Chan中,完成事件生產過程。
  • EventBroadcaster
在client-go中的tools/record/event.go中定義了EventBroadcaster接口:
type EventBroadcaster interface {
 StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
    StartRecordingToSink(sink EventSink) watch.Interface
    StartLogging(logf func(format string, args ...interface{})) watch.Interface
    StartStructuredLogging(verbosity klog.Level) watch.Interface
    NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
    Shutdown()
}
EventBroadcaster作為Event消費者和事件廣播器,消費EventRecorder記錄的事件並將其分發給目前所有已連接的broadcasterWatcher。
結構體eventBroadcasterImpl是其實現:
type eventBroadcasterImpl struct {
    *watch.Broadcaster
    sleepDuration time.Duration
    options       CorrelatorOptions
}
eventBroadcasterImpl結構體中,同樣包含Broadcaster結構體對象地址,因此可以調用Broadcaster實現的方法
在apimachinery中的pkg/watch/mux.go中定義了Broadcaster結構體:
type Broadcaster struct {
    watchers     map[int64]*broadcasterWatcher
    nextWatcher  int64
    distributing sync.WaitGroup
    incoming chan Event
    stopped  chan struct{}
    watchQueueLength int
    fullChannelBehavior FullChannelBehavior
}
client-go的tools/record/event.go中,提供的實例化eventBroadcasterImpl的函數:
func NewBroadcaster() EventBroadcaster {
    return &eventBroadcasterImpl{
        Broadcaster:   watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
        sleepDuration: defaultSleepDuration,
    }
}
Broadcaster實際由apimachinery/pkg/watch/mux.go中的NewBroadcaster函數創建:
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
    m := &Broadcaster{
        watchers:            map[int64]*broadcasterWatcher{},
        incoming:            make(chan Event, incomingQueueLength),
        watchQueueLength:    queueLength,
        fullChannelBehavior: fullChannelBehavior,
    }
    m.distributing.Add(1)
    go m.loop()
    return m
}
創建時,會在內部啟動goroutine,通過m.loop方法監控m.incoming;同時將監控的事件通過m.distribute函數分發給所有已連接的BroadcasterWatcher:
func (m *Broadcaster) distribute(event Event) {
    if m.fullChannelBehavior == DropIfChannelFull {
        for _, w := range m.watchers {
            select {
            case w.result <- event:
            case <-w.stopped:
            default:  // 隊列滿時,不阻塞
            }
        }
    } else {
        for _, w := range m.watchers {
            select {
            case w.result <- event:
            case <-w.stopped:
            }
        }
    }
}
分發過程有兩種機制,分別是非阻塞(Non-Blocking)分發機制和阻塞(Blocking)分發機制。
在非阻塞分發機制(默認)下使用DropIfChannelFull標識。DropIfChannelFull標識位於select多路復用中,使用default關鍵字做非阻塞分發,當w.result緩沖區滿的時候,事件會丟失。
在阻塞分發機制下使用WaitIfChannelFull標識。WaitIfChannelFull標識也位於select多路復用中,沒有default關鍵字,當w.result緩沖區滿的時候,分發過程會阻塞並等待。
eventBroadcasterImpl實現的兩種Event的處理方法:
(1)StartLogging:將事件寫入日志中。
func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
    return e.StartEventWatcher(
        func(e *v1.Event) {
            logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
        })
}
(2)StartRecordingToSink:將事件存儲到相應的sink。
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
    eventCorrelator := NewEventCorrelatorWithOptions(e.options)
    return e.StartEventWatcher(
        func(event *v1.Event) {
            recordToSink(sink, event, eventCorrelator, e.sleepDuration)
        })
}
kubelet默認均使用,也就是說任何一個事件會同時發送給apiserver、打印到日志;用戶也可以編寫自己的事件處理邏輯。
它們均依賴於StartEventWatcher方法:
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
    watcher := e.Watch()    //注冊了一個watcher到broadcaster里面
    go func() {
        defer utilruntime.HandleCrash()
        for watchEvent := range watcher.ResultChan() {
            event, ok := watchEvent.Object.(*v1.Event)
            if !ok {
                continue
            }
            eventHandler(event)
        }
    }()
    return watcher
}
該函數內部運行了一個goroutine,用於不斷監控EventBroadcaster來發現事件並調用傳入的eventHandler函數對事件進行處理。
StartLogging傳入的eventHandler,只是執行了被傳入的logf函數
StartRecordingToSink傳入的eventHandler,則根據被傳入的sink執行了recordToSink函數:
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
    eventCopy := *event   //處理前先做拷貝,因為可能有其它listener也需要這個Event
    event = &eventCopy
    result, err := eventCorrelator.EventCorrelate(event)   
    if err != nil {
        utilruntime.HandleError(err)
    }
    if result.Skip {
        return
    }
    tries := 0
    for {
        if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {  //最終把事件發送到apiserver
            break
        }
        tries++
        if tries >= maxTriesPerEvent {
            klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
            break
        }
        if tries == 1 {   //第一次同步要錯開
            time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
        } else {
            time.Sleep(sleepDuration)
        }
    }
}
recordToSink方法首先會調用tools/record/events_cache.go中的EventCorrelate方法對event做預處理,聚合相同的事件,避免產生的事件過多,增加sink的壓力,如果傳入的Event太多了,那么result.Skip處就會返回
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
    if newEvent == nil {
        return nil, fmt.Errorf("event is nil")
    }
    aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
    observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
    if c.filterFunc(observedEvent) {
        return &EventCorrelateResult{Skip: true}, nil
    }
    return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
接下來會調用recordEvent方法把事件發送到sink,它會重試很多次(默認是12次),並且每次重試都有一定時間間隔(默認是10秒鍾)。
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
    var newEvent *v1.Event
    var err error
    if updateExistingEvent {        //result.Event.Count > 1時,更新已經存在的事件
        newEvent, err = sink.Patch(event, patch)
    }
    // 創建一個新的事件
    if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
        event.ResourceVersion = ""
        newEvent, err = sink.Create(event)
    }
    if err == nil {
        eventCorrelator.UpdateState(newEvent)
        return true
    }
    // 如果是已知錯誤,就不要再重試了;否則,返回false,讓上層進行重試
    switch err.(type) {
    case *restclient.RequestConstructionError:
        klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
        return true
    case *errors.StatusError:
        if errors.IsAlreadyExists(err) {
            klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
        } else {
            klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
        }
        return true
    case *errors.UnexpectedObjectError:
    default:
    }
    klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
    return false
}
recordEvent方法會根據eventCorrelator返回的結果來決定是新建一個事件還是更新已經存在的事件,並根據請求的結果決定是否需要重試。
  • BroadcasterWatcher
client-go的tools/record/event.go中定義了EventSink接口:
type EventSink interface {
    Create(event *v1.Event) (*v1.Event, error)
    Update(event *v1.Event) (*v1.Event, error)
    Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
}
EventSink相當於觀察者(Watcher),作為sink從EventBroadcaster接收事件,自定義事件的處理方式
client-go的tools/events/event_broadcaster.go中EventSinkImpl是其具體實現
type EventSinkImpl struct {
    Interface typedeventsv1.EventsV1Interface
}
Interface中包含了Restful client,可通過其與k8s apiserver交互
EventSinkImpl實現的這三種方法:
func (e *EventSinkImpl) Create(event *eventsv1.Event) (*eventsv1.Event, error) {
    if event.Namespace == "" {
        return nil, fmt.Errorf("can't create an event with empty namespace")
    }
    return e.Interface.Events(event.Namespace).Create(context.TODO(), event, metav1.CreateOptions{})
}
func (e *EventSinkImpl) Update(event *eventsv1.Event) (*eventsv1.Event, error) {
    if event.Namespace == "" {
        return nil, fmt.Errorf("can't update an event with empty namespace")
    }
    return e.Interface.Events(event.Namespace).Update(context.TODO(), event, metav1.UpdateOptions{})
}
func (e *EventSinkImpl) Patch(event *eventsv1.Event, data []byte) (*eventsv1.Event, error) {
    if event.Namespace == "" {
        return nil, fmt.Errorf("can't patch an event with empty namespace")
    }
    return e.Interface.Events(event.Namespace).Patch(context.TODO(), event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
}
會分別調用Restful client的Create、Update、Patch,將Event上報至apiserver


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM