diff --git a/internal/store/builder.go b/internal/store/builder.go index 890550a355..886c9827ec 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -68,6 +68,7 @@ var _ ksmtypes.BuilderInterface = &Builder{} type Builder struct { kubeClient clientset.Interface ctx context.Context + cancel func() familyGeneratorFilter generator.FamilyGeneratorFilter customResourceClients map[string]interface{} listWatchMetrics *watch.ListWatchMetrics @@ -91,7 +92,8 @@ type Builder struct { // NewBuilder returns a new builder. func NewBuilder() *Builder { - b := &Builder{} + ctx, cancel := context.WithCancel(context.Background()) + b := &Builder{ctx: ctx, cancel: cancel} return b } @@ -149,9 +151,17 @@ func (b *Builder) WithSharding(shard int32, totalShards int) { b.shardingMetrics.Total.Set(float64(totalShards)) } -// WithContext sets the ctx property of a Builder. +// WithContext will observe the cancellations of the provided context +// to cancel the internal b.ctx. func (b *Builder) WithContext(ctx context.Context) { - b.ctx = ctx + // WithContext might be called concurrently with startReflector. + // In order to avoid the data race, and also to notify the reflectors that + // are already running with the context, we don't replace b.ctx, + // but just subscribe to the cancellations of the provided context. + go func() { + <-ctx.Done() + b.cancel() + }() } // WithKubeClient sets the kubeClient property of a Builder. @@ -621,7 +631,16 @@ func (b *Builder) startReflector( instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit) reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0}) if cr, ok := expectedType.(*unstructured.Unstructured); ok { - go reflector.Run((*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()]) + stopCh := make(chan struct{}) + crStopCh := (*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()] + go func(builderContext context.Context) { + select { + case <-builderContext.Done(): + case <-crStopCh: + } + close(stopCh) + }(b.ctx) + go reflector.Run(stopCh) } else { go reflector.Run(b.ctx.Done()) }