Go:通过kubernetes的client-go来watch数据变化

时间:May 9, 2020 分类:

目录:

环境准备

kubernetes的api请求权限

这边直接使用clusterrole的admin

apiVersion: v1
kind: ServiceAccount
metadata:
  namespace: pre
  name: admin2goclient
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: admin2goclient
subjects:
- kind: ServiceAccount
  name: admin2goclient
  namespace: pre
roleRef:
  kind: ClusterRole
  name: admin
  apiGroup: rbac.authorization.k8s.io

在集群内测试安装服务

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: goclient
  namespace: pre
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: goclient
    spec:
      containers:
      - image: centos
        name: goclient
        command: ["sh", "-c"]
        args:
        - while true; do
            sleep 5;
          done;
        volumeMounts:
        - {mountPath: /home/logs, name: hostpath}
      serviceAccountName: admin2goclient
      volumes:
      - hostPath: {path: /data/logs, type: ''}
        name: hostpath

直接kubectl -n pre exec -it goclient-dd487966c-vvwnn /bin/bash进入pod即可,可以看到挂载的/run/secrets/kubernetes.io/serviceaccount

使用kubernetes

client

获取client代码示例

/*
    Client相关
 */

func NewClientset() (*kubernetes.Clientset, error) {
    config, err := rest.InClusterConfig()
    if err != nil {
        return nil, err
    }
    kubeclient, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, err
    }
    return kubeclient, nil
}

watch流程代码

package main

import (
    "errors"
    "fmt"
    "github.com/golang/glog"
    "k8s.io/api/extensions/v1beta1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
    "os"
    "os/signal"
    "reflect"
    "syscall"
    "time"
)

/*
    Handler相关
*/

type Action = int
type ResourceType = int

type event struct {
    Action       Action
    ResourceType ResourceType
    OldObj       interface{}
    NewObj       interface{}
}

type EventHandlerFunc struct {
    ev           chan event
    resourceType ResourceType
}

var (
    ADD    = Action(1)
    DELETE = Action(2)
    UPDATE = Action(3)
    //
    Deployment  = ResourceType(1)
    Statefulset = ResourceType(2)
    Daemonset   = ResourceType(3)

    discardEvent = errors.New("")
)

func (c *EventHandlerFunc) OnAdd(obj interface{}) {
    c.ev <- event{
        ResourceType: c.resourceType,
        Action:       ADD,
        NewObj:       obj,
    }
}

func (c *EventHandlerFunc) OnDel(obj interface{}) {
    c.ev <- event{
        ResourceType: c.resourceType,
        Action:       DELETE,
        NewObj:       obj,
    }
}

func (c *EventHandlerFunc) OnUpdate(oldObj, newObj interface{}) {
    c.ev <- event{
        ResourceType: c.resourceType,
        Action:       UPDATE,
        NewObj:       newObj,
        OldObj:       oldObj,
    }
}

/*
    Controller相关
*/
type Controller struct {
    // client
    clientset *kubernetes.Clientset
    // namespaces: informerFactory
    factors map[string]informers.SharedInformerFactory
    // 用于构建factors的map
    eventHandlerFunc map[ResourceType]*EventHandlerFunc
}

// 为controller构建factorsmap Namespace:Informer
func (c *Controller) BuildFactors(namespaces []string) error {
    for _, ns := range namespaces {
        factory := informers.NewSharedInformerFactoryWithOptions(c.clientset, 10*time.Minute, informers.WithNamespace(ns))
        if _, ok := c.eventHandlerFunc[Deployment]; ok {
            // 为informer添加对应的操作
            factory.Extensions().V1beta1().Deployments().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    c.eventHandlerFunc[Deployment].OnAdd,
                DeleteFunc: c.eventHandlerFunc[Deployment].OnDel,
                UpdateFunc: c.eventHandlerFunc[Deployment].OnUpdate,
            })
        }
        if _, ok := c.eventHandlerFunc[Daemonset]; ok {
            factory.Extensions().V1beta1().DaemonSets().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    c.eventHandlerFunc[Daemonset].OnAdd,
                DeleteFunc: c.eventHandlerFunc[Daemonset].OnDel,
                UpdateFunc: c.eventHandlerFunc[Daemonset].OnUpdate,
            })
        }
        if _, ok := c.eventHandlerFunc[Statefulset]; ok {
            factory.Apps().V1beta1().StatefulSets().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                AddFunc:    c.eventHandlerFunc[Statefulset].OnAdd,
                DeleteFunc: c.eventHandlerFunc[Statefulset].OnDel,
                UpdateFunc: c.eventHandlerFunc[Statefulset].OnUpdate,
            })
        }
        c.factors[ns] = factory
    }
    return nil
}

