flannel源码

时间:Dec. 24, 2020 分类:

目录:

flannel源码

main方法

main.go,主要是

  1. 确定网卡,如果没有指定,根据默认路由
  2. 创建子网管理类型,存储到本地文件,并与etcd建立连接
  3. 注册信号
  4. 创建网卡并激活,vxlan是flannel.VNI,udp为flannel.数字
  5. 创建backend
  6. 运行backend
  7. 进行续约
func main() {
    // 打印版本
    if opts.version {
        fmt.Fprintln(os.Stderr, version.Version)
        os.Exit(0)
    }

    flagutil.SetFlagsFromEnv(flannelFlags, "FLANNELD")

    // Validate flags
    // 子网续约
    if opts.subnetLeaseRenewMargin >= 24*60 || opts.subnetLeaseRenewMargin <= 0 {
        log.Error("Invalid subnet-lease-renew-margin option, out of acceptable range")
        os.Exit(1)
    }

    // Work out which interface to use
    // 查找网卡
    var extIface *backend.ExternalInterface
    var err error
    // Check the default interface only if no interfaces are specified
    if len(opts.iface) == 0 && len(opts.ifaceRegex) == 0 {
        extIface, err = LookupExtIface(opts.publicIP, "")
        if err != nil {
            log.Error("Failed to find any valid interface to use: ", err)
            os.Exit(1)
        }
    } else {
        // Check explicitly specified interfaces
        for _, iface := range opts.iface {
            extIface, err = LookupExtIface(iface, "")
            if err != nil {
                log.Infof("Could not find valid interface matching %s: %s", iface, err)
            }

            if extIface != nil {
                break
            }
        }

        // Check interfaces that match any specified regexes
        if extIface == nil {
            for _, ifaceRegex := range opts.ifaceRegex {
                extIface, err = LookupExtIface("", ifaceRegex)
                if err != nil {
                    log.Infof("Could not find valid interface matching %s: %s", ifaceRegex, err)
                }

                if extIface != nil {
                    break
                }
            }
        }

        if extIface == nil {
            // Exit if any of the specified interfaces do not match
            log.Error("Failed to find interface to use that matches the interfaces and/or regexes provided")
            os.Exit(1)
        }
    }

    // This is the main context that everything should run in.
    // All spawned goroutines should exit when cancel is called on this context.
    // Go routines spawned from main.go coordinate using a WaitGroup. This provides a mechanism to allow the shutdownHandler goroutine
    // to block until all the goroutines return . If those goroutines spawn other goroutines then they are responsible for
    // blocking and returning only when cancel() is called.
    ctx, cancel := context.WithCancel(context.Background())

    // 创建子网管理类型
    sm, err := newSubnetManager(ctx)
    if err != nil {
        log.Error("Failed to create SubnetManager: ", err)
        os.Exit(1)
    }
    log.Infof("Created subnet manager: %s", sm.Name())

    // Register for SIGINT and SIGTERM
    log.Info("Installing signal handlers")
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)

    wg := sync.WaitGroup{}

    wg.Add(1)
    go func() {
        shutdownHandler(ctx, sigs, cancel)
        wg.Done()
    }()

    if opts.healthzPort > 0 {
        // It's not super easy to shutdown the HTTP server so don't attempt to stop it cleanly
        go mustRunHealthz()
    }

    // Fetch the network config (i.e. what backend to use etc..).
    config, err := getConfig(ctx, sm)
    if err == errCanceled {
        wg.Wait()
        os.Exit(0)
    }

    // Create a backend manager then use it to create the backend and register the network with it.
    bm := backend.NewManager(ctx, sm, extIface)
    be, err := bm.GetBackend(config.BackendType)
    if err != nil {
        log.Errorf("Error fetching backend: %s", err)
        cancel()
        wg.Wait()
        os.Exit(1)
    }

    bn, err := be.RegisterNetwork(ctx, &wg, config)
    if err != nil {
        log.Errorf("Error registering network: %s", err)
        cancel()
        wg.Wait()
        os.Exit(1)
    }

    // Set up ipMasq if needed
    if opts.ipMasq {
        if err = recycleIPTables(config.Network, bn.Lease()); err != nil {
            log.Errorf("Failed to recycle IPTables rules, %v", err)
            cancel()
            wg.Wait()
            os.Exit(1)
        }
        log.Infof("Setting up masking rules")
        // 设置防火墙策略
        go network.SetupAndEnsureIPTables(network.MasqRules(config.Network, bn.Lease()), opts.iptablesResyncSeconds)
    }

    // Always enables forwarding rules. This is needed for Docker versions >1.13 (https://docs.docker.com/engine/userguide/networking/default_network/container-communication/#container-communication-between-hosts)
    // In Docker 1.12 and earlier, the default FORWARD chain policy was ACCEPT.
    // In Docker 1.13 and later, Docker sets the default policy of the FORWARD chain to DROP.
    if opts.iptablesForwardRules {
        log.Infof("Changing default FORWARD chain policy to ACCEPT")
        go network.SetupAndEnsureIPTables(network.ForwardRules(config.Network.String()), opts.iptablesResyncSeconds)
    }

    if err := WriteSubnetFile(opts.subnetFile, config.Network, opts.ipMasq, bn); err != nil {
        // Continue, even though it failed.
        log.Warningf("Failed to write subnet file: %s", err)
    } else {
        log.Infof("Wrote subnet file to %s", opts.subnetFile)
    }

    // Start "Running" the backend network. This will block until the context is done so run in another goroutine.
    log.Info("Running backend.")
    wg.Add(1)
    // 启动backend
    go func() {
        bn.Run(ctx)
        wg.Done()
    }()

    daemon.SdNotify(false, "READY=1")

    // Kube subnet mgr doesn't lease the subnet for this node - it just uses the podCidr that's already assigned.
    if !opts.kubeSubnetMgr {
        // 协程续订租约
        err = MonitorLease(ctx, sm, bn, &wg)
        if err == errInterrupted {
            // The lease was "revoked" - shut everything down
            cancel()
        }
    }

    log.Info("Waiting for all goroutines to exit")
    // Block waiting for all the goroutines to finish.
    wg.Wait()
    log.Info("Exiting cleanly...")
    os.Exit(0)
}

