diff --git a/prometheus/counter.go b/prometheus/counter.go index 7d963d3af..26b53400e 100644 --- a/prometheus/counter.go +++ b/prometheus/counter.go @@ -201,6 +201,12 @@ func NewCounterVec(opts CounterOpts, labelNames []string) *CounterVec { // NewCounterVec creates a new CounterVec based on the provided CounterVecOpts. func (v2) NewCounterVec(opts CounterVecOpts) *CounterVec { + return newCounterVecWithTTL(opts, 0) +} + +// newCounterVecWithTTL creates a CounterVec. ttl must be >= 0; ttl == 0 disables +// TTL behavior (identical to NewMetricVec). +func newCounterVecWithTTL(opts CounterVecOpts, ttl time.Duration) *CounterVec { desc := V2.NewDesc( BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), opts.Help, @@ -211,16 +217,22 @@ func (v2) NewCounterVec(opts CounterVecOpts) *CounterVec { if opts.now == nil { opts.now = time.Now } + newMetric := func(lvs ...string) Metric { + if len(lvs) != len(desc.variableLabels.names) { + panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs)) + } + result := &counter{desc: desc, labelPairs: MakeLabelPairs(desc, lvs), now: opts.now} + result.init(result) // Init self-collection. + result.createdTs = timestamppb.New(opts.now()) + return result + } + if ttl > 0 { + return &CounterVec{ + MetricVec: NewMetricVecWithTTL(desc, newMetric, ttl), + } + } return &CounterVec{ - MetricVec: NewMetricVec(desc, func(lvs ...string) Metric { - if len(lvs) != len(desc.variableLabels.names) { - panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs)) - } - result := &counter{desc: desc, labelPairs: MakeLabelPairs(desc, lvs), now: opts.now} - result.init(result) // Init self-collection. - result.createdTs = timestamppb.New(opts.now()) - return result - }), + MetricVec: NewMetricVec(desc, newMetric), } } diff --git a/prometheus/gauge.go b/prometheus/gauge.go index 41e54bf27..adebbf673 100644 --- a/prometheus/gauge.go +++ b/prometheus/gauge.go @@ -159,6 +159,10 @@ func NewGaugeVec(opts GaugeOpts, labelNames []string) *GaugeVec { // NewGaugeVec creates a new GaugeVec based on the provided GaugeVecOpts. func (v2) NewGaugeVec(opts GaugeVecOpts) *GaugeVec { + return newGaugeVecWithTTL(opts, 0) +} + +func newGaugeVecWithTTL(opts GaugeVecOpts, ttl time.Duration) *GaugeVec { desc := V2.NewDesc( BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), opts.Help, @@ -166,15 +170,21 @@ func (v2) NewGaugeVec(opts GaugeVecOpts) *GaugeVec { opts.ConstLabels, WithUnit(opts.Unit), ) + newMetric := func(lvs ...string) Metric { + if len(lvs) != len(desc.variableLabels.names) { + panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs)) + } + result := &gauge{desc: desc, labelPairs: MakeLabelPairs(desc, lvs)} + result.init(result) // Init self-collection. + return result + } + if ttl > 0 { + return &GaugeVec{ + MetricVec: NewMetricVecWithTTL(desc, newMetric, ttl), + } + } return &GaugeVec{ - MetricVec: NewMetricVec(desc, func(lvs ...string) Metric { - if len(lvs) != len(desc.variableLabels.names) { - panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs)) - } - result := &gauge{desc: desc, labelPairs: MakeLabelPairs(desc, lvs)} - result.init(result) // Init self-collection. - return result - }), + MetricVec: NewMetricVec(desc, newMetric), } } diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 0e788f715..433e75195 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -1189,6 +1189,10 @@ func NewHistogramVec(opts HistogramOpts, labelNames []string) *HistogramVec { // NewHistogramVec creates a new HistogramVec based on the provided HistogramVecOpts. func (v2) NewHistogramVec(opts HistogramVecOpts) *HistogramVec { + return newHistogramVecWithTTL(opts, 0) +} + +func newHistogramVecWithTTL(opts HistogramVecOpts, ttl time.Duration) *HistogramVec { desc := V2.NewDesc( BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), opts.Help, @@ -1196,10 +1200,16 @@ func (v2) NewHistogramVec(opts HistogramVecOpts) *HistogramVec { opts.ConstLabels, WithUnit(opts.Unit), ) + newMetric := func(lvs ...string) Metric { + return newHistogram(desc, opts.HistogramOpts, lvs...) + } + if ttl > 0 { + return &HistogramVec{ + MetricVec: NewMetricVecWithTTL(desc, newMetric, ttl), + } + } return &HistogramVec{ - MetricVec: NewMetricVec(desc, func(lvs ...string) Metric { - return newHistogram(desc, opts.HistogramOpts, lvs...) - }), + MetricVec: NewMetricVec(desc, newMetric), } } diff --git a/prometheus/ttl_registry.go b/prometheus/ttl_registry.go new file mode 100644 index 000000000..fa30f7c0a --- /dev/null +++ b/prometheus/ttl_registry.go @@ -0,0 +1,165 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "sync" + "time" + + dto "github.com/prometheus/client_model/go" +) + +// TTLRegistry is a dedicated Prometheus registry for metrics that need +// time-to-live behavior on *Vec children. It embeds a plain *Registry and adds: +// - Vec constructors that enable per-child TTL (same semantics as MetricVec +// built with NewMetricVecWithTTL). +// - Automatic CleanupExpired on all Vecs created through this registry before +// each Gather, so memory can be reclaimed even when scrapes are infrequent +// (see discussion in https://github.com/prometheus/client_golang/issues/1983). +// +// Default prometheus.NewRegistry, NewCounterVec, and Opts are unchanged; use +// TTLRegistry only when you explicitly opt in to Vec TTL. +type TTLRegistry struct { + *Registry + + ttl time.Duration + + mu sync.Mutex + vecs []*MetricVec +} + +// NewTTLRegistry returns a registry backed by a new empty *Registry. ttl must +// be greater than zero; it applies to every Vec created via this registry's +// constructor methods. +func NewTTLRegistry(ttl time.Duration) *TTLRegistry { + if ttl <= 0 { + panic("NewTTLRegistry: ttl must be > 0") + } + return &TTLRegistry{ + Registry: NewRegistry(), + ttl: ttl, + } +} + +func (r *TTLRegistry) track(mv *MetricVec) { + r.mu.Lock() + defer r.mu.Unlock() + r.vecs = append(r.vecs, mv) +} + +func (r *TTLRegistry) untrack(mv *MetricVec) { + r.mu.Lock() + defer r.mu.Unlock() + for i, v := range r.vecs { + if v == mv { + last := len(r.vecs) - 1 + r.vecs[i] = r.vecs[last] + r.vecs[last] = nil + r.vecs = r.vecs[:last] + return + } + } +} + +func metricVecFromCollector(c Collector) *MetricVec { + switch v := c.(type) { + case *CounterVec: + return v.MetricVec + case *GaugeVec: + return v.MetricVec + case *HistogramVec: + return v.MetricVec + case *MetricVec: + return v + default: + return nil + } +} + +// Unregister implements Registerer. It untracks Vecs created through this +// registry so they are no longer retained or cleaned up on Gather. +func (r *TTLRegistry) Unregister(c Collector) bool { + ok := r.Registry.Unregister(c) + if ok { + if mv := metricVecFromCollector(c); mv != nil { + r.untrack(mv) + } + } + return ok +} + +func (r *TTLRegistry) runCleanup() { + r.mu.Lock() + vecs := append([]*MetricVec(nil), r.vecs...) + r.mu.Unlock() + for _, mv := range vecs { + mv.CleanupExpired() + } +} + +// Gather implements Gatherer. It runs CleanupExpired on all Vecs created +// through this TTLRegistry, then delegates to the embedded Registry. +func (r *TTLRegistry) Gather() ([]*dto.MetricFamily, error) { + r.runCleanup() + return r.Registry.Gather() +} + +// NewCounterVec is like prometheus.NewCounterVec but enables Vec TTL using this +// registry's ttl, registers the Vec, and tracks it for Gather-time cleanup. +func (r *TTLRegistry) NewCounterVec(opts CounterOpts, labelNames []string) *CounterVec { + return r.NewCounterVecOpts(CounterVecOpts{ + CounterOpts: opts, + VariableLabels: UnconstrainedLabels(labelNames), + }) +} + +// NewCounterVecOpts is like V2.NewCounterVec with TTL and automatic registration. +func (r *TTLRegistry) NewCounterVecOpts(opts CounterVecOpts) *CounterVec { + cv := newCounterVecWithTTL(opts, r.ttl) + r.MustRegister(cv) + r.track(cv.MetricVec) + return cv +} + +// NewGaugeVec is like prometheus.NewGaugeVec with TTL, registration, and tracking. +func (r *TTLRegistry) NewGaugeVec(opts GaugeOpts, labelNames []string) *GaugeVec { + return r.NewGaugeVecOpts(GaugeVecOpts{ + GaugeOpts: opts, + VariableLabels: UnconstrainedLabels(labelNames), + }) +} + +// NewGaugeVecOpts is like V2.NewGaugeVec with TTL and automatic registration. +func (r *TTLRegistry) NewGaugeVecOpts(opts GaugeVecOpts) *GaugeVec { + gv := newGaugeVecWithTTL(opts, r.ttl) + r.MustRegister(gv) + r.track(gv.MetricVec) + return gv +} + +// NewHistogramVec is like prometheus.NewHistogramVec with TTL, registration, and tracking. +func (r *TTLRegistry) NewHistogramVec(opts HistogramOpts, labelNames []string) *HistogramVec { + return r.NewHistogramVecOpts(HistogramVecOpts{ + HistogramOpts: opts, + VariableLabels: UnconstrainedLabels(labelNames), + }) +} + +// NewHistogramVecOpts is like V2.NewHistogramVec with TTL and automatic registration. +func (r *TTLRegistry) NewHistogramVecOpts(opts HistogramVecOpts) *HistogramVec { + hv := newHistogramVecWithTTL(opts, r.ttl) + r.MustRegister(hv) + r.track(hv.MetricVec) + return hv +} diff --git a/prometheus/ttl_registry_test.go b/prometheus/ttl_registry_test.go new file mode 100644 index 000000000..7c43c933f --- /dev/null +++ b/prometheus/ttl_registry_test.go @@ -0,0 +1,95 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "testing" + "testing/synctest" + "time" +) + +func TestNewTTLRegistryPanicsOnNonPositiveTTL(t *testing.T) { + defer func() { + if recover() == nil { + t.Fatal("expected panic for ttl <= 0") + } + }() + NewTTLRegistry(0) +} + +func TestTTLRegistryUnregisterUntracks(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ttl := 80 * time.Millisecond + reg := NewTTLRegistry(ttl) + vec := reg.NewCounterVec(CounterOpts{ + Name: "ttl_reg_unregister", + Help: "test", + }, []string{"code"}) + + vec.WithLabelValues("200").Add(1) + if !reg.Unregister(vec) { + t.Fatal("expected Unregister to succeed") + } + + time.Sleep(ttl + 40*time.Millisecond) + + if _, err := reg.Gather(); err != nil { + t.Fatal(err) + } + + // Gather should not have run CleanupExpired on the unregistered vec. + if cleaned := vec.CleanupExpired(); cleaned != 1 { + t.Fatalf("expected 1 expired child after explicit cleanup, got %d", cleaned) + } + + vec2 := reg.NewCounterVec(CounterOpts{ + Name: "ttl_reg_still_tracked", + Help: "test", + }, []string{"code"}) + vec2.WithLabelValues("200").Add(1) + time.Sleep(ttl + 40*time.Millisecond) + if _, err := reg.Gather(); err != nil { + t.Fatal(err) + } + if cleaned := vec2.CleanupExpired(); cleaned != 0 { + t.Fatalf("expected tracked vec cleaned on Gather, got %d remaining", cleaned) + } + }) +} + +func TestTTLRegistryGatherRunsCleanup(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ttl := 80 * time.Millisecond + reg := NewTTLRegistry(ttl) + vec := reg.NewCounterVec(CounterOpts{ + Name: "ttl_reg_gather", + Help: "test", + }, []string{"code"}) + + vec.WithLabelValues("200").Add(1) + if n := collectCount(vec); n != 1 { + t.Fatalf("expected 1 metric before sleep, got %d", n) + } + + time.Sleep(ttl + 40*time.Millisecond) + + if _, err := reg.Gather(); err != nil { + t.Fatal(err) + } + + if n := collectCount(vec); n != 0 { + t.Fatalf("expected 0 metrics after Gather cleanup, got %d", n) + } + }) +} diff --git a/prometheus/ttl_vec.go b/prometheus/ttl_vec.go new file mode 100644 index 000000000..b33a6fddc --- /dev/null +++ b/prometheus/ttl_vec.go @@ -0,0 +1,327 @@ +// Copyright 2014 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "sync" + "sync/atomic" + "time" +) + +// ttlMetricWithLabelValues is the TTL variant of metricWithLabelValues. +type ttlMetricWithLabelValues struct { + values []string + metric Metric + lastAccessed atomic.Int64 // unix timestamp in milliseconds +} + +// ttlMetricMap backs MetricVec instances created with NewMetricVecWithTTL. +// It is separate from metricMap so the default Vec path stays unchanged. +type ttlMetricMap struct { + mtx sync.RWMutex + metrics map[uint64][]*ttlMetricWithLabelValues + desc *Desc + newMetric func(labelValues ...string) Metric + ttl time.Duration +} + +func (m *ttlMetricMap) Describe(ch chan<- *Desc) { + ch <- m.desc +} + +func (m *ttlMetricMap) Collect(ch chan<- Metric) { + m.mtx.RLock() + defer m.mtx.RUnlock() + + deadline := time.Now().Add(-m.ttl).UnixMilli() + for _, metrics := range m.metrics { + for i := range metrics { + if metrics[i].lastAccessed.Load() < deadline { + continue + } + ch <- metrics[i].metric + } + } +} + +func (m *ttlMetricMap) Reset() { + m.mtx.Lock() + defer m.mtx.Unlock() + + for h := range m.metrics { + delete(m.metrics, h) + } +} + +func (m *ttlMetricMap) touchByHash(h uint64, lvs []string, curry []curriedLabelValue) { + m.mtx.RLock() + defer m.mtx.RUnlock() + m.touchByHashRLocked(h, lvs, curry) +} + +func (m *ttlMetricMap) touchByHashRLocked(h uint64, lvs []string, curry []curriedLabelValue) { + now := time.Now().UnixMilli() + metrics, ok := m.metrics[h] + if !ok { + return + } + if i := findTTLMetricWithLabelValues(metrics, lvs, curry); i < len(metrics) { + metrics[i].lastAccessed.Store(now) + } +} + +func (m *ttlMetricMap) touchByHashLocked(h uint64, lvs []string, curry []curriedLabelValue) { + now := time.Now().UnixMilli() + metrics, ok := m.metrics[h] + if !ok { + return + } + if i := findTTLMetricWithLabelValues(metrics, lvs, curry); i < len(metrics) { + metrics[i].lastAccessed.Store(now) + } +} + +func (m *ttlMetricMap) touchByHashLabels(h uint64, labels Labels, curry []curriedLabelValue) { + m.mtx.RLock() + defer m.mtx.RUnlock() + m.touchByHashLabelsRLocked(h, labels, curry) +} + +func (m *ttlMetricMap) touchByHashLabelsRLocked(h uint64, labels Labels, curry []curriedLabelValue) { + now := time.Now().UnixMilli() + metrics, ok := m.metrics[h] + if !ok { + return + } + if i := findTTLMetricWithLabels(m.desc, metrics, labels, curry); i < len(metrics) { + metrics[i].lastAccessed.Store(now) + } +} + +func (m *ttlMetricMap) touchByHashLabelsLocked(h uint64, labels Labels, curry []curriedLabelValue) { + now := time.Now().UnixMilli() + metrics, ok := m.metrics[h] + if !ok { + return + } + if i := findTTLMetricWithLabels(m.desc, metrics, labels, curry); i < len(metrics) { + metrics[i].lastAccessed.Store(now) + } +} + +func (m *ttlMetricMap) cleanupExpired() int { + deadline := time.Now().Add(-m.ttl).UnixMilli() + m.mtx.Lock() + defer m.mtx.Unlock() + + var numDeleted int + for h, metrics := range m.metrics { + origLen := len(metrics) + remaining := metrics[:0] + for i := range metrics { + if metrics[i].lastAccessed.Load() >= deadline { + remaining = append(remaining, metrics[i]) + } else { + numDeleted++ + } + } + if len(remaining) == 0 { + delete(m.metrics, h) + } else { + for i := len(remaining); i < origLen; i++ { + metrics[i] = nil + } + m.metrics[h] = remaining + } + } + return numDeleted +} + +func (m *ttlMetricMap) deleteByHashWithLabelValues( + h uint64, lvs []string, curry []curriedLabelValue, +) bool { + m.mtx.Lock() + defer m.mtx.Unlock() + + metrics, ok := m.metrics[h] + if !ok { + return false + } + + i := findTTLMetricWithLabelValues(metrics, lvs, curry) + if i >= len(metrics) { + return false + } + + if len(metrics) > 1 { + old := metrics + m.metrics[h] = append(metrics[:i], metrics[i+1:]...) + old[len(old)-1] = nil + } else { + delete(m.metrics, h) + } + return true +} + +func (m *ttlMetricMap) deleteByHashWithLabels( + h uint64, labels Labels, curry []curriedLabelValue, +) bool { + m.mtx.Lock() + defer m.mtx.Unlock() + + metrics, ok := m.metrics[h] + if !ok { + return false + } + i := findTTLMetricWithLabels(m.desc, metrics, labels, curry) + if i >= len(metrics) { + return false + } + + if len(metrics) > 1 { + old := metrics + m.metrics[h] = append(metrics[:i], metrics[i+1:]...) + old[len(old)-1] = nil + } else { + delete(m.metrics, h) + } + return true +} + +func (m *ttlMetricMap) deleteByLabels(labels Labels, curry []curriedLabelValue) int { + m.mtx.Lock() + defer m.mtx.Unlock() + + var numDeleted int + + for h, metrics := range m.metrics { + i := findTTLMetricWithPartialLabels(m.desc, metrics, labels, curry) + if i >= len(metrics) { + continue + } + delete(m.metrics, h) + numDeleted++ + } + + return numDeleted +} + +func findTTLMetricWithPartialLabels( + desc *Desc, metrics []*ttlMetricWithLabelValues, labels Labels, curry []curriedLabelValue, +) int { + for i := range metrics { + if matchPartialLabels(desc, metrics[i].values, labels, curry) { + return i + } + } + return len(metrics) +} + +func (m *ttlMetricMap) getOrCreateMetricWithLabelValues( + hash uint64, lvs []string, curry []curriedLabelValue, +) Metric { + m.mtx.RLock() + metric, ok := m.getMetricWithHashAndLabelValues(hash, lvs, curry) + m.mtx.RUnlock() + if ok { + m.touchByHash(hash, lvs, curry) + return metric + } + + m.mtx.Lock() + defer m.mtx.Unlock() + metric, ok = m.getMetricWithHashAndLabelValues(hash, lvs, curry) + if !ok { + inlinedLVs := inlineLabelValues(lvs, curry) + metric = m.newMetric(inlinedLVs...) + entry := &ttlMetricWithLabelValues{values: inlinedLVs, metric: metric} + entry.lastAccessed.Store(time.Now().UnixMilli()) + m.metrics[hash] = append(m.metrics[hash], entry) + } else { + m.touchByHashLocked(hash, lvs, curry) + } + return metric +} + +func (m *ttlMetricMap) getOrCreateMetricWithLabels( + hash uint64, labels Labels, curry []curriedLabelValue, +) Metric { + m.mtx.RLock() + metric, ok := m.getMetricWithHashAndLabels(hash, labels, curry) + m.mtx.RUnlock() + if ok { + m.touchByHashLabels(hash, labels, curry) + return metric + } + + m.mtx.Lock() + defer m.mtx.Unlock() + metric, ok = m.getMetricWithHashAndLabels(hash, labels, curry) + if !ok { + lvs := extractLabelValues(m.desc, labels, curry) + metric = m.newMetric(lvs...) + entry := &ttlMetricWithLabelValues{values: lvs, metric: metric} + entry.lastAccessed.Store(time.Now().UnixMilli()) + m.metrics[hash] = append(m.metrics[hash], entry) + } else { + m.touchByHashLabelsLocked(hash, labels, curry) + } + return metric +} + +func (m *ttlMetricMap) getMetricWithHashAndLabelValues( + h uint64, lvs []string, curry []curriedLabelValue, +) (Metric, bool) { + metrics, ok := m.metrics[h] + if ok { + if i := findTTLMetricWithLabelValues(metrics, lvs, curry); i < len(metrics) { + return metrics[i].metric, true + } + } + return nil, false +} + +func (m *ttlMetricMap) getMetricWithHashAndLabels( + h uint64, labels Labels, curry []curriedLabelValue, +) (Metric, bool) { + metrics, ok := m.metrics[h] + if ok { + if i := findTTLMetricWithLabels(m.desc, metrics, labels, curry); i < len(metrics) { + return metrics[i].metric, true + } + } + return nil, false +} + +func findTTLMetricWithLabelValues( + metrics []*ttlMetricWithLabelValues, lvs []string, curry []curriedLabelValue, +) int { + for i := range metrics { + if matchLabelValues(metrics[i].values, lvs, curry) { + return i + } + } + return len(metrics) +} + +func findTTLMetricWithLabels( + desc *Desc, metrics []*ttlMetricWithLabelValues, labels Labels, curry []curriedLabelValue, +) int { + for i := range metrics { + if matchLabels(desc, metrics[i].values, labels, curry) { + return i + } + } + return len(metrics) +} diff --git a/prometheus/vec.go b/prometheus/vec.go index b405a64de..44ed8a49a 100644 --- a/prometheus/vec.go +++ b/prometheus/vec.go @@ -16,6 +16,7 @@ package prometheus import ( "fmt" "sync" + "time" "github.com/prometheus/common/model" ) @@ -35,6 +36,7 @@ import ( // panic instead of returning errors. See also the MetricVec example. type MetricVec struct { *metricMap + ttlMap *ttlMetricMap curry []curriedLabelValue @@ -56,6 +58,47 @@ func NewMetricVec(desc *Desc, newMetric func(lvs ...string) Metric) *MetricVec { } } +// NewMetricVecWithTTL returns an initialized MetricVec with TTL-based expiration. +// Children that have not been accessed (via GetMetricWith or GetMetricWithLabelValues) +// for longer than ttl will be excluded from Collect and can be cleaned up via +// CleanupExpired. If ttl is 0, this behaves identically to NewMetricVec. A +// negative ttl is invalid and will cause a panic. +func NewMetricVecWithTTL(desc *Desc, newMetric func(lvs ...string) Metric, ttl time.Duration) *MetricVec { + if ttl < 0 { + panic(fmt.Sprintf("invalid negative ttl: %v", ttl)) + } + if ttl == 0 { + return NewMetricVec(desc, newMetric) + } + return &MetricVec{ + ttlMap: &ttlMetricMap{ + metrics: map[uint64][]*ttlMetricWithLabelValues{}, + desc: desc, + newMetric: newMetric, + ttl: ttl, + }, + hashAdd: hashAdd, + hashAddByte: hashAddByte, + } +} + +// CleanupExpired removes all children that have not been accessed within the +// configured TTL. It returns the number of children removed. If TTL is not +// configured (zero), this is a no-op and returns 0. +func (m *MetricVec) CleanupExpired() int { + if m.ttlMap == nil { + return 0 + } + return m.ttlMap.cleanupExpired() +} + +func (m *MetricVec) vecDesc() *Desc { + if m.ttlMap != nil { + return m.ttlMap.desc + } + return m.desc +} + // DeleteLabelValues removes the metric where the variable labels are the same // as those passed in as labels (same order as the VariableLabels in Desc). It // returns true if a metric was deleted. @@ -72,13 +115,16 @@ func NewMetricVec(desc *Desc, newMetric func(lvs ...string) Metric) *MetricVec { // with a performance overhead (for creating and processing the Labels map). // See also the CounterVec example. func (m *MetricVec) DeleteLabelValues(lvs ...string) bool { - lvs = constrainLabelValues(m.desc, lvs, m.curry) + lvs = constrainLabelValues(m.vecDesc(), lvs, m.curry) h, err := m.hashLabelValues(lvs) if err != nil { return false } + if m.ttlMap != nil { + return m.ttlMap.deleteByHashWithLabelValues(h, lvs, m.curry) + } return m.deleteByHashWithLabelValues(h, lvs, m.curry) } @@ -93,7 +139,7 @@ func (m *MetricVec) DeleteLabelValues(lvs ...string) bool { // This method is used for the same purpose as DeleteLabelValues(...string). See // there for pros and cons of the two methods. func (m *MetricVec) Delete(labels Labels) bool { - labels, closer := constrainLabels(m.desc, labels) + labels, closer := constrainLabels(m.vecDesc(), labels) defer closer() h, err := m.hashLabels(labels) @@ -101,6 +147,9 @@ func (m *MetricVec) Delete(labels Labels) bool { return false } + if m.ttlMap != nil { + return m.ttlMap.deleteByHashWithLabels(h, labels, m.curry) + } return m.deleteByHashWithLabels(h, labels, m.curry) } @@ -111,9 +160,12 @@ func (m *MetricVec) Delete(labels Labels) bool { // Note that curried labels will never be matched if deleting from the curried vector. // To match curried labels with DeletePartialMatch, it must be called on the base vector. func (m *MetricVec) DeletePartialMatch(labels Labels) int { - labels, closer := constrainLabels(m.desc, labels) + labels, closer := constrainLabels(m.vecDesc(), labels) defer closer() + if m.ttlMap != nil { + return m.ttlMap.deleteByLabels(labels, m.curry) + } return m.deleteByLabels(labels, m.curry) } @@ -121,13 +173,31 @@ func (m *MetricVec) DeletePartialMatch(labels Labels) int { // show up in GoDoc. // Describe implements Collector. -func (m *MetricVec) Describe(ch chan<- *Desc) { m.metricMap.Describe(ch) } +func (m *MetricVec) Describe(ch chan<- *Desc) { + if m.ttlMap != nil { + m.ttlMap.Describe(ch) + return + } + m.metricMap.Describe(ch) +} // Collect implements Collector. -func (m *MetricVec) Collect(ch chan<- Metric) { m.metricMap.Collect(ch) } +func (m *MetricVec) Collect(ch chan<- Metric) { + if m.ttlMap != nil { + m.ttlMap.Collect(ch) + return + } + m.metricMap.Collect(ch) +} // Reset deletes all metrics in this vector. -func (m *MetricVec) Reset() { m.metricMap.Reset() } +func (m *MetricVec) Reset() { + if m.ttlMap != nil { + m.ttlMap.Reset() + return + } + m.metricMap.Reset() +} // CurryWith returns a vector curried with the provided labels, i.e. the // returned vector has those labels pre-set for all labeled operations performed @@ -152,7 +222,7 @@ func (m *MetricVec) CurryWith(labels Labels) (*MetricVec, error) { oldCurry = m.curry iCurry int ) - for i, labelName := range m.desc.variableLabels.names { + for i, labelName := range m.vecDesc().variableLabels.names { val, ok := labels[labelName] if iCurry < len(oldCurry) && oldCurry[iCurry].index == i { if ok { @@ -166,7 +236,7 @@ func (m *MetricVec) CurryWith(labels Labels) (*MetricVec, error) { } newCurry = append(newCurry, curriedLabelValue{ i, - m.desc.variableLabels.constrain(labelName, val), + m.vecDesc().variableLabels.constrain(labelName, val), }) } } @@ -176,6 +246,7 @@ func (m *MetricVec) CurryWith(labels Labels) (*MetricVec, error) { return &MetricVec{ metricMap: m.metricMap, + ttlMap: m.ttlMap, curry: newCurry, hashAdd: m.hashAdd, hashAddByte: m.hashAddByte, @@ -212,12 +283,15 @@ func (m *MetricVec) CurryWith(labels Labels) (*MetricVec, error) { // a wrapper around MetricVec, implementing a vector for a specific Metric // implementation, for example GaugeVec. func (m *MetricVec) GetMetricWithLabelValues(lvs ...string) (Metric, error) { - lvs = constrainLabelValues(m.desc, lvs, m.curry) + lvs = constrainLabelValues(m.vecDesc(), lvs, m.curry) h, err := m.hashLabelValues(lvs) if err != nil { return nil, err } + if m.ttlMap != nil { + return m.ttlMap.getOrCreateMetricWithLabelValues(h, lvs, m.curry), nil + } return m.getOrCreateMetricWithLabelValues(h, lvs, m.curry), nil } @@ -238,7 +312,7 @@ func (m *MetricVec) GetMetricWithLabelValues(lvs ...string) (Metric, error) { // around MetricVec, implementing a vector for a specific Metric implementation, // for example GaugeVec. func (m *MetricVec) GetMetricWith(labels Labels) (Metric, error) { - labels, closer := constrainLabels(m.desc, labels) + labels, closer := constrainLabels(m.vecDesc(), labels) defer closer() h, err := m.hashLabels(labels) @@ -246,11 +320,14 @@ func (m *MetricVec) GetMetricWith(labels Labels) (Metric, error) { return nil, err } + if m.ttlMap != nil { + return m.ttlMap.getOrCreateMetricWithLabels(h, labels, m.curry), nil + } return m.getOrCreateMetricWithLabels(h, labels, m.curry), nil } func (m *MetricVec) hashLabelValues(vals []string) (uint64, error) { - if err := validateLabelValues(vals, len(m.desc.variableLabels.names)-len(m.curry)); err != nil { + if err := validateLabelValues(vals, len(m.vecDesc().variableLabels.names)-len(m.curry)); err != nil { return 0, err } @@ -259,7 +336,7 @@ func (m *MetricVec) hashLabelValues(vals []string) (uint64, error) { curry = m.curry iVals, iCurry int ) - for i := 0; i < len(m.desc.variableLabels.names); i++ { + for i := 0; i < len(m.vecDesc().variableLabels.names); i++ { if iCurry < len(curry) && curry[iCurry].index == i { h = m.hashAdd(h, curry[iCurry].value) iCurry++ @@ -273,7 +350,7 @@ func (m *MetricVec) hashLabelValues(vals []string) (uint64, error) { } func (m *MetricVec) hashLabels(labels Labels) (uint64, error) { - if err := validateValuesInLabels(labels, len(m.desc.variableLabels.names)-len(m.curry)); err != nil { + if err := validateValuesInLabels(labels, len(m.vecDesc().variableLabels.names)-len(m.curry)); err != nil { return 0, err } @@ -282,7 +359,7 @@ func (m *MetricVec) hashLabels(labels Labels) (uint64, error) { curry = m.curry iCurry int ) - for i, labelName := range m.desc.variableLabels.names { + for i, labelName := range m.vecDesc().variableLabels.names { val, ok := labels[labelName] if iCurry < len(curry) && curry[iCurry].index == i { if ok { diff --git a/prometheus/vec_test.go b/prometheus/vec_test.go index 03223f2f6..ada97032e 100644 --- a/prometheus/vec_test.go +++ b/prometheus/vec_test.go @@ -18,6 +18,8 @@ import ( "reflect" "strconv" "testing" + "testing/synctest" + "time" dto "github.com/prometheus/client_model/go" ) @@ -1004,3 +1006,187 @@ func benchmarkMetricVecWithLabelValues(b *testing.B, labels map[string][]string) vec.WithLabelValues(values...) } } + +func collectCount(c Collector) int { + ch := make(chan Metric, 100) + c.Collect(ch) + close(ch) + n := 0 + for range ch { + n++ + } + return n +} + +func TestTTLCounterVec(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ttl := 100 * time.Millisecond + reg := NewTTLRegistry(ttl) + vec := reg.NewCounterVec(CounterOpts{ + Name: "test_ttl_counter", + Help: "test", + }, []string{"code"}) + + vec.WithLabelValues("200").Add(1) + vec.WithLabelValues("404").Add(1) + + if n := collectCount(vec); n != 2 { + t.Fatalf("expected 2 metrics, got %d", n) + } + + time.Sleep(ttl + 50*time.Millisecond) + + if n := collectCount(vec); n != 0 { + t.Fatalf("expected 0 metrics after TTL, got %d", n) + } + + cleaned := vec.CleanupExpired() + if cleaned != 2 { + t.Fatalf("expected 2 cleaned, got %d", cleaned) + } + }) +} + +func TestTTLGaugeVec(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ttl := 100 * time.Millisecond + reg := NewTTLRegistry(ttl) + vec := reg.NewGaugeVec(GaugeOpts{ + Name: "test_ttl_gauge", + Help: "test", + }, []string{"method"}) + + vec.WithLabelValues("GET").Set(10) + vec.WithLabelValues("POST").Set(20) + + if n := collectCount(vec); n != 2 { + t.Fatalf("expected 2 metrics, got %d", n) + } + + time.Sleep(ttl + 50*time.Millisecond) + // Touch only one + vec.WithLabelValues("GET").Set(30) + + if n := collectCount(vec); n != 1 { + t.Fatalf("expected 1 metric after partial TTL, got %d", n) + } + + cleaned := vec.CleanupExpired() + if cleaned != 1 { + t.Fatalf("expected 1 cleaned, got %d", cleaned) + } + }) +} + +func TestTTLHistogramVec(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ttl := 100 * time.Millisecond + reg := NewTTLRegistry(ttl) + vec := reg.NewHistogramVec(HistogramOpts{ + Name: "test_ttl_histo", + Help: "test", + }, []string{"status"}) + + vec.WithLabelValues("ok").Observe(0.5) + vec.WithLabelValues("err").Observe(1.5) + + if n := collectCount(vec); n != 2 { + t.Fatalf("expected 2 metrics, got %d", n) + } + + time.Sleep(ttl + 50*time.Millisecond) + + if n := collectCount(vec); n != 0 { + t.Fatalf("expected 0 metrics after TTL, got %d", n) + } + }) +} + +func TestTTLRefreshPreventsExpiration(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ttl := 150 * time.Millisecond + reg := NewTTLRegistry(ttl) + vec := reg.NewCounterVec(CounterOpts{ + Name: "test_ttl_refresh", + Help: "test", + }, []string{"code"}) + + vec.WithLabelValues("200").Add(1) + + // Keep refreshing before TTL expires + for i := 0; i < 5; i++ { + time.Sleep(80 * time.Millisecond) + vec.WithLabelValues("200").Add(1) + } + + if n := collectCount(vec); n != 1 { + t.Fatalf("expected 1 metric still alive, got %d", n) + } + }) +} + +func TestNewMetricVecWithTTLZeroAndNegative(t *testing.T) { + desc := NewDesc("test", "help", []string{"l"}, nil) + newMetric := func(lvs ...string) Metric { return &counter{} } + + mv0 := NewMetricVecWithTTL(desc, newMetric, 0) + if mv0.ttlMap != nil { + t.Fatal("ttl==0 should use plain MetricVec without ttlMap") + } + + defer func() { + if recover() == nil { + t.Fatal("expected panic for negative ttl") + } + }() + NewMetricVecWithTTL(desc, newMetric, -time.Second) +} + +func TestTTLZeroMeansNoExpiration(t *testing.T) { + vec := NewCounterVec(CounterOpts{ + Name: "test_no_ttl", + Help: "test", + }, []string{"code"}) + + vec.WithLabelValues("200").Add(1) + + cleaned := vec.CleanupExpired() + if cleaned != 0 { + t.Fatalf("expected 0 cleaned with no TTL, got %d", cleaned) + } + + if n := collectCount(vec); n != 1 { + t.Fatalf("expected 1 metric, got %d", n) + } +} + +func TestTTLWithGetMetricWith(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ttl := 100 * time.Millisecond + reg := NewTTLRegistry(ttl) + vec := reg.NewGaugeVec(GaugeOpts{ + Name: "test_ttl_getmetricwith", + Help: "test", + }, []string{"method"}) + + g, err := vec.GetMetricWith(Labels{"method": "GET"}) + if err != nil { + t.Fatal(err) + } + g.Set(42) + + time.Sleep(ttl + 50*time.Millisecond) + + if n := collectCount(vec); n != 0 { + t.Fatalf("expected 0 after TTL, got %d", n) + } + + // Re-access refreshes + g, _ = vec.GetMetricWith(Labels{"method": "GET"}) + g.Set(99) + + if n := collectCount(vec); n != 1 { + t.Fatalf("expected 1 after re-access, got %d", n) + } + }) +}