nginx-ingress-controller代码解析

时间:Sept. 1, 2021 分类:

目录:

原理

启动的时候根据ingress资源生成nginx配置文件,upstream等调用127.0.0.1:10246/ configuration接口同步数据,数据通过ngx.shared共享内存进行存储

通过watch的方式获取资源的变化

  1. 如果bankends和certs发生了变化,upstream等调用127.0.0.1:10246/ configuration接口同步数据
  2. 如果非上边两种资源,重新生成nginx.conf,并进行reload

Ingress进行watch

controller代码 /internal/ingress/controller/nginx.go

注册的watch方法 /internal/ingress/controller/store/store.go

注册了

  • ingEventHandler
  • secrEventHandler
  • epEventHandler
  • cmEventHandler
  • serviceHandler

对于不同的资源变化都会写入updataCh中

  1. 对于不同的资源的不同方法,直接使用event类型不同,ConfigurationEvent类型会单独调用nginx的configuration接口同步
  2. 对于ingress的AddFunc和Update方法,secret的Update和Delete方法,会调用store.syncIngress(ing)(用于ingress的标签解析为配置的值)和store.syncSecrets(ing)(同步证书到本地目录共nginx使用)

启动服务的时候会消费 updataCh

case event := <-n.updateCh.Out():
            if n.isShuttingDown {
                break
            }

            if evt, ok := event.(store.Event); ok {
                klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
                if evt.Type == store.ConfigurationEvent {
                    // TODO: is this necessary? Consider removing this special case
                    n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
                    continue
                }

                n.syncQueue.EnqueueSkippableTask(evt.Obj)
            } else {
                klog.Warningf("Unexpected event type received %T", event)
            }

// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
    t.enqueue(obj, false)
}

// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
    t.enqueue(obj, true)
}

// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
    if t.IsShuttingDown() {
        klog.ErrorS(nil, "queue has been shutdown, failed to enqueue", "key", obj)
        return
    }

    ts := time.Now().UnixNano()
    if !skippable {
        // make sure the timestamp is bigger than lastSync
        ts = time.Now().Add(24 * time.Hour).UnixNano()
    }
    klog.V(3).InfoS("queuing", "item", obj)
    key, err := t.fn(obj)
    if err != nil {
        klog.ErrorS(err, "creating object key", "item", obj)
        return
    }
    t.queue.Add(Element{
        Key:       key,
        Timestamp: ts,
    })
}

这里fn为internal/ingress/controller/controller.go的 是使用cache.DeletionHandlingMetaNamespaceKeyFunc,Add使用的kubernetes的workqueue.RateLimitingInterface限速队列

worker从限速队列获取,并执行

// worker processes work in the queue through sync.
func (t *Queue) worker() {
    for {
        key, quit := t.queue.Get()
        if quit {
            if !isClosed(t.workerDone) {
                close(t.workerDone)
            }
            return
        }
        ts := time.Now().UnixNano()

        item := key.(Element)
        if t.lastSync > item.Timestamp {
            klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
            t.queue.Forget(key)
            t.queue.Done(key)
            continue
        }

        klog.V(3).InfoS("syncing", "key", item.Key)
        if err := t.sync(key); err != nil {
            klog.ErrorS(err, "requeuing", "key", item.Key)
            t.queue.AddRateLimited(Element{
                Key:       item.Key,
                Timestamp: time.Now().UnixNano(),
            })
        } else {
            t.queue.Forget(key)
            t.lastSync = ts
        }

        t.queue.Done(key)
    }
}


sync为syncIngress
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
    n.syncRateLimiter.Accept()

    if n.syncQueue.IsShuttingDown() {
        return nil
    }

    ings := n.store.ListIngresses()
    hosts, servers, pcfg := n.getConfiguration(ings)

    n.metricCollector.SetSSLExpireTime(servers)

    if n.runningConfig.Equal(pcfg) {
        klog.V(3).Infof("No configuration change detected, skipping backend reload")
        return nil
    }

    n.metricCollector.SetHosts(hosts)
    # 判断是否能动态加载
    if !n.IsDynamicConfigurationEnough(pcfg) {
        klog.InfoS("Configuration changes detected, backend reload required")

        hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
            TagName: "json",
        })

        pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)
        # 更新nginx配置并reload
        err := n.OnUpdate(*pcfg)
        if err != nil {
            n.metricCollector.IncReloadErrorCount()
            n.metricCollector.ConfigSuccess(hash, false)
            klog.Errorf("Unexpected failure reloading the backend:
%v", err)
            n.recorder.Eventf(k8s.IngressNGINXPod, core.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
            return err
        }

        klog.InfoS("Backend successfully reloaded")
        n.metricCollector.ConfigSuccess(hash, true)
        n.metricCollector.IncReloadCount()

        n.recorder.Eventf(k8s.IngressNGINXPod, core.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
    }

    isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
    if isFirstSync {
        // For the initial sync it always takes some time for NGINX to start listening
        // For large configurations it might take a while so we loop and back off
        klog.InfoS("Initial sync, sleeping for 1 second")
        time.Sleep(1 * time.Second)
    }

    retry := wait.Backoff{
        Steps:    15,
        Duration: 1 * time.Second,
        Factor:   0.8,
        Jitter:   0.1,
    }

    # 动态加载
    err := wait.ExponentialBackoff(retry, func() (bool, error) {
        err := n.configureDynamically(pcfg)
        if err == nil {
            klog.V(2).Infof("Dynamic reconfiguration succeeded.")
            return true, nil
        }

        klog.Warningf("Dynamic reconfiguration failed: %v", err)
        return false, err
    })
    if err != nil {
        klog.Errorf("Unexpected failure reconfiguring NGINX:
%v", err)
        return err
    }

    ri := getRemovedIngresses(n.runningConfig, pcfg)
    re := getRemovedHosts(n.runningConfig, pcfg)
    n.metricCollector.RemoveMetrics(ri, re)

    n.runningConfig = pcfg

    return nil
}

动态调整nginx的配置

// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
    backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
    if backendsChanged {
        err := configureBackends(pcfg.Backends)
        if err != nil {
            return err
        }
    }

    streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
    if streamConfigurationChanged {
        err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
        if err != nil {
            return err
        }
    }

    serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
    if serversChanged {
        err := configureCertificates(pcfg.Servers)
        if err != nil {
            return err
        }
    }

    return nil
}

获取upstream

nginx配置

upstream upstream_balancer {
                ### Attention!!!
                #
                # We no longer create "upstream" section for every backend.
                # Backends are handled dynamically using Lua. If you would like to debug
                # and see what backends ingress-nginx has in its memory you can
                # install our kubectl plugin https://kubernetes.github.io/ingress-nginx/kubectl-plugin.
                # Once you have the plugin you can use "kubectl ingress-nginx backends" command to
                # inspect current backends.
                #
                ###

                server 0.0.0.1; # placeholder

                balancer_by_lua_block {
                        balancer.balance()
                }

                keepalive 32;

                keepalive_timeout  60s;
                keepalive_requests 100;

        }

lua代码

function _M.balance()
  local balancer = get_balancer()
  if not balancer then
    return
  end

  local peer = balancer:balance()
  if not peer then
    ngx.log(ngx.WARN, "no peer was returned, balancer: " .. balancer.name)
    return
  end
  # 重试一次(后端业务是否幂等?是否能配置?)
  ngx_balancer.set_more_tries(1)
  # ngx.balancer.set_current_peer设置upstream返回的server地址
  local ok, err = ngx_balancer.set_current_peer(peer)
  if not ok then
    ngx.log(ngx.ERR, "error while setting current upstream peer ", peer,
            ": ", err)
  end
end



local function get_balancer()
  if ngx.ctx.balancer then
    return ngx.ctx.balancer
  end

  local backend_name = ngx.var.proxy_upstream_name

  local balancer = balancers[backend_name]
  if not balancer then
    return
  end

  if route_to_alternative_balancer(balancer) then
    local alternative_backend_name = balancer.alternative_backends[1]
    ngx.var.proxy_alternative_upstream_name = alternative_backend_name

    balancer = balancers[alternative_backend_name]
  end

  ngx.ctx.balancer = balancer

  return balancer
end

nginx用于动态更新

nginx配置

server {
                listen 127.0.0.1:10246;
                set $proxy_upstream_name "internal";
                ...
                location /configuration {
                        client_max_body_size                    21m;
                        client_body_buffer_size                 21m;
                        proxy_buffering                         off;

                        content_by_lua_block {
                                configuration.call()
                        }
                }
        }

对应接收方法

function _M.call()
  if ngx.var.request_method ~= "POST" and ngx.var.request_method ~= "GET" then
    ngx.status = ngx.HTTP_BAD_REQUEST
    ngx.print("Only POST and GET requests are allowed!")
    return
  end

  if ngx.var.request_uri == "/configuration/servers" then
    handle_servers()
    return
  end

  if ngx.var.request_uri == "/configuration/general" then
    handle_general()
    return
  end

  if ngx.var.uri == "/configuration/certs" then
    handle_certs()
    return
  end

  if ngx.var.request_uri ~= "/configuration/backends" then
    ngx.status = ngx.HTTP_NOT_FOUND
    ngx.print("Not found!")
    return
  end

  if ngx.var.request_method == "GET" then
    ngx.status = ngx.HTTP_OK
    ngx.print(_M.get_backends_data())
    return
  end

  local backends = fetch_request_body()
  if not backends then
    ngx.log(ngx.ERR, "dynamic-configuration: unable to read valid request body")
    ngx.status = ngx.HTTP_BAD_REQUEST
    return
  end
 local success, err = configuration_data:set("backends", backends)
  if not success then
    ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(err))
    ngx.status = ngx.HTTP_BAD_REQUEST
    return
  end

  ngx.status = ngx.HTTP_CREATED
end

用于同步代码

function _M.init_worker()
  sync_backends() -- when worker starts, sync backends without delay
  local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
  if err then
    ngx.log(ngx.ERR, "error when setting up timer.every for sync_backends: ",
            tostring(err))
  end
end


local function sync_backends()
  local backends_data = configuration.get_backends_data()
  if not backends_data then
    balancers = {}
    return
  end

  local new_backends, err = cjson.decode(backends_data)
  if not new_backends then
    ngx.log(ngx.ERR, "could not parse backends data: ", err)
    return
  end

  local balancers_to_keep = {}
  for _, new_backend in ipairs(new_backends) do
    sync_backend(new_backend)
    balancers_to_keep[new_backend.name] = balancers[new_backend.name]
  end

  for backend_name, _ in pairs(balancers) do
    if not balancers_to_keep[backend_name] then
      balancers[backend_name] = nil
    end
  end
end

通过插件获取的bankend数据

{
    "name": "default-flaskapp1-80",
    "service": {
      "metadata": {
        "creationTimestamp": null
      },
      "spec": {
        "ports": [
          {
            "name": "http",
            "protocol": "TCP",
            "port": 80,
            "targetPort": 80
          }
        ],
        "selector": {
          "app.kubernetes.io/name": "flaskapp"
        },
        "clusterIP": "10.93.32.45",
        "type": "ClusterIP",
        "sessionAffinity": "None"
      },
      "status": {
        "loadBalancer": {}
      }
    },
    "port": 80,
    "sslPassthrough": false,
    "endpoints": [
      {
        "address": "10.243.3.69",
        "port": "80"
      }
    ],
    "sessionAffinityConfig": {
      "name": "",
      "mode": "",
      "cookieSessionAffinity": {
        "name": ""
      }
    },
    "upstreamHashByConfig": {
      "upstream-hash-by-subset-size": 3
    },
    "noServer": false,
    "trafficShapingPolicy": {
      "weight": 0,
      "header": "",
      "headerValue": "",
      "cookie": ""
    }
  },

对于跨namespace

"endpoints": [
      {
        "address": "flaskapp.inf.svc.cluster.local",
        "port": "80"
      }
    ],