prometheus的remote_write和remote_read使用global的label的问题

时间:Sept. 27, 2020 分类:

目录:

场景

3个集群prometheus使用的一个m3db,数据查询不方便,在global加了cluster的label配置

global:
  # The labels to add to any time series or alerts when communicating with
  # external systems (federation, remote storage, Alertmanager).
  external_labels:
    [ <labelname>: <labelvalue> ... ]

然后添加了{storage="influxdb"}也不能查询到数据,需要再加{cluster=~".*"}

remote_write

参考/storage/remote/write.go

type WriteStorage struct {
    logger log.Logger
    reg    prometheus.Registerer
    mtx    sync.Mutex

    watcherMetrics    *wal.WatcherMetrics
    liveReaderMetrics *wal.LiveReaderMetrics
    externalLabels    labels.Labels
    walDir            string
    queues            map[string]*QueueManager
    samplesIn         *ewmaRate
    flushDeadline     time.Duration
    interner          *pool

    // For timestampTracker.
    highestTimestamp *maxGauge
}

NewWriteStorage方法没有设置externalLabels

ApplyConfig方法使用的global的label

func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
    ...
    rws.externalLabels = conf.GlobalConfig.ExternalLabels
    ...
    newQueues[hash] = NewQueueManager(
            newQueueManagerMetrics(rws.reg, name, endpoint),
            rws.watcherMetrics,
            rws.liveReaderMetrics,
            rws.logger,
            rws.walDir,
            rws.samplesIn,
            rwConf.QueueConfig,
            conf.GlobalConfig.ExternalLabels,
            rwConf.WriteRelabelConfigs,
            c,
            rws.flushDeadline,
            rws.interner,
            rws.highestTimestamp,
        )
    ...

参考/storage/remote/queue_manager.go

type QueueManager struct {
    ...
    externalLabels labels.Labels
    relabelConfigs []*relabel.Config
    ...
}

NewQueueManager中,externalLabels是global.external_labels,relabelConfigs为remote_write.write_relabel_configs

func NewQueueManager(
    t := &QueueManager{
        ...
        externalLabels: externalLabels,
        relabelConfigs: relabelConfigs,
        ...
    }

将externalLabels和relabelConfigs合并在一起用于写入远程存储

// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
    t.seriesMtx.Lock()
    defer t.seriesMtx.Unlock()
    for _, s := range series {
        ls := processExternalLabels(s.Labels, t.externalLabels)
        lbls := relabel.Process(ls, t.relabelConfigs...)
        if len(lbls) == 0 {
            t.droppedSeries[s.Ref] = struct{}{}
            continue
        }
        t.seriesSegmentIndexes[s.Ref] = index
        t.internLabels(lbls)

        // We should not ever be replacing a series labels in the map, but just
        // in case we do we need to ensure we do not leak the replaced interned
        // strings.
        if orig, ok := t.seriesLabels[s.Ref]; ok {
            t.releaseLabels(orig)
        }
        t.seriesLabels[s.Ref] = lbls
    }
}

processExternalLabels

// processExternalLabels merges externalLabels into ls. If ls contains
// a label in externalLabels, the value in ls wins.
func processExternalLabels(ls labels.Labels, externalLabels labels.Labels) labels.Labels {
    i, j, result := 0, 0, make(labels.Labels, 0, len(ls)+len(externalLabels))
    for i < len(ls) && j < len(externalLabels) {
        if ls[i].Name < externalLabels[j].Name {
            result = append(result, labels.Label{
                Name:  ls[i].Name,
                Value: ls[i].Value,
            })
            i++
        } else if ls[i].Name > externalLabels[j].Name {
            result = append(result, externalLabels[j])
            j++
        } else {
            result = append(result, labels.Label{
                Name:  ls[i].Name,
                Value: ls[i].Value,
            })
            i++
            j++
        }
    }
    for ; i < len(ls); i++ {
        result = append(result, labels.Label{
            Name:  ls[i].Name,
            Value: ls[i].Value,
        })
    }
    result = append(result, externalLabels[j:]...)
    return result
}


remote_read

参考/storage/remote/read.go

type querier struct {
    ctx        context.Context
    mint, maxt int64
    client     ReadClient

    // Derived from configuration.
    externalLabels   labels.Labels
    requiredMatchers []*labels.Matcher
}

externalLabels还是global.external_labels,细节参考/storage/remote/read.go的ApplyConfig

Select方法进行查询

// Select implements storage.Querier and uses the given matchers to read series sets from the client.
// Select also adds equality matchers for all external labels to the list of matchers before calling remote endpoint.
// The added external labels are removed from the returned series sets.
//
// If requiredMatchers are given, select returns a NoopSeriesSet if the given matchers don't match the label set of the
// requiredMatchers. Otherwise it'll just call remote endpoint.
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
    ...
    m, added := q.addExternalLabels(matchers)
    query, err := ToQuery(q.mint, q.maxt, m, hints)
    ...
}

addExternalLabels将global.external_labels加入查询条件

// addExternalLabels adds matchers for each external label. External labels
// that already have a corresponding user-supplied matcher are skipped, as we
// assume that the user explicitly wants to select a different value for them.
// We return the new set of matchers, along with a map of labels for which
// matchers were added, so that these can later be removed from the result
// time series again.
func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, labels.Labels) {
    el := make(labels.Labels, len(q.externalLabels))
    copy(el, q.externalLabels)

    // ms won't be sorted, so have to O(n^2) the search.
    for _, m := range ms {
        for i := 0; i < len(el); {
            if el[i].Name == m.Name {
                el = el[:i+copy(el[i:], el[i+1:])]
                continue
            }
            i++
        }
    }

    for _, l := range el {
        m, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value)
        if err != nil {
            panic(err)
        }
        ms = append(ms, m)
    }
    return ms, el
}

结论

  • 远程写入的时候global.external_labelsremote_write.write_relabel_configs都会加上写入
  • 远程读取的时候global.external_labels会加在查询条件里