nginx-ingress-controller代码解析
目录:
原理
启动的时候根据ingress资源生成nginx配置文件,upstream等调用127.0.0.1:10246/ configuration接口同步数据,数据通过ngx.shared共享内存进行存储
通过watch的方式获取资源的变化
- 如果bankends和certs发生了变化,upstream等调用127.0.0.1:10246/ configuration接口同步数据
- 如果非上边两种资源,重新生成nginx.conf,并进行reload
Ingress进行watch
controller代码 /internal/ingress/controller/nginx.go
注册的watch方法 /internal/ingress/controller/store/store.go
注册了
- ingEventHandler
- secrEventHandler
- epEventHandler
- cmEventHandler
- serviceHandler
对于不同的资源变化都会写入updataCh中
- 对于不同的资源的不同方法,直接使用event类型不同,ConfigurationEvent类型会单独调用nginx的configuration接口同步
- 对于ingress的AddFunc和Update方法,secret的Update和Delete方法,会调用store.syncIngress(ing)(用于ingress的标签解析为配置的值)和store.syncSecrets(ing)(同步证书到本地目录共nginx使用)
启动服务的时候会消费 updataCh
case event := <-n.updateCh.Out():
if n.isShuttingDown {
break
}
if evt, ok := event.(store.Event); ok {
klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
if evt.Type == store.ConfigurationEvent {
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
klog.Warningf("Unexpected event type received %T", event)
}
// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
t.enqueue(obj, false)
}
// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
t.enqueue(obj, true)
}
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
if t.IsShuttingDown() {
klog.ErrorS(nil, "queue has been shutdown, failed to enqueue", "key", obj)
return
}
ts := time.Now().UnixNano()
if !skippable {
// make sure the timestamp is bigger than lastSync
ts = time.Now().Add(24 * time.Hour).UnixNano()
}
klog.V(3).InfoS("queuing", "item", obj)
key, err := t.fn(obj)
if err != nil {
klog.ErrorS(err, "creating object key", "item", obj)
return
}
t.queue.Add(Element{
Key: key,
Timestamp: ts,
})
}
这里fn为internal/ingress/controller/controller.go的 是使用cache.DeletionHandlingMetaNamespaceKeyFunc,Add使用的kubernetes的workqueue.RateLimitingInterface限速队列
worker从限速队列获取,并执行
// worker processes work in the queue through sync.
func (t *Queue) worker() {
for {
key, quit := t.queue.Get()
if quit {
if !isClosed(t.workerDone) {
close(t.workerDone)
}
return
}
ts := time.Now().UnixNano()
item := key.(Element)
if t.lastSync > item.Timestamp {
klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
t.queue.Forget(key)
t.queue.Done(key)
continue
}
klog.V(3).InfoS("syncing", "key", item.Key)
if err := t.sync(key); err != nil {
klog.ErrorS(err, "requeuing", "key", item.Key)
t.queue.AddRateLimited(Element{
Key: item.Key,
Timestamp: time.Now().UnixNano(),
})
} else {
t.queue.Forget(key)
t.lastSync = ts
}
t.queue.Done(key)
}
}
sync为syncIngress
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
n.syncRateLimiter.Accept()
if n.syncQueue.IsShuttingDown() {
return nil
}
ings := n.store.ListIngresses()
hosts, servers, pcfg := n.getConfiguration(ings)
n.metricCollector.SetSSLExpireTime(servers)
if n.runningConfig.Equal(pcfg) {
klog.V(3).Infof("No configuration change detected, skipping backend reload")
return nil
}
n.metricCollector.SetHosts(hosts)
# 判断是否能动态加载
if !n.IsDynamicConfigurationEnough(pcfg) {
klog.InfoS("Configuration changes detected, backend reload required")
hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
TagName: "json",
})
pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)
# 更新nginx配置并reload
err := n.OnUpdate(*pcfg)
if err != nil {
n.metricCollector.IncReloadErrorCount()
n.metricCollector.ConfigSuccess(hash, false)
klog.Errorf("Unexpected failure reloading the backend:
%v", err)
n.recorder.Eventf(k8s.IngressNGINXPod, core.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
return err
}
klog.InfoS("Backend successfully reloaded")
n.metricCollector.ConfigSuccess(hash, true)
n.metricCollector.IncReloadCount()
n.recorder.Eventf(k8s.IngressNGINXPod, core.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
}
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
if isFirstSync {
// For the initial sync it always takes some time for NGINX to start listening
// For large configurations it might take a while so we loop and back off
klog.InfoS("Initial sync, sleeping for 1 second")
time.Sleep(1 * time.Second)
}
retry := wait.Backoff{
Steps: 15,
Duration: 1 * time.Second,
Factor: 0.8,
Jitter: 0.1,
}
# 动态加载
err := wait.ExponentialBackoff(retry, func() (bool, error) {
err := n.configureDynamically(pcfg)
if err == nil {
klog.V(2).Infof("Dynamic reconfiguration succeeded.")
return true, nil
}
klog.Warningf("Dynamic reconfiguration failed: %v", err)
return false, err
})
if err != nil {
klog.Errorf("Unexpected failure reconfiguring NGINX:
%v", err)
return err
}
ri := getRemovedIngresses(n.runningConfig, pcfg)
re := getRemovedHosts(n.runningConfig, pcfg)
n.metricCollector.RemoveMetrics(ri, re)
n.runningConfig = pcfg
return nil
}
动态调整nginx的配置
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
if backendsChanged {
err := configureBackends(pcfg.Backends)
if err != nil {
return err
}
}
streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
if streamConfigurationChanged {
err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
if err != nil {
return err
}
}
serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
if serversChanged {
err := configureCertificates(pcfg.Servers)
if err != nil {
return err
}
}
return nil
}
获取upstream
nginx配置
upstream upstream_balancer {
### Attention!!!
#
# We no longer create "upstream" section for every backend.
# Backends are handled dynamically using Lua. If you would like to debug
# and see what backends ingress-nginx has in its memory you can
# install our kubectl plugin https://kubernetes.github.io/ingress-nginx/kubectl-plugin.
# Once you have the plugin you can use "kubectl ingress-nginx backends" command to
# inspect current backends.
#
###
server 0.0.0.1; # placeholder
balancer_by_lua_block {
balancer.balance()
}
keepalive 32;
keepalive_timeout 60s;
keepalive_requests 100;
}
lua代码
function _M.balance()
local balancer = get_balancer()
if not balancer then
return
end
local peer = balancer:balance()
if not peer then
ngx.log(ngx.WARN, "no peer was returned, balancer: " .. balancer.name)
return
end
# 重试一次(后端业务是否幂等?是否能配置?)
ngx_balancer.set_more_tries(1)
# ngx.balancer.set_current_peer设置upstream返回的server地址
local ok, err = ngx_balancer.set_current_peer(peer)
if not ok then
ngx.log(ngx.ERR, "error while setting current upstream peer ", peer,
": ", err)
end
end
local function get_balancer()
if ngx.ctx.balancer then
return ngx.ctx.balancer
end
local backend_name = ngx.var.proxy_upstream_name
local balancer = balancers[backend_name]
if not balancer then
return
end
if route_to_alternative_balancer(balancer) then
local alternative_backend_name = balancer.alternative_backends[1]
ngx.var.proxy_alternative_upstream_name = alternative_backend_name
balancer = balancers[alternative_backend_name]
end
ngx.ctx.balancer = balancer
return balancer
end
nginx用于动态更新
nginx配置
server {
listen 127.0.0.1:10246;
set $proxy_upstream_name "internal";
...
location /configuration {
client_max_body_size 21m;
client_body_buffer_size 21m;
proxy_buffering off;
content_by_lua_block {
configuration.call()
}
}
}
对应接收方法
function _M.call()
if ngx.var.request_method ~= "POST" and ngx.var.request_method ~= "GET" then
ngx.status = ngx.HTTP_BAD_REQUEST
ngx.print("Only POST and GET requests are allowed!")
return
end
if ngx.var.request_uri == "/configuration/servers" then
handle_servers()
return
end
if ngx.var.request_uri == "/configuration/general" then
handle_general()
return
end
if ngx.var.uri == "/configuration/certs" then
handle_certs()
return
end
if ngx.var.request_uri ~= "/configuration/backends" then
ngx.status = ngx.HTTP_NOT_FOUND
ngx.print("Not found!")
return
end
if ngx.var.request_method == "GET" then
ngx.status = ngx.HTTP_OK
ngx.print(_M.get_backends_data())
return
end
local backends = fetch_request_body()
if not backends then
ngx.log(ngx.ERR, "dynamic-configuration: unable to read valid request body")
ngx.status = ngx.HTTP_BAD_REQUEST
return
end
local success, err = configuration_data:set("backends", backends)
if not success then
ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(err))
ngx.status = ngx.HTTP_BAD_REQUEST
return
end
ngx.status = ngx.HTTP_CREATED
end
用于同步代码
function _M.init_worker()
sync_backends() -- when worker starts, sync backends without delay
local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if err then
ngx.log(ngx.ERR, "error when setting up timer.every for sync_backends: ",
tostring(err))
end
end
local function sync_backends()
local backends_data = configuration.get_backends_data()
if not backends_data then
balancers = {}
return
end
local new_backends, err = cjson.decode(backends_data)
if not new_backends then
ngx.log(ngx.ERR, "could not parse backends data: ", err)
return
end
local balancers_to_keep = {}
for _, new_backend in ipairs(new_backends) do
sync_backend(new_backend)
balancers_to_keep[new_backend.name] = balancers[new_backend.name]
end
for backend_name, _ in pairs(balancers) do
if not balancers_to_keep[backend_name] then
balancers[backend_name] = nil
end
end
end
通过插件获取的bankend数据
{
"name": "default-flaskapp1-80",
"service": {
"metadata": {
"creationTimestamp": null
},
"spec": {
"ports": [
{
"name": "http",
"protocol": "TCP",
"port": 80,
"targetPort": 80
}
],
"selector": {
"app.kubernetes.io/name": "flaskapp"
},
"clusterIP": "10.93.32.45",
"type": "ClusterIP",
"sessionAffinity": "None"
},
"status": {
"loadBalancer": {}
}
},
"port": 80,
"sslPassthrough": false,
"endpoints": [
{
"address": "10.243.3.69",
"port": "80"
}
],
"sessionAffinityConfig": {
"name": "",
"mode": "",
"cookieSessionAffinity": {
"name": ""
}
},
"upstreamHashByConfig": {
"upstream-hash-by-subset-size": 3
},
"noServer": false,
"trafficShapingPolicy": {
"weight": 0,
"header": "",
"headerValue": "",
"cookie": ""
}
},
对于跨namespace
"endpoints": [
{
"address": "flaskapp.inf.svc.cluster.local",
"port": "80"
}
],