traefik源码解读
目录:
入口文件
main.go
入口方法main()
# 获取配置结构 cmd/configuration.go
traefikConfiguration
# 获取后端配置 cmd/configuration.go 不同后端对应的配置
traefikPointersConfiguration
# 初始化命令结构 调用traefikConfiguration使用的https://github.com/containous/flaeg/blob/master/flaeg.go
traefikCmd
# 再获取配置结构
storeConfigCmd
# 初始化启动服务
f := flaeg.New(traefikCmd, os.Args[1:])
# 添加指令到f.customParsers
f.AddParser
# 添加对应命令的调用方法 f.commands = append(f.commands, command)
f.AddCommand
# 获取命令行参数
usedCmd, err := f.GetCommand()
# 合并配置文件
s := staert.NewStaert(traefikCmd)
# 初始化TOML格式源文件
toml := staert.NewTomlSource("traefik", []string{traefikConfiguration.ConfigFile, "/etc/traefik/", "$HOME/.traefik/", "."})
# 合并命令行参数,文件配置
s.LoadConfig()
# 启动服务command.run=runCmd
s.Run()
启动方法runCmd()
# 设置日志文件flag、日志级别、日志格式json、日志文件(没有创建)
configureLogging
# log打印读取配置toml文件
# 启动代理?
http.DefaultTransport.(*http.Transport).Proxy = http.ProxyFromEnvironment
# 设置轮序 err报Could not set roundrobin roundrobin是哪来的?
roundrobin.SetDefaultWeight(0)
# 设置有效配置
staticConfiguration.SetEffectiveConfiguration(configFile)
# 验证配置
staticConfiguration.ValidateConfiguration()
# log打印版本相关
# 配置转化为json err报不能加载
jsonConf, err := json.Marshal(staticConfiguration)
# 如果存在staticConfiguration.API.Dashboard,设置staticConfiguration.API.DashboardAssets
if staticConfiguration.API != nil && staticConfiguration.API.Dashboard { staticConfiguration.API.DashboardAssets=... }
# 还有检查新版本的操作,这么骚的吗
if staticConfiguration.Global.CheckNewVersion { checkNewVersion()}
# 未知
stats(staticConfiguration)
# 提供程序集合
providerAggregator := configuration.NewProviderAggregator(globalConfiguration)
# 初始化acmeProvider
acmeProvider = staticConfiguration.InitACMEProvider()
# 如果acmeProvider不为空则加入集合
if acmeProvider != nil { providerAggregator.AddProvider(acmeProvider) }
# 创建TCP代理入口
serverEntryPointsTCP := make(server.TCPEntryPoints)
# 创建后端代理点,没有的话会被SetEffectiveConfiguration初始化为一个80端口的endpoint
for entryPointName, config := range staticConfiguration.EntryPoints {
ctx := log.With(context.Background(), log.Str(log.EntryPointName, entryPointName))
serverEntryPointsTCP[entryPointName], err = server.NewTCPEntryPoint(ctx, config)
if err != nil {
return fmt.Errorf("error while building entryPoint %s: %v", entryPointName, err)
}
serverEntryPointsTCP[entryPointName].RouteAppenderFactory = router.NewRouteAppenderFactory(*staticConfiguration, entryPointName, acmeProvider)
}
# tls相关
tlsManager := traefiktls.NewManager()
# 如果acmeProvider不为空则为其设置tls,http,dns等
if acmeProvider != nil { acmeProvider.SetTLSManager(tlsManager) }
# 创建server
svr := server.NewServer(*staticConfiguration, providerAggregator, serverEntryPointsTCP, tlsManager)
#
if acmeProvider != nil && acmeProvider.OnHostRule {
acmeProvider.SetConfigListenerChan(make(chan config.Configuration))
svr.AddListener(acmeProvider.ListenConfiguration)
}
#
ctx := cmd.ContextWithSignal(context.Background())
#
if staticConfiguration.Ping != nil { staticConfiguration.Ping.WithContext(ctx) }
# 调用启动,StartWithContext调用Start
svr.StartWithContext(ctx)
# 保证进行关闭
defer svr.Close()
#
sent, err := daemon.SdNotify(false, "READY=1")
#
t, err := daemon.SdWatchdogEnabled(false)
#
safe.Go(for range tick { healthcheck.Do(*staticConfiguration) }
#
svr.Wait()
#
logrus.Exit(0)
服务详细逻辑
server/server.go
整体的启动和关闭流程 StartWithContext()
main.go的runCmd中调用了StartWithContext所以是执行了启动服务
func (s *Server) StartWithContext(ctx context.Context) {
go func() {
defer s.Close()
<-ctx.Done()
log.Info("I have to go...")
reqAcceptGraceTimeOut := time.Duration(s.globalConfiguration.LifeCycle.RequestAcceptGraceTimeout)
if reqAcceptGraceTimeOut > 0 {
log.Infof("Waiting %s for incoming requests to cease", reqAcceptGraceTimeOut)
time.Sleep(reqAcceptGraceTimeOut)
}
log.Info("Stopping server gracefully")
s.Stop()
}()
s.Start()
}
协程匿名函数停止服务,并在停止服务前根据配置进行等待,最后是调用的Start方式启动
启动服务 Start()
// Start starts the server.
func (s *Server) Start() {
s.startHTTPServers()
s.startLeadership()
s.routinesPool.Go(func(stop chan bool) {
s.listenProviders(stop)
})
s.routinesPool.Go(func(stop chan bool) {
s.listenConfigurations(stop)
})
s.startProvider()
s.routinesPool.Go(func(stop chan bool) {
s.listenSignals(stop)
})
}
可以看到就调用了几个方法
- startHTTPServers
- startLeadership
- listenProviders
- listenConfiguration
- startProvider
- listenSignals
启动HTTP监听 startHTTPServers()
func (s *Server) startHTTPServers() {
# 构造map
s.serverEntryPoints = s.buildServerEntryPoints()
for newServerEntryPointName, newServerEntryPoint := range s.serverEntryPoints {
serverEntryPoint := s.setupServerEntryPoint(newServerEntryPointName, newServerEntryPoint)
# 创建启动服务的协程
go s.startServer(serverEntryPoint)
}
}
整体流程
- 构建serverEntryPoints的map
- 为每个serverEntryPoint进行初始化
- 创建启动每个serverEntryPoint的协程
buildServerEntryPoints位于server/server_configuration.go
func (s *Server) buildServerEntryPoints() map[string]*serverEntryPoint {
serverEntryPoints := make(map[string]*serverEntryPoint)
for entryPointName, entryPoint := range s.entryPoints {
serverEntryPoints[entryPointName] = &serverEntryPoint{
# 路由中间件
httpRouter: middlewares.NewHandlerSwitcher(s.buildDefaultHTTPRouter()),
onDemandListener: entryPoint.OnDemandListener,
tlsALPNGetter: entryPoint.TLSALPNGetter,
}
...省略TLS部分
}
}
其实就是每个EntryPoints对应的监听端口和自己的NewHandlerSwitcher,EnterPoint就是每个对应的监听的端口了
setupServerEntryPoint位于server/server.go,用于初始化EntryPoint
func (s *Server) setupServerEntryPoint(newServerEntryPointName string, newServerEntryPoint *serverEntryPoint) *serverEntryPoint {
# 添加中间件对应的操作
serverMiddlewares, err := s.buildServerEntryPointMiddlewares(newServerEntryPointName, newServerEntryPoint)
if err != nil {
log.Fatal("Error preparing server: ", err)
}
# 使用negroni加载中间件
newSrv, listener, err := s.prepareServer(newServerEntryPointName, s.entryPoints[newServerEntryPointName].Configuration, newServerEntryPoint.httpRouter, serverMiddlewares)
if err != nil {
log.Fatal("Error preparing server: ", err)
}
# 监听端口
serverEntryPoint := s.serverEntryPoints[newServerEntryPointName]
serverEntryPoint.httpServer = newSrv
serverEntryPoint.listener = listener
# 为ServerEntryPoint分配ConnectionTracker
serverEntryPoint.hijackConnectionTracker = newHijackConnectionTracker()
# 为ServerEntryPoint定义增减Conn方法
serverEntryPoint.httpServer.ConnState = func(conn net.Conn, state http.ConnState) {
switch state {
case http.StateHijacked:
serverEntryPoint.hijackConnectionTracker.AddHijackedConnection(conn)
case http.StateClosed:
serverEntryPoint.hijackConnectionTracker.RemoveHijackedConnection(conn)
}
}
return serverEntryPoint
}
buildServerEntryPointMiddlewares位于server/server_middlewares.go,用于添加中间件trace,log,metrics,api, redirect(重定向),auth,compress(压缩),ipWhitelist
func (s *Server) buildServerEntryPointMiddlewares(serverEntryPointName string, serverEntryPoint *serverEntryPoint) ([]negroni.Handler, error) {
serverMiddlewares := []negroni.Handler{middlewares.NegroniRecoverHandler()}
if s.tracingMiddleware.IsEnabled() {
serverMiddlewares = append(serverMiddlewares, s.tracingMiddleware.NewEntryPoint(serverEntryPointName))
}
if s.accessLoggerMiddleware != nil {
serverMiddlewares = append(serverMiddlewares, s.accessLoggerMiddleware)
}
...
return serverMiddlewares, nil
}
prepareServer位于server/server.go,用于监听服务
func (s *Server) prepareServer(entryPointName string, entryPoint *configuration.EntryPoint, router *middlewares.HandlerSwitcher, middlewares []negroni.Handler) (*h2c.Server, net.Listener, error) {
# 超时时间
readTimeout, writeTimeout, idleTimeout := buildServerTimeouts(s.globalConfiguration)
log.Infof("Preparing server %s %+v with readTimeout=%s writeTimeout=%s idleTimeout=%s", entryPointName, entryPoint, readTimeout, writeTimeout, idleTimeout)
# 加载中间件 github.com/urfave/negroni
n := negroni.New()
for _, middleware := range middlewares {
n.Use(middleware)
}
# 加载路由,也就是s.buildDefaultHTTPRouter
n.UseHandler(router)
# 创建enterpoint名称的handler为配置好的negroni
internalMuxRouter := s.buildInternalRouter(entryPointName)
internalMuxRouter.NotFoundHandler = n
tlsConfig, err := s.createTLSConfig(entryPointName, entryPoint.TLS, router)
if err != nil {
return nil, nil, fmt.Errorf("error creating TLS config: %v", err)
}
# 判断端口是否被监听
listener, err := net.Listen("tcp", entryPoint.Address)
if err != nil {
return nil, nil, fmt.Errorf("error opening listener: %v", err)
}
# 监听端口
listener = tcpKeepAliveListener{listener.(*net.TCPListener)}
if entryPoint.ProxyProtocol != nil {
# 添加白名单等规则
listener, err = buildProxyProtocolListener(entryPoint, listener)
if err != nil {
return nil, nil, err
}
}
return &h2c.Server{
Server: &http.Server{
Addr: entryPoint.Address,
Handler: internalMuxRouter,
TLSConfig: tlsConfig,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
IdleTimeout: idleTimeout,
ErrorLog: httpServerLogger,
},
},
listener,
nil
}
startServer位于server/server.go,用于启动serverEntryPoint
对于没有配置tls的直接执行的serverEntryPoint.httpServer.Serve(serverEntryPoint.listener)
startLeadership()
用于集群的形式才有这个步骤
func (s *Server) startLeadership() {
if s.leadership != nil {
s.leadership.Participate(s.routinesPool)
}
}
Participate在cluster/leadership.go,此次不过多解析
listenProviders()
监听后端,就是用于获取backends和frontends的配置的变化
func (s *Server) listenProviders(stop chan bool) {
for {
select {
case <-stop:
return
case configMsg, ok := <-s.configurationChan:
if !ok {
return
}
if configMsg.Configuration != nil {
s.preLoadConfiguration(configMsg)
} else {
log.Debugf("Received nil configuration from provider %q, skipping.", configMsg.ProviderName)
}
}
}
}
preLoadConfiguration位于server/server_configuration.go
func (s *Server) preLoadConfiguration(configMsg types.ConfigMessage) {
# 获取providers变化的时间
providersThrottleDuration := time.Duration(s.globalConfiguration.ProvidersThrottleDuration)
s.defaultConfigurationValues(configMsg.Configuration)
currentConfigurations := s.currentConfigurations.Get().(types.Configurations)
if log.GetLevel() == logrus.DebugLevel {
jsonConf, _ := json.Marshal(configMsg.Configuration)
log.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf))
}
# 配置文件为空返回
if configMsg.Configuration == nil || configMsg.Configuration.Backends == nil && configMsg.Configuration.Frontends == nil && configMsg.Configuration.TLS == nil {
log.Infof("Skipping empty Configuration for provider %s", configMsg.ProviderName)
return
}
# 反射检测两者是否一样,如果一样则返回
if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
log.Infof("Skipping same configuration for provider %s", configMsg.ProviderName)
return
}
#
providerConfigUpdateCh, ok := s.providerConfigUpdateMap[configMsg.ProviderName]
# map为空,使用throttleProviderConfigReload建立一个configurationValidatedChan和providerConfigUpdateCh对应关系
if !ok {
providerConfigUpdateCh = make(chan types.ConfigMessage)
s.providerConfigUpdateMap[configMsg.ProviderName] = providerConfigUpdateCh
# routinesPool=safe.NewPool
s.routinesPool.Go(func(stop chan bool) {
s.throttleProviderConfigReload(providersThrottleDuration, s.configurationValidatedChan, providerConfigUpdateCh, stop)
})
}
providerConfigUpdateCh <- configMsg
}
s.routinesPool.Go的Go方法位于./safe/routine.go,就是异步的启动一个
// Go starts a recoverable goroutine, and can be stopped with stop chan
func (p *Pool) Go(goroutine func(stop chan bool)) {
p.lock.Lock()
newRoutine := routine{
goroutine: goroutine,
stop: make(chan bool, 1),
}
p.routines = append(p.routines, newRoutine)
p.waitGroup.Add(1)
Go(func() {
goroutine(newRoutine.stop)
p.waitGroup.Done()
})
p.lock.Unlock()
}
throttleProviderConfigReload位于server/server_configuration.go
// throttleProviderConfigReload throttles the configuration reload speed for a single provider.
// It will immediately publish a new configuration and then only publish the next configuration after the throttle duration.
// Note that in the case it receives N new configs in the timeframe of the throttle duration after publishing,
// it will publish the last of the newly received configurations.
func (s *Server) throttleProviderConfigReload(throttle time.Duration, publish chan<- types.ConfigMessage, in <-chan types.ConfigMessage, stop chan bool) {
ring := channels.NewRingChannel(1)
defer ring.Close()
s.routinesPool.Go(func(stop chan bool) {
for {
select {
case <-stop:
return
case nextConfig := <-ring.Out():
if config, ok := nextConfig.(types.ConfigMessage); ok {
publish <- config
time.Sleep(throttle)
}
}
}
})
for {
select {
case <-stop:
return
case nextConfig := <-in:
ring.In() <- nextConfig
}
}
}
listenConfiguration()
监听和配置解析
server/server-configuration.go
func (s *Server) listenConfigurations(stop chan bool) {
for {
select {
case <-stop:
return
case configMsg, ok := <-s.configurationValidatedChan:
if !ok || configMsg.Configuration == nil {
return
}
s.loadConfiguration(configMsg)
}
}
}
loadConfiguration也在server/server-configuration.go
如注释所说,loadConfiguration用于管理动态的frontends和backends
// loadConfiguration manages dynamically frontends, backends and TLS configurations
func (s *Server) loadConfiguration(configMsg types.ConfigMessage) {
currentConfigurations := s.currentConfigurations.Get().(types.Configurations)
# 拷贝出新的配置文件
// Copy configurations to new map so we don't change current if LoadConfig fails
newConfigurations := make(types.Configurations)
for k, v := range currentConfigurations {
newConfigurations[k] = v
}
# 可以看到是ProviderName,数据源来自Provider
newConfigurations[configMsg.ProviderName] = configMsg.Configuration
# 加载配置次数+1
s.metricsRegistry.ConfigReloadsCounter().Add(1)
# 加载的配置
newServerEntryPoints := s.loadConfig(newConfigurations, s.globalConfiguration)
s.metricsRegistry.LastConfigReloadSuccessGauge().Set(float64(time.Now().Unix()))
# 更新router
for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints {
s.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler())
if s.entryPoints[newServerEntryPointName].Configuration.TLS == nil {
if newServerEntryPoint.certs.ContainsCertificates() {
log.Debugf("Certificates not added to non-TLS entryPoint %s.", newServerEntryPointName)
}
} else {
s.serverEntryPoints[newServerEntryPointName].certs.DynamicCerts.Set(newServerEntryPoint.certs.DynamicCerts.Get())
s.serverEntryPoints[newServerEntryPointName].certs.ResetCache()
}
log.Infof("Server configuration reloaded on %s", s.serverEntryPoints[newServerEntryPointName].httpServer.Addr)
}
# 配置文件设置为新的配置
s.currentConfigurations.Set(newConfigurations)
for _, listener := range s.configurationListeners {
listener(*configMsg.Configuration)
}
# 推送加载的配置
s.postLoadConfiguration()
}
loadConfig也在server/server-configuration.go
loadConfig中重新配置了负载均衡和反向代理以及很多middleware的配置,返回了一个新的mux,mux库做路由转发,
// loadConfig returns a new gorilla.mux Route from the specified global configuration and the dynamic
// provider configurations.
func (s *Server) loadConfig(configurations types.Configurations, globalConfiguration configuration.GlobalConfiguration) map[string]*serverEntryPoint {
serverEntryPoints := s.buildServerEntryPoints()
backendsHandlers := map[string]http.Handler{}
backendsHealthCheck := map[string]*healthcheck.BackendConfig{}
var postConfigs []handlerPostConfig
# 可以看到providerName
for providerName, config := range configurations {
frontendNames := sortedFrontendNamesForConfig(config)
for _, frontendName := range frontendNames {
# 这里使用loadFrontendConfig
frontendPostConfigs, err := s.loadFrontendConfig(providerName, frontendName, config,
serverEntryPoints,
backendsHandlers, backendsHealthCheck)
if err != nil {
log.Errorf("%v. Skipping frontend %s...", err, frontendName)
}
if len(frontendPostConfigs) > 0 {
postConfigs = append(postConfigs, frontendPostConfigs...)
}
}
}
...
}
最后是通过 wireFrontendBackend 方法最终更新 http handler ,让配置生效
loadFrontendConfig也在server/server-configuration.go
func (s *Server) loadFrontendConfig(
providerName string, frontendName string, config *types.Configuration,
serverEntryPoints map[string]*serverEntryPoint,
backendsHandlers map[string]http.Handler, backendsHealthCheck map[string]*healthcheck.BackendConfig,
) ([]handlerPostConfig, error) {
frontend := config.Frontends[frontendName]
hostResolver := buildHostResolver(s.globalConfiguration)
if len(frontend.EntryPoints) == 0 {
return nil, fmt.Errorf("no entrypoint defined for frontend %s", frontendName)
}
backend := config.Backends[frontend.Backend]
if backend == nil {
return nil, fmt.Errorf("undefined backend '%s' for frontend %s", frontend.Backend, frontendName)
}
frontendHash, err := frontend.Hash()
if err != nil {
return nil, fmt.Errorf("error calculating hash value for frontend %s: %v", frontendName, err)
}
var postConfigs []handlerPostConfig
for _, entryPointName := range frontend.EntryPoints {
log.Debugf("Wiring frontend %s to entryPoint %s", frontendName, entryPointName)
entryPoint := s.entryPoints[entryPointName].Configuration
if backendsHandlers[entryPointName+providerName+frontendHash] == nil {
log.Debugf("Creating backend %s", frontend.Backend)
handlers, responseModifier, postConfig, err := s.buildMiddlewares(frontendName, frontend, config.Backends, entryPointName, entryPoint, providerName)
if err != nil {
return nil, err
}
if postConfig != nil {
postConfigs = append(postConfigs, postConfig)
}
fwd, err := s.buildForwarder(entryPointName, entryPoint, frontendName, frontend, responseModifier, backend)
if err != nil {
return nil, fmt.Errorf("failed to create the forwarder for frontend %s: %v", frontendName, err)
}
lb, healthCheckConfig, err := s.buildBalancerMiddlewares(frontendName, frontend, backend, fwd)
if err != nil {
return nil, err
}
// Handler used by error pages
if backendsHandlers[entryPointName+providerName+frontend.Backend] == nil {
backendsHandlers[entryPointName+providerName+frontend.Backend] = lb
}
if healthCheckConfig != nil {
backendsHealthCheck[entryPointName+providerName+frontendHash] = healthCheckConfig
}
n := negroni.New()
for _, handler := range handlers {
n.Use(handler)
}
n.UseHandler(lb)
backendsHandlers[entryPointName+providerName+frontendHash] = n
} else {
log.Debugf("Reusing backend %s [%s - %s - %s - %s]",
frontend.Backend, entryPointName, providerName, frontendName, frontendHash)
}
serverRoute, err := buildServerRoute(serverEntryPoints[entryPointName], frontendName, frontend, hostResolver)
if err != nil {
return nil, err
}
handler := buildMatcherMiddlewares(serverRoute, backendsHandlers[entryPointName+providerName+frontendHash])
serverRoute.Route.Handler(handler)
err = serverRoute.Route.GetError()
if err != nil {
// FIXME error management
log.Errorf("Error building route: %s", err)
}
}
return postConfigs, nil
postLoadConfiguration也是在server/server-configuration.go,对于集群才有这个配置
func (s *Server) postLoadConfiguration() {
if s.metricsRegistry.IsEnabled() {
activeConfig := s.currentConfigurations.Get().(types.Configurations)
metrics.OnConfigurationUpdate(activeConfig)
}
if s.globalConfiguration.ACME == nil || s.leadership == nil || !s.leadership.IsLeader() {
return
}
...
}
startProvider()
反向代理、负载均衡动态更新模块
func (s *Server) startProvider() {
// start providers
jsonConf, err := json.Marshal(s.provider)
if err != nil {
log.Debugf("Unable to marshal provider conf %T with error: %v", s.provider, err)
}
log.Infof("Starting provider %T %s", s.provider, jsonConf)
currentProvider := s.provider
safe.Go(func() {
err := currentProvider.Provide(s.configurationChan, s.routinesPool)
if err != nil {
log.Errorf("Error starting provider %T: %s", s.provider, err)
}
})
}
启动了协程,读取Provider的配置信息,并开始监听Provider,server.provider属性的Provide方法
./configuration/provider_aggregator.go
// Provide call the provide method of every providers
func (p ProviderAggregator) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
for _, p := range p.providers {
jsonConf, err := json.Marshal(p)
if err != nil {
log.Debugf("Unable to marshal provider conf %T with error: %v", p, err)
}
log.Infof("Starting provider %T %s", p, jsonConf)
currentProvider := p
safe.Go(func() {
err := currentProvider.Provide(configurationChan, pool)
if err != nil {
log.Errorf("Error starting provider %T: %v", p, err)
}
})
}
return nil
}
listenSignals()
看起来只对SIGUSR1做了处理
func (s *Server) listenSignals(stop chan bool) {
for {
select {
case <-stop:
return
case sig := <-s.signals:
switch sig {
case syscall.SIGUSR1:
log.Infof("Closing and re-opening log files for rotation: %+v", sig)
if s.accessLoggerMiddleware != nil {
if err := s.accessLoggerMiddleware.Rotate(); err != nil {
log.Errorf("Error rotating access log: %v", err)
}
}
if err := log.RotateFile(); err != nil {
log.Errorf("Error rotating traefik log: %v", err)
}
}
}
}
}