创建backend

backend/manager.go

func NewManager(ctx context.Context, sm subnet.Manager, extIface *ExternalInterface) Manager {
    return &manager{
        ctx:      ctx,
        sm:       sm,
        extIface: extIface,
        active:   make(map[string]Backend),
    }
}

获取对创建backend的方法

func (bm *manager) GetBackend(backendType string) (Backend, error) {
    bm.mux.Lock()
    defer bm.mux.Unlock()

    betype := strings.ToLower(backendType)
    // see if one is already running
    if be, ok := bm.active[betype]; ok {
        return be, nil
    }

    // first request, need to create and run it
    befunc, ok := constructors[betype]
    if !ok {
        return nil, fmt.Errorf("unknown backend type: %v", betype)
    }

    be, err := befunc(bm.sm, bm.extIface)
    if err != nil {
        return nil, err
    }
    bm.active[betype] = be

    bm.wg.Add(1)
    go func() {
        <-bm.ctx.Done()

        // TODO(eyakubovich): this obviosly introduces a race.
        // GetBackend() could get called while we are here.
        // Currently though, all backends' Run exit only
        // on shutdown

        bm.mux.Lock()
        delete(bm.active, betype)
        bm.mux.Unlock()

        bm.wg.Done()
    }()

    return be, nil
}

befunc(bm.sm, bm.extIface)指向的是对应类型的New方法

backend/vxlan/vxlan.go

func New(sm subnet.Manager, extIface *backend.ExternalInterface) (backend.Backend, error) {
    backend := &VXLANBackend{
        subnetMgr: sm,
        extIface:  extIface,
    }

    return backend, nil
}

注册网络

func (be *VXLANBackend) RegisterNetwork(ctx context.Context, wg *sync.WaitGroup, config *subnet.Config) (backend.Network, error) {
    // Parse our configuration
    cfg := struct {
        VNI           int
        Port          int
        GBP           bool
        Learning      bool
        DirectRouting bool
    }{
        VNI: defaultVNI,
    }

    if len(config.Backend) > 0 {
        if err := json.Unmarshal(config.Backend, &cfg); err != nil {
            return nil, fmt.Errorf("error decoding VXLAN backend config: %v", err)
        }
    }
    log.Infof("VXLAN config: VNI=%d Port=%d GBP=%v Learning=%v DirectRouting=%v", cfg.VNI, cfg.Port, cfg.GBP, cfg.Learning, cfg.DirectRouting)

    devAttrs := vxlanDeviceAttrs{
        vni:       uint32(cfg.VNI),
        name:      fmt.Sprintf("flannel.%v", cfg.VNI),
        vtepIndex: be.extIface.Iface.Index,
        vtepAddr:  be.extIface.IfaceAddr,
        vtepPort:  cfg.Port,
        gbp:       cfg.GBP,
        learning:  cfg.Learning,
    }
    // 创建VXLAN设备
    dev, err := newVXLANDevice(&devAttrs)
    if err != nil {
        return nil, err
    }
    dev.directRouting = cfg.DirectRouting

    // 创建子网属性
    subnetAttrs, err := newSubnetAttrs(be.extIface.ExtAddr, uint16(cfg.VNI), dev.MACAddr())
    if err != nil {
        return nil, err
    }

    // 获取租约
    lease, err := be.subnetMgr.AcquireLease(ctx, subnetAttrs)
    switch err {
    case nil:
    case context.Canceled, context.DeadlineExceeded:
        return nil, err
    default:
        return nil, fmt.Errorf("failed to acquire lease: %v", err)
    }

    // Ensure that the device has a /32 address so that no broadcast routes are created.
    // This IP is just used as a source address for host to workload traffic (so
    // the return path for the traffic has an address on the flannel network to use as the destination)
    if err := dev.Configure(ip.IP4Net{IP: lease.Subnet.IP, PrefixLen: 32}); err != nil {
        return nil, fmt.Errorf("failed to configure interface %s: %s", dev.link.Attrs().Name, err)
    }

    return newNetwork(be.subnetMgr, be.extIface, dev, ip.IP4Net{}, lease)
}

