kubernetes官方案例sample-controller详解

时间:May 8, 2019 分类:

目录:

如何编写kubernetes的CRD

参考案例

github地址

$ tree sample-controller/
sample-controller/
├── artifacts
│   └── examples
│       ├── crd-status-subresource.yaml
│       ├── crd-validation.yaml
│       ├── crd.yaml # 创建CRD对象
│       └── example-foo.yaml # 创建Foo对象
├── code-of-conduct.md
├── CONTRIBUTING.md
├── controller.go
├── controller_test.go
├── docs
│   ├── controller-client-go.md
│   └── images
│       └── client-go-controller-interaction.jpeg
├── Godeps
│   ├── Godeps.json
│   ├── OWNERS
│   └── Readme
├── go.mod
├── go.sum
├── hack
│   ├── boilerplate.go.txt
│   ├── custom-boilerplate.go.txt
│   ├── tools.go
│   ├── update-codegen.sh
│   └── verify-codegen.sh
├── LICENSE
├── main.go
├── OWNERS
├── pkg
│   ├── apis
│   │   └── samplecontroller
│   │       ├── register.go # 放置后边需要的全局变量
│   │       └── v1alpha1
│   │           ├── doc.go # 生成注释,定义包api组名字
│   │           ├── register.go # 用于向服务端注册
│   │           ├── types.go # 用于定义提交的数据
│   │           └── zz_generated.deepcopy.go # 这个是自动生成的deepcopy相关代码
│   ├── generated
│   │   ├── clientset
│   │   │   └── versioned
│   │   │       ├── clientset.go
│   │   │       ├── doc.go
│   │   │       ├── fake
│   │   │       │   ├── clientset_generated.go
│   │   │       │   ├── doc.go
│   │   │       │   └── register.go
│   │   │       ├── scheme
│   │   │       │   ├── doc.go
│   │   │       │   └── register.go
│   │   │       └── typed
│   │   │           └── samplecontroller
│   │   │               └── v1alpha1
│   │   │                   ├── doc.go
│   │   │                   ├── fake
│   │   │                   │   ├── doc.go
│   │   │                   │   ├── fake_foo.go
│   │   │                   │   └── fake_samplecontroller_client.go
│   │   │                   ├── foo.go
│   │   │                   ├── generated_expansion.go
│   │   │                   └── samplecontroller_client.go
│   │   ├── informers
│   │   │   └── externalversions
│   │   │       ├── factory.go
│   │   │       ├── generic.go
│   │   │       ├── internalinterfaces
│   │   │       │   └── factory_interfaces.go
│   │   │       └── samplecontroller
│   │   │           ├── interface.go
│   │   │           └── v1alpha1
│   │   │               ├── foo.go
│   │   │               └── interface.go
│   │   └── listers
│   │       └── samplecontroller
│   │           └── v1alpha1
│   │               ├── expansion_generated.go
│   │               └── foo.go
│   └── signals
│       ├── signal.go
│       ├── signal_posix.go
│       └── signal_windows.go
├── README.md
└── SECURITY_CONTACTS

28 directories, 55 files

artifacts/examples/crd.yaml

artifacts/examples/crd.yaml用于创建CRD对象

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: foos.samplecontroller.k8s.io
spec:
  group: samplecontroller.k8s.io
  version: v1alpha1
  names:
    kind: Foo
    plural: foos
  scope: Namespaced
  • group: samplecontroller.k8s.io
  • version: v1alpha1
  • kind: Foo资源
  • plural: Foo的复数
  • scope: Namespaced代表定义的是一个属于Namespace的对象,类似Pod

定义了之后可以向foos.samplecontroller.k8s.io/v1/foos接口提交yaml文件了

官方文档也定义了更多的定义,包括对提交的数据的类型,正则匹配,数字范围进行了定义,参考官方文档custom-resource-definitions

artifacts/examples/example-foo.yaml

artifacts/examples/example-foo.yaml用于创建Foo对象

apiVersion: samplecontroller.k8s.io/v1alpha1
kind: Foo
metadata:
  name: example-foo
