diff --git a/prometheus/counter.go b/prometheus/counter.go index 7d963d3af..0c4dc6e8c 100644 --- a/prometheus/counter.go +++ b/prometheus/counter.go @@ -211,16 +211,17 @@ func (v2) NewCounterVec(opts CounterVecOpts) *CounterVec { if opts.now == nil { opts.now = time.Now } + newMetric := func(lvs ...string) Metric { + if len(lvs) != len(desc.variableLabels.names) { + panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs)) + } + result := &counter{desc: desc, labelPairs: MakeLabelPairs(desc, lvs), now: opts.now} + result.init(result) // Init self-collection. + result.createdTs = timestamppb.New(opts.now()) + return result + } return &CounterVec{ - MetricVec: NewMetricVec(desc, func(lvs ...string) Metric { - if len(lvs) != len(desc.variableLabels.names) { - panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs)) - } - result := &counter{desc: desc, labelPairs: MakeLabelPairs(desc, lvs), now: opts.now} - result.init(result) // Init self-collection. - result.createdTs = timestamppb.New(opts.now()) - return result - }), + MetricVec: NewMetricVecWithTTL(desc, newMetric, opts.TTL), } } diff --git a/prometheus/gauge.go b/prometheus/gauge.go index 41e54bf27..e79dca756 100644 --- a/prometheus/gauge.go +++ b/prometheus/gauge.go @@ -166,15 +166,16 @@ func (v2) NewGaugeVec(opts GaugeVecOpts) *GaugeVec { opts.ConstLabels, WithUnit(opts.Unit), ) + newMetric := func(lvs ...string) Metric { + if len(lvs) != len(desc.variableLabels.names) { + panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs)) + } + result := &gauge{desc: desc, labelPairs: MakeLabelPairs(desc, lvs)} + result.init(result) // Init self-collection. + return result + } return &GaugeVec{ - MetricVec: NewMetricVec(desc, func(lvs ...string) Metric { - if len(lvs) != len(desc.variableLabels.names) { - panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs)) - } - result := &gauge{desc: desc, labelPairs: MakeLabelPairs(desc, lvs)} - result.init(result) // Init self-collection. - return result - }), + MetricVec: NewMetricVecWithTTL(desc, newMetric, opts.TTL), } } diff --git a/prometheus/histogram.go b/prometheus/histogram.go index 0e788f715..265584a04 100644 --- a/prometheus/histogram.go +++ b/prometheus/histogram.go @@ -498,6 +498,12 @@ type HistogramOpts struct { // 5m is used. To always delete the oldest exemplar, set it to a negative value. NativeHistogramExemplarTTL time.Duration + // TTL specifies the time-to-live for Vec children. When set to a value > 0, + // children that have not been accessed for longer than TTL will be excluded + // from Collect output and can be removed via CleanupExpired. This is only + // relevant for HistogramVec metrics; it is ignored for non-Vec Histograms. + TTL time.Duration + // now is for testing purposes, by default it's time.Now. now func() time.Time @@ -1197,9 +1203,9 @@ func (v2) NewHistogramVec(opts HistogramVecOpts) *HistogramVec { WithUnit(opts.Unit), ) return &HistogramVec{ - MetricVec: NewMetricVec(desc, func(lvs ...string) Metric { + MetricVec: NewMetricVecWithTTL(desc, func(lvs ...string) Metric { return newHistogram(desc, opts.HistogramOpts, lvs...) - }), + }, opts.TTL), } } diff --git a/prometheus/metric.go b/prometheus/metric.go index c5cb90adf..dae71672e 100644 --- a/prometheus/metric.go +++ b/prometheus/metric.go @@ -96,6 +96,13 @@ type Opts struct { // https://prometheus.io/docs/instrumenting/writing_exporters/#target-labels-not-static-scraped-labels ConstLabels Labels + // TTL specifies the time-to-live for Vec children. When set to a value > 0, + // children that have not been accessed (via GetMetricWith or similar) for + // longer than TTL will be excluded from Collect output and can be removed + // via CleanupExpired. This is only relevant for *Vec metrics; it is ignored + // for non-Vec metrics. + TTL time.Duration + // now is for testing purposes, by default it's time.Now. now func() time.Time } diff --git a/prometheus/vec.go b/prometheus/vec.go index b405a64de..5538f66dd 100644 --- a/prometheus/vec.go +++ b/prometheus/vec.go @@ -16,6 +16,8 @@ package prometheus import ( "fmt" "sync" + "sync/atomic" + "time" "github.com/prometheus/common/model" ) @@ -47,7 +49,7 @@ type MetricVec struct { func NewMetricVec(desc *Desc, newMetric func(lvs ...string) Metric) *MetricVec { return &MetricVec{ metricMap: &metricMap{ - metrics: map[uint64][]metricWithLabelValues{}, + metrics: map[uint64][]*metricWithLabelValues{}, desc: desc, newMetric: newMetric, }, @@ -56,6 +58,30 @@ func NewMetricVec(desc *Desc, newMetric func(lvs ...string) Metric) *MetricVec { } } +// NewMetricVecWithTTL returns an initialized MetricVec with TTL-based expiration. +// Children that have not been accessed (via GetMetricWith or GetMetricWithLabelValues) +// for longer than ttl will be excluded from Collect and can be cleaned up via +// CleanupExpired. If ttl is 0, this behaves identically to NewMetricVec. +func NewMetricVecWithTTL(desc *Desc, newMetric func(lvs ...string) Metric, ttl time.Duration) *MetricVec { + return &MetricVec{ + metricMap: &metricMap{ + metrics: map[uint64][]*metricWithLabelValues{}, + desc: desc, + newMetric: newMetric, + ttl: ttl, + }, + hashAdd: hashAdd, + hashAddByte: hashAddByte, + } +} + +// CleanupExpired removes all children that have not been accessed within the +// configured TTL. It returns the number of children removed. If TTL is not +// configured (zero), this is a no-op and returns 0. +func (m *MetricVec) CleanupExpired() int { + return m.cleanupExpired() +} + // DeleteLabelValues removes the metric where the variable labels are the same // as those passed in as labels (same order as the VariableLabels in Desc). It // returns true if a metric was deleted. @@ -304,8 +330,9 @@ func (m *MetricVec) hashLabels(labels Labels) (uint64, error) { // metricWithLabelValues provides the metric and its label values for // disambiguation on hash collision. type metricWithLabelValues struct { - values []string - metric Metric + values []string + metric Metric + lastAccessed atomic.Int64 // unix timestamp in milliseconds; only used when TTL > 0 } // curriedLabelValue sets the curried value for a label at the given index. @@ -318,9 +345,10 @@ type curriedLabelValue struct { // metricVecs. type metricMap struct { mtx sync.RWMutex // Protects metrics. - metrics map[uint64][]metricWithLabelValues + metrics map[uint64][]*metricWithLabelValues desc *Desc newMetric func(labelValues ...string) Metric + ttl time.Duration // if > 0, enables TTL-based expiration } // Describe implements Collector. It will send exactly one Desc to the provided @@ -334,9 +362,17 @@ func (m *metricMap) Collect(ch chan<- Metric) { m.mtx.RLock() defer m.mtx.RUnlock() + var deadline int64 + if m.ttl > 0 { + deadline = time.Now().Add(-m.ttl).UnixMilli() + } + for _, metrics := range m.metrics { - for _, metric := range metrics { - ch <- metric.metric + for i := range metrics { + if m.ttl > 0 && metrics[i].lastAccessed.Load() < deadline { + continue + } + ch <- metrics[i].metric } } } @@ -351,6 +387,93 @@ func (m *metricMap) Reset() { } } +// touchByHash updates lastAccessed for a metric found by hash and label values. +// Acquires RLock internally. +func (m *metricMap) touchByHash(h uint64, lvs []string, curry []curriedLabelValue) { + m.mtx.RLock() + defer m.mtx.RUnlock() + m.touchByHashRLocked(h, lvs, curry) +} + +func (m *metricMap) touchByHashRLocked(h uint64, lvs []string, curry []curriedLabelValue) { + now := time.Now().UnixMilli() + metrics, ok := m.metrics[h] + if !ok { + return + } + if i := findMetricWithLabelValues(metrics, lvs, curry); i < len(metrics) { + metrics[i].lastAccessed.Store(now) + } +} + +func (m *metricMap) touchByHashLocked(h uint64, lvs []string, curry []curriedLabelValue) { + now := time.Now().UnixMilli() + metrics, ok := m.metrics[h] + if !ok { + return + } + if i := findMetricWithLabelValues(metrics, lvs, curry); i < len(metrics) { + metrics[i].lastAccessed.Store(now) + } +} + +func (m *metricMap) touchByHashLabels(h uint64, labels Labels, curry []curriedLabelValue) { + m.mtx.RLock() + defer m.mtx.RUnlock() + m.touchByHashLabelsRLocked(h, labels, curry) +} + +func (m *metricMap) touchByHashLabelsRLocked(h uint64, labels Labels, curry []curriedLabelValue) { + now := time.Now().UnixMilli() + metrics, ok := m.metrics[h] + if !ok { + return + } + if i := findMetricWithLabels(m.desc, metrics, labels, curry); i < len(metrics) { + metrics[i].lastAccessed.Store(now) + } +} + +func (m *metricMap) touchByHashLabelsLocked(h uint64, labels Labels, curry []curriedLabelValue) { + now := time.Now().UnixMilli() + metrics, ok := m.metrics[h] + if !ok { + return + } + if i := findMetricWithLabels(m.desc, metrics, labels, curry); i < len(metrics) { + metrics[i].lastAccessed.Store(now) + } +} + +// cleanupExpired removes all children whose lastAccessed is older than TTL. +func (m *metricMap) cleanupExpired() int { + if m.ttl <= 0 { + return 0 + } + + deadline := time.Now().Add(-m.ttl).UnixMilli() + m.mtx.Lock() + defer m.mtx.Unlock() + + var numDeleted int + for h, metrics := range m.metrics { + remaining := metrics[:0] + for i := range metrics { + if metrics[i].lastAccessed.Load() >= deadline { + remaining = append(remaining, metrics[i]) + } else { + numDeleted++ + } + } + if len(remaining) == 0 { + delete(m.metrics, h) + } else { + m.metrics[h] = remaining + } + } + return numDeleted +} + // deleteByHashWithLabelValues removes the metric from the hash bucket h. If // there are multiple matches in the bucket, use lvs to select a metric and // remove only that metric. @@ -373,7 +496,7 @@ func (m *metricMap) deleteByHashWithLabelValues( if len(metrics) > 1 { old := metrics m.metrics[h] = append(metrics[:i], metrics[i+1:]...) - old[len(old)-1] = metricWithLabelValues{} + old[len(old)-1] = nil } else { delete(m.metrics, h) } @@ -401,7 +524,7 @@ func (m *metricMap) deleteByHashWithLabels( if len(metrics) > 1 { old := metrics m.metrics[h] = append(metrics[:i], metrics[i+1:]...) - old[len(old)-1] = metricWithLabelValues{} + old[len(old)-1] = nil } else { delete(m.metrics, h) } @@ -431,10 +554,10 @@ func (m *metricMap) deleteByLabels(labels Labels, curry []curriedLabelValue) int // findMetricWithPartialLabel returns the index of the matching metric or // len(metrics) if not found. func findMetricWithPartialLabels( - desc *Desc, metrics []metricWithLabelValues, labels Labels, curry []curriedLabelValue, + desc *Desc, metrics []*metricWithLabelValues, labels Labels, curry []curriedLabelValue, ) int { - for i, metric := range metrics { - if matchPartialLabels(desc, metric.values, labels, curry) { + for i := range metrics { + if matchPartialLabels(desc, metrics[i].values, labels, curry) { return i } } @@ -495,6 +618,9 @@ func (m *metricMap) getOrCreateMetricWithLabelValues( metric, ok := m.getMetricWithHashAndLabelValues(hash, lvs, curry) m.mtx.RUnlock() if ok { + if m.ttl > 0 { + m.touchByHash(hash, lvs, curry) + } return metric } @@ -504,7 +630,13 @@ func (m *metricMap) getOrCreateMetricWithLabelValues( if !ok { inlinedLVs := inlineLabelValues(lvs, curry) metric = m.newMetric(inlinedLVs...) - m.metrics[hash] = append(m.metrics[hash], metricWithLabelValues{values: inlinedLVs, metric: metric}) + entry := &metricWithLabelValues{values: inlinedLVs, metric: metric} + if m.ttl > 0 { + entry.lastAccessed.Store(time.Now().UnixMilli()) + } + m.metrics[hash] = append(m.metrics[hash], entry) + } else if m.ttl > 0 { + m.touchByHashLocked(hash, lvs, curry) } return metric } @@ -520,6 +652,9 @@ func (m *metricMap) getOrCreateMetricWithLabels( metric, ok := m.getMetricWithHashAndLabels(hash, labels, curry) m.mtx.RUnlock() if ok { + if m.ttl > 0 { + m.touchByHashLabels(hash, labels, curry) + } return metric } @@ -529,7 +664,13 @@ func (m *metricMap) getOrCreateMetricWithLabels( if !ok { lvs := extractLabelValues(m.desc, labels, curry) metric = m.newMetric(lvs...) - m.metrics[hash] = append(m.metrics[hash], metricWithLabelValues{values: lvs, metric: metric}) + entry := &metricWithLabelValues{values: lvs, metric: metric} + if m.ttl > 0 { + entry.lastAccessed.Store(time.Now().UnixMilli()) + } + m.metrics[hash] = append(m.metrics[hash], entry) + } else if m.ttl > 0 { + m.touchByHashLabelsLocked(hash, labels, curry) } return metric } @@ -565,10 +706,10 @@ func (m *metricMap) getMetricWithHashAndLabels( // findMetricWithLabelValues returns the index of the matching metric or // len(metrics) if not found. func findMetricWithLabelValues( - metrics []metricWithLabelValues, lvs []string, curry []curriedLabelValue, + metrics []*metricWithLabelValues, lvs []string, curry []curriedLabelValue, ) int { - for i, metric := range metrics { - if matchLabelValues(metric.values, lvs, curry) { + for i := range metrics { + if matchLabelValues(metrics[i].values, lvs, curry) { return i } } @@ -578,10 +719,10 @@ func findMetricWithLabelValues( // findMetricWithLabels returns the index of the matching metric or len(metrics) // if not found. func findMetricWithLabels( - desc *Desc, metrics []metricWithLabelValues, labels Labels, curry []curriedLabelValue, + desc *Desc, metrics []*metricWithLabelValues, labels Labels, curry []curriedLabelValue, ) int { - for i, metric := range metrics { - if matchLabels(desc, metric.values, labels, curry) { + for i := range metrics { + if matchLabels(desc, metrics[i].values, labels, curry) { return i } } diff --git a/prometheus/vec_test.go b/prometheus/vec_test.go index 03223f2f6..977ad7103 100644 --- a/prometheus/vec_test.go +++ b/prometheus/vec_test.go @@ -18,6 +18,7 @@ import ( "reflect" "strconv" "testing" + "time" dto "github.com/prometheus/client_model/go" ) @@ -1004,3 +1005,160 @@ func benchmarkMetricVecWithLabelValues(b *testing.B, labels map[string][]string) vec.WithLabelValues(values...) } } + +func collectCount(c Collector) int { + ch := make(chan Metric, 100) + c.Collect(ch) + close(ch) + n := 0 + for range ch { + n++ + } + return n +} + +func TestTTLCounterVec(t *testing.T) { + ttl := 100 * time.Millisecond + vec := NewCounterVec(CounterOpts{ + Name: "test_ttl_counter", + Help: "test", + TTL: ttl, + }, []string{"code"}) + + vec.WithLabelValues("200").Add(1) + vec.WithLabelValues("404").Add(1) + + if n := collectCount(vec); n != 2 { + t.Fatalf("expected 2 metrics, got %d", n) + } + + time.Sleep(ttl + 50*time.Millisecond) + + if n := collectCount(vec); n != 0 { + t.Fatalf("expected 0 metrics after TTL, got %d", n) + } + + cleaned := vec.CleanupExpired() + if cleaned != 2 { + t.Fatalf("expected 2 cleaned, got %d", cleaned) + } +} + +func TestTTLGaugeVec(t *testing.T) { + ttl := 100 * time.Millisecond + vec := NewGaugeVec(GaugeOpts{ + Name: "test_ttl_gauge", + Help: "test", + TTL: ttl, + }, []string{"method"}) + + vec.WithLabelValues("GET").Set(10) + vec.WithLabelValues("POST").Set(20) + + if n := collectCount(vec); n != 2 { + t.Fatalf("expected 2 metrics, got %d", n) + } + + time.Sleep(ttl + 50*time.Millisecond) + // Touch only one + vec.WithLabelValues("GET").Set(30) + + if n := collectCount(vec); n != 1 { + t.Fatalf("expected 1 metric after partial TTL, got %d", n) + } + + cleaned := vec.CleanupExpired() + if cleaned != 1 { + t.Fatalf("expected 1 cleaned, got %d", cleaned) + } +} + +func TestTTLHistogramVec(t *testing.T) { + ttl := 100 * time.Millisecond + vec := NewHistogramVec(HistogramOpts{ + Name: "test_ttl_histo", + Help: "test", + TTL: ttl, + }, []string{"status"}) + + vec.WithLabelValues("ok").Observe(0.5) + vec.WithLabelValues("err").Observe(1.5) + + if n := collectCount(vec); n != 2 { + t.Fatalf("expected 2 metrics, got %d", n) + } + + time.Sleep(ttl + 50*time.Millisecond) + + if n := collectCount(vec); n != 0 { + t.Fatalf("expected 0 metrics after TTL, got %d", n) + } +} + +func TestTTLRefreshPreventsExpiration(t *testing.T) { + ttl := 150 * time.Millisecond + vec := NewCounterVec(CounterOpts{ + Name: "test_ttl_refresh", + Help: "test", + TTL: ttl, + }, []string{"code"}) + + vec.WithLabelValues("200").Add(1) + + // Keep refreshing before TTL expires + for i := 0; i < 5; i++ { + time.Sleep(80 * time.Millisecond) + vec.WithLabelValues("200").Add(1) + } + + if n := collectCount(vec); n != 1 { + t.Fatalf("expected 1 metric still alive, got %d", n) + } +} + +func TestTTLZeroMeansNoExpiration(t *testing.T) { + vec := NewCounterVec(CounterOpts{ + Name: "test_no_ttl", + Help: "test", + }, []string{"code"}) + + vec.WithLabelValues("200").Add(1) + + cleaned := vec.CleanupExpired() + if cleaned != 0 { + t.Fatalf("expected 0 cleaned with no TTL, got %d", cleaned) + } + + if n := collectCount(vec); n != 1 { + t.Fatalf("expected 1 metric, got %d", n) + } +} + +func TestTTLWithGetMetricWith(t *testing.T) { + ttl := 100 * time.Millisecond + vec := NewGaugeVec(GaugeOpts{ + Name: "test_ttl_getmetricwith", + Help: "test", + TTL: ttl, + }, []string{"method"}) + + g, err := vec.GetMetricWith(Labels{"method": "GET"}) + if err != nil { + t.Fatal(err) + } + g.Set(42) + + time.Sleep(ttl + 50*time.Millisecond) + + if n := collectCount(vec); n != 0 { + t.Fatalf("expected 0 after TTL, got %d", n) + } + + // Re-access refreshes + g, _ = vec.GetMetricWith(Labels{"method": "GET"}) + g.Set(99) + + if n := collectCount(vec); n != 1 { + t.Fatalf("expected 1 after re-access, got %d", n) + } +} diff --git a/tutorials/whatsup/internal/acceptance_test.go b/tutorials/whatsup/internal/acceptance_test.go index 38533c495..57848a9b9 100644 --- a/tutorials/whatsup/internal/acceptance_test.go +++ b/tutorials/whatsup/internal/acceptance_test.go @@ -78,5 +78,4 @@ func TestAcceptance(t *testing.T) { if gotErr { fmt.Println("Got this response from ", fmt.Sprintf("http://localhost:%v", WhatsupPort), ":", metrics) } - } diff --git a/tutorials/whatsup/main.go b/tutorials/whatsup/main.go index 5943eefb0..15ebd4e72 100644 --- a/tutorials/whatsup/main.go +++ b/tutorials/whatsup/main.go @@ -73,7 +73,7 @@ func runMain(opts internal.Config) (err error) { m := http.NewServeMux() // Create HTTP handler for Prometheus metrics. // TODO - //m.Handle("/metrics", ... + // m.Handle("/metrics", ... promClient, err := api.NewClient(api.Config{ Client: &http.Client{Transport: instrumentRoundTripper(nil, "prometheus", http.DefaultTransport)},