kube-eventer

时间:Nov. 30, 2020 分类:

目录:

event实质

kubernetes的异常处理机制

event产生

当node和pod异常的时候会产生大量的events,这些event是由各个组件上报给apiserver的

可以在k8s.io/kubernetes/cmd目录下进行暴力搜索

$ grep -R -n -i "EventRecorder" k8s.io/kubernetes/cmd

event定义

位于k8s.io/api/core/v1/types.go

type Event struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
    InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
    Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
    Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
    Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
    FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
    LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
    Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
    Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
    EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
    Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
    Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
    Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
    ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
    ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
    ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}
  • InvolvedObject和event关联的对象
  • source代表事件源
  • 其他字段例如Type、Reason、Age、From、Message是通过kubectl可以正常看到的,event只有Normal和Warning两种

event生成

组件启动后会创建EventBroadcaster

func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
  if kubeDeps.Recorder != nil {
    return
  }
  // 初始化 EventBroadcaster 
  eventBroadcaster := record.NewBroadcaster()
  // 初始化 EventRecorder
  kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
  // 记录 events 到本地日志
  eventBroadcaster.StartLogging(glog.V(3).Infof)
  if kubeDeps.EventClient != nil {
    glog.V(4).Infof("Sending events to api server.")
    // 上报 events 到 apiserver
  eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
  } else {
    glog.Warning("No api server defined - no events will be sent to API server.")
  }
}

EventBroadcaster用于对接收到的event做一些处理(保存或者上报)

type EventBroadcaster interface {
  StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface

  StartRecordingToSink(sink EventSink) watch.Interface

  StartLogging(logf func(format string, args ...interface{})) watch.Interface

  NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
}

EventBroadcaster接口类型的四个方法

  • StartEventWatcher 接收模块产生的event,参数为处理event的方法,可对event进行指定方法的处理
  • StartRecordingToSink 调用StartEventWatcher接收event,并将其发进行缓存、过滤、聚合而后发送到apiserver
  • StartLogging 调用StartEventWatcher接收event,保存到日志
  • NewRecorder 创建一个EventSource的EventRecorder

EventRecorder主要的功能是生成指定格式的event

type EventRecorder interface {
  Event(object runtime.Object, eventtype, reason, message string)
  Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
  PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
  AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

kubelet的每个模块都会调用generateEvent来生成event

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
  ref, err := ref.GetReference(recorder.scheme, object)
  if err != nil {
    glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
    return
  }

  if !validateEventType(eventtype) {
    glog.Errorf("Unsupported event type: '%v'", eventtype)
    return
  }

  event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
  event.Source = recorder.source

  go func() {
    // NOTE: events should be a non-blocking operation
    defer utilruntime.HandleCrash()
    // 发送事件
    recorder.Action(watch.Added, event)
  }()
}

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
  t := metav1.Time{Time: recorder.clock.Now()}
  namespace := ref.Namespace
  if namespace == "" {
    namespace = metav1.NamespaceDefault
  }
  return &v1.Event{
    ObjectMeta: metav1.ObjectMeta{
      Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
      Namespace:   namespace,
      Annotations: annotations,
    },
    InvolvedObject: *ref,
    Reason:         reason,
    Message:        message,
    FirstTimestamp: t,
    LastTimestamp:  t,
    Count:          1,
    Type:           eventtype,
  }
}

初始化event调用recorder.Action()将event发送到Broadcaster的接收队列

func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
  m.incoming <- Event{action, obj}
}

event广播

EventBroadcaster初始化时会初始化一个Broadcaster,向注册在map中watcher发送event,watcher可以从channel中获取到event的数据进行消费

func (m *Broadcaster) loop() {
  for event := range m.incoming {
    if event.Type == internalRunFunctionMarker {
      event.Object.(functionFakeRuntimeObject)()
      continue
    }
    m.distribute(event)
  }
  m.closeAll()
  m.distributing.Done()
}

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
  m.lock.Lock()
  defer m.lock.Unlock()
  if m.fullChannelBehavior == DropIfChannelFull {
    for _, w := range m.watchers {
      select {
      case w.result <- event:
      case <-w.stopped:
      default: // Don't block if the event can't be queued.
      }
    }
  } else {
    for _, w := range m.watchers {
      select {
      case w.result <- event:
      case <-w.stopped:
      }
    }
  }
}

event处理

StartEventWatcher实例化一个watcher,并加入到Broadcaster的watcher列表

func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
  watcher := eventBroadcaster.Watch()
  go func() {
    defer utilruntime.HandleCrash()
    for watchEvent := range watcher.ResultChan() {
      event, ok := watchEvent.Object.(*v1.Event)
      if !ok {
        // This is all local, so there's no reason this should
        // ever happen.
        continue
      }
      eventHandler(event)
    }
  }()
  return watcher
}

其他的接口就是传递方法进来,保存日志

func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
  return eventBroadcaster.StartEventWatcher(
    func(e *v1.Event) {
      logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
    })
}

还有发送到apiserver

func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
  // The default math/rand package functions aren't thread safe, so create a
  // new Rand object for each StartRecording call.
  randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
  eventCorrelator := NewEventCorrelator(clock.RealClock{})
  return eventBroadcaster.StartEventWatcher(
    func(event *v1.Event) {
      recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
    })
}

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
  eventCopy := *event
  event = &eventCopy
  result, err := eventCorrelator.EventCorrelate(event)
  if err != nil {
    utilruntime.HandleError(err)
  }
  if result.Skip {
    return
  }
  tries := 0
  for {
    if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
      break
    }
    tries++
    if tries >= maxTriesPerEvent {
      glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
      break
    }
    // 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件
    if tries == 1 {
      time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
    } else {
      time.Sleep(sleepDuration)
    }
  }
}

event简单实现

event包含的功能

  1. 事件的产生
  2. 事件的发送
  3. 事件广播
  4. 事件缓存
  5. 事件过滤和聚合
package main

import (
  "fmt"
  "sync"
  "time"
)

// watcher queue
const queueLength = int64(1)

// Events xxx
type Events struct {
  Reason    string
  Message   string
  Source    string
  Type      string
  Count     int64
  Timestamp time.Time
}

// EventBroadcaster xxx
type EventBroadcaster interface {
  Event(etype, reason, message string)
  StartLogging() Interface
  Stop()
}

// eventBroadcaster xxx
type eventBroadcasterImpl struct {
  *Broadcaster
}

func NewEventBroadcaster() EventBroadcaster {
  return &eventBroadcasterImpl{NewBroadcaster(queueLength)}
}

func (eventBroadcaster *eventBroadcasterImpl) Stop() {
  eventBroadcaster.Shutdown()
}

// generate event
func (eventBroadcaster *eventBroadcasterImpl) Event(etype, reason, message string) {
  events := &Events{Type: etype, Reason: reason, Message: message}
  // send event to broadcast
  eventBroadcaster.Action(events)
}

// 仅实现 StartLogging() 的功能,将日志打印
func (eventBroadcaster *eventBroadcasterImpl) StartLogging() Interface {
  // register a watcher
  watcher := eventBroadcaster.Watch()
  go func() {
    for watchEvent := range watcher.ResultChan() {
      fmt.Printf("%v\n", watchEvent)
    }
  }()

  go func() {
    time.Sleep(time.Second * 4)
    watcher.Stop()
  }()

  return watcher
}

// --------------------
// Broadcaster 定义与实现
// 接收 events channel 的长度
const incomingQueuLength = 100

type Broadcaster struct {
  lock             sync.Mutex
  incoming         chan Events
  watchers         map[int64]*broadcasterWatcher
  watchersQueue    int64
  watchQueueLength int64
  distributing     sync.WaitGroup
}

func NewBroadcaster(queueLength int64) *Broadcaster {
  m := &Broadcaster{
    incoming:         make(chan Events, incomingQueuLength),
    watchers:         map[int64]*broadcasterWatcher{},
    watchQueueLength: queueLength,
  }
  m.distributing.Add(1)
  // 后台启动一个 goroutine 广播 events
  go m.loop()
  return m
}

// Broadcaster 接收所产生的 events
func (m *Broadcaster) Action(event *Events) {
  m.incoming <- *event
}

// 广播 events 到每个 watcher
func (m *Broadcaster) loop() {
  // 从 incoming channel 中读取所接收到的 events
  for event := range m.incoming {
    // 发送 events 到每一个 watcher
    for _, w := range m.watchers {
      select {
      case w.result <- event:
      case <-w.stopped:
      default:
      }
    }
  }
  m.closeAll()
  m.distributing.Done()
}

func (m *Broadcaster) Shutdown() {
  close(m.incoming)
  m.distributing.Wait()
}

func (m *Broadcaster) closeAll() {
  // TODO
  m.lock.Lock()
  defer m.lock.Unlock()
  for _, w := range m.watchers {
    close(w.result)
  }
  m.watchers = map[int64]*broadcasterWatcher{}
}

func (m *Broadcaster) stopWatching(id int64) {
  m.lock.Lock()
  defer m.lock.Unlock()
  w, ok := m.watchers[id]
  if !ok {
    return
  }
  delete(m.watchers, id)
  close(w.result)
}

// 调用 Watch()方法注册一个 watcher
func (m *Broadcaster) Watch() Interface {
  watcher := &broadcasterWatcher{
    result:  make(chan Events, incomingQueuLength),
    stopped: make(chan struct{}),
    id:      m.watchQueueLength,
    m:       m,
  }
  m.watchers[m.watchersQueue] = watcher
  m.watchQueueLength++
  return watcher
}

// watcher 实现
type Interface interface {
  Stop()
  ResultChan() <-chan Events
}

type broadcasterWatcher struct {
  result  chan Events
  stopped chan struct{}
  stop    sync.Once
  id      int64
  m       *Broadcaster
}

// 每个 watcher 通过该方法读取 channel 中广播的 events
func (b *broadcasterWatcher) ResultChan() <-chan Events {
  return b.result
}

func (b *broadcasterWatcher) Stop() {
  b.stop.Do(func() {
    close(b.stopped)
    b.m.stopWatching(b.id)
  })
}

// --------------------

func main() {
  eventBroadcast := NewEventBroadcaster()

  var wg sync.WaitGroup
  wg.Add(1)
  // producer event
  go func() {
    defer wg.Done()
    time.Sleep(time.Second)
    eventBroadcast.Event("add", "test", "1")
    time.Sleep(time.Second * 2)
    eventBroadcast.Event("add", "test", "2")
    time.Sleep(time.Second * 3)
    eventBroadcast.Event("add", "test", "3")
    //eventBroadcast.Stop()
  }()

  eventBroadcast.StartLogging()
  wg.Wait()
}

安装

https://github.com/AliyunContainerService/kube-eventer