// 启动服务
func (c *Controller) Run(stopCh <-chan struct{}) error {
    glog.Info("start kafak_to_file controller ")
    for _, factor := range c.factors {
        factor.Start(stopCh)
    }

    for ns, factor := range c.factors {
        for t, ok := range factor.WaitForCacheSync(stopCh) {
            if !ok {
                return errors.New("failed to wait for caches to sync")
            } else {
                glog.V(11).Infof("sync cache %s/%s success", ns, t.String())
            }
        }
    }
    glog.Info("sync cache successful")
    glog.Info("Started workers")
    <-stopCh
    glog.Info("Shutting down workers")
    return nil
}

/*
    获取数据处理
*/

func EventHandler(ev chan event) {
    for e := range ev {
        glog.V(25).Infof("new event:%#v", e)
        glog.V(20).Infof("new event type %d", e.Action)
        if e.Action == ADD {
            if e.ResourceType == Deployment {
                new, ok := e.NewObj.(*v1beta1.Deployment)
                if !ok {
                    fmt.Println(fmt.Sprintf("reflect deployment error type %s", reflect.TypeOf(new).String()))
                } else {
                    fmt.Println(new.Name)
                }
            }
        } else if e.Action == DELETE {

        } else if e.Action == UPDATE {

        }
    }
}

/*
   创建新的controller
*/
func NewControllers(clientset *kubernetes.Clientset, eventHandlerFunc map[ResourceType]*EventHandlerFunc) *Controller {
    // 设置namespaces
    namespaces := []string{"pre", "www"}
    glog.Infof("watch namespace:%#v", namespaces)
    c := &Controller{
        clientset:        clientset,
        factors:          make(map[string]informers.SharedInformerFactory),
        eventHandlerFunc: eventHandlerFunc,
    }
    _ = c.BuildFactors(namespaces)
    return c
}

/*
    Client相关
*/

func NewClientset() (*kubernetes.Clientset, error) {
    config, err := rest.InClusterConfig()
    if err != nil {
        return nil, err
    }
    kubeclient, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, err
    }
    return kubeclient, nil
}

var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}

func SetupSignalHandler() (stopCh <-chan struct{}) {
    stop := make(chan struct{})
    c := make(chan os.Signal, 2)
    signal.Notify(c, shutdownSignals...)
    go func() {
        <-c
        close(stop)
        <-c
        os.Exit(1) // second signal. Exit directly.
    }()
    return stop
}

func main() {
    // 获取k8s client
    kubeclient, err := NewClientset()
    if err != nil {
        panic(err.Error())
    }
    // 创建管道
    evCh := make(chan event, 100000)
    // 构建map
    EventHandlerFuncMap := make(map[ResourceType]*EventHandlerFunc)
    EventHandlerFuncMap[Deployment] = &EventHandlerFunc{
        ev:           evCh,
        resourceType: Deployment,
    }
    EventHandlerFuncMap[Daemonset] = &EventHandlerFunc{
        ev:           evCh,
        resourceType: Daemonset,
    }
    EventHandlerFuncMap[Statefulset] = &EventHandlerFunc{
        ev:           evCh,
        resourceType: Statefulset,
    }
    // 构建Handler对象
    controller := NewControllers(
        kubeclient,
        EventHandlerFuncMap,
    )
    // 监听管道数据
    go EventHandler(evCh)
    stopCh := SetupSignalHandler()

    if err = controller.Run(stopCh); err != nil {
        glog.Errorf("Error running controller: %s", err.Error())
    }

}

go.mod参考

module new

go 1.13

require (
    github.com/Shopify/sarama v1.23.1
    github.com/bsm/sarama-cluster v2.1.15+incompatible
    github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
    github.com/googleapis/gnostic v0.3.1 // indirect
    github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
    github.com/hashicorp/consul/api v1.4.0
    github.com/hashicorp/golang-lru v0.5.3 // indirect
    github.com/lylezxl/boulle v0.0.0-20191008021431-b62c52214195
    github.com/lylezxl/cobratools v0.0.0-20190723070549-09c65febb5b8
    github.com/marlonfan/go-library v0.0.0-20180504100944-3ba7339839ad
    github.com/onsi/ginkgo v1.10.2 // indirect
    github.com/onsi/gomega v1.7.0 // indirect
    github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
    github.com/prometheus/client_golang v1.1.0
    github.com/spf13/cobra v0.0.5
    github.com/spf13/pflag v1.0.3
    github.com/spf13/viper v1.4.0
    github.com/tidwall/gjson v1.3.4
    golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd // indirect
    gopkg.in/inf.v0 v0.9.1 // indirect
    gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
    k8s.io/api v0.0.0-20190118113203-912cbe2bfef3
    k8s.io/apimachinery v0.0.0-20190116203031-d49e237a2683
    k8s.io/client-go v0.0.0-20190117233410-4022682532b3
)