Go:通过kubernetes的client-go来watch数据变化
目录:
环境准备
kubernetes的api请求权限
这边直接使用clusterrole的admin
apiVersion: v1
kind: ServiceAccount
metadata:
namespace: pre
name: admin2goclient
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: admin2goclient
subjects:
- kind: ServiceAccount
name: admin2goclient
namespace: pre
roleRef:
kind: ClusterRole
name: admin
apiGroup: rbac.authorization.k8s.io
在集群内测试安装服务
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: goclient
namespace: pre
spec:
replicas: 1
template:
metadata:
labels:
app: goclient
spec:
containers:
- image: centos
name: goclient
command: ["sh", "-c"]
args:
- while true; do
sleep 5;
done;
volumeMounts:
- {mountPath: /home/logs, name: hostpath}
serviceAccountName: admin2goclient
volumes:
- hostPath: {path: /data/logs, type: ''}
name: hostpath
直接kubectl -n pre exec -it goclient-dd487966c-vvwnn /bin/bash
进入pod即可,可以看到挂载的/run/secrets/kubernetes.io/serviceaccount
使用kubernetes
client
获取client代码示例
/*
Client相关
*/
func NewClientset() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
kubeclient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return kubeclient, nil
}
watch流程代码
package main
import (
"errors"
"fmt"
"github.com/golang/glog"
"k8s.io/api/extensions/v1beta1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"os"
"os/signal"
"reflect"
"syscall"
"time"
)
/*
Handler相关
*/
type Action = int
type ResourceType = int
type event struct {
Action Action
ResourceType ResourceType
OldObj interface{}
NewObj interface{}
}
type EventHandlerFunc struct {
ev chan event
resourceType ResourceType
}
var (
ADD = Action(1)
DELETE = Action(2)
UPDATE = Action(3)
//
Deployment = ResourceType(1)
Statefulset = ResourceType(2)
Daemonset = ResourceType(3)
discardEvent = errors.New("")
)
func (c *EventHandlerFunc) OnAdd(obj interface{}) {
c.ev <- event{
ResourceType: c.resourceType,
Action: ADD,
NewObj: obj,
}
}
func (c *EventHandlerFunc) OnDel(obj interface{}) {
c.ev <- event{
ResourceType: c.resourceType,
Action: DELETE,
NewObj: obj,
}
}
func (c *EventHandlerFunc) OnUpdate(oldObj, newObj interface{}) {
c.ev <- event{
ResourceType: c.resourceType,
Action: UPDATE,
NewObj: newObj,
OldObj: oldObj,
}
}
/*
Controller相关
*/
type Controller struct {
// client
clientset *kubernetes.Clientset
// namespaces: informerFactory
factors map[string]informers.SharedInformerFactory
// 用于构建factors的map
eventHandlerFunc map[ResourceType]*EventHandlerFunc
}
// 为controller构建factorsmap Namespace:Informer
func (c *Controller) BuildFactors(namespaces []string) error {
for _, ns := range namespaces {
factory := informers.NewSharedInformerFactoryWithOptions(c.clientset, 10*time.Minute, informers.WithNamespace(ns))
if _, ok := c.eventHandlerFunc[Deployment]; ok {
// 为informer添加对应的操作
factory.Extensions().V1beta1().Deployments().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.eventHandlerFunc[Deployment].OnAdd,
DeleteFunc: c.eventHandlerFunc[Deployment].OnDel,
UpdateFunc: c.eventHandlerFunc[Deployment].OnUpdate,
})
}
if _, ok := c.eventHandlerFunc[Daemonset]; ok {
factory.Extensions().V1beta1().DaemonSets().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.eventHandlerFunc[Daemonset].OnAdd,
DeleteFunc: c.eventHandlerFunc[Daemonset].OnDel,
UpdateFunc: c.eventHandlerFunc[Daemonset].OnUpdate,
})
}
if _, ok := c.eventHandlerFunc[Statefulset]; ok {
factory.Apps().V1beta1().StatefulSets().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.eventHandlerFunc[Statefulset].OnAdd,
DeleteFunc: c.eventHandlerFunc[Statefulset].OnDel,
UpdateFunc: c.eventHandlerFunc[Statefulset].OnUpdate,
})
}
c.factors[ns] = factory
}
return nil
}
// 启动服务
func (c *Controller) Run(stopCh <-chan struct{}) error {
glog.Info("start kafak_to_file controller ")
for _, factor := range c.factors {
factor.Start(stopCh)
}
for ns, factor := range c.factors {
for t, ok := range factor.WaitForCacheSync(stopCh) {
if !ok {
return errors.New("failed to wait for caches to sync")
} else {
glog.V(11).Infof("sync cache %s/%s success", ns, t.String())
}
}
}
glog.Info("sync cache successful")
glog.Info("Started workers")
<-stopCh
glog.Info("Shutting down workers")
return nil
}
/*
获取数据处理
*/
func EventHandler(ev chan event) {
for e := range ev {
glog.V(25).Infof("new event:%#v", e)
glog.V(20).Infof("new event type %d", e.Action)
if e.Action == ADD {
if e.ResourceType == Deployment {
new, ok := e.NewObj.(*v1beta1.Deployment)
if !ok {
fmt.Println(fmt.Sprintf("reflect deployment error type %s", reflect.TypeOf(new).String()))
} else {
fmt.Println(new.Name)
}
}
} else if e.Action == DELETE {
} else if e.Action == UPDATE {
}
}
}
/*
创建新的controller
*/
func NewControllers(clientset *kubernetes.Clientset, eventHandlerFunc map[ResourceType]*EventHandlerFunc) *Controller {
// 设置namespaces
namespaces := []string{"pre", "www"}
glog.Infof("watch namespace:%#v", namespaces)
c := &Controller{
clientset: clientset,
factors: make(map[string]informers.SharedInformerFactory),
eventHandlerFunc: eventHandlerFunc,
}
_ = c.BuildFactors(namespaces)
return c
}
/*
Client相关
*/
func NewClientset() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
kubeclient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return kubeclient, nil
}
var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
func SetupSignalHandler() (stopCh <-chan struct{}) {
stop := make(chan struct{})
c := make(chan os.Signal, 2)
signal.Notify(c, shutdownSignals...)
go func() {
<-c
close(stop)
<-c
os.Exit(1) // second signal. Exit directly.
}()
return stop
}
func main() {
// 获取k8s client
kubeclient, err := NewClientset()
if err != nil {
panic(err.Error())
}
// 创建管道
evCh := make(chan event, 100000)
// 构建map
EventHandlerFuncMap := make(map[ResourceType]*EventHandlerFunc)
EventHandlerFuncMap[Deployment] = &EventHandlerFunc{
ev: evCh,
resourceType: Deployment,
}
EventHandlerFuncMap[Daemonset] = &EventHandlerFunc{
ev: evCh,
resourceType: Daemonset,
}
EventHandlerFuncMap[Statefulset] = &EventHandlerFunc{
ev: evCh,
resourceType: Statefulset,
}
// 构建Handler对象
controller := NewControllers(
kubeclient,
EventHandlerFuncMap,
)
// 监听管道数据
go EventHandler(evCh)
stopCh := SetupSignalHandler()
if err = controller.Run(stopCh); err != nil {
glog.Errorf("Error running controller: %s", err.Error())
}
}
go.mod参考
module new
go 1.13
require (
github.com/Shopify/sarama v1.23.1
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/hashicorp/consul/api v1.4.0
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/lylezxl/boulle v0.0.0-20191008021431-b62c52214195
github.com/lylezxl/cobratools v0.0.0-20190723070549-09c65febb5b8
github.com/marlonfan/go-library v0.0.0-20180504100944-3ba7339839ad
github.com/onsi/ginkgo v1.10.2 // indirect
github.com/onsi/gomega v1.7.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/prometheus/client_golang v1.1.0
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
github.com/spf13/viper v1.4.0
github.com/tidwall/gjson v1.3.4
golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
k8s.io/api v0.0.0-20190118113203-912cbe2bfef3
k8s.io/apimachinery v0.0.0-20190116203031-d49e237a2683
k8s.io/client-go v0.0.0-20190117233410-4022682532b3
)