nginx-ingress-controller
目录:
原理
启动的时候根据ingress资源生成nginx配置文件,upstream等调用127.0.0.1:10246/configuration接口同步数据,数据通过ngx.shared共享内存进行存储
通过watch的方式获取资源的变化
- 如果bankends和certs发生了变化,upstream等调用127.0.0.1:10246/configuration接口同步数据
- 如果非上边两种资源,重新生成nginx.conf,并进行reload
Ingress进行watch
controller代码
/internal/ingress/controller/nginx.go
注册的watch方法
/internal/ingress/controller/store/store.go
注册了
- ingEventHandler
- secrEventHandler
- epEventHandler
- cmEventHandler
- serviceHandler
对于不同的资源变化都会写入updataCh中,但是有的会有额外操作
- 对于不同的资源的不同方法,直接使用event类型不同,ConfigurationEvent类型会单独调用nginx的configuration接口同步
- 对于ingress的AddFunc和Update方法,secret的Update和Delete方法,会调用store.syncIngress(ing)(用于ingress的标签解析为配置的值)和store.syncSecrets(ing)(同步证书到本地目录共nginx使用)
启动服务的时候会消费updataCh
case event := <-n.updateCh.Out():
if n.isShuttingDown {
break
}
if evt, ok := event.(store.Event); ok {
klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj)
if evt.Type == store.ConfigurationEvent {
// TODO: is this necessary? Consider removing this special case
n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))
continue
}
n.syncQueue.EnqueueSkippableTask(evt.Obj)
} else {
klog.Warningf("Unexpected event type received %T", event)
}
EnqueueTask和EnqueueSkippableTask统一调用的enqueue方法
// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {
t.enqueue(obj, false)
}
// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {
t.enqueue(obj, true)
}
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {
if t.IsShuttingDown() {
klog.ErrorS(nil, "queue has been shutdown, failed to enqueue", "key", obj)
return
}
ts := time.Now().UnixNano()
if !skippable {
// make sure the timestamp is bigger than lastSync
ts = time.Now().Add(24 * time.Hour).UnixNano()
}
klog.V(3).InfoS("queuing", "item", obj)
key, err := t.fn(obj)
if err != nil {
klog.ErrorS(err, "creating object key", "item", obj)
return
}
t.queue.Add(Element{
Key: key,
Timestamp: ts,
})
}
- fn为internal/ingress/controller/controller.go的是使用cache.DeletionHandlingMetaNamespaceKeyFunc
- Add使用的kubernetes的workqueue.RateLimitingInterface限速队列
worker从限速队列获取,并执行
// worker processes work in the queue through sync.
func (t *Queue) worker() {
for {
key, quit := t.queue.Get()
if quit {
if !isClosed(t.workerDone) {
close(t.workerDone)
}
return
}
ts := time.Now().UnixNano()
item := key.(Element)
if t.lastSync > item.Timestamp {
klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp)
t.queue.Forget(key)
t.queue.Done(key)
continue
}
klog.V(3).InfoS("syncing", "key", item.Key)
if err := t.sync(key); err != nil {
klog.ErrorS(err, "requeuing", "key", item.Key)
t.queue.AddRateLimited(Element{
Key: item.Key,
Timestamp: time.Now().UnixNano(),
})
} else {
t.queue.Forget(key)
t.lastSync = ts
}
t.queue.Done(key)
}
}
sync为syncIngress
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
n.syncRateLimiter.Accept()
if n.syncQueue.IsShuttingDown() {
return nil
}
ings := n.store.ListIngresses()
hosts, servers, pcfg := n.getConfiguration(ings)
n.metricCollector.SetSSLExpireTime(servers)
if n.runningConfig.Equal(pcfg) {
klog.V(3).Infof("No configuration change detected, skipping backend reload")
return nil
}
n.metricCollector.SetHosts(hosts)
# 判断是否能动态加载
if !n.IsDynamicConfigurationEnough(pcfg) {
klog.InfoS("Configuration changes detected, backend reload required")
hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{
TagName: "json",
})
pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)
# 更新nginx配置并reload
err := n.OnUpdate(*pcfg)
if err != nil {
n.metricCollector.IncReloadErrorCount()
n.metricCollector.ConfigSuccess(hash, false)
klog.Errorf("Unexpected failure reloading the backend:\n%v", err)
n.recorder.Eventf(k8s.IngressNGINXPod, core.EventTypeWarning, "RELOAD", fmt.Sprintf("Error reloading NGINX: %v", err))
return err
}
klog.InfoS("Backend successfully reloaded")
n.metricCollector.ConfigSuccess(hash, true)
n.metricCollector.IncReloadCount()
n.recorder.Eventf(k8s.IngressNGINXPod, core.EventTypeNormal, "RELOAD", "NGINX reload triggered due to a change in configuration")
}
isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})
if isFirstSync {
// For the initial sync it always takes some time for NGINX to start listening
// For large configurations it might take a while so we loop and back off
klog.InfoS("Initial sync, sleeping for 1 second")
time.Sleep(1 * time.Second)
}
retry := wait.Backoff{
Steps: 15,
Duration: 1 * time.Second,
Factor: 0.8,
Jitter: 0.1,
}
# 动态加载
err := wait.ExponentialBackoff(retry, func() (bool, error) {
err := n.configureDynamically(pcfg)
if err == nil {
klog.V(2).Infof("Dynamic reconfiguration succeeded.")
return true, nil
}
klog.Warningf("Dynamic reconfiguration failed: %v", err)
return false, err
})
if err != nil {
klog.Errorf("Unexpected failure reconfiguring NGINX:\n%v", err)
return err
}
ri := getRemovedIngresses(n.runningConfig, pcfg)
re := getRemovedHosts(n.runningConfig, pcfg)
n.metricCollector.RemoveMetrics(ri, re)
n.runningConfig = pcfg
return nil
}
动态调整nginx配置
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {
backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)
if backendsChanged {
err := configureBackends(pcfg.Backends)
if err != nil {
return err
}
}
streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)
if streamConfigurationChanged {
err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)
if err != nil {
return err
}
}
serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)
if serversChanged {
err := configureCertificates(pcfg.Servers)
if err != nil {
return err
}
}
return nil
}
nginx端用于接收动态配置调整的接口
nginx配置
server {
listen 127.0.0.1:10246;
set $proxy_upstream_name "internal";
...
location /configuration {
client_max_body_size 21m;
client_body_buffer_size 21m;
proxy_buffering off;
content_by_lua_block {
configuration.call()
}
}
}
对应接收方法
function _M.call()
if ngx.var.request_method ~= "POST" and ngx.var.request_method ~= "GET" then
ngx.status = ngx.HTTP_BAD_REQUEST
ngx.print("Only POST and GET requests are allowed!")
return
end
if ngx.var.request_uri == "/configuration/servers" then
handle_servers()
return
end
if ngx.var.request_uri == "/configuration/general" then
handle_general()
return
end
if ngx.var.uri == "/configuration/certs" then
handle_certs()
return
end
if ngx.var.request_uri ~= "/configuration/backends" then
ngx.status = ngx.HTTP_NOT_FOUND
ngx.print("Not found!")
return
end
if ngx.var.request_method == "GET" then
ngx.status = ngx.HTTP_OK
ngx.print(_M.get_backends_data())
return
end
local backends = fetch_request_body()
if not backends then
ngx.log(ngx.ERR, "dynamic-configuration: unable to read valid request body")
ngx.status = ngx.HTTP_BAD_REQUEST
return
end
local success, err = configuration_data:set("backends", backends)
if not success then
ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(err))
ngx.status = ngx.HTTP_BAD_REQUEST
return
end
ngx.status = ngx.HTTP_CREATED
end
用于同步配置的代码
function _M.init_worker()
sync_backends() -- when worker starts, sync backends without delay
local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if err then
ngx.log(ngx.ERR, "error when setting up timer.every for sync_backends: ",
tostring(err))
end
end
local function sync_backends()
local backends_data = configuration.get_backends_data()
if not backends_data then
balancers = {}
return
end
local new_backends, err = cjson.decode(backends_data)
if not new_backends then
ngx.log(ngx.ERR, "could not parse backends data: ", err)
return
end
local balancers_to_keep = {}
for _, new_backend in ipairs(new_backends) do
sync_backend(new_backend)
balancers_to_keep[new_backend.name] = balancers[new_backend.name]
end
for backend_name, _ in pairs(balancers) do
if not balancers_to_keep[backend_name] then
balancers[backend_name] = nil
end
end
end
bankend数据
通过kubectl插件,安装方式参考官网
kubectl ingress-nginx backends -n inf
对于普通的service
{
"name": "default-flaskapp1-80",
"service": {
"metadata": {
"creationTimestamp": null
},
"spec": {
"ports": [
{
"name": "http",
"protocol": "TCP",
"port": 80,
"targetPort": 80
}
],
"selector": {
"app.kubernetes.io/name": "flaskapp"
},
"clusterIP": "110.193.132.145",
"type": "ClusterIP",
"sessionAffinity": "None"
},
"status": {
"loadBalancer": {}
}
},
"port": 80,
"sslPassthrough": false,
"endpoints": [
{
"address": "110.243.13.169",
"port": "80"
}
],
"sessionAffinityConfig": {
"name": "",
"mode": "",
"cookieSessionAffinity": {
"name": ""
}
},
"upstreamHashByConfig": {
"upstream-hash-by-subset-size": 3
},
"noServer": false,
"trafficShapingPolicy": {
"weight": 0,
"header": "",
"headerValue": "",
"cookie": ""
}
},
对于跨namespace,就必须要使用ExternalName
apiVersion: v1
kind: Service
metadata:
name: flaskapp
namespace: www
spec:
selector:
app.kubernetes.io/name: flaskapp
ports:
- name: http
port: 80
---
apiVersion: v1
kind: Service
metadata:
name: flaskapp2
spec:
ports:
- name: http
port: 80
type: ExternalName
externalName: flaskapp.www.svc.cluster.local
获取到的bankend为
"endpoints": [
{
"address": "flaskapp.www.svc.cluster.local",
"port": "80"
}
],
tips
修改nginx配置
支持三种方式
- configmap,全局配置
- ingress的annotations,作用于单个location
- emplate/nginx.tmpl配置模板
configmap配置会直接在http配置中,也有直接对全体server和location生效的,ingress的annotations配置后可以对location进行覆盖
分组监听
配置参考
# Ingress controller 1
apiVersion: extensions/v1beta1
kind: Deployment
spec:
template:
spec:
containers:
- args:
- /nginx-ingress-controller
- --ingress-class=class-1
- ...
# Ingress controller 2
apiVersion: extensions/v1beta1
kind: Deployment
spec:
template:
spec:
containers:
- args:
- /nginx-ingress-controller
- --ingress-class=class-2
- ...
# This Ingress resource will be managed by controller 1
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
annotations:
kubernetes.io/ingress.class: class-1
spec:
rules: ...
# This Ingress resource will be managed by controller 2
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
annotations:
kubernetes.io/ingress.class: class-2
spec:
rules: ...