From 6863cff2d9fd4c34e87ab84830bcd04850663cd9 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Fri, 6 Mar 2026 09:34:24 +0100 Subject: [PATCH 1/2] fix: stop custom resource reflectors on context cancellation Custom resource reflectors only listened to their per-GVK stop channel, ignoring context cancellation from BuildWriters. When BuildWriters rebuilt stores (e.g. after a CRD update event), it cancelled the context to stop old reflectors, but custom resource reflectors kept running. Each rebuild accumulated leaked Reflector.Run and StreamWatcher.receive goroutine pairs, causing unbounded memory, CPU, and network growth. Merge both stop signals so CR reflectors stop on either context cancellation or per-GVK stop channel close. --- internal/store/builder.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/store/builder.go b/internal/store/builder.go index 890550a355..b2a16ee23c 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -621,7 +621,16 @@ func (b *Builder) startReflector( instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit) reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0}) if cr, ok := expectedType.(*unstructured.Unstructured); ok { - go reflector.Run((*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()]) + stopCh := make(chan struct{}) + crStopCh := (*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()] + go func() { + select { + case <-b.ctx.Done(): + case <-crStopCh: + } + close(stopCh) + }() + go reflector.Run(stopCh) } else { go reflector.Run(b.ctx.Done()) } From 6efdcb452b2465c0269f2aa76fd8fd875169e62e Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Fri, 6 Mar 2026 15:16:57 +0100 Subject: [PATCH 2/2] fix: Builder owns its context lifecycle to avoid data races Builder now creates its own cancellable context in NewBuilder instead of accepting one via WithContext. WithContext subscribes to the provided context's cancellation rather than replacing b.ctx, which avoids a data race with concurrent startReflector reads and ensures already-running reflectors get notified on cancellation. --- internal/store/builder.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/internal/store/builder.go b/internal/store/builder.go index b2a16ee23c..886c9827ec 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -68,6 +68,7 @@ var _ ksmtypes.BuilderInterface = &Builder{} type Builder struct { kubeClient clientset.Interface ctx context.Context + cancel func() familyGeneratorFilter generator.FamilyGeneratorFilter customResourceClients map[string]interface{} listWatchMetrics *watch.ListWatchMetrics @@ -91,7 +92,8 @@ type Builder struct { // NewBuilder returns a new builder. func NewBuilder() *Builder { - b := &Builder{} + ctx, cancel := context.WithCancel(context.Background()) + b := &Builder{ctx: ctx, cancel: cancel} return b } @@ -149,9 +151,17 @@ func (b *Builder) WithSharding(shard int32, totalShards int) { b.shardingMetrics.Total.Set(float64(totalShards)) } -// WithContext sets the ctx property of a Builder. +// WithContext will observe the cancellations of the provided context +// to cancel the internal b.ctx. func (b *Builder) WithContext(ctx context.Context) { - b.ctx = ctx + // WithContext might be called concurrently with startReflector. + // In order to avoid the data race, and also to notify the reflectors that + // are already running with the context, we don't replace b.ctx, + // but just subscribe to the cancellations of the provided context. + go func() { + <-ctx.Done() + b.cancel() + }() } // WithKubeClient sets the kubeClient property of a Builder. @@ -623,13 +633,13 @@ func (b *Builder) startReflector( if cr, ok := expectedType.(*unstructured.Unstructured); ok { stopCh := make(chan struct{}) crStopCh := (*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()] - go func() { + go func(builderContext context.Context) { select { - case <-b.ctx.Done(): + case <-builderContext.Done(): case <-crStopCh: } close(stopCh) - }() + }(b.ctx) go reflector.Run(stopCh) } else { go reflector.Run(b.ctx.Done())