newVXLANDevice调用的ensureLink方法创建的网卡

func ensureLink(vxlan *netlink.Vxlan) (*netlink.Vxlan, error) {
    err := netlink.LinkAdd(vxlan)
    if err == syscall.EEXIST {
        // it's ok if the device already exists as long as config is similar
        log.V(1).Infof("VXLAN device already exists")
        // 获取已有vxlan设备信息
        existing, err := netlink.LinkByName(vxlan.Name)
        if err != nil {
            return nil, err
        }
        // 比较新旧网卡信息
        incompat := vxlanLinksIncompat(vxlan, existing)
        if incompat == "" {
            log.V(1).Infof("Returning existing device")
            return existing.(*netlink.Vxlan), nil
        }

        // delete existing
        // 不相同则删除
        log.Warningf("%q already exists with incompatable configuration: %v; recreating device", vxlan.Name, incompat)
        if err = netlink.LinkDel(existing); err != nil {
            return nil, fmt.Errorf("failed to delete interface: %v", err)
        }

        // create new
        // 创建新的vxlan设备
        if err = netlink.LinkAdd(vxlan); err != nil {
            return nil, fmt.Errorf("failed to create vxlan interface: %v", err)
        }
    } else if err != nil {
        return nil, err
    }

    ifindex := vxlan.Index
    link, err := netlink.LinkByIndex(vxlan.Index)
    if err != nil {
        return nil, fmt.Errorf("can't locate created vxlan device with index %v", ifindex)
    }

    var ok bool
    if vxlan, ok = link.(*netlink.Vxlan); !ok {
        return nil, fmt.Errorf("created vxlan device with index %v is not vxlan", ifindex)
    }

    return vxlan, nil
}

创建网卡通过的netlink.LinkAdd进行创建,如果返回syscall.EEXIST会再次创建,其他错误会退出

租约部分AcquireLease略过

监听子网变化

在main.go中bn.Run(ctx)

func (nw *network) Run(ctx context.Context) {
    wg := sync.WaitGroup{}

    log.V(0).Info("watching for new subnet leases")
    events := make(chan []subnet.Event)
    wg.Add(1)
    // 监控租约
    go func() {
        subnet.WatchLeases(ctx, nw.subnetMgr, nw.SubnetLease, events)
        log.V(1).Info("WatchLeases exited")
        wg.Done()
    }()

    defer wg.Wait()
    // 调用handleSubnetEvents处理事件
    for {
        select {
        case evtBatch := <-events:
            nw.handleSubnetEvents(evtBatch)

        case <-ctx.Done():
            return
        }
    }
}

WatchLeases将etcd中租约的数据变化处理生成Event,由handleSubnetEvents处理事件。main方法的MonitorLease(ctx, sm, bn, &wg)是监控自己的

func WatchLeases(ctx context.Context, sm Manager, ownLease *Lease, receiver chan []Event) {
    lw := &leaseWatcher{
        ownLease: ownLease,
    }
    var cursor interface{}

    for {
        res, err := sm.WatchLeases(ctx, cursor)
        if err != nil {
            if err == context.Canceled || err == context.DeadlineExceeded {
                return
            }

            log.Errorf("Watch subnets: %v", err)
            time.Sleep(time.Second)
            continue
        }

        cursor = res.Cursor

        var batch []Event

        if len(res.Events) > 0 {
            batch = lw.update(res.Events)
        } else {
            batch = lw.reset(res.Snapshot)
        }

        if len(batch) > 0 {
            receiver <- batch
        }
    }
}

backend/vxlan/vxlan_network.go的handleSubnetEvents,对event同步到flannel组件,如果是directRouting,就直接修改路由,如果是普通的vxlan还需要将对端mac地址同步到flannel网卡

