Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions prometheus/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,17 @@ func (v2) NewCounterVec(opts CounterVecOpts) *CounterVec {
if opts.now == nil {
opts.now = time.Now
}
newMetric := func(lvs ...string) Metric {
if len(lvs) != len(desc.variableLabels.names) {
panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs))
}
result := &counter{desc: desc, labelPairs: MakeLabelPairs(desc, lvs), now: opts.now}
result.init(result) // Init self-collection.
result.createdTs = timestamppb.New(opts.now())
return result
}
return &CounterVec{
MetricVec: NewMetricVec(desc, func(lvs ...string) Metric {
if len(lvs) != len(desc.variableLabels.names) {
panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs))
}
result := &counter{desc: desc, labelPairs: MakeLabelPairs(desc, lvs), now: opts.now}
result.init(result) // Init self-collection.
result.createdTs = timestamppb.New(opts.now())
return result
}),
MetricVec: NewMetricVecWithTTL(desc, newMetric, opts.TTL),
}
}

Expand Down
17 changes: 9 additions & 8 deletions prometheus/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,16 @@ func (v2) NewGaugeVec(opts GaugeVecOpts) *GaugeVec {
opts.ConstLabels,
WithUnit(opts.Unit),
)
newMetric := func(lvs ...string) Metric {
if len(lvs) != len(desc.variableLabels.names) {
panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs))
}
result := &gauge{desc: desc, labelPairs: MakeLabelPairs(desc, lvs)}
result.init(result) // Init self-collection.
return result
}
return &GaugeVec{
MetricVec: NewMetricVec(desc, func(lvs ...string) Metric {
if len(lvs) != len(desc.variableLabels.names) {
panic(makeInconsistentCardinalityError(desc.fqName, desc.variableLabels.names, lvs))
}
result := &gauge{desc: desc, labelPairs: MakeLabelPairs(desc, lvs)}
result.init(result) // Init self-collection.
return result
}),
MetricVec: NewMetricVecWithTTL(desc, newMetric, opts.TTL),
}
}

Expand Down
10 changes: 8 additions & 2 deletions prometheus/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ type HistogramOpts struct {
// 5m is used. To always delete the oldest exemplar, set it to a negative value.
NativeHistogramExemplarTTL time.Duration

// TTL specifies the time-to-live for Vec children. When set to a value > 0,
// children that have not been accessed for longer than TTL will be excluded
// from Collect output and can be removed via CleanupExpired. This is only
// relevant for HistogramVec metrics; it is ignored for non-Vec Histograms.
TTL time.Duration

// now is for testing purposes, by default it's time.Now.
now func() time.Time

Expand Down Expand Up @@ -1197,9 +1203,9 @@ func (v2) NewHistogramVec(opts HistogramVecOpts) *HistogramVec {
WithUnit(opts.Unit),
)
return &HistogramVec{
MetricVec: NewMetricVec(desc, func(lvs ...string) Metric {
MetricVec: NewMetricVecWithTTL(desc, func(lvs ...string) Metric {
return newHistogram(desc, opts.HistogramOpts, lvs...)
}),
}, opts.TTL),
}
}

Expand Down
7 changes: 7 additions & 0 deletions prometheus/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ type Opts struct {
// https://prometheus.io/docs/instrumenting/writing_exporters/#target-labels-not-static-scraped-labels
ConstLabels Labels

// TTL specifies the time-to-live for Vec children. When set to a value > 0,
// children that have not been accessed (via GetMetricWith or similar) for
// longer than TTL will be excluded from Collect output and can be removed
// via CleanupExpired. This is only relevant for *Vec metrics; it is ignored
// for non-Vec metrics.
TTL time.Duration

// now is for testing purposes, by default it's time.Now.
now func() time.Time
}
Expand Down
179 changes: 160 additions & 19 deletions prometheus/vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package prometheus
import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/common/model"
)
Expand Down Expand Up @@ -47,7 +49,7 @@ type MetricVec struct {
func NewMetricVec(desc *Desc, newMetric func(lvs ...string) Metric) *MetricVec {
return &MetricVec{
metricMap: &metricMap{
metrics: map[uint64][]metricWithLabelValues{},
metrics: map[uint64][]*metricWithLabelValues{},
desc: desc,
newMetric: newMetric,
},
Expand All @@ -56,6 +58,30 @@ func NewMetricVec(desc *Desc, newMetric func(lvs ...string) Metric) *MetricVec {
}
}

// NewMetricVecWithTTL returns an initialized MetricVec with TTL-based expiration.
// Children that have not been accessed (via GetMetricWith or GetMetricWithLabelValues)
// for longer than ttl will be excluded from Collect and can be cleaned up via
// CleanupExpired. If ttl is 0, this behaves identically to NewMetricVec.
func NewMetricVecWithTTL(desc *Desc, newMetric func(lvs ...string) Metric, ttl time.Duration) *MetricVec {
return &MetricVec{
metricMap: &metricMap{
metrics: map[uint64][]*metricWithLabelValues{},
desc: desc,
newMetric: newMetric,
ttl: ttl,
},
hashAdd: hashAdd,
hashAddByte: hashAddByte,
}
}

// CleanupExpired removes all children that have not been accessed within the
// configured TTL. It returns the number of children removed. If TTL is not
// configured (zero), this is a no-op and returns 0.
func (m *MetricVec) CleanupExpired() int {
return m.cleanupExpired()
}

// DeleteLabelValues removes the metric where the variable labels are the same
// as those passed in as labels (same order as the VariableLabels in Desc). It
// returns true if a metric was deleted.
Expand Down Expand Up @@ -304,8 +330,9 @@ func (m *MetricVec) hashLabels(labels Labels) (uint64, error) {
// metricWithLabelValues provides the metric and its label values for
// disambiguation on hash collision.
type metricWithLabelValues struct {
values []string
metric Metric
values []string
metric Metric
lastAccessed atomic.Int64 // unix timestamp in milliseconds; only used when TTL > 0
}

// curriedLabelValue sets the curried value for a label at the given index.
Expand All @@ -318,9 +345,10 @@ type curriedLabelValue struct {
// metricVecs.
type metricMap struct {
mtx sync.RWMutex // Protects metrics.
metrics map[uint64][]metricWithLabelValues
metrics map[uint64][]*metricWithLabelValues
desc *Desc
newMetric func(labelValues ...string) Metric
ttl time.Duration // if > 0, enables TTL-based expiration
}

// Describe implements Collector. It will send exactly one Desc to the provided
Expand All @@ -334,9 +362,17 @@ func (m *metricMap) Collect(ch chan<- Metric) {
m.mtx.RLock()
defer m.mtx.RUnlock()

var deadline int64
if m.ttl > 0 {
deadline = time.Now().Add(-m.ttl).UnixMilli()
}

for _, metrics := range m.metrics {
for _, metric := range metrics {
ch <- metric.metric
for i := range metrics {
if m.ttl > 0 && metrics[i].lastAccessed.Load() < deadline {
continue
}
ch <- metrics[i].metric
}
}
}
Expand All @@ -351,6 +387,93 @@ func (m *metricMap) Reset() {
}
}

// touchByHash updates lastAccessed for a metric found by hash and label values.
// Acquires RLock internally.
func (m *metricMap) touchByHash(h uint64, lvs []string, curry []curriedLabelValue) {
m.mtx.RLock()
defer m.mtx.RUnlock()
m.touchByHashRLocked(h, lvs, curry)
}

func (m *metricMap) touchByHashRLocked(h uint64, lvs []string, curry []curriedLabelValue) {
now := time.Now().UnixMilli()
metrics, ok := m.metrics[h]
if !ok {
return
}
if i := findMetricWithLabelValues(metrics, lvs, curry); i < len(metrics) {
metrics[i].lastAccessed.Store(now)
}
}

func (m *metricMap) touchByHashLocked(h uint64, lvs []string, curry []curriedLabelValue) {
now := time.Now().UnixMilli()
metrics, ok := m.metrics[h]
if !ok {
return
}
if i := findMetricWithLabelValues(metrics, lvs, curry); i < len(metrics) {
metrics[i].lastAccessed.Store(now)
}
}

func (m *metricMap) touchByHashLabels(h uint64, labels Labels, curry []curriedLabelValue) {
m.mtx.RLock()
defer m.mtx.RUnlock()
m.touchByHashLabelsRLocked(h, labels, curry)
}

func (m *metricMap) touchByHashLabelsRLocked(h uint64, labels Labels, curry []curriedLabelValue) {
now := time.Now().UnixMilli()
metrics, ok := m.metrics[h]
if !ok {
return
}
if i := findMetricWithLabels(m.desc, metrics, labels, curry); i < len(metrics) {
metrics[i].lastAccessed.Store(now)
}
}

func (m *metricMap) touchByHashLabelsLocked(h uint64, labels Labels, curry []curriedLabelValue) {
now := time.Now().UnixMilli()
metrics, ok := m.metrics[h]
if !ok {
return
}
if i := findMetricWithLabels(m.desc, metrics, labels, curry); i < len(metrics) {
metrics[i].lastAccessed.Store(now)
}
}

// cleanupExpired removes all children whose lastAccessed is older than TTL.
func (m *metricMap) cleanupExpired() int {
if m.ttl <= 0 {
return 0
}

deadline := time.Now().Add(-m.ttl).UnixMilli()
m.mtx.Lock()
defer m.mtx.Unlock()

var numDeleted int
for h, metrics := range m.metrics {
remaining := metrics[:0]
for i := range metrics {
if metrics[i].lastAccessed.Load() >= deadline {
remaining = append(remaining, metrics[i])
} else {
numDeleted++
}
}
if len(remaining) == 0 {
delete(m.metrics, h)
} else {
m.metrics[h] = remaining
}
}
return numDeleted
}

// deleteByHashWithLabelValues removes the metric from the hash bucket h. If
// there are multiple matches in the bucket, use lvs to select a metric and
// remove only that metric.
Expand All @@ -373,7 +496,7 @@ func (m *metricMap) deleteByHashWithLabelValues(
if len(metrics) > 1 {
old := metrics
m.metrics[h] = append(metrics[:i], metrics[i+1:]...)
old[len(old)-1] = metricWithLabelValues{}
old[len(old)-1] = nil
} else {
delete(m.metrics, h)
}
Expand Down Expand Up @@ -401,7 +524,7 @@ func (m *metricMap) deleteByHashWithLabels(
if len(metrics) > 1 {
old := metrics
m.metrics[h] = append(metrics[:i], metrics[i+1:]...)
old[len(old)-1] = metricWithLabelValues{}
old[len(old)-1] = nil
} else {
delete(m.metrics, h)
}
Expand Down Expand Up @@ -431,10 +554,10 @@ func (m *metricMap) deleteByLabels(labels Labels, curry []curriedLabelValue) int
// findMetricWithPartialLabel returns the index of the matching metric or
// len(metrics) if not found.
func findMetricWithPartialLabels(
desc *Desc, metrics []metricWithLabelValues, labels Labels, curry []curriedLabelValue,
desc *Desc, metrics []*metricWithLabelValues, labels Labels, curry []curriedLabelValue,
) int {
for i, metric := range metrics {
if matchPartialLabels(desc, metric.values, labels, curry) {
for i := range metrics {
if matchPartialLabels(desc, metrics[i].values, labels, curry) {
return i
}
}
Expand Down Expand Up @@ -495,6 +618,9 @@ func (m *metricMap) getOrCreateMetricWithLabelValues(
metric, ok := m.getMetricWithHashAndLabelValues(hash, lvs, curry)
m.mtx.RUnlock()
if ok {
if m.ttl > 0 {
m.touchByHash(hash, lvs, curry)
}
return metric
}

Expand All @@ -504,7 +630,13 @@ func (m *metricMap) getOrCreateMetricWithLabelValues(
if !ok {
inlinedLVs := inlineLabelValues(lvs, curry)
metric = m.newMetric(inlinedLVs...)
m.metrics[hash] = append(m.metrics[hash], metricWithLabelValues{values: inlinedLVs, metric: metric})
entry := &metricWithLabelValues{values: inlinedLVs, metric: metric}
if m.ttl > 0 {
entry.lastAccessed.Store(time.Now().UnixMilli())
}
m.metrics[hash] = append(m.metrics[hash], entry)
} else if m.ttl > 0 {
m.touchByHashLocked(hash, lvs, curry)
}
return metric
}
Expand All @@ -520,6 +652,9 @@ func (m *metricMap) getOrCreateMetricWithLabels(
metric, ok := m.getMetricWithHashAndLabels(hash, labels, curry)
m.mtx.RUnlock()
if ok {
if m.ttl > 0 {
m.touchByHashLabels(hash, labels, curry)
}
return metric
}

Expand All @@ -529,7 +664,13 @@ func (m *metricMap) getOrCreateMetricWithLabels(
if !ok {
lvs := extractLabelValues(m.desc, labels, curry)
metric = m.newMetric(lvs...)
m.metrics[hash] = append(m.metrics[hash], metricWithLabelValues{values: lvs, metric: metric})
entry := &metricWithLabelValues{values: lvs, metric: metric}
if m.ttl > 0 {
entry.lastAccessed.Store(time.Now().UnixMilli())
}
m.metrics[hash] = append(m.metrics[hash], entry)
} else if m.ttl > 0 {
m.touchByHashLabelsLocked(hash, labels, curry)
}
return metric
}
Expand Down Expand Up @@ -565,10 +706,10 @@ func (m *metricMap) getMetricWithHashAndLabels(
// findMetricWithLabelValues returns the index of the matching metric or
// len(metrics) if not found.
func findMetricWithLabelValues(
metrics []metricWithLabelValues, lvs []string, curry []curriedLabelValue,
metrics []*metricWithLabelValues, lvs []string, curry []curriedLabelValue,
) int {
for i, metric := range metrics {
if matchLabelValues(metric.values, lvs, curry) {
for i := range metrics {
if matchLabelValues(metrics[i].values, lvs, curry) {
return i
}
}
Expand All @@ -578,10 +719,10 @@ func findMetricWithLabelValues(
// findMetricWithLabels returns the index of the matching metric or len(metrics)
// if not found.
func findMetricWithLabels(
desc *Desc, metrics []metricWithLabelValues, labels Labels, curry []curriedLabelValue,
desc *Desc, metrics []*metricWithLabelValues, labels Labels, curry []curriedLabelValue,
) int {
for i, metric := range metrics {
if matchLabels(desc, metric.values, labels, curry) {
for i := range metrics {
if matchLabels(desc, metrics[i].values, labels, curry) {
return i
}
}
Expand Down
Loading
Loading