nginx-ingress-controller

时间:Oct. 12, 2020 分类:

目录:

原理

启动的时候根据ingress资源生成nginx配置文件,upstream等调用127.0.0.1:10246/configuration接口同步数据,数据通过ngx.shared共享内存进行存储

通过watch的方式获取资源的变化

  • 如果bankends和certs发生了变化,upstream等调用127.0.0.1:10246/configuration接口同步数据
  • 如果非上边两种资源,重新生成nginx.conf,并进行reload

Ingress进行watch

controller代码

/internal/ingress/controller/nginx.go

注册的watch方法

/internal/ingress/controller/store/store.go

注册了

  • ingEventHandler
  • secrEventHandler
  • epEventHandler
  • cmEventHandler
  • serviceHandler

对于不同的资源变化都会写入updataCh中,但是有的会有额外操作

  • 对于不同的资源的不同方法,直接使用event类型不同,ConfigurationEvent类型会单独调用nginx的configuration接口同步
  • 对于ingress的AddFunc和Update方法,secret的Update和Delete方法,会调用store.syncIngress(ing)(用于ingress的标签解析为配置的值)和store.syncSecrets(ing)(同步证书到本地目录共nginx使用)

启动服务的时候会消费updataCh

        case event := <-n.updateCh.Out():
            if n.isShuttingDown {
                break
            }

            if evt, ok := event.(store.Event); ok {
                klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
                if evt.Type == store.ConfigurationEvent {
                    // TODO: is this necessary? Consider removing this special case
                    n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
                    continue
                }

                n.syncQueue.EnqueueSkippableTask(evt.Obj)
            } else {
                klog.Warningf("Unexpected event type received %T", event)
            }

EnqueueTask和EnqueueSkippableTask统一调用的enqueue方法

// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
    t.enqueue(obj, false)
}

// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
    t.enqueue(obj, true)
}

// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
    if t.IsShuttingDown() {
        klog.ErrorS(nil, "queue has been shutdown, failed to enqueue", "key", obj)
        return
    }

    ts := time.Now().UnixNano()
    if !skippable {
        // make sure the timestamp is bigger than lastSync
        ts = time.Now().Add(24 * time.Hour).UnixNano()
    }
    klog.V(3).InfoS("queuing", "item", obj)
    key, err := t.fn(obj)
    if err != nil {
        klog.ErrorS(err, "creating object key", "item", obj)
        return
    }
    t.queue.Add(Element{
        Key:       key,
        Timestamp: ts,
    })
}
  • fn为internal/ingress/controller/controller.go的是使用cache.DeletionHandlingMetaNamespaceKeyFunc
  • Add使用的kubernetes的workqueue.RateLimitingInterface限速队列

worker从限速队列获取,并执行

// worker processes work in the queue through sync.
func (t *Queue) worker() {
    for {
        key, quit := t.queue.Get()
        if quit {
            if !isClosed(t.workerDone) {
                close(t.workerDone)
            }
            return
        }
        ts := time.Now().UnixNano()

        item := key.(Element)
        if t.lastSync > item.Timestamp {
            klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
            t.queue.Forget(key)
            t.queue.Done(key)
            continue
        }

        klog.V(3).InfoS("syncing", "key", item.Key)
        if err := t.sync(key); err != nil {
            klog.ErrorS(err, "requeuing", "key", item.Key)
            t.queue.AddRateLimited(Element{
                Key:       item.Key,
                Timestamp: time.Now().UnixNano(),
            })
        } else {
            t.queue.Forget(key)
            t.lastSync = ts
        }

        t.queue.Done(key)
    }
}

sync为syncIngress

// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
    n.syncRateLimiter.Accept()

    if n.syncQueue.IsShuttingDown() {
        return nil
    }

    ings := n.store.ListIngresses()
    hosts, servers, pcfg := n.getConfiguration(ings)

    n.metricCollector.SetSSLExpireTime(servers)

    if n.runningConfig.Equal(pcfg) {
        klog.V(3).Infof("No configuration change detected, skipping backend reload")
        return nil
    }

    n.metricCollector.SetHosts(hosts)
    # 判断是否能动态加载
    if !n.IsDynamicConfigurationEnough(pcfg) {
        klog.InfoS("Configuration changes detected, backend reload required")

        hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
            TagName: "json",
        })

        pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)
        # 更新nginx配置并reload
        err := n.OnUpdate(*pcfg)
        if err != nil {
            n.metricCollector.IncReloadErrorCount()
            n.metricCollector.ConfigSuccess(hash, false)
            klog.Errorf("Unexpected failure reloading the backend:\n%v", err)
            n.recorder.Eventf(k8s.IngressNGINXPod, core.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
            return err
        }

        klog.InfoS("Backend successfully reloaded")
        n.metricCollector.ConfigSuccess(hash, true)
        n.metricCollector.IncReloadCount()

        n.recorder.Eventf(k8s.IngressNGINXPod, core.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
    }

    isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
    if isFirstSync {
        // For the initial sync it always takes some time for NGINX to start listening
        // For large configurations it might take a while so we loop and back off
        klog.InfoS("Initial sync, sleeping for 1 second")
        time.Sleep(1 * time.Second)
    }

    retry := wait.Backoff{
        Steps:    15,
        Duration: 1 * time.Second,
        Factor:   0.8,
        Jitter:   0.1,
    }

    # 动态加载
    err := wait.ExponentialBackoff(retry, func() (bool, error) {
        err := n.configureDynamically(pcfg)
        if err == nil {
            klog.V(2).Infof("Dynamic reconfiguration succeeded.")
            return true, nil
        }

        klog.Warningf("Dynamic reconfiguration failed: %v", err)
        return false, err
    })
    if err != nil {
        klog.Errorf("Unexpected failure reconfiguring NGINX:\n%v", err)
        return err
    }

    ri := getRemovedIngresses(n.runningConfig, pcfg)
    re := getRemovedHosts(n.runningConfig, pcfg)
    n.metricCollector.RemoveMetrics(ri, re)

    n.runningConfig = pcfg

    return nil
}

动态调整nginx配置

// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
    backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
    if backendsChanged {
        err := configureBackends(pcfg.Backends)
        if err != nil {
            return err
        }
    }

    streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
    if streamConfigurationChanged {
        err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
        if err != nil {
            return err
        }
    }

    serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
    if serversChanged {
        err := configureCertificates(pcfg.Servers)
        if err != nil {
            return err
        }
    }

    return nil
}

nginx端用于接收动态配置调整的接口

nginx配置

        server {
                listen 127.0.0.1:10246;
                set $proxy_upstream_name "internal";
                ...
                location /configuration {
                        client_max_body_size                    21m;
                        client_body_buffer_size                 21m;
                        proxy_buffering                         off;

                        content_by_lua_block {
                                configuration.call()
                        }
                }
        }

对应接收方法

function _M.call()
  if ngx.var.request_method ~= "POST" and ngx.var.request_method ~= "GET" then
    ngx.status = ngx.HTTP_BAD_REQUEST
    ngx.print("Only POST and GET requests are allowed!")
    return
  end

  if ngx.var.request_uri == "/configuration/servers" then
    handle_servers()
    return
  end

  if ngx.var.request_uri == "/configuration/general" then
    handle_general()
    return
  end

  if ngx.var.uri == "/configuration/certs" then
    handle_certs()
    return
  end

  if ngx.var.request_uri ~= "/configuration/backends" then
    ngx.status = ngx.HTTP_NOT_FOUND
    ngx.print("Not found!")
    return
  end

  if ngx.var.request_method == "GET" then
    ngx.status = ngx.HTTP_OK
    ngx.print(_M.get_backends_data())
    return
  end

  local backends = fetch_request_body()
  if not backends then
    ngx.log(ngx.ERR, "dynamic-configuration: unable to read valid request body")
    ngx.status = ngx.HTTP_BAD_REQUEST
    return
  end
 local success, err = configuration_data:set("backends", backends)
  if not success then
    ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(err))
    ngx.status = ngx.HTTP_BAD_REQUEST
    return
  end

  ngx.status = ngx.HTTP_CREATED
end

用于同步配置的代码

function _M.init_worker()
  sync_backends() -- when worker starts, sync backends without delay
  local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
  if err then
    ngx.log(ngx.ERR, "error when setting up timer.every for sync_backends: ",
            tostring(err))
  end
end


local function sync_backends()
  local backends_data = configuration.get_backends_data()
  if not backends_data then
    balancers = {}
    return
  end

  local new_backends, err = cjson.decode(backends_data)
  if not new_backends then
    ngx.log(ngx.ERR, "could not parse backends data: ", err)
    return
  end

  local balancers_to_keep = {}
  for _, new_backend in ipairs(new_backends) do
    sync_backend(new_backend)
    balancers_to_keep[new_backend.name] = balancers[new_backend.name]
  end

  for backend_name, _ in pairs(balancers) do
    if not balancers_to_keep[backend_name] then
      balancers[backend_name] = nil
    end
  end
end

bankend数据

通过kubectl插件,安装方式参考官网

kubectl ingress-nginx backends -n inf 

对于普通的service

  {
    "name": "default-flaskapp1-80",
    "service": {
      "metadata": {
        "creationTimestamp": null
      },
      "spec": {
        "ports": [
          {
            "name": "http",
            "protocol": "TCP",
            "port": 80,
            "targetPort": 80
          }
        ],
        "selector": {
          "app.kubernetes.io/name": "flaskapp"
        },
        "clusterIP": "110.193.132.145",
        "type": "ClusterIP",
        "sessionAffinity": "None"
      },
      "status": {
        "loadBalancer": {}
      }
    },
    "port": 80,
    "sslPassthrough": false,
    "endpoints": [
      {
        "address": "110.243.13.169",
        "port": "80"
      }
    ],
    "sessionAffinityConfig": {
      "name": "",
      "mode": "",
      "cookieSessionAffinity": {
        "name": ""
      }
    },
    "upstreamHashByConfig": {
      "upstream-hash-by-subset-size": 3
    },
    "noServer": false,
    "trafficShapingPolicy": {
      "weight": 0,
      "header": "",
      "headerValue": "",
      "cookie": ""
    }
  },

对于跨namespace,就必须要使用ExternalName

apiVersion: v1
kind: Service
metadata:
  name: flaskapp
  namespace: www
spec:
  selector:
    app.kubernetes.io/name: flaskapp
  ports:
    - name: http
      port: 80
---
apiVersion: v1
kind: Service
metadata:
  name: flaskapp2
spec:
  ports:
    - name: http
      port: 80
  type: ExternalName
  externalName: flaskapp.www.svc.cluster.local

获取到的bankend为

    "endpoints": [
      {
        "address": "flaskapp.www.svc.cluster.local",
        "port": "80"
      }
    ],

tips

修改nginx配置

支持三种方式

  • configmap,全局配置
  • ingress的annotations,作用于单个location
  • emplate/nginx.tmpl配置模板

configmap配置会直接在http配置中,也有直接对全体server和location生效的,ingress的annotations配置后可以对location进行覆盖

分组监听

配置参考

# Ingress controller 1
apiVersion: extensions/v1beta1
kind: Deployment
spec:
  template:
    spec:
      containers:
        - args:
            - /nginx-ingress-controller
            - --ingress-class=class-1
            - ...

# Ingress controller 2
apiVersion: extensions/v1beta1
kind: Deployment
spec:
  template:
    spec:
      containers:
        - args:
            - /nginx-ingress-controller
            - --ingress-class=class-2
            - ...

# This Ingress resource will be managed by controller 1
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  annotations:
    kubernetes.io/ingress.class: class-1
spec:
  rules: ...

# This Ingress resource will be managed by controller 2
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  annotations:
    kubernetes.io/ingress.class: class-2
spec:
  rules: ...