flannel源码
目录:
flannel源码
main方法
main.go,主要是
- 确定网卡,如果没有指定,根据默认路由
- 创建子网管理类型,存储到本地文件,并与etcd建立连接
- 注册信号
- 创建网卡并激活,vxlan是flannel.VNI,udp为flannel.数字
- 创建backend
- 运行backend
- 进行续约
func main() {
// 打印版本
if opts.version {
fmt.Fprintln(os.Stderr, version.Version)
os.Exit(0)
}
flagutil.SetFlagsFromEnv(flannelFlags, "FLANNELD")
// Validate flags
// 子网续约
if opts.subnetLeaseRenewMargin >= 24*60 || opts.subnetLeaseRenewMargin <= 0 {
log.Error("Invalid subnet-lease-renew-margin option, out of acceptable range")
os.Exit(1)
}
// Work out which interface to use
// 查找网卡
var extIface *backend.ExternalInterface
var err error
// Check the default interface only if no interfaces are specified
if len(opts.iface) == 0 && len(opts.ifaceRegex) == 0 {
extIface, err = LookupExtIface(opts.publicIP, "")
if err != nil {
log.Error("Failed to find any valid interface to use: ", err)
os.Exit(1)
}
} else {
// Check explicitly specified interfaces
for _, iface := range opts.iface {
extIface, err = LookupExtIface(iface, "")
if err != nil {
log.Infof("Could not find valid interface matching %s: %s", iface, err)
}
if extIface != nil {
break
}
}
// Check interfaces that match any specified regexes
if extIface == nil {
for _, ifaceRegex := range opts.ifaceRegex {
extIface, err = LookupExtIface("", ifaceRegex)
if err != nil {
log.Infof("Could not find valid interface matching %s: %s", ifaceRegex, err)
}
if extIface != nil {
break
}
}
}
if extIface == nil {
// Exit if any of the specified interfaces do not match
log.Error("Failed to find interface to use that matches the interfaces and/or regexes provided")
os.Exit(1)
}
}
// This is the main context that everything should run in.
// All spawned goroutines should exit when cancel is called on this context.
// Go routines spawned from main.go coordinate using a WaitGroup. This provides a mechanism to allow the shutdownHandler goroutine
// to block until all the goroutines return . If those goroutines spawn other goroutines then they are responsible for
// blocking and returning only when cancel() is called.
ctx, cancel := context.WithCancel(context.Background())
// 创建子网管理类型
sm, err := newSubnetManager(ctx)
if err != nil {
log.Error("Failed to create SubnetManager: ", err)
os.Exit(1)
}
log.Infof("Created subnet manager: %s", sm.Name())
// Register for SIGINT and SIGTERM
log.Info("Installing signal handlers")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
shutdownHandler(ctx, sigs, cancel)
wg.Done()
}()
if opts.healthzPort > 0 {
// It's not super easy to shutdown the HTTP server so don't attempt to stop it cleanly
go mustRunHealthz()
}
// Fetch the network config (i.e. what backend to use etc..).
config, err := getConfig(ctx, sm)
if err == errCanceled {
wg.Wait()
os.Exit(0)
}
// Create a backend manager then use it to create the backend and register the network with it.
bm := backend.NewManager(ctx, sm, extIface)
be, err := bm.GetBackend(config.BackendType)
if err != nil {
log.Errorf("Error fetching backend: %s", err)
cancel()
wg.Wait()
os.Exit(1)
}
bn, err := be.RegisterNetwork(ctx, &wg, config)
if err != nil {
log.Errorf("Error registering network: %s", err)
cancel()
wg.Wait()
os.Exit(1)
}
// Set up ipMasq if needed
if opts.ipMasq {
if err = recycleIPTables(config.Network, bn.Lease()); err != nil {
log.Errorf("Failed to recycle IPTables rules, %v", err)
cancel()
wg.Wait()
os.Exit(1)
}
log.Infof("Setting up masking rules")
// 设置防火墙策略
go network.SetupAndEnsureIPTables(network.MasqRules(config.Network, bn.Lease()), opts.iptablesResyncSeconds)
}
// Always enables forwarding rules. This is needed for Docker versions >1.13 (https://docs.docker.com/engine/userguide/networking/default_network/container-communication/#container-communication-between-hosts)
// In Docker 1.12 and earlier, the default FORWARD chain policy was ACCEPT.
// In Docker 1.13 and later, Docker sets the default policy of the FORWARD chain to DROP.
if opts.iptablesForwardRules {
log.Infof("Changing default FORWARD chain policy to ACCEPT")
go network.SetupAndEnsureIPTables(network.ForwardRules(config.Network.String()), opts.iptablesResyncSeconds)
}
if err := WriteSubnetFile(opts.subnetFile, config.Network, opts.ipMasq, bn); err != nil {
// Continue, even though it failed.
log.Warningf("Failed to write subnet file: %s", err)
} else {
log.Infof("Wrote subnet file to %s", opts.subnetFile)
}
// Start "Running" the backend network. This will block until the context is done so run in another goroutine.
log.Info("Running backend.")
wg.Add(1)
// 启动backend
go func() {
bn.Run(ctx)
wg.Done()
}()
daemon.SdNotify(false, "READY=1")
// Kube subnet mgr doesn't lease the subnet for this node - it just uses the podCidr that's already assigned.
if !opts.kubeSubnetMgr {
// 协程续订租约
err = MonitorLease(ctx, sm, bn, &wg)
if err == errInterrupted {
// The lease was "revoked" - shut everything down
cancel()
}
}
log.Info("Waiting for all goroutines to exit")
// Block waiting for all the goroutines to finish.
wg.Wait()
log.Info("Exiting cleanly...")
os.Exit(0)
}
创建backend
backend/manager.go
func NewManager(ctx context.Context, sm subnet.Manager, extIface *ExternalInterface) Manager {
return &manager{
ctx: ctx,
sm: sm,
extIface: extIface,
active: make(map[string]Backend),
}
}
获取对创建backend的方法
func (bm *manager) GetBackend(backendType string) (Backend, error) {
bm.mux.Lock()
defer bm.mux.Unlock()
betype := strings.ToLower(backendType)
// see if one is already running
if be, ok := bm.active[betype]; ok {
return be, nil
}
// first request, need to create and run it
befunc, ok := constructors[betype]
if !ok {
return nil, fmt.Errorf("unknown backend type: %v", betype)
}
be, err := befunc(bm.sm, bm.extIface)
if err != nil {
return nil, err
}
bm.active[betype] = be
bm.wg.Add(1)
go func() {
<-bm.ctx.Done()
// TODO(eyakubovich): this obviosly introduces a race.
// GetBackend() could get called while we are here.
// Currently though, all backends' Run exit only
// on shutdown
bm.mux.Lock()
delete(bm.active, betype)
bm.mux.Unlock()
bm.wg.Done()
}()
return be, nil
}
befunc(bm.sm, bm.extIface)指向的是对应类型的New方法
backend/vxlan/vxlan.go
func New(sm subnet.Manager, extIface *backend.ExternalInterface) (backend.Backend, error) {
backend := &VXLANBackend{
subnetMgr: sm,
extIface: extIface,
}
return backend, nil
}
注册网络
func (be *VXLANBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, config *subnet.Config) (backend.Network, error) {
// Parse our configuration
cfg := struct {
VNI int
Port int
GBP bool
Learning bool
DirectRouting bool
}{
VNI: defaultVNI,
}
if len(config.Backend) > 0 {
if err := json.Unmarshal(config.Backend, &cfg); err != nil {
return nil, fmt.Errorf("error decoding VXLAN backend config: %v", err)
}
}
log.Infof("VXLAN config: VNI=%d Port=%d GBP=%v Learning=%v DirectRouting=%v", cfg.VNI, cfg.Port, cfg.GBP, cfg.Learning, cfg.DirectRouting)
devAttrs := vxlanDeviceAttrs{
vni: uint32(cfg.VNI),
name: fmt.Sprintf("flannel.%v", cfg.VNI),
vtepIndex: be.extIface.Iface.Index,
vtepAddr: be.extIface.IfaceAddr,
vtepPort: cfg.Port,
gbp: cfg.GBP,
learning: cfg.Learning,
}
// 创建VXLAN设备
dev, err := newVXLANDevice(&devAttrs)
if err != nil {
return nil, err
}
dev.directRouting = cfg.DirectRouting
// 创建子网属性
subnetAttrs, err := newSubnetAttrs(be.extIface.ExtAddr, uint16(cfg.VNI), dev.MACAddr())
if err != nil {
return nil, err
}
// 获取租约
lease, err := be.subnetMgr.AcquireLease(ctx, subnetAttrs)
switch err {
case nil:
case context.Canceled, context.DeadlineExceeded:
return nil, err
default:
return nil, fmt.Errorf("failed to acquire lease: %v", err)
}
// Ensure that the device has a /32 address so that no broadcast routes are created.
// This IP is just used as a source address for host to workload traffic (so
// the return path for the traffic has an address on the flannel network to use as the destination)
if err := dev.Configure(ip.IP4Net{IP: lease.Subnet.IP, PrefixLen: 32}); err != nil {
return nil, fmt.Errorf("failed to configure interface %s: %s", dev.link.Attrs().Name, err)
}
return newNetwork(be.subnetMgr, be.extIface, dev, ip.IP4Net{}, lease)
}
newVXLANDevice调用的ensureLink方法创建的网卡
func ensureLink(vxlan *netlink.Vxlan) (*netlink.Vxlan, error) {
err := netlink.LinkAdd(vxlan)
if err == syscall.EEXIST {
// it's ok if the device already exists as long as config is similar
log.V(1).Infof("VXLAN device already exists")
// 获取已有vxlan设备信息
existing, err := netlink.LinkByName(vxlan.Name)
if err != nil {
return nil, err
}
// 比较新旧网卡信息
incompat := vxlanLinksIncompat(vxlan, existing)
if incompat == "" {
log.V(1).Infof("Returning existing device")
return existing.(*netlink.Vxlan), nil
}
// delete existing
// 不相同则删除
log.Warningf("%q already exists with incompatable configuration: %v; recreating device", vxlan.Name, incompat)
if err = netlink.LinkDel(existing); err != nil {
return nil, fmt.Errorf("failed to delete interface: %v", err)
}
// create new
// 创建新的vxlan设备
if err = netlink.LinkAdd(vxlan); err != nil {
return nil, fmt.Errorf("failed to create vxlan interface: %v", err)
}
} else if err != nil {
return nil, err
}
ifindex := vxlan.Index
link, err := netlink.LinkByIndex(vxlan.Index)
if err != nil {
return nil, fmt.Errorf("can't locate created vxlan device with index %v", ifindex)
}
var ok bool
if vxlan, ok = link.(*netlink.Vxlan); !ok {
return nil, fmt.Errorf("created vxlan device with index %v is not vxlan", ifindex)
}
return vxlan, nil
}
创建网卡通过的netlink.LinkAdd进行创建,如果返回syscall.EEXIST会再次创建,其他错误会退出
租约部分AcquireLease略过
监听子网变化
在main.go中bn.Run(ctx)
func (nw *network) Run(ctx context.Context) {
wg := sync.WaitGroup{}
log.V(0).Info("watching for new subnet leases")
events := make(chan []subnet.Event)
wg.Add(1)
// 监控租约
go func() {
subnet.WatchLeases(ctx, nw.subnetMgr, nw.SubnetLease, events)
log.V(1).Info("WatchLeases exited")
wg.Done()
}()
defer wg.Wait()
// 调用handleSubnetEvents处理事件
for {
select {
case evtBatch := <-events:
nw.handleSubnetEvents(evtBatch)
case <-ctx.Done():
return
}
}
}
WatchLeases将etcd中租约的数据变化处理生成Event,由handleSubnetEvents处理事件。main方法的MonitorLease(ctx, sm, bn, &wg)是监控自己的
func WatchLeases(ctx context.Context, sm Manager, ownLease *Lease, receiver chan []Event) {
lw := &leaseWatcher{
ownLease: ownLease,
}
var cursor interface{}
for {
res, err := sm.WatchLeases(ctx, cursor)
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
return
}
log.Errorf("Watch subnets: %v", err)
time.Sleep(time.Second)
continue
}
cursor = res.Cursor
var batch []Event
if len(res.Events) > 0 {
batch = lw.update(res.Events)
} else {
batch = lw.reset(res.Snapshot)
}
if len(batch) > 0 {
receiver <- batch
}
}
}
backend/vxlan/vxlan_network.go的handleSubnetEvents,对event同步到flannel组件,如果是directRouting,就直接修改路由,如果是普通的vxlan还需要将对端mac地址同步到flannel网卡
func (nw *network) handleSubnetEvents(batch []subnet.Event) {
for _, event := range batch {
sn := event.Lease.Subnet
attrs := event.Lease.Attrs
if attrs.BackendType != "vxlan" {
log.Warningf("ignoring non-vxlan subnet(%s): type=%v", sn, attrs.BackendType)
continue
}
// 解析json格式化
var vxlanAttrs vxlanLeaseAttrs
if err := json.Unmarshal(attrs.BackendData, &vxlanAttrs); err != nil {
log.Error("error decoding subnet lease JSON: ", err)
continue
}
// This route is used when traffic should be vxlan encapsulated
vxlanRoute := netlink.Route{
LinkIndex: nw.dev.link.Attrs().Index,
Scope: netlink.SCOPE_UNIVERSE,
Dst: sn.ToIPNet(),
Gw: sn.IP.ToIP(),
}
vxlanRoute.SetFlag(syscall.RTNH_F_ONLINK)
// directRouting is where the remote host is on the same subnet so vxlan isn't required.
directRoute := netlink.Route{
Dst: sn.ToIPNet(),
Gw: attrs.PublicIP.ToIP(),
}
var directRoutingOK = false
if nw.dev.directRouting {
if dr, err := ip.DirectRouting(attrs.PublicIP.ToIP()); err != nil {
log.Error(err)
} else {
directRoutingOK = dr
}
}
switch event.Type {
case subnet.EventAdded:
if directRoutingOK {
log.V(2).Infof("Adding direct route to subnet: %s PublicIP: %s", sn, attrs.PublicIP)
if err := netlink.RouteReplace(&directRoute); err != nil {
log.Errorf("Error adding route to %v via %v: %v", sn, attrs.PublicIP, err)
continue
}
} else {
log.V(2).Infof("adding subnet: %s PublicIP: %s VtepMAC: %s", sn, attrs.PublicIP, net.HardwareAddr(vxlanAttrs.VtepMAC))
if err := nw.dev.AddARP(neighbor{IP: sn.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("AddARP failed: ", err)
continue
}
if err := nw.dev.AddFDB(neighbor{IP: attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("AddFDB failed: ", err)
// Try to clean up the ARP entry then continue
if err := nw.dev.DelARP(neighbor{IP: event.Lease.Subnet.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("DelARP failed: ", err)
}
continue
}
// Set the route - the kernel would ARP for the Gw IP address if it hadn't already been set above so make sure
// this is done last.
// 更新路由表项
if err := netlink.RouteReplace(&vxlanRoute); err != nil {
log.Errorf("failed to add vxlanRoute (%s -> %s): %v", vxlanRoute.Dst, vxlanRoute.Gw, err)
// Try to clean up both the ARP and FDB entries then continue
if err := nw.dev.DelARP(neighbor{IP: event.Lease.Subnet.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("DelARP failed: ", err)
}
if err := nw.dev.DelFDB(neighbor{IP: event.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("DelFDB failed: ", err)
}
continue
}
}
case subnet.EventRemoved:
if directRoutingOK {
log.V(2).Infof("Removing direct route to subnet: %s PublicIP: %s", sn, attrs.PublicIP)
if err := netlink.RouteDel(&directRoute); err != nil {
log.Errorf("Error deleting route to %v via %v: %v", sn, attrs.PublicIP, err)
}
} else {
log.V(2).Infof("removing subnet: %s PublicIP: %s VtepMAC: %s", sn, attrs.PublicIP, net.HardwareAddr(vxlanAttrs.VtepMAC))
// Try to remove all entries - don't bail out if one of them fails.
if err := nw.dev.DelARP(neighbor{IP: sn.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("DelARP failed: ", err)
}
if err := nw.dev.DelFDB(neighbor{IP: attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
log.Error("DelFDB failed: ", err)
}
if err := netlink.RouteDel(&vxlanRoute); err != nil {
log.Errorf("failed to delete vxlanRoute (%s -> %s): %v", vxlanRoute.Dst, vxlanRoute.Gw, err)
}
}
default:
log.Error("internal error: unknown event type: ", int(event.Type))
}
}
}