func (nw *network) handleSubnetEvents(batch []subnet.Event) {
    for _, event := range batch {
        sn := event.Lease.Subnet
        attrs := event.Lease.Attrs
        if attrs.BackendType != "vxlan" {
            log.Warningf("ignoring non-vxlan subnet(%s): type=%v", sn, attrs.BackendType)
            continue
        }

        // 解析json格式化
        var vxlanAttrs vxlanLeaseAttrs
        if err := json.Unmarshal(attrs.BackendData, &vxlanAttrs); err != nil {
            log.Error("error decoding subnet lease JSON: ", err)
            continue
        }

        // This route is used when traffic should be vxlan encapsulated
        vxlanRoute := netlink.Route{
            LinkIndex: nw.dev.link.Attrs().Index,
            Scope:     netlink.SCOPE_UNIVERSE,
            Dst:       sn.ToIPNet(),
            Gw:        sn.IP.ToIP(),
        }
        vxlanRoute.SetFlag(syscall.RTNH_F_ONLINK)

        // directRouting is where the remote host is on the same subnet so vxlan isn't required.
        directRoute := netlink.Route{
            Dst: sn.ToIPNet(),
            Gw:  attrs.PublicIP.ToIP(),
        }
        var directRoutingOK = false
        if nw.dev.directRouting {
            if dr, err := ip.DirectRouting(attrs.PublicIP.ToIP()); err != nil {
                log.Error(err)
            } else {
                directRoutingOK = dr
            }
        }

        switch event.Type {
        case subnet.EventAdded:
            if directRoutingOK {
                log.V(2).Infof("Adding direct route to subnet: %s PublicIP: %s", sn, attrs.PublicIP)

                if err := netlink.RouteReplace(&directRoute); err != nil {
                    log.Errorf("Error adding route to %v via %v: %v", sn, attrs.PublicIP, err)
                    continue
                }
            } else {
                log.V(2).Infof("adding subnet: %s PublicIP: %s VtepMAC: %s", sn, attrs.PublicIP, net.HardwareAddr(vxlanAttrs.VtepMAC))
                if err := nw.dev.AddARP(neighbor{IP: sn.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
                    log.Error("AddARP failed: ", err)
                    continue
                }

                if err := nw.dev.AddFDB(neighbor{IP: attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
                    log.Error("AddFDB failed: ", err)

                    // Try to clean up the ARP entry then continue
                    if err := nw.dev.DelARP(neighbor{IP: event.Lease.Subnet.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
                        log.Error("DelARP failed: ", err)
                    }

                    continue
                }

                // Set the route - the kernel would ARP for the Gw IP address if it hadn't already been set above so make sure
                // this is done last.
                // 更新路由表项
                if err := netlink.RouteReplace(&vxlanRoute); err != nil {
                    log.Errorf("failed to add vxlanRoute (%s -> %s): %v", vxlanRoute.Dst, vxlanRoute.Gw, err)

                    // Try to clean up both the ARP and FDB entries then continue
                    if err := nw.dev.DelARP(neighbor{IP: event.Lease.Subnet.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
                        log.Error("DelARP failed: ", err)
                    }

                    if err := nw.dev.DelFDB(neighbor{IP: event.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
                        log.Error("DelFDB failed: ", err)
                    }

                    continue
                }
            }
        case subnet.EventRemoved:
            if directRoutingOK {
                log.V(2).Infof("Removing direct route to subnet: %s PublicIP: %s", sn, attrs.PublicIP)
                if err := netlink.RouteDel(&directRoute); err != nil {
                    log.Errorf("Error deleting route to %v via %v: %v", sn, attrs.PublicIP, err)
                }
            } else {
                log.V(2).Infof("removing subnet: %s PublicIP: %s VtepMAC: %s", sn, attrs.PublicIP, net.HardwareAddr(vxlanAttrs.VtepMAC))

                // Try to remove all entries - don't bail out if one of them fails.
                if err := nw.dev.DelARP(neighbor{IP: sn.IP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
                    log.Error("DelARP failed: ", err)
                }

                if err := nw.dev.DelFDB(neighbor{IP: attrs.PublicIP, MAC: net.HardwareAddr(vxlanAttrs.VtepMAC)}); err != nil {
                    log.Error("DelFDB failed: ", err)
                }

                if err := netlink.RouteDel(&vxlanRoute); err != nil {
                    log.Errorf("failed to delete vxlanRoute (%s -> %s): %v", vxlanRoute.Dst, vxlanRoute.Gw, err)
                }
            }
        default:
            log.Error("internal error: unknown event type: ", int(event.Type))
        }
    }
}