kube-eventer
目录:
event实质
kubernetes的异常处理机制
event产生
当node和pod异常的时候会产生大量的events,这些event是由各个组件上报给apiserver的
可以在k8s.io/kubernetes/cmd目录下进行暴力搜索
$ grep -R -n -i "EventRecorder" k8s.io/kubernetes/cmd
event定义
位于k8s.io/api/core/v1/types.go
type Event struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}
- InvolvedObject和event关联的对象
- source代表事件源
- 其他字段例如Type、Reason、Age、From、Message是通过kubectl可以正常看到的,event只有Normal和Warning两种
event生成
组件启动后会创建EventBroadcaster
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
if kubeDeps.Recorder != nil {
return
}
// 初始化 EventBroadcaster
eventBroadcaster := record.NewBroadcaster()
// 初始化 EventRecorder
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
// 记录 events 到本地日志
eventBroadcaster.StartLogging(glog.V(3).Infof)
if kubeDeps.EventClient != nil {
glog.V(4).Infof("Sending events to api server.")
// 上报 events 到 apiserver
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
} else {
glog.Warning("No api server defined - no events will be sent to API server.")
}
}
EventBroadcaster用于对接收到的event做一些处理(保存或者上报)
type EventBroadcaster interface {
StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
StartRecordingToSink(sink EventSink) watch.Interface
StartLogging(logf func(format string, args ...interface{})) watch.Interface
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
}
EventBroadcaster接口类型的四个方法
- StartEventWatcher 接收模块产生的event,参数为处理event的方法,可对event进行指定方法的处理
- StartRecordingToSink 调用StartEventWatcher接收event,并将其发进行缓存、过滤、聚合而后发送到apiserver
- StartLogging 调用StartEventWatcher接收event,保存到日志
- NewRecorder 创建一个EventSource的EventRecorder
EventRecorder主要的功能是生成指定格式的event
type EventRecorder interface {
Event(object runtime.Object, eventtype, reason, message string)
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
kubelet的每个模块都会调用generateEvent来生成event
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
return
}
if !validateEventType(eventtype) {
glog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
go func() {
// NOTE: events should be a non-blocking operation
defer utilruntime.HandleCrash()
// 发送事件
recorder.Action(watch.Added, event)
}()
}
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
namespace := ref.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: namespace,
Annotations: annotations,
},
InvolvedObject: *ref,
Reason: reason,
Message: message,
FirstTimestamp: t,
LastTimestamp: t,
Count: 1,
Type: eventtype,
}
}
初始化event调用recorder.Action()将event发送到Broadcaster的接收队列
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}
event广播
EventBroadcaster初始化时会初始化一个Broadcaster,向注册在map中watcher发送event,watcher可以从channel中获取到event的数据进行消费
func (m *Broadcaster) loop() {
for event := range m.incoming {
if event.Type == internalRunFunctionMarker {
event.Object.(functionFakeRuntimeObject)()
continue
}
m.distribute(event)
}
m.closeAll()
m.distributing.Done()
}
// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}
event处理
StartEventWatcher实例化一个watcher,并加入到Broadcaster的watcher列表
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher := eventBroadcaster.Watch()
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event)
}
}()
return watcher
}
其他的接口就是传递方法进来,保存日志
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return eventBroadcaster.StartEventWatcher(
func(e *v1.Event) {
logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
})
}
还有发送到apiserver
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
eventCorrelator := NewEventCorrelator(clock.RealClock{})
return eventBroadcaster.StartEventWatcher(
func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
})
}
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
eventCopy := *event
event = &eventCopy
result, err := eventCorrelator.EventCorrelate(event)
if err != nil {
utilruntime.HandleError(err)
}
if result.Skip {
return
}
tries := 0
for {
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent {
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
// 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}
event简单实现
event包含的功能
- 事件的产生
- 事件的发送
- 事件广播
- 事件缓存
- 事件过滤和聚合
package main
import (
"fmt"
"sync"
"time"
)
// watcher queue
const queueLength = int64(1)
// Events xxx
type Events struct {
Reason string
Message string
Source string
Type string
Count int64
Timestamp time.Time
}
// EventBroadcaster xxx
type EventBroadcaster interface {
Event(etype, reason, message string)
StartLogging() Interface
Stop()
}
// eventBroadcaster xxx
type eventBroadcasterImpl struct {
*Broadcaster
}
func NewEventBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{NewBroadcaster(queueLength)}
}
func (eventBroadcaster *eventBroadcasterImpl) Stop() {
eventBroadcaster.Shutdown()
}
// generate event
func (eventBroadcaster *eventBroadcasterImpl) Event(etype, reason, message string) {
events := &Events{Type: etype, Reason: reason, Message: message}
// send event to broadcast
eventBroadcaster.Action(events)
}
// 仅实现 StartLogging() 的功能,将日志打印
func (eventBroadcaster *eventBroadcasterImpl) StartLogging() Interface {
// register a watcher
watcher := eventBroadcaster.Watch()
go func() {
for watchEvent := range watcher.ResultChan() {
fmt.Printf("%v\n", watchEvent)
}
}()
go func() {
time.Sleep(time.Second * 4)
watcher.Stop()
}()
return watcher
}
// --------------------
// Broadcaster 定义与实现
// 接收 events channel 的长度
const incomingQueuLength = 100
type Broadcaster struct {
lock sync.Mutex
incoming chan Events
watchers map[int64]*broadcasterWatcher
watchersQueue int64
watchQueueLength int64
distributing sync.WaitGroup
}
func NewBroadcaster(queueLength int64) *Broadcaster {
m := &Broadcaster{
incoming: make(chan Events, incomingQueuLength),
watchers: map[int64]*broadcasterWatcher{},
watchQueueLength: queueLength,
}
m.distributing.Add(1)
// 后台启动一个 goroutine 广播 events
go m.loop()
return m
}
// Broadcaster 接收所产生的 events
func (m *Broadcaster) Action(event *Events) {
m.incoming <- *event
}
// 广播 events 到每个 watcher
func (m *Broadcaster) loop() {
// 从 incoming channel 中读取所接收到的 events
for event := range m.incoming {
// 发送 events 到每一个 watcher
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
default:
}
}
}
m.closeAll()
m.distributing.Done()
}
func (m *Broadcaster) Shutdown() {
close(m.incoming)
m.distributing.Wait()
}
func (m *Broadcaster) closeAll() {
// TODO
m.lock.Lock()
defer m.lock.Unlock()
for _, w := range m.watchers {
close(w.result)
}
m.watchers = map[int64]*broadcasterWatcher{}
}
func (m *Broadcaster) stopWatching(id int64) {
m.lock.Lock()
defer m.lock.Unlock()
w, ok := m.watchers[id]
if !ok {
return
}
delete(m.watchers, id)
close(w.result)
}
// 调用 Watch()方法注册一个 watcher
func (m *Broadcaster) Watch() Interface {
watcher := &broadcasterWatcher{
result: make(chan Events, incomingQueuLength),
stopped: make(chan struct{}),
id: m.watchQueueLength,
m: m,
}
m.watchers[m.watchersQueue] = watcher
m.watchQueueLength++
return watcher
}
// watcher 实现
type Interface interface {
Stop()
ResultChan() <-chan Events
}
type broadcasterWatcher struct {
result chan Events
stopped chan struct{}
stop sync.Once
id int64
m *Broadcaster
}
// 每个 watcher 通过该方法读取 channel 中广播的 events
func (b *broadcasterWatcher) ResultChan() <-chan Events {
return b.result
}
func (b *broadcasterWatcher) Stop() {
b.stop.Do(func() {
close(b.stopped)
b.m.stopWatching(b.id)
})
}
// --------------------
func main() {
eventBroadcast := NewEventBroadcaster()
var wg sync.WaitGroup
wg.Add(1)
// producer event
go func() {
defer wg.Done()
time.Sleep(time.Second)
eventBroadcast.Event("add", "test", "1")
time.Sleep(time.Second * 2)
eventBroadcast.Event("add", "test", "2")
time.Sleep(time.Second * 3)
eventBroadcast.Event("add", "test", "3")
//eventBroadcast.Stop()
}()
eventBroadcast.StartLogging()
wg.Wait()
}
安装
https://github.com/AliyunContainerService/kube-eventer