kubernetes官方案例sample-controller详解
目录:
如何编写kubernetes的CRD
参考案例
$ tree sample-controller/
sample-controller/
├── artifacts
│ └── examples
│ ├── crd-status-subresource.yaml
│ ├── crd-validation.yaml
│ ├── crd.yaml # 创建CRD对象
│ └── example-foo.yaml # 创建Foo对象
├── code-of-conduct.md
├── CONTRIBUTING.md
├── controller.go
├── controller_test.go
├── docs
│ ├── controller-client-go.md
│ └── images
│ └── client-go-controller-interaction.jpeg
├── Godeps
│ ├── Godeps.json
│ ├── OWNERS
│ └── Readme
├── go.mod
├── go.sum
├── hack
│ ├── boilerplate.go.txt
│ ├── custom-boilerplate.go.txt
│ ├── tools.go
│ ├── update-codegen.sh
│ └── verify-codegen.sh
├── LICENSE
├── main.go
├── OWNERS
├── pkg
│ ├── apis
│ │ └── samplecontroller
│ │ ├── register.go # 放置后边需要的全局变量
│ │ └── v1alpha1
│ │ ├── doc.go # 生成注释,定义包api组名字
│ │ ├── register.go # 用于向服务端注册
│ │ ├── types.go # 用于定义提交的数据
│ │ └── zz_generated.deepcopy.go # 这个是自动生成的deepcopy相关代码
│ ├── generated
│ │ ├── clientset
│ │ │ └── versioned
│ │ │ ├── clientset.go
│ │ │ ├── doc.go
│ │ │ ├── fake
│ │ │ │ ├── clientset_generated.go
│ │ │ │ ├── doc.go
│ │ │ │ └── register.go
│ │ │ ├── scheme
│ │ │ │ ├── doc.go
│ │ │ │ └── register.go
│ │ │ └── typed
│ │ │ └── samplecontroller
│ │ │ └── v1alpha1
│ │ │ ├── doc.go
│ │ │ ├── fake
│ │ │ │ ├── doc.go
│ │ │ │ ├── fake_foo.go
│ │ │ │ └── fake_samplecontroller_client.go
│ │ │ ├── foo.go
│ │ │ ├── generated_expansion.go
│ │ │ └── samplecontroller_client.go
│ │ ├── informers
│ │ │ └── externalversions
│ │ │ ├── factory.go
│ │ │ ├── generic.go
│ │ │ ├── internalinterfaces
│ │ │ │ └── factory_interfaces.go
│ │ │ └── samplecontroller
│ │ │ ├── interface.go
│ │ │ └── v1alpha1
│ │ │ ├── foo.go
│ │ │ └── interface.go
│ │ └── listers
│ │ └── samplecontroller
│ │ └── v1alpha1
│ │ ├── expansion_generated.go
│ │ └── foo.go
│ └── signals
│ ├── signal.go
│ ├── signal_posix.go
│ └── signal_windows.go
├── README.md
└── SECURITY_CONTACTS
28 directories, 55 files
artifacts/examples/crd.yaml
artifacts/examples/crd.yaml用于创建CRD对象
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: foos.samplecontroller.k8s.io
spec:
group: samplecontroller.k8s.io
version: v1alpha1
names:
kind: Foo
plural: foos
scope: Namespaced
- group: samplecontroller.k8s.io
- version: v1alpha1
- kind: Foo资源
- plural: Foo的复数
- scope: Namespaced代表定义的是一个属于Namespace的对象,类似Pod
定义了之后可以向foos.samplecontroller.k8s.io/v1/foos接口提交yaml文件了
官方文档也定义了更多的定义,包括对提交的数据的类型,正则匹配,数字范围进行了定义,参考官方文档custom-resource-definitions
artifacts/examples/example-foo.yaml
artifacts/examples/example-foo.yaml用于创建Foo对象
apiVersion: samplecontroller.k8s.io/v1alpha1
kind: Foo
metadata:
name: example-foo
spec:
deploymentName: example-foo
replicas: 1
pkg/apis/samplecontroller/register.go
pkg/apis/samplecontroller/register.go用于放置后边需要的全局变量
package samplecontroller
const (
GroupName = "samplecontroller.k8s.io"
)
是不是还应该有个Version = "v1alpha1"
?
pkg/apis/samplecontroller/v1alpha1/doc.go
// +k8s:deepcopy-gen=package
// +groupName=samplecontroller.k8s.io
// Package v1alpha1 is the v1alpha1 version of the API.
package v1alpha1
+<tag_name>[=value]
格式的注释是kubernetes进行代码生成的Annotation风格注释
代码生成注释的意思是
+k8s:deepcopy-gen=package
是为整个v1的包里所有类型定义自动生成DeepCopy方法+groupName=samplecrd.k8s.io
定义这个包对应的API组名字
这些起到的是全局代码生成控制的作用
pkg/apis/samplecontroller/v1alpha1/types.go
用于定义提交的数据
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Foo is a specification for a Foo resource
type Foo struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec FooSpec `json:"spec"`
Status FooStatus `json:"status"`
}
// FooSpec is the spec for a Foo resource
type FooSpec struct {
DeploymentName string `json:"deploymentName"`
Replicas *int32 `json:"replicas"`
}
// FooStatus is the status for a Foo resource
type FooStatus struct {
AvailableReplicas int32 `json:"availableReplicas"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// FooList is a list of Foo resources
type FooList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []Foo `json:"items"`
}
- Foo的type也是包含了TypeMeta(API元数据,包括kind和apiVersion)和ObjectMeta(对象元数据,包括metadata),Spec是是需要自定定制的部分 = FooSpec里定义了Cidr和Gateway两个字段,每个字段后边的代表这个字段被转化为json的名字,也就是yaml中需要定义的部分
- FooList中定义一组Network对象需要包含的字段,这是为了在kubernetes中通过list方法获取所有对象的时候返回值是list类型
代码生成注释的意思是
+genclient
代表为下边的API资源类型生成对应的Client代码+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
代表在生成DeepCopy的时候实现kubernetes提供的runtime.Object接口,否则再某些kubernetes版本会出现一个编译错误,一个固定操作
如果没有Status字段需要加
+genclient:noStatus
代表这个API没有Status字段,否则生成的Client会自带UpdateStatus方法,如果有Status需要单独添加Status FooStatusjson:"status"
pkg/apis/samplecontroller/v1alpha1/register.go
注册一个Type给APIServer,使资源类型在服务器端的注册,chemeGroupVersion包括Group和Version,var
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
samplecontroller "k8s.io/sample-controller/pkg/apis/samplecontroller"
)
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: samplecontroller.GroupName, Version: "v1alpha1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Foo{},
&FooList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
代码生成
使用kubernets提供的工具k8s.io/code-generator,为定义的资源类型主动生成deep-copy方法,clientset,informer和lister
hack/update-codegen.sh
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
# generate the code with:
# --output-base because this script should also be able to run inside the vendor dir of
# k8s.io/kubernetes. The output-base is needed for the generators to output into the vendor dir
# instead of the $GOPATH directly. For normal projects this can be dropped.
"${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
k8s.io/sample-controller/pkg/generated k8s.io/sample-controller/pkg/apis \
samplecontroller:v1alpha1 \
--output-base "$(dirname "${BASH_SOURCE[0]}")/../../.." \
--go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
# To use your own boilerplate text append:
# --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt
进行代码生成
$ mv pkg/generated /tmp/generated
$ mv pkg/apis/samplecontroller/v1alpha1/zz_generated.deepcopy.go /tmp/zz_generated.deepcopy.go.bak
$ go get k8s.io/code-generator
$ go get k8s.io/gengo
$ go get golang.org/x/tools/imports
$ ./hack/update-codegen.sh
./hack/update-codegen.sh
Generating deepcopy funcs
Generating clientset for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/client/clientset
Generating listers for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/client/listers
Generating informers for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/client/informers
$ diff pkg/apis/samplecontroller/v1alpha1/zz_generated.deepcopy.go /tmp/zz_generated.deepcopy.go.bak
$ for i in `find ./pkg/generated -name "*.go"`; do diff $i $(echo $i | sed s@./pkg/generated@/tmp/generated@);done
$ for i in `find ./pkg/generated -name "*.go"`; do echo "diff $i $(echo $i | sed s@./pkg/generated@/tmp/generated@)";done
diff ./pkg/generated/clientset/versioned/doc.go /tmp/generated/clientset/versioned/doc.go
diff ./pkg/generated/clientset/versioned/clientset.go /tmp/generated/clientset/versioned/clientset.go
diff ./pkg/generated/clientset/versioned/fake/register.go /tmp/generated/clientset/versioned/fake/register.go
diff ./pkg/generated/clientset/versioned/fake/doc.go /tmp/generated/clientset/versioned/fake/doc.go
diff ./pkg/generated/clientset/versioned/fake/clientset_generated.go /tmp/generated/clientset/versioned/fake/clientset_generated.go
diff ./pkg/generated/clientset/versioned/scheme/register.go /tmp/generated/clientset/versioned/scheme/register.go
diff ./pkg/generated/clientset/versioned/scheme/doc.go /tmp/generated/clientset/versioned/scheme/doc.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/doc.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/doc.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/foo.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/foo.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/doc.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/doc.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/fake_samplecontroller_client.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/fake_samplecontroller_client.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/fake_foo.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/fake_foo.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/generated_expansion.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/generated_expansion.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/samplecontroller_client.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/samplecontroller_client.go
diff ./pkg/generated/listers/samplecontroller/v1alpha1/foo.go /tmp/generated/listers/samplecontroller/v1alpha1/foo.go
diff ./pkg/generated/listers/samplecontroller/v1alpha1/expansion_generated.go /tmp/generated/listers/samplecontroller/v1alpha1/expansion_generated.go
diff ./pkg/generated/informers/externalversions/generic.go /tmp/generated/informers/externalversions/generic.go
diff ./pkg/generated/informers/externalversions/factory.go /tmp/generated/informers/externalversions/factory.go
diff ./pkg/generated/informers/externalversions/samplecontroller/interface.go /tmp/generated/informers/externalversions/samplecontroller/interface.go
diff ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go /tmp/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go
diff ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/interface.go /tmp/generated/informers/externalversions/samplecontroller/v1alpha1/interface.go
diff ./pkg/generated/informers/externalversions/internalinterfaces/factory_interfaces.go /tmp/generated/informers/externalversions/internalinterfaces/factory_interfaces.go
可以看到删除文件和生成的文件内容一样
pkg/apis/samplecontroller/v1alpha1下的zz_generated.deepcopy.go就是DeepCopy的代码文件,client目录下clientset、informers、listers就是自定义控制器的客户端了
直接编译和使用
$ go get k8s.io/sample-controller
$ cd $GOPATH/src/k8s.io/sample-controller
$ go build -o sample-controller .
$ ls
artifacts code-of-conduct.md CONTRIBUTING.md controller.go controller_test.go docs Godeps go.mod go.sum hack LICENSE main.go OWNERS pkg README.md sample-controller SECURITY_CONTACTS
直接编译可以看到编译到的sample-controller
老版本使用deployment的apiVersion是extensions/v1beta1,1.9版本后使用app/v1了
$ ./sample-controller -kubeconfig=$HOME/.kube/config
$ kubectl create -f artifacts/examples/crd.yaml
$ kubectl create -f artifacts/examples/example-foo.yaml
$ kubectl get deployments
$ kubectl delete crd foos.samplecontroller.k8s.io
main.go
package main
import (
"flag"
"time"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
informers "k8s.io/sample-controller/pkg/generated/informers/externalversions"
"k8s.io/sample-controller/pkg/signals"
)
var (
masterURL string
kubeconfig string
)
func main() {
flag.Parse()
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
exampleClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building example clientset: %s", err.Error())
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
- 创建一个kubernetes的client和exampleClient(用于与apiserver进行连接,维护连接使用的Informer的reflector包,进行监听对象实例变化)
- 为network对象创建一个exampleInformerFactory,创建Informer对应的Controller
- 启动informer和controller
controller.go
package main
import (
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
samplev1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
samplescheme "k8s.io/sample-controller/pkg/generated/clientset/versioned/scheme"
informers "k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1"
listers "k8s.io/sample-controller/pkg/generated/listers/samplecontroller/v1alpha1"
)
const controllerAgentName = "sample-controller"
const (
// SuccessSynced is used as part of the Event 'reason' when a Foo is synced
SuccessSynced = "Synced"
// ErrResourceExists is used as part of the Event 'reason' when a Foo fails
// to sync due to a Deployment of the same name already existing.
ErrResourceExists = "ErrResourceExists"
// MessageResourceExists is the message used for Events when a resource
// fails to sync due to a Deployment already existing
MessageResourceExists = "Resource %q already exists and is not managed by Foo"
// MessageResourceSynced is the message used for an Event fired when a Foo
// is synced successfully
MessageResourceSynced = "Foo synced successfully"
)
// Controller is the controller implementation for Foo resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
// sampleclientset is a clientset for our own API group
sampleclientset clientset.Interface
deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced
foosLister listers.FooLister
foosSynced cache.InformerSynced
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
}
// NewController returns a new sample controller
func NewController(
kubeclientset kubernetes.Interface,
sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
fooInformer informers.FooInformer) *Controller {
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
recorder: recorder,
}
klog.Info("Setting up event handlers")
// Set up an event handler for when Foo resources change
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
// Set up an event handler for when Deployment resources change. This
// handler will lookup the owner of the given Deployment, and if it is
// owned by a Foo resource will enqueue that Foo resource for
// processing. This way, we don't need to implement custom logic for
// handling Deployment resources. More info on this pattern:
// https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
return controller
}
# 控制循环
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
// Start the informer factories to begin populating the informer caches
klog.Info("Starting Foo controller")
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
# 完成一次本地缓存的数据同步操作
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.Info("Starting workers")
// Launch two workers to process Foo resources
# 启动多个循环任务runWorker
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
# 通过for无限循环
for c.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
# 从队列获取对象
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
# 如果不调用Forget,会使用Done使obj会重新回到队列
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get the Foo resource with this namespace/name
foo, err := c.foosLister.Foos(namespace).Get(name)
# 这里应该是对应的DELETE操作,参考https://github.com/resouer/k8s-controller-custom-resource/blob/master/controller.go
if err != nil {
// The Foo resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
return nil
}
return err
}
# 获取foo定义的DeploymentName
deploymentName := foo.Spec.DeploymentName
if deploymentName == "" {
// We choose to absorb the error here as the worker would requeue the
// resource otherwise. Instead, the next time the resource is updated
// the resource will be queued again.
utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
return nil
}
// Get the deployment with the name specified in Foo.spec
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
// If the resource doesn't exist, we'll create it
# 如果不存在deployment则创建一下
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo)) # 这个newDeployment方法是自定义的,构建了一个Deployment对象
}
// If an error occurs during Get/Create, we'll requeue the item so we can
// attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
# 判断对象是由哪个Controller对象创建
// If the Deployment is not controlled by this Foo resource, we should log
// a warning to the event recorder and ret
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
// If this number of the replicas on the Foo resource is specified, and the
// number does not equal the current desired replicas on the Deployment, we
// should update the Deployment resource.
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
}
// If an error occurs during Update, we'll requeue the item so we can
// attempt processing again later. THis could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil {
return err
}
// Finally, we update the status block of the Foo resource to reflect the
// current state of the world
err = c.updateFooStatus(foo, deployment)
if err != nil {
return err
}
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}
func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1.Deployment) error {
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use DeepCopy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
fooCopy := foo.DeepCopy()
fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
// If the CustomResourceSubresources feature gate is not enabled,
// we must use Update instead of UpdateStatus to update the Status block of the Foo resource.
// UpdateStatus will not allow changes to the Spec of the resource,
// which is ideal for ensuring nothing other than resource status has been updated.
_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(fooCopy)
return err
}
# 监听有增加放入workqueue
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
func (c *Controller) enqueueFoo(obj interface{}) {
var key string
var err error
# 更新本地缓存
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
# 将其放到队列
c.workqueue.Add(key)
}
// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that Foo resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}
klog.V(4).Infof("Processing object: %s", object.GetName())
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
// If this object is not owned by a Foo, we should not do anything more
// with it.
if ownerRef.Kind != "Foo" {
return
}
foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
return
}
c.enqueueFoo(foo)
return
}
}
// newDeployment creates a new Deployment for a Foo resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the Foo resource that 'owns' it.
func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
labels := map[string]string{
"app": "nginx",
"controller": foo.Name,
}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: foo.Spec.DeploymentName,
Namespace: foo.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(foo, samplev1alpha1.SchemeGroupVersion.WithKind("Foo")),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: foo.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:latest",
},
},
},
},
},
}
}