spec:
  deploymentName: example-foo
  replicas: 1

pkg/apis/samplecontroller/register.go

pkg/apis/samplecontroller/register.go用于放置后边需要的全局变量

package samplecontroller

const (
    GroupName = "samplecontroller.k8s.io"
)

是不是还应该有个Version = "v1alpha1"

pkg/apis/samplecontroller/v1alpha1/doc.go

// +k8s:deepcopy-gen=package
// +groupName=samplecontroller.k8s.io

// Package v1alpha1 is the v1alpha1 version of the API.
package v1alpha1

+<tag_name>[=value]格式的注释是kubernetes进行代码生成的Annotation风格注释

代码生成注释的意思是

  • +k8s:deepcopy-gen=package是为整个v1的包里所有类型定义自动生成DeepCopy方法
  • +groupName=samplecrd.k8s.io定义这个包对应的API组名字

这些起到的是全局代码生成控制的作用

pkg/apis/samplecontroller/v1alpha1/types.go

用于定义提交的数据

package v1alpha1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// Foo is a specification for a Foo resource
type Foo struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   FooSpec   `json:"spec"`
    Status FooStatus `json:"status"`
}

// FooSpec is the spec for a Foo resource
type FooSpec struct {
    DeploymentName string `json:"deploymentName"`
    Replicas       *int32 `json:"replicas"`
}

// FooStatus is the status for a Foo resource
type FooStatus struct {
    AvailableReplicas int32 `json:"availableReplicas"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// FooList is a list of Foo resources
type FooList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata"`

    Items []Foo `json:"items"`
}
  • Foo的type也是包含了TypeMeta(API元数据,包括kind和apiVersion)和ObjectMeta(对象元数据,包括metadata),Spec是是需要自定定制的部分 = FooSpec里定义了Cidr和Gateway两个字段,每个字段后边的代表这个字段被转化为json的名字,也就是yaml中需要定义的部分
  • FooList中定义一组Network对象需要包含的字段,这是为了在kubernetes中通过list方法获取所有对象的时候返回值是list类型

代码生成注释的意思是

  • +genclient代表为下边的API资源类型生成对应的Client代码
  • +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object代表在生成DeepCopy的时候实现kubernetes提供的runtime.Object接口,否则再某些kubernetes版本会出现一个编译错误,一个固定操作

如果没有Status字段需要加

  • +genclient:noStatus代表这个API没有Status字段,否则生成的Client会自带UpdateStatus方法,如果有Status需要单独添加Status FooStatus json:"status"

pkg/apis/samplecontroller/v1alpha1/register.go

注册一个Type给APIServer,使资源类型在服务器端的注册,chemeGroupVersion包括Group和Version,var

package v1alpha1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"

    samplecontroller "k8s.io/sample-controller/pkg/apis/samplecontroller"
)

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: samplecontroller.GroupName, Version: "v1alpha1"}

// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
    return SchemeGroupVersion.WithKind(kind).GroupKind()
}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
    return SchemeGroupVersion.WithResource(resource).GroupResource()
}

var (
    SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
    AddToScheme   = SchemeBuilder.AddToScheme
)

// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
    scheme.AddKnownTypes(SchemeGroupVersion,
        &Foo{},
        &FooList{},
    )
    metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
    return nil
}

代码生成

使用kubernets提供的工具k8s.io/code-generator,为定义的资源类型主动生成deep-copy方法,clientset,informer和lister

hack/update-codegen.sh

set -o errexit
set -o nounset
set -o pipefail

SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}

# generate the code with:
# --output-base    because this script should also be able to run inside the vendor dir of
#                  k8s.io/kubernetes. The output-base is needed for the generators to output into the vendor dir
#                  instead of the $GOPATH directly. For normal projects this can be dropped.
"${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
  k8s.io/sample-controller/pkg/generated k8s.io/sample-controller/pkg/apis \
  samplecontroller:v1alpha1 \
  --output-base "$(dirname "${BASH_SOURCE[0]}")/../../.." \
  --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt

# To use your own boilerplate text append:
#   --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt

进行代码生成

$ mv pkg/generated /tmp/generated
$ mv pkg/apis/samplecontroller/v1alpha1/zz_generated.deepcopy.go /tmp/zz_generated.deepcopy.go.bak
$ go get k8s.io/code-generator
$ go get k8s.io/gengo
$ go get golang.org/x/tools/imports
$ ./hack/update-codegen.sh
./hack/update-codegen.sh 
Generating deepcopy funcs
Generating clientset for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/client/clientset
Generating listers for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/client/listers
Generating informers for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/client/informers
$ diff pkg/apis/samplecontroller/v1alpha1/zz_generated.deepcopy.go /tmp/zz_generated.deepcopy.go.bak
$ for i in `find ./pkg/generated -name "*.go"`; do diff $i $(echo $i | sed s@./pkg/generated@/tmp/generated@);done
$ for i in `find ./pkg/generated -name "*.go"`; do echo "diff $i $(echo $i | sed s@./pkg/generated@/tmp/generated@)";done
diff ./pkg/generated/clientset/versioned/doc.go /tmp/generated/clientset/versioned/doc.go
diff ./pkg/generated/clientset/versioned/clientset.go /tmp/generated/clientset/versioned/clientset.go
diff ./pkg/generated/clientset/versioned/fake/register.go /tmp/generated/clientset/versioned/fake/register.go
diff ./pkg/generated/clientset/versioned/fake/doc.go /tmp/generated/clientset/versioned/fake/doc.go
diff ./pkg/generated/clientset/versioned/fake/clientset_generated.go /tmp/generated/clientset/versioned/fake/clientset_generated.go
diff ./pkg/generated/clientset/versioned/scheme/register.go /tmp/generated/clientset/versioned/scheme/register.go
diff ./pkg/generated/clientset/versioned/scheme/doc.go /tmp/generated/clientset/versioned/scheme/doc.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/doc.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/doc.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/foo.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/foo.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/doc.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/doc.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/fake_samplecontroller_client.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/fake_samplecontroller_client.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/fake_foo.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/fake/fake_foo.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/generated_expansion.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/generated_expansion.go
diff ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/samplecontroller_client.go /tmp/generated/clientset/versioned/typed/samplecontroller/v1alpha1/samplecontroller_client.go
diff ./pkg/generated/listers/samplecontroller/v1alpha1/foo.go /tmp/generated/listers/samplecontroller/v1alpha1/foo.go
diff ./pkg/generated/listers/samplecontroller/v1alpha1/expansion_generated.go /tmp/generated/listers/samplecontroller/v1alpha1/expansion_generated.go
diff ./pkg/generated/informers/externalversions/generic.go /tmp/generated/informers/externalversions/generic.go
diff ./pkg/generated/informers/externalversions/factory.go /tmp/generated/informers/externalversions/factory.go
diff ./pkg/generated/informers/externalversions/samplecontroller/interface.go /tmp/generated/informers/externalversions/samplecontroller/interface.go
diff ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go /tmp/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go
diff ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/interface.go /tmp/generated/informers/externalversions/samplecontroller/v1alpha1/interface.go
diff ./pkg/generated/informers/externalversions/internalinterfaces/factory_interfaces.go /tmp/generated/informers/externalversions/internalinterfaces/factory_interfaces.go

可以看到删除文件和生成的文件内容一样

pkg/apis/samplecontroller/v1alpha1下的zz_generated.deepcopy.go就是DeepCopy的代码文件,client目录下clientset、informers、listers就是自定义控制器的客户端了

直接编译和使用

$ go get k8s.io/sample-controller
$ cd $GOPATH/src/k8s.io/sample-controller
$ go build -o sample-controller .
$ ls
artifacts  code-of-conduct.md  CONTRIBUTING.md  controller.go  controller_test.go  docs  Godeps  go.mod  go.sum  hack  LICENSE  main.go  OWNERS  pkg  README.md  sample-controller  SECURITY_CONTACTS

直接编译可以看到编译到的sample-controller

老版本使用deployment的apiVersion是extensions/v1beta1,1.9版本后使用app/v1了

$ ./sample-controller -kubeconfig=$HOME/.kube/config
$ kubectl create -f artifacts/examples/crd.yaml
$ kubectl create -f artifacts/examples/example-foo.yaml
$ kubectl get deployments
$ kubectl delete crd foos.samplecontroller.k8s.io

main.go

package main

import (
    "flag"
    "time"

    kubeinformers "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/klog"
    // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
    // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

    clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
    informers "k8s.io/sample-controller/pkg/generated/informers/externalversions"
    "k8s.io/sample-controller/pkg/signals"
)

var (
    masterURL  string
    kubeconfig string
)

func main() {
    flag.Parse()

    // set up signals so we handle the first shutdown signal gracefully
    stopCh := signals.SetupSignalHandler()

    cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
    if err != nil {
        klog.Fatalf("Error building kubeconfig: %s", err.Error())
    }

    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }

    exampleClient, err := clientset.NewForConfig(cfg)
    if err != nil {
        klog.Fatalf("Error building example clientset: %s", err.Error())
    }

    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
    exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

    controller := NewController(kubeClient, exampleClient,
        kubeInformerFactory.Apps().V1().Deployments(),
        exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

    // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
    // Start method is non-blocking and runs all registered informers in a dedicated goroutine.
    kubeInformerFactory.Start(stopCh)
    exampleInformerFactory.Start(stopCh)

    if err = controller.Run(2, stopCh); err != nil {
        klog.Fatalf("Error running controller: %s", err.Error())
    }
}

func init() {
    flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
    flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
  1. 创建一个kubernetes的client和exampleClient(用于与apiserver进行连接,维护连接使用的Informer的reflector包,进行监听对象实例变化)
  2. 为network对象创建一个exampleInformerFactory,创建Informer对应的Controller
  3. 启动informer和controller

controller.go

package main

import (
    "fmt"
    "time"

    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    appsinformers "k8s.io/client-go/informers/apps/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/kubernetes/scheme"
    typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
    appslisters "k8s.io/client-go/listers/apps/v1"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/record"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog"

    samplev1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
    clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
    samplescheme "k8s.io/sample-controller/pkg/generated/clientset/versioned/scheme"
    informers "k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1"
    listers "k8s.io/sample-controller/pkg/generated/listers/samplecontroller/v1alpha1"
)

const controllerAgentName = "sample-controller"

const (
    // SuccessSynced is used as part of the Event 'reason' when a Foo is synced
    SuccessSynced = "Synced"
    // ErrResourceExists is used as part of the Event 'reason' when a Foo fails
    // to sync due to a Deployment of the same name already existing.
    ErrResourceExists = "ErrResourceExists"

    // MessageResourceExists is the message used for Events when a resource
    // fails to sync due to a Deployment already existing
    MessageResourceExists = "Resource %q already exists and is not managed by Foo"
    // MessageResourceSynced is the message used for an Event fired when a Foo
    // is synced successfully
    MessageResourceSynced = "Foo synced successfully"
)

// Controller is the controller implementation for Foo resources
type Controller struct {
    // kubeclientset is a standard kubernetes clientset
    kubeclientset kubernetes.Interface
    // sampleclientset is a clientset for our own API group
    sampleclientset clientset.Interface

    deploymentsLister appslisters.DeploymentLister
    deploymentsSynced cache.InformerSynced
    foosLister        listers.FooLister
    foosSynced        cache.InformerSynced

    // workqueue is a rate limited work queue. This is used to queue work to be
    // processed instead of performing it as soon as a change happens. This
    // means we can ensure we only process a fixed amount of resources at a
    // time, and makes it easy to ensure we are never processing the same item
    // simultaneously in two different workers.
    workqueue workqueue.RateLimitingInterface
    // recorder is an event recorder for recording Event resources to the
    // Kubernetes API.
    recorder record.EventRecorder
}

// NewController returns a new sample controller
func NewController(
    kubeclientset kubernetes.Interface,
    sampleclientset clientset.Interface,
    deploymentInformer appsinformers.DeploymentInformer,
    fooInformer informers.FooInformer) *Controller {

    // Create event broadcaster
    // Add sample-controller types to the default Kubernetes Scheme so Events can be
    // logged for sample-controller types.
    utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
    klog.V(4).Info("Creating event broadcaster")
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

    controller := &Controller{
        kubeclientset:     kubeclientset,
        sampleclientset:   sampleclientset,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        foosLister:        fooInformer.Lister(),
        foosSynced:        fooInformer.Informer().HasSynced,
        workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
        recorder:          recorder,
    }

    klog.Info("Setting up event handlers")
    // Set up an event handler for when Foo resources change
    fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueFoo,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueFoo(new)
        },
    })
    // Set up an event handler for when Deployment resources change. This
    // handler will lookup the owner of the given Deployment, and if it is
    // owned by a Foo resource will enqueue that Foo resource for
    // processing. This way, we don't need to implement custom logic for
    // handling Deployment resources. More info on this pattern:
    // https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.handleObject,
        UpdateFunc: func(old, new interface{}) {
            newDepl := new.(*appsv1.Deployment)
            oldDepl := old.(*appsv1.Deployment)
            if newDepl.ResourceVersion == oldDepl.ResourceVersion {
                // Periodic resync will send update events for all known Deployments.
                // Two different versions of the same Deployment will always have different RVs.
                return
            }
            controller.handleObject(new)
        },
        DeleteFunc: controller.handleObject,
    })

    return controller
}


# 控制循环
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
    defer utilruntime.HandleCrash()
    defer c.workqueue.ShutDown()

    // Start the informer factories to begin populating the informer caches
    klog.Info("Starting Foo controller")

    // Wait for the caches to be synced before starting workers
    klog.Info("Waiting for informer caches to sync")
    # 完成一次本地缓存的数据同步操作
    if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
        return fmt.Errorf("failed to wait for caches to sync")
    }

    klog.Info("Starting workers")
    // Launch two workers to process Foo resources
    # 启动多个循环任务runWorker
    for i := 0; i < threadiness; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }

    klog.Info("Started workers")
    <-stopCh
    klog.Info("Shutting down workers")

    return nil
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
    # 通过for无限循环
    for c.processNextWorkItem() {
    }
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
    # 从队列获取对象
    obj, shutdown := c.workqueue.Get()

    if shutdown {
        return false
    }

    // We wrap this block in a func so we can defer c.workqueue.Done.
    err := func(obj interface{}) error {
        // We call Done here so the workqueue knows we have finished
        // processing this item. We also must remember to call Forget if we
        // do not want this work item being re-queued. For example, we do
        // not call Forget if a transient error occurs, instead the item is
        // put back on the workqueue and attempted again after a back-off
        // period.
        defer c.workqueue.Done(obj)
        var key string
        var ok bool
        // We expect strings to come off the workqueue. These are of the
        // form namespace/name. We do this as the delayed nature of the
        // workqueue means the items in the informer cache may actually be
        // more up to date that when the item was initially put onto the
        // workqueue.
        if key, ok = obj.(string); !ok {
            // As the item in the workqueue is actually invalid, we call
            // Forget here else we'd go into a loop of attempting to
            // process a work item that is invalid.
            # 如果不调用Forget,会使用Done使obj会重新回到队列
            c.workqueue.Forget(obj)
            utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
            return nil
        }
        // Run the syncHandler, passing it the namespace/name string of the
        // Foo resource to be synced.
        if err := c.syncHandler(key); err != nil {
            // Put the item back on the workqueue to handle any transient errors.
            c.workqueue.AddRateLimited(key)
            return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
        }
        // Finally, if no error occurs we Forget this item so it does not
        // get queued again until another change happens.
        c.workqueue.Forget(obj)
        klog.Infof("Successfully synced '%s'", key)
        return nil
    }(obj)

    if err != nil {
        utilruntime.HandleError(err)
        return true
    }

    return true
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
    // Convert the namespace/name string into a distinct namespace and name
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
        return nil
    }

    // Get the Foo resource with this namespace/name
    foo, err := c.foosLister.Foos(namespace).Get(name)
    # 这里应该是对应的DELETE操作,参考https://github.com/resouer/k8s-controller-custom-resource/blob/master/controller.go
    if err != nil {
        // The Foo resource may no longer exist, in which case we stop
        // processing.
        if errors.IsNotFound(err) {
            utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
            return nil
        }

        return err
    }

    # 获取foo定义的DeploymentName
    deploymentName := foo.Spec.DeploymentName
    if deploymentName == "" {
        // We choose to absorb the error here as the worker would requeue the
        // resource otherwise. Instead, the next time the resource is updated
        // the resource will be queued again.
        utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
        return nil
    }

    // Get the deployment with the name specified in Foo.spec
    deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
    // If the resource doesn't exist, we'll create it
    # 如果不存在deployment则创建一下
    if errors.IsNotFound(err) {
        deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo)) # 这个newDeployment方法是自定义的,构建了一个Deployment对象
    }

    // If an error occurs during Get/Create, we'll requeue the item so we can
    // attempt processing again later. This could have been caused by a
    // temporary network failure, or any other transient reason.
    if err != nil {
        return err
    }

    # 判断对象是由哪个Controller对象创建
    // If the Deployment is not controlled by this Foo resource, we should log
    // a warning to the event recorder and ret
    if !metav1.IsControlledBy(deployment, foo) {
        msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
        c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
        return fmt.Errorf(msg)
    }

    // If this number of the replicas on the Foo resource is specified, and the
    // number does not equal the current desired replicas on the Deployment, we
    // should update the Deployment resource.
    if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
        klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
        deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo)) 
    }

    // If an error occurs during Update, we'll requeue the item so we can
    // attempt processing again later. THis could have been caused by a
    // temporary network failure, or any other transient reason.
    if err != nil {
        return err
    }

    // Finally, we update the status block of the Foo resource to reflect the
    // current state of the world
    err = c.updateFooStatus(foo, deployment)
    if err != nil {
        return err
    }

    c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
    return nil
}

func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1.Deployment) error {
    // NEVER modify objects from the store. It's a read-only, local cache.
    // You can use DeepCopy() to make a deep copy of original object and modify this copy
    // Or create a copy manually for better performance
    fooCopy := foo.DeepCopy()
    fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
    // If the CustomResourceSubresources feature gate is not enabled,
    // we must use Update instead of UpdateStatus to update the Status block of the Foo resource.
    // UpdateStatus will not allow changes to the Spec of the resource,
    // which is ideal for ensuring nothing other than resource status has been updated.
    _, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(fooCopy)
    return err
}

# 监听有增加放入workqueue
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
func (c *Controller) enqueueFoo(obj interface{}) {
    var key string
    var err error
    # 更新本地缓存
    if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
        utilruntime.HandleError(err)
        return
    }
    # 将其放到队列
    c.workqueue.Add(key)
}

// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that Foo resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
func (c *Controller) handleObject(obj interface{}) {
    var object metav1.Object
    var ok bool
    if object, ok = obj.(metav1.Object); !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
            return
        }
        object, ok = tombstone.Obj.(metav1.Object)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
            return
        }
        klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
    }
    klog.V(4).Infof("Processing object: %s", object.GetName())
    if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
        // If this object is not owned by a Foo, we should not do anything more
        // with it.
        if ownerRef.Kind != "Foo" {
            return
        }

        foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
        if err != nil {
            klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)
            return
        }

        c.enqueueFoo(foo)
        return
    }
}

// newDeployment creates a new Deployment for a Foo resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the Foo resource that 'owns' it.
func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
    labels := map[string]string{
        "app":        "nginx",
        "controller": foo.Name,
    }
    return &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      foo.Spec.DeploymentName,
            Namespace: foo.Namespace,
            OwnerReferences: []metav1.OwnerReference{
                *metav1.NewControllerRef(foo, samplev1alpha1.SchemeGroupVersion.WithKind("Foo")),
            },
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: foo.Spec.Replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: labels,
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: labels,
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  "nginx",
                            Image: "nginx:latest",
                        },
                    },
                },
            },
        },
    }
}