Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Following is the supported API format for kafka encode:
readTimeout: timeout (in seconds) for read operation performed by the Writer
batchBytes: limit the maximum size of a request in bytes before being sent to a partition
batchSize: limit on how many messages will be buffered before being sent to a partition
compression: compression codec: none (default), gzip, snappy, lz4, zstd
tls: TLS client configuration (optional)
insecureSkipVerify: skip client verifying the server's certificate chain and host name
caCertPath: path to the CA certificate
Expand Down
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.67.5
github.com/prometheus/common v0.67.6-0.20260224092343-e4c38a0aea47
github.com/segmentio/kafka-go v0.4.50
github.com/sirupsen/logrus v1.9.4
github.com/spf13/cobra v1.10.2
Expand Down Expand Up @@ -49,7 +49,7 @@ require (
)

require (
github.com/golang-jwt/jwt/v5 v5.3.0 // indirect
github.com/golang-jwt/jwt/v5 v5.3.1 // indirect
github.com/klauspost/crc32 v1.3.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
Expand Down Expand Up @@ -93,7 +93,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.18.2 // indirect
github.com/klauspost/compress v1.18.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
Expand All @@ -114,7 +114,7 @@ require (
github.com/pion/logging v0.2.3 // indirect
github.com/pion/transport/v2 v2.2.10 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
github.com/prometheus/prometheus v0.304.0 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/sagikazarmark/locafero v0.11.0 // indirect
Expand Down Expand Up @@ -157,3 +157,6 @@ require (
)

replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101

// Local client_golang (TTLRegistry); switch to upstream tag or fork pseudo-version when published.
replace github.com/prometheus/client_golang => ../client_golang
18 changes: 8 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9L
github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
Expand Down Expand Up @@ -156,8 +156,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
Expand Down Expand Up @@ -245,14 +245,12 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4=
github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/prometheus/common v0.67.6-0.20260224092343-e4c38a0aea47 h1:DXz2mpU9UOivlNi73hwwVm7KSYGImaPDdYsniI6uOdE=
github.com/prometheus/common v0.67.6-0.20260224092343-e4c38a0aea47/go.mod h1:6wuJRcHcgBM1w2vIgTuCW7XSzlfCat/lS3C39U02oDA=
github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc=
github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo=
github.com/prometheus/prometheus v0.304.0 h1:otXBqfF7bbTcW7IrXrB6HMjo4dThQbayCPFr2yTlqrQ=
github.com/prometheus/prometheus v0.304.0/go.mod h1:ioGx2SGKTY+fLnJSQCdTHqARVldGNS8OlIe3kvp98so=
github.com/prometheus/sigv4 v0.1.2 h1:R7570f8AoM5YnTUPFm3mjZH5q2k4D+I/phCWvZ4PXG8=
Expand Down
88 changes: 54 additions & 34 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,48 +97,65 @@ func (e *Prometheus) ProcessAggHist(m interface{}, _ string, labels map[string]s
return nil
}

func (e *Prometheus) GetCacheEntry(entryLabels map[string]string, m interface{}) interface{} {
// In prom_encode, the metrics cache just contains cleanup callbacks
switch mv := m.(type) {
case *prometheus.CounterVec:
return func() { mv.Delete(entryLabels) }
case *prometheus.GaugeVec:
return func() { mv.Delete(entryLabels) }
case *prometheus.HistogramVec:
return func() { mv.Delete(entryLabels) }
}
func (e *Prometheus) GetCacheEntry(_ map[string]string, _ interface{}) interface{} {
// With Vec-native TTL, no cache entry is needed for Prometheus
return nil
}

// callback function from lru cleanup
func (e *Prometheus) Cleanup(cleanupFunc interface{}) {
cleanupFunc.(func())()
}

func (e *Prometheus) addCounter(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
var counter *prometheus.CounterVec
if ttlReg, ok := e.registerer.(*prometheus.TTLRegistry); ok {
counter = ttlReg.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
} else {
counter = prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
}
e.metricCommon.AddCounter(fullMetricName, counter, mInfo)
return counter
}

func (e *Prometheus) addGauge(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
var gauge *prometheus.GaugeVec
if ttlReg, ok := e.registerer.(*prometheus.TTLRegistry); ok {
gauge = ttlReg.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
} else {
gauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
}
e.metricCommon.AddGauge(fullMetricName, gauge, mInfo)
return gauge
}

func (e *Prometheus) addHistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
var histogram *prometheus.HistogramVec
if ttlReg, ok := e.registerer.(*prometheus.TTLRegistry); ok {
histogram = ttlReg.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
} else {
histogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
}
e.metricCommon.AddHist(fullMetricName, histogram, mInfo)
return histogram
}

func (e *Prometheus) addAgghistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
var agghistogram *prometheus.HistogramVec
if ttlReg, ok := e.registerer.(*prometheus.TTLRegistry); ok {
agghistogram = ttlReg.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
} else {
agghistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
}
e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo)
return agghistogram
}

// registerIfNeeded registers c unless it was already registered by TTLRegistry constructors.
func (e *Prometheus) registerIfNeeded(c prometheus.Collector) {
if _, ok := e.registerer.(*prometheus.TTLRegistry); ok {
return
}
if err := e.registerer.Register(c); err != nil {
plog.Errorf("error in prometheus.Register: %v", err)
}
}

func (e *Prometheus) unregisterMetric(c interface{}) {
if c, ok := c.(prometheus.Collector); ok {
e.registerer.Unregister(c)
Expand Down Expand Up @@ -190,20 +207,14 @@ func (e *Prometheus) checkMetricUpdate(prefix string, apiItem *api.MetricsItem,
plog.Debug("Changes detected: unregistering and replacing")
e.unregisterMetric(oldMetric.genericMetric)
c := createMetric(fullMetricName, mInfo)
err := e.registerer.Register(c)
if err != nil {
plog.Errorf("error in prometheus.Register: %v", err)
}
e.registerIfNeeded(c)
} else {
plog.Debug("No changes found")
}
} else {
plog.Debug("New metric")
c := createMetric(fullMetricName, mInfo)
err := e.registerer.Register(c)
if err != nil {
plog.Errorf("error in prometheus.Register: %v", err)
}
e.registerIfNeeded(c)
}
return false
}
Expand Down Expand Up @@ -253,8 +264,16 @@ func (e *Prometheus) checkConfUpdate() {

func (e *Prometheus) resetRegistry() {
e.metricCommon.cleanupInfoStructs()
reg := prometheus.NewRegistry()
e.registerer = reg
var reg prometheus.Gatherer
if e.metricCommon.mCache == nil {
ttlReg := prometheus.NewTTLRegistry(e.metricCommon.expiryTime)
e.registerer = ttlReg
reg = ttlReg
} else {
r := prometheus.NewRegistry()
e.registerer = r
reg = r
}
for i := range e.cfg.Metrics {
mCfg := &e.cfg.Metrics[i]
fullMetricName := e.cfg.Prefix + mCfg.Name
Expand All @@ -275,10 +294,7 @@ func (e *Prometheus) resetRegistry() {
continue
}
if m != nil {
err := e.registerer.Register(m)
if err != nil {
plog.Errorf("error in prometheus.Register: %v", err)
}
e.registerIfNeeded(m)
}
}
e.server.SetRegistry(e.regName, reg)
Expand All @@ -296,6 +312,8 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
}
plog.Debugf("expiryTime = %v", expiryTime)

// Placeholder gatherer; resetRegistry replaces it with a TTLRegistry (Vec TTL path)
// or a plain Registry (TimedCache path) before scraping.
registry := prometheus.NewRegistry()

w := &Prometheus{
Expand All @@ -311,12 +329,14 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
w.server = promserver.StartServerAsync(cfg.PromConnectionInfo, params.Name, registry)
}

metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup)
metricCommon := NewMetricsCommonStructWithVecTTL(opMetrics, cfg.MaxMetrics, params.Name, expiryTime)
w.metricCommon = metricCommon

// Init metrics
w.resetRegistry()

w.metricCommon.StartCleanupLoop()

return w, nil
}

Expand Down
Loading
Loading