From 7345a5c82ccd215a0a59b4348fea263dae18758a Mon Sep 17 00:00:00 2001 From: Alexander North Date: Mon, 19 Jan 2026 17:06:03 +0100 Subject: [PATCH 1/8] split discovery into apiservice and crd discovery and monitoring --- internal/discovery/discovery.go | 129 ++++++++++++++++++++++++-------- 1 file changed, 99 insertions(+), 30 deletions(-) diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index 55a79fddf7..de1b13d40e 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -17,10 +17,12 @@ package discovery import ( "context" "fmt" + "strings" "time" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + k8sdiscovery "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/rest" @@ -37,39 +39,27 @@ import ( // Interval is the time interval between two cache sync checks. const Interval = 3 * time.Second +type gvkExtractor func(obj interface{}) []groupVersionKindPlural + // StartDiscovery starts the discovery process, fetching all the objects that can be listed from the apiserver, every `Interval` seconds. // resolveGVK needs to be called after StartDiscovery to generate factories. func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) error { - client := dynamic.NewForConfigOrDie(config) - factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{ - Group: "apiextensions.k8s.io", - Version: "v1", - Resource: "customresourcedefinitions", - }, "", 0, nil, nil) - informer := factory.Informer() - stopper := make(chan struct{}) - extractGVKPs := func(obj interface{}) []groupVersionKindPlural { - objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) - var gvkps []groupVersionKindPlural - for _, version := range objSpec["versions"].([]interface{}) { - g := objSpec["group"].(string) - v := version.(map[string]interface{})["name"].(string) - k := objSpec["names"].(map[string]interface{})["kind"].(string) - p := objSpec["names"].(map[string]interface{})["plural"].(string) - gvkps = append(gvkps, groupVersionKindPlural{ - GroupVersionKind: schema.GroupVersionKind{ - Group: g, - Version: v, - Kind: k, - }, - Plural: p, - }) - } - return gvkps + err := r.startCRDDiscovery(ctx, config) + if err != nil { + return err } + err = r.startAPIServiceDiscovery(ctx, config) + if err != nil { + return err + } + return nil +} + +func (r *CRDiscoverer) runInformer(ctx context.Context, informer cache.SharedIndexInformer, gvkExtractor gvkExtractor) error { + stopper := make(chan struct{}) _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - gvkps := extractGVKPs(obj) + gvkps := gvkExtractor(obj) r.SafeWrite(func() { r.AppendToMap(gvkps...) r.WasUpdated = true @@ -80,8 +70,8 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) }) }, UpdateFunc: func(oldObj, newObj interface{}) { - oldGVKPs := extractGVKPs(oldObj) - newGVKPs := extractGVKPs(newObj) + oldGVKPs := gvkExtractor(oldObj) + newGVKPs := gvkExtractor(newObj) r.SafeWrite(func() { r.RemoveFromMap(oldGVKPs...) r.AppendToMap(newGVKPs...) @@ -92,7 +82,7 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) }) }, DeleteFunc: func(obj interface{}) { - gvkps := extractGVKPs(obj) + gvkps := gvkExtractor(obj) r.SafeWrite(func() { r.RemoveFromMap(gvkps...) r.WasUpdated = true @@ -106,6 +96,7 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) if err != nil { return err } + // Respect context cancellation. go func() { for range ctx.Done() { @@ -118,6 +109,84 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) return nil } +func (r *CRDiscoverer) startCRDDiscovery(ctx context.Context, config *rest.Config) error { + client := dynamic.NewForConfigOrDie(config) + factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{ + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", + }, "", 0, nil, nil) + + extractGVKPs := func(obj interface{}) []groupVersionKindPlural { + objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) + var gvkps []groupVersionKindPlural + for _, version := range objSpec["versions"].([]interface{}) { + g := objSpec["group"].(string) + v := version.(map[string]interface{})["name"].(string) + k := objSpec["names"].(map[string]interface{})["kind"].(string) + p := objSpec["names"].(map[string]interface{})["plural"].(string) + gvkps = append(gvkps, groupVersionKindPlural{ + GroupVersionKind: schema.GroupVersionKind{ + Group: g, + Version: v, + Kind: k, + }, + Plural: p, + }) + } + return gvkps + } + + return r.runInformer(ctx, factory.Informer(), extractGVKPs) +} + +func (r *CRDiscoverer) startAPIServiceDiscovery(ctx context.Context, config *rest.Config) error { + client := dynamic.NewForConfigOrDie(config) + factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{ + Group: "apiregistration.k8s.io", + Version: "v1", + Resource: "apiservices", + }, "", 0, nil, nil) + + discoveryClient := k8sdiscovery.NewDiscoveryClientForConfigOrDie(config) + + processAPIService := func(obj interface{}) []groupVersionKindPlural { + serviceSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) + if svc, ok := serviceSpec["service"]; !ok || svc == nil { + return nil + } + + group := serviceSpec["group"].(string) + version := serviceSpec["version"].(string) + + resourceList, err := discoveryClient.ServerResourcesForGroupVersion(fmt.Sprintf("%s/%s", group, version)) + if err != nil { + klog.ErrorS(err, "failed to fetch server resources for group version", "groupVersion", fmt.Sprintf("%s/%s", group, version)) + return nil + } + + var gvkps []groupVersionKindPlural + for _, resource := range resourceList.APIResources { + // Skip subresources + if strings.Contains(resource.Name, "/") { + continue + } + + gvkps = append(gvkps, groupVersionKindPlural{ + GroupVersionKind: schema.GroupVersionKind{ + Group: group, + Version: version, + Kind: resource.Kind, + }, + Plural: resource.Name, + }) + } + return gvkps + } + + return r.runInformer(ctx, factory.Informer(), processAPIService) +} + // ResolveGVKToGVKPs resolves the variable VKs to a GVK list, based on the current cache. func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedGVKPs []groupVersionKindPlural, err error) { // nolint:revive g := gvk.Group From 1bf641aacac42bccba03479710176d3253e8b28e Mon Sep 17 00:00:00 2001 From: Alexander North Date: Tue, 20 Jan 2026 11:44:43 +0100 Subject: [PATCH 2/8] refactor gvkExtraction to interface --- internal/discovery/discovery.go | 71 +++--------------- internal/discovery/gvk_extractors.go | 104 +++++++++++++++++++++++++++ internal/discovery/types.go | 4 ++ 3 files changed, 117 insertions(+), 62 deletions(-) create mode 100644 internal/discovery/gvk_extractors.go diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index de1b13d40e..a5ca2e1fec 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -17,10 +17,8 @@ package discovery import ( "context" "fmt" - "strings" "time" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" k8sdiscovery "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" @@ -39,8 +37,6 @@ import ( // Interval is the time interval between two cache sync checks. const Interval = 3 * time.Second -type gvkExtractor func(obj interface{}) []groupVersionKindPlural - // StartDiscovery starts the discovery process, fetching all the objects that can be listed from the apiserver, every `Interval` seconds. // resolveGVK needs to be called after StartDiscovery to generate factories. func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) error { @@ -59,7 +55,7 @@ func (r *CRDiscoverer) runInformer(ctx context.Context, informer cache.SharedInd stopper := make(chan struct{}) _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - gvkps := gvkExtractor(obj) + gvkps := gvkExtractor.ExtractGVKs(obj) r.SafeWrite(func() { r.AppendToMap(gvkps...) r.WasUpdated = true @@ -70,8 +66,8 @@ func (r *CRDiscoverer) runInformer(ctx context.Context, informer cache.SharedInd }) }, UpdateFunc: func(oldObj, newObj interface{}) { - oldGVKPs := gvkExtractor(oldObj) - newGVKPs := gvkExtractor(newObj) + oldGVKPs := gvkExtractor.ExtractGVKs(oldObj) + newGVKPs := gvkExtractor.ExtractGVKs(newObj) r.SafeWrite(func() { r.RemoveFromMap(oldGVKPs...) r.AppendToMap(newGVKPs...) @@ -82,7 +78,7 @@ func (r *CRDiscoverer) runInformer(ctx context.Context, informer cache.SharedInd }) }, DeleteFunc: func(obj interface{}) { - gvkps := gvkExtractor(obj) + gvkps := gvkExtractor.ExtractGVKs(obj) r.SafeWrite(func() { r.RemoveFromMap(gvkps...) r.WasUpdated = true @@ -117,27 +113,9 @@ func (r *CRDiscoverer) startCRDDiscovery(ctx context.Context, config *rest.Confi Resource: "customresourcedefinitions", }, "", 0, nil, nil) - extractGVKPs := func(obj interface{}) []groupVersionKindPlural { - objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) - var gvkps []groupVersionKindPlural - for _, version := range objSpec["versions"].([]interface{}) { - g := objSpec["group"].(string) - v := version.(map[string]interface{})["name"].(string) - k := objSpec["names"].(map[string]interface{})["kind"].(string) - p := objSpec["names"].(map[string]interface{})["plural"].(string) - gvkps = append(gvkps, groupVersionKindPlural{ - GroupVersionKind: schema.GroupVersionKind{ - Group: g, - Version: v, - Kind: k, - }, - Plural: p, - }) - } - return gvkps - } + gvkExtractor := &crdGVKExtractor{} - return r.runInformer(ctx, factory.Informer(), extractGVKPs) + return r.runInformer(ctx, factory.Informer(), gvkExtractor) } func (r *CRDiscoverer) startAPIServiceDiscovery(ctx context.Context, config *rest.Config) error { @@ -149,42 +127,11 @@ func (r *CRDiscoverer) startAPIServiceDiscovery(ctx context.Context, config *res }, "", 0, nil, nil) discoveryClient := k8sdiscovery.NewDiscoveryClientForConfigOrDie(config) - - processAPIService := func(obj interface{}) []groupVersionKindPlural { - serviceSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) - if svc, ok := serviceSpec["service"]; !ok || svc == nil { - return nil - } - - group := serviceSpec["group"].(string) - version := serviceSpec["version"].(string) - - resourceList, err := discoveryClient.ServerResourcesForGroupVersion(fmt.Sprintf("%s/%s", group, version)) - if err != nil { - klog.ErrorS(err, "failed to fetch server resources for group version", "groupVersion", fmt.Sprintf("%s/%s", group, version)) - return nil - } - - var gvkps []groupVersionKindPlural - for _, resource := range resourceList.APIResources { - // Skip subresources - if strings.Contains(resource.Name, "/") { - continue - } - - gvkps = append(gvkps, groupVersionKindPlural{ - GroupVersionKind: schema.GroupVersionKind{ - Group: group, - Version: version, - Kind: resource.Kind, - }, - Plural: resource.Name, - }) - } - return gvkps + gvkExtractor := &apiServiceGVKExtractor{ + discoveryClient: discoveryClient, } - return r.runInformer(ctx, factory.Informer(), processAPIService) + return r.runInformer(ctx, factory.Informer(), gvkExtractor) } // ResolveGVKToGVKPs resolves the variable VKs to a GVK list, based on the current cache. diff --git a/internal/discovery/gvk_extractors.go b/internal/discovery/gvk_extractors.go new file mode 100644 index 0000000000..e1447a0b2e --- /dev/null +++ b/internal/discovery/gvk_extractors.go @@ -0,0 +1,104 @@ +package discovery + +import ( + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + k8sdiscovery "k8s.io/client-go/discovery" + "k8s.io/klog/v2" +) + +type crdGVKExtractor struct{} + +func (e *crdGVKExtractor) ExtractGVKs(obj interface{}) []groupVersionKindPlural { + objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) + var gvkps []groupVersionKindPlural + for _, version := range objSpec["versions"].([]interface{}) { + g := objSpec["group"].(string) + v := version.(map[string]interface{})["name"].(string) + k := objSpec["names"].(map[string]interface{})["kind"].(string) + p := objSpec["names"].(map[string]interface{})["plural"].(string) + gvkps = append(gvkps, groupVersionKindPlural{ + GroupVersionKind: schema.GroupVersionKind{ + Group: g, + Version: v, + Kind: k, + }, + Plural: p, + }) + } + return gvkps +} + +func isAPIServiceReady(obj interface{}) bool { + status, found, err := unstructured.NestedSlice(obj.(*unstructured.Unstructured).Object, "status", "conditions") + if err != nil || !found { + return false + } + + for _, condition := range status { + conditionMap, ok := condition.(map[string]interface{}) + if !ok { + continue // skip invalid condition + } + if conditionMap["type"] == "Available" && conditionMap["status"] == "True" { + return true + } + } + return false +} + +type apiServiceGVKExtractor struct { + discoveryClient *k8sdiscovery.DiscoveryClient +} + +func (e *apiServiceGVKExtractor) ExtractGVKs(obj interface{}) []groupVersionKindPlural { + serviceSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) + group, _, err := unstructured.NestedString(serviceSpec, "group") + if err != nil { + klog.ErrorS(err, "failed to extract group from APIService") + return nil + } + version, _, err := unstructured.NestedString(serviceSpec, "version") + if err != nil { + klog.ErrorS(err, "failed to extract version from APIService") + return nil + } + + // check if APIService has a service defined - i.e. not local + if svc, ok := serviceSpec["service"]; !ok || svc == nil { + klog.V(5).InfoS("skipping local APIService", "group", group, "version", version) + return nil + } + + if !isAPIServiceReady(obj) { + klog.V(5).InfoS("skipping non-ready APIService", "group", group, "version", version) + return nil + } + + resourceList, err := e.discoveryClient.ServerResourcesForGroupVersion(fmt.Sprintf("%s/%s", group, version)) + if err != nil { + klog.ErrorS(err, "failed to fetch server resources for group version", "groupVersion", fmt.Sprintf("%s/%s", group, version)) + return nil + } + + var gvkps []groupVersionKindPlural + for _, resource := range resourceList.APIResources { + // Skip subresources + if strings.Contains(resource.Name, "/") { + continue + } + + gvkps = append(gvkps, groupVersionKindPlural{ + GroupVersionKind: schema.GroupVersionKind{ + Group: group, + Version: version, + Kind: resource.Kind, + }, + Plural: resource.Name, + }) + } + return gvkps +} diff --git a/internal/discovery/types.go b/internal/discovery/types.go index b97e442e68..0d5f1635a8 100644 --- a/internal/discovery/types.go +++ b/internal/discovery/types.go @@ -35,6 +35,10 @@ type kindPlural struct { Plural string } +type gvkExtractor interface { + ExtractGVKs(obj interface{}) []groupVersionKindPlural +} + // CRDiscoverer provides a cache of the collected GVKs, along with helper utilities. type CRDiscoverer struct { // CRDsAddEventsCounter tracks the number of times that the CRD informer triggered the "add" event. From 5009bb522e0a15a6305ce2d1b8745e99f1672897 Mon Sep 17 00:00:00 2001 From: Alexander North Date: Tue, 20 Jan 2026 16:33:25 +0100 Subject: [PATCH 3/8] refactor CRDiscoverer so that Resources from CRDs and APIServices can be managed in the same way --- internal/discovery/discovery.go | 147 ++--------- internal/discovery/discovery_test.go | 349 ++++++++++++++++++++------- internal/discovery/gvk_extractors.go | 47 ++-- internal/discovery/types.go | 242 +++++++++++++------ internal/store/builder.go | 13 +- pkg/app/server.go | 31 +-- pkg/builder/types/interfaces.go | 8 + pkg/customresourcestate/config.go | 2 +- 8 files changed, 508 insertions(+), 331 deletions(-) diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index a5ca2e1fec..20ad36c8df 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -16,7 +16,6 @@ package discovery import ( "context" - "fmt" "time" "k8s.io/apimachinery/pkg/runtime/schema" @@ -51,42 +50,22 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) return nil } -func (r *CRDiscoverer) runInformer(ctx context.Context, informer cache.SharedIndexInformer, gvkExtractor gvkExtractor) error { +func (r *CRDiscoverer) runInformer(ctx context.Context, informer cache.SharedIndexInformer, extractor extractor) error { stopper := make(chan struct{}) _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - gvkps := gvkExtractor.ExtractGVKs(obj) - r.SafeWrite(func() { - r.AppendToMap(gvkps...) - r.WasUpdated = true - }) - r.SafeWrite(func() { - r.CRDsAddEventsCounter.Inc() - r.CRDsCacheCountGauge.Inc() - }) + sourceID := extractor.SourceID(obj) + resources := extractor.ExtractGVKs(obj) + r.UpdateSource(sourceID, resources) }, UpdateFunc: func(oldObj, newObj interface{}) { - oldGVKPs := gvkExtractor.ExtractGVKs(oldObj) - newGVKPs := gvkExtractor.ExtractGVKs(newObj) - r.SafeWrite(func() { - r.RemoveFromMap(oldGVKPs...) - r.AppendToMap(newGVKPs...) - r.WasUpdated = true - }) - r.SafeWrite(func() { - r.CRDsUpdateEventsCounter.Inc() - }) + sourceID := extractor.SourceID(newObj) + resources := extractor.ExtractGVKs(newObj) + r.UpdateSource(sourceID, resources) }, DeleteFunc: func(obj interface{}) { - gvkps := gvkExtractor.ExtractGVKs(obj) - r.SafeWrite(func() { - r.RemoveFromMap(gvkps...) - r.WasUpdated = true - }) - r.SafeWrite(func() { - r.CRDsDeleteEventsCounter.Inc() - r.CRDsCacheCountGauge.Dec() - }) + sourceID := extractor.SourceID(obj) + r.DeleteSource(sourceID) }, }) if err != nil { @@ -95,11 +74,9 @@ func (r *CRDiscoverer) runInformer(ctx context.Context, informer cache.SharedInd // Respect context cancellation. go func() { - for range ctx.Done() { - klog.InfoS("context cancelled, stopping discovery") - close(stopper) - return - } + <-ctx.Done() + klog.InfoS("context cancelled, stopping discovery") + close(stopper) }() go informer.Run(stopper) return nil @@ -113,9 +90,9 @@ func (r *CRDiscoverer) startCRDDiscovery(ctx context.Context, config *rest.Confi Resource: "customresourcedefinitions", }, "", 0, nil, nil) - gvkExtractor := &crdGVKExtractor{} + extractor := &crdExtractor{} - return r.runInformer(ctx, factory.Informer(), gvkExtractor) + return r.runInformer(ctx, factory.Informer(), extractor) } func (r *CRDiscoverer) startAPIServiceDiscovery(ctx context.Context, config *rest.Config) error { @@ -127,86 +104,11 @@ func (r *CRDiscoverer) startAPIServiceDiscovery(ctx context.Context, config *res }, "", 0, nil, nil) discoveryClient := k8sdiscovery.NewDiscoveryClientForConfigOrDie(config) - gvkExtractor := &apiServiceGVKExtractor{ + extractor := &apiServiceExtractor{ discoveryClient: discoveryClient, } - return r.runInformer(ctx, factory.Informer(), gvkExtractor) -} - -// ResolveGVKToGVKPs resolves the variable VKs to a GVK list, based on the current cache. -func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedGVKPs []groupVersionKindPlural, err error) { // nolint:revive - g := gvk.Group - v := gvk.Version - k := gvk.Kind - if g == "" || g == "*" { - return nil, fmt.Errorf("group is required in the defined GVK %v", gvk) - } - hasVersion := v != "" && v != "*" - hasKind := k != "" && k != "*" - // No need to resolve, return. - if hasVersion && hasKind { - for _, el := range r.Map[g][v] { - if el.Kind == k { - return []groupVersionKindPlural{ - { - GroupVersionKind: schema.GroupVersionKind{ - Group: g, - Version: v, - Kind: k, - }, - Plural: el.Plural, - }, - }, nil - } - } - } - if hasVersion && !hasKind { - kinds := r.Map[g][v] - for _, el := range kinds { - resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{ - GroupVersionKind: schema.GroupVersionKind{ - Group: g, - Version: v, - Kind: el.Kind, - }, - Plural: el.Plural, - }) - } - } - if !hasVersion && hasKind { - versions := r.Map[g] - for version, kinds := range versions { - for _, el := range kinds { - if el.Kind == k { - resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{ - GroupVersionKind: schema.GroupVersionKind{ - Group: g, - Version: version, - Kind: k, - }, - Plural: el.Plural, - }) - } - } - } - } - if !hasVersion && !hasKind { - versions := r.Map[g] - for version, kinds := range versions { - for _, el := range kinds { - resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{ - GroupVersionKind: schema.GroupVersionKind{ - Group: g, - Version: version, - Kind: el.Kind, - }, - Plural: el.Plural, - }) - } - } - } - return + return r.runInformer(ctx, factory.Informer(), extractor) } // PollForCacheUpdates polls the cache for updates and updates the stores accordingly. @@ -255,28 +157,19 @@ func (r *CRDiscoverer) PollForCacheUpdates( } // Configure the generation function for the custom resource stores. storeBuilder.WithGenerateCustomResourceStoresFunc(storeBuilder.DefaultGenerateCustomResourceStoresFunc()) - // Reset the flag, if there were no errors. Else, we'll try again on the next tick. - // Keep retrying if there were errors. - r.SafeWrite(func() { - r.WasUpdated = false - }) // Update metric handler with the new configs. m.BuildWriters(ctx) } go func() { - for range t.C { + for { select { case <-ctx.Done(): klog.InfoS("context cancelled") t.Stop() return - default: - // Check if cache has been updated. - shouldGenerateMetrics := false - r.SafeRead(func() { - shouldGenerateMetrics = r.WasUpdated - }) - if shouldGenerateMetrics { + case <-t.C: + // Check if cache has been updated and reset the flag. + if r.CheckAndResetUpdated() { generateMetrics() klog.InfoS("discovery finished, cache updated") } diff --git a/internal/discovery/discovery_test.go b/internal/discovery/discovery_test.go index 9ebf631bec..fef7d3f706 100644 --- a/internal/discovery/discovery_test.go +++ b/internal/discovery/discovery_test.go @@ -18,37 +18,51 @@ import ( "sort" "testing" + "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/runtime/schema" ) -func TestGVKMapsResolveGVK(t *testing.T) { +// newTestCRDiscoverer creates a CRDiscoverer with no-op metrics for testing. +func newTestCRDiscoverer() *CRDiscoverer { + return NewCRDiscoverer( + prometheus.NewCounter(prometheus.CounterOpts{Name: "test_update"}), + prometheus.NewCounter(prometheus.CounterOpts{Name: "test_delete"}), + prometheus.NewGauge(prometheus.GaugeOpts{Name: "test_count"}), + ) +} + +func TestResolve(t *testing.T) { type testcase struct { - desc string - gvkmaps *CRDiscoverer - gvk schema.GroupVersionKind - want []groupVersionKindPlural + desc string + resources map[string][]*DiscoveredResource // map[sourceID] -> []resources + gvk schema.GroupVersionKind + want []DiscoveredResource } testcases := []testcase{ { desc: "variable version and kind", - gvkmaps: &CRDiscoverer{ - Map: map[string]map[string][]kindPlural{ - "apps": { - "v1": { - kindPlural{ - Kind: "Deployment", - Plural: "deployments", - }, - kindPlural{ - Kind: "StatefulSet", - Plural: "statefulsets", - }, + resources: map[string][]*DiscoveredResource{ + "crd:deployments.apps": { + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "apps", + Version: "v1", + Kind: "Deployment", + }, + Plural: "deployments", + }, + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "apps", + Version: "v1", + Kind: "StatefulSet", }, + Plural: "statefulsets", }, }, }, gvk: schema.GroupVersionKind{Group: "apps", Version: "*", Kind: "*"}, - want: []groupVersionKindPlural{ + want: []DiscoveredResource{ { GroupVersionKind: schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}, Plural: "deployments", @@ -61,30 +75,36 @@ func TestGVKMapsResolveGVK(t *testing.T) { }, { desc: "variable version", - gvkmaps: &CRDiscoverer{ - Map: map[string]map[string][]kindPlural{ - "testgroup": { - "v1": { - kindPlural{ - Kind: "TestObject1", - Plural: "testobjects1", - }, - kindPlural{ - Kind: "TestObject2", - Plural: "testobjects2", - }, + resources: map[string][]*DiscoveredResource{ + "crd:testobjects.testgroup": { + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1", + Kind: "TestObject1", }, - "v1alpha1": { - kindPlural{ - Kind: "TestObject1", - Plural: "testobjects1", - }, + Plural: "testobjects1", + }, + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1", + Kind: "TestObject2", + }, + Plural: "testobjects2", + }, + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1alpha1", + Kind: "TestObject1", }, + Plural: "testobjects1", }, }, }, gvk: schema.GroupVersionKind{Group: "testgroup", Version: "*", Kind: "TestObject1"}, - want: []groupVersionKindPlural{ + want: []DiscoveredResource{ { GroupVersionKind: schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}, Plural: "testobjects1", @@ -97,30 +117,36 @@ func TestGVKMapsResolveGVK(t *testing.T) { }, { desc: "variable kind", - gvkmaps: &CRDiscoverer{ - Map: map[string]map[string][]kindPlural{ - "testgroup": { - "v1": { - kindPlural{ - Kind: "TestObject1", - Plural: "testobjects1", - }, - kindPlural{ - Kind: "TestObject2", - Plural: "testobjects2", - }, + resources: map[string][]*DiscoveredResource{ + "crd:testobjects.testgroup": { + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1", + Kind: "TestObject1", + }, + Plural: "testobjects1", + }, + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1", + Kind: "TestObject2", }, - "v1alpha1": { - kindPlural{ - Kind: "TestObject1", - Plural: "testobjects1", - }, + Plural: "testobjects2", + }, + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1alpha1", + Kind: "TestObject1", }, + Plural: "testobjects1", }, }, }, gvk: schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "*"}, - want: []groupVersionKindPlural{ + want: []DiscoveredResource{ { GroupVersionKind: schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}, Plural: "testobjects1", @@ -133,30 +159,36 @@ func TestGVKMapsResolveGVK(t *testing.T) { }, { desc: "fixed version and kind", - gvkmaps: &CRDiscoverer{ - Map: map[string]map[string][]kindPlural{ - "testgroup": { - "v1": { - kindPlural{ - Kind: "TestObject1", - Plural: "testobjects1", - }, - kindPlural{ - Kind: "TestObject2", - Plural: "testobjects2", - }, + resources: map[string][]*DiscoveredResource{ + "crd:testobjects.testgroup": { + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1", + Kind: "TestObject1", + }, + Plural: "testobjects1", + }, + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1", + Kind: "TestObject2", }, - "v1alpha1": { - kindPlural{ - Kind: "TestObject1", - Plural: "testobjects1", - }, + Plural: "testobjects2", + }, + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1alpha1", + Kind: "TestObject1", }, + Plural: "testobjects1", }, }, }, gvk: schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}, - want: []groupVersionKindPlural{ + want: []DiscoveredResource{ { GroupVersionKind: schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}, Plural: "testobjects1", @@ -165,15 +197,15 @@ func TestGVKMapsResolveGVK(t *testing.T) { }, { desc: "fixed version and kind, no matching cache entry", - gvkmaps: &CRDiscoverer{ - Map: map[string]map[string][]kindPlural{ - "testgroup": { - "v1": { - kindPlural{ - Kind: "TestObject2", - Plural: "testobjects2", - }, + resources: map[string][]*DiscoveredResource{ + "crd:testobjects.testgroup": { + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "testgroup", + Version: "v1", + Kind: "TestObject2", }, + Plural: "testobjects2", }, }, }, @@ -182,19 +214,150 @@ func TestGVKMapsResolveGVK(t *testing.T) { }, } for _, tc := range testcases { - got, err := tc.gvkmaps.ResolveGVKToGVKPs(tc.gvk) - if err != nil { - t.Errorf("testcase: %s: got error %v", tc.desc, err) - } - // Sort got and tc.want to ensure the order of the elements. - sort.Slice(got, func(i, j int) bool { - return got[i].String() < got[j].String() - }) - sort.Slice(tc.want, func(i, j int) bool { - return tc.want[i].String() < tc.want[j].String() + t.Run(tc.desc, func(t *testing.T) { + discoverer := newTestCRDiscoverer() + // Populate the discoverer with test data + for sourceID, resources := range tc.resources { + discoverer.UpdateSource(sourceID, resources) + } + + got, err := discoverer.Resolve(tc.gvk) + if err != nil { + t.Errorf("got error %v", err) + } + // Sort got and tc.want to ensure the order of the elements. + sort.Slice(got, func(i, j int) bool { + return got[i].String() < got[j].String() + }) + sort.Slice(tc.want, func(i, j int) bool { + return tc.want[i].String() < tc.want[j].String() + }) + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("got %v, want %v", got, tc.want) + } }) - if !reflect.DeepEqual(got, tc.want) { - t.Errorf("testcase: %s: got %v, want %v", tc.desc, got, tc.want) - } + } +} + +func TestUpdateSourceAndDeleteSource(t *testing.T) { + discoverer := newTestCRDiscoverer() + + // Add resources for a source + resources := []*DiscoveredResource{ + { + GroupVersionKind: schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}, + Plural: "testobjects1", + }, + } + discoverer.UpdateSource("crd:testobjects.testgroup", resources) + // Verify resource is present + got, err := discoverer.Resolve(schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 1 { + t.Fatalf("expected 1 resource, got %d", len(got)) + } + + // Get stop channel + stopChan, ok := discoverer.GetStopChan(schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}) + if !ok { + t.Fatal("expected stop channel to exist") + } + + // Delete the source + discoverer.DeleteSource("crd:testobjects.testgroup") + + // Verify resource is removed + got, err = discoverer.Resolve(schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 0 { + t.Fatalf("expected 0 resources, got %d", len(got)) + } + + // Verify stop channel is closed + select { + case <-stopChan: + // expected - channel is closed + default: + t.Fatal("expected stop channel to be closed") + } +} + +func TestUpdateSourceNilSkipsUpdate(t *testing.T) { + discoverer := newTestCRDiscoverer() + + // Add initial resources + resources := []*DiscoveredResource{ + { + GroupVersionKind: schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}, + Plural: "testobjects1", + }, + } + discoverer.UpdateSource("apiservice:testobjects.testgroup", resources) + // Update with nil (simulating skipping update) + discoverer.UpdateSource("apiservice:testobjects.testgroup", nil) + + // Verify original resource is still present + got, err := discoverer.Resolve(schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 1 { + t.Fatalf("expected 1 resource (nil should skip update), got %d", len(got)) + } +} + +func TestUpdateSourceEmptyRemovesResources(t *testing.T) { + discoverer := newTestCRDiscoverer() + + // Add initial resources + resources := []*DiscoveredResource{ + { + GroupVersionKind: schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}, + Plural: "testobjects1", + }, + } + discoverer.UpdateSource("apiservice:testobjects.testgroup", resources) + + // Update with empty slice (simulating removal) + discoverer.UpdateSource("apiservice:testobjects.testgroup", []*DiscoveredResource{}) + + // Verify resource is removed + got, err := discoverer.Resolve(schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(got) != 0 { + t.Fatalf("expected 0 resources, got %d", len(got)) + } +} + +func TestCheckAndResetUpdated(t *testing.T) { + discoverer := newTestCRDiscoverer() + + // Initially not updated + if discoverer.CheckAndResetUpdated() { + t.Fatal("expected wasUpdated to be false initially") + } + + // Add a resource + discoverer.UpdateSource("crd:testobjects.testgroup", []*DiscoveredResource{ + { + GroupVersionKind: schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}, + Plural: "testobjects1", + }, + }) + + // Should be updated now + if !discoverer.CheckAndResetUpdated() { + t.Fatal("expected wasUpdated to be true after UpdateSource") + } + + // Should be reset + if discoverer.CheckAndResetUpdated() { + t.Fatal("expected wasUpdated to be false after CheckAndResetUpdated") } } diff --git a/internal/discovery/gvk_extractors.go b/internal/discovery/gvk_extractors.go index e1447a0b2e..2d6617064c 100644 --- a/internal/discovery/gvk_extractors.go +++ b/internal/discovery/gvk_extractors.go @@ -10,17 +10,24 @@ import ( "k8s.io/klog/v2" ) -type crdGVKExtractor struct{} +type crdExtractor struct{} -func (e *crdGVKExtractor) ExtractGVKs(obj interface{}) []groupVersionKindPlural { +// SourceID returns a unique identifier for the CRD. +func (e *crdExtractor) SourceID(obj interface{}) string { + u := obj.(*unstructured.Unstructured) + return "crd:" + u.GetName() +} + +// ExtractGVKs extracts GVK information from a CRD object. +func (e *crdExtractor) ExtractGVKs(obj interface{}) []*DiscoveredResource { objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) - var gvkps []groupVersionKindPlural + var resources []*DiscoveredResource for _, version := range objSpec["versions"].([]interface{}) { g := objSpec["group"].(string) v := version.(map[string]interface{})["name"].(string) k := objSpec["names"].(map[string]interface{})["kind"].(string) p := objSpec["names"].(map[string]interface{})["plural"].(string) - gvkps = append(gvkps, groupVersionKindPlural{ + resources = append(resources, &DiscoveredResource{ GroupVersionKind: schema.GroupVersionKind{ Group: g, Version: v, @@ -29,7 +36,7 @@ func (e *crdGVKExtractor) ExtractGVKs(obj interface{}) []groupVersionKindPlural Plural: p, }) } - return gvkps + return resources } func isAPIServiceReady(obj interface{}) bool { @@ -50,11 +57,19 @@ func isAPIServiceReady(obj interface{}) bool { return false } -type apiServiceGVKExtractor struct { +type apiServiceExtractor struct { discoveryClient *k8sdiscovery.DiscoveryClient } -func (e *apiServiceGVKExtractor) ExtractGVKs(obj interface{}) []groupVersionKindPlural { +// SourceID returns a unique identifier for the APIService. +func (e *apiServiceExtractor) SourceID(obj interface{}) string { + u := obj.(*unstructured.Unstructured) + return "apiservice:" + u.GetName() +} + +// ExtractGVKs extracts GVK information from an APIService object. +// Returns nil if the APIService is not ready (signals "skip update"). +func (e *apiServiceExtractor) ExtractGVKs(obj interface{}) []*DiscoveredResource { serviceSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) group, _, err := unstructured.NestedString(serviceSpec, "group") if err != nil { @@ -67,31 +82,34 @@ func (e *apiServiceGVKExtractor) ExtractGVKs(obj interface{}) []groupVersionKind return nil } - // check if APIService has a service defined - i.e. not local + // Check if APIService has a service defined - i.e. not local if svc, ok := serviceSpec["service"]; !ok || svc == nil { klog.V(5).InfoS("skipping local APIService", "group", group, "version", version) - return nil + // Return empty slice to clear any existing resources for this source + return []*DiscoveredResource{} } if !isAPIServiceReady(obj) { - klog.V(5).InfoS("skipping non-ready APIService", "group", group, "version", version) - return nil + klog.InfoS("skipping non-ready APIService", "group", group, "version", version) + // Return empty slice to remove resources for non-ready APIService + return []*DiscoveredResource{} } resourceList, err := e.discoveryClient.ServerResourcesForGroupVersion(fmt.Sprintf("%s/%s", group, version)) if err != nil { klog.ErrorS(err, "failed to fetch server resources for group version", "groupVersion", fmt.Sprintf("%s/%s", group, version)) + // Return nil to skip resources update return nil } - var gvkps []groupVersionKindPlural + var resources []*DiscoveredResource for _, resource := range resourceList.APIResources { // Skip subresources if strings.Contains(resource.Name, "/") { continue } - gvkps = append(gvkps, groupVersionKindPlural{ + resources = append(resources, &DiscoveredResource{ GroupVersionKind: schema.GroupVersionKind{ Group: group, Version: version, @@ -100,5 +118,6 @@ func (e *apiServiceGVKExtractor) ExtractGVKs(obj interface{}) []groupVersionKind Plural: resource.Name, }) } - return gvkps + + return resources } diff --git a/internal/discovery/types.go b/internal/discovery/types.go index 0d5f1635a8..890efb2e40 100644 --- a/internal/discovery/types.go +++ b/internal/discovery/types.go @@ -21,103 +21,193 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" ) -type groupVersionKindPlural struct { +// DiscoveredResource represents a discovered custom resource type. +type DiscoveredResource struct { schema.GroupVersionKind - Plural string + Plural string + stopChan chan struct{} } -func (g groupVersionKindPlural) String() string { - return fmt.Sprintf("%s/%s, Kind=%s, Plural=%s", g.Group, g.Version, g.Kind, g.Plural) +// String returns a string representation of the DiscoveredResource. +func (d DiscoveredResource) String() string { + return fmt.Sprintf("%s/%s, Kind=%s, Plural=%s", d.Group, d.Version, d.Kind, d.Plural) } -type kindPlural struct { - Kind string - Plural string +// extractor defines the interface for extracting DiscoveredResources from CRDs/APIServices. +type extractor interface { + // SourceID returns a unique identifier for the source object. + // For CRDs: "crd:", for APIServices: "apiservice:" + SourceID(obj interface{}) string + // ExtractGVKs extracts discovered resources from the object. + // Return nil to skip, empty array to signal deletion of all resources for the source. + ExtractGVKs(obj interface{}) []*DiscoveredResource } -type gvkExtractor interface { - ExtractGVKs(obj interface{}) []groupVersionKindPlural -} - -// CRDiscoverer provides a cache of the collected GVKs, along with helper utilities. +// CRDiscoverer provides discovery and lifecycle management for custom resources. type CRDiscoverer struct { - // CRDsAddEventsCounter tracks the number of times that the CRD informer triggered the "add" event. - CRDsAddEventsCounter prometheus.Counter - // CRDsUpdateEventsCounter tracks the number of times that the CRD informer triggered the "update" event. - CRDsUpdateEventsCounter prometheus.Counter - // CRDsDeleteEventsCounter tracks the number of times that the CRD informer triggered the "remove" event. - CRDsDeleteEventsCounter prometheus.Counter - // CRDsCacheCountGauge tracks the net amount of CRDs affecting the cache at this point. - CRDsCacheCountGauge prometheus.Gauge - // Map is a cache of the collected GVKs. - Map map[string]map[string][]kindPlural - // GVKToReflectorStopChanMap is a map of GVKs to channels that can be used to stop their corresponding reflector. - GVKToReflectorStopChanMap map[string]chan struct{} - // m is a mutex to protect the cache. - m sync.RWMutex - // ShouldUpdate is a flag that indicates whether the cache was updated. - WasUpdated bool -} + // mu protects all fields below. + mu sync.RWMutex + // resourcesBySource maps source objects to their discovered resources. + // Keys: "crd:" or "apiservice:" + resourcesBySource map[string][]*DiscoveredResource + // wasUpdated indicates whether the cache was updated since last check. + wasUpdated bool -// SafeRead executes the given function while holding a read lock. -func (r *CRDiscoverer) SafeRead(f func()) { - r.m.RLock() - defer r.m.RUnlock() - f() + // Metrics for discovery events. + // UpdateEvents counts add and update operations (any source mutation). + UpdateEvents prometheus.Counter + // DeleteEvents counts source deletions. + DeleteEvents prometheus.Counter + // CacheCount tracks the current number of discovered resources. + CacheCount prometheus.Gauge } -// SafeWrite executes the given function while holding a write lock. -func (r *CRDiscoverer) SafeWrite(f func()) { - r.m.Lock() - defer r.m.Unlock() - f() +func NewCRDiscoverer( + updateEvents prometheus.Counter, + deleteEvents prometheus.Counter, + cacheCount prometheus.Gauge, +) *CRDiscoverer { + return &CRDiscoverer{ + resourcesBySource: make(map[string][]*DiscoveredResource), + UpdateEvents: updateEvents, + DeleteEvents: deleteEvents, + CacheCount: cacheCount, + } } -// AppendToMap appends the given GVKs to the cache. -func (r *CRDiscoverer) AppendToMap(gvkps ...groupVersionKindPlural) { - if r.Map == nil { - r.Map = map[string]map[string][]kindPlural{} - } - if r.GVKToReflectorStopChanMap == nil { - r.GVKToReflectorStopChanMap = map[string]chan struct{}{} +// UpdateSource replaces all resources for a source with new resources. +// If resources is nil, this is a noop. +// If resources is empty, all resources for the source are removed. +func (r *CRDiscoverer) UpdateSource(sourceID string, resources []*DiscoveredResource) { + if resources == nil { + return // Skip if nil resources } - for _, gvkp := range gvkps { - if _, ok := r.Map[gvkp.Group]; !ok { - r.Map[gvkp.Group] = map[string][]kindPlural{} - } - if _, ok := r.Map[gvkp.Group][gvkp.Version]; !ok { - r.Map[gvkp.Group][gvkp.Version] = []kindPlural{} + + r.mu.Lock() + defer r.mu.Unlock() + + // Close stop channels for old resources + if oldResources, ok := r.resourcesBySource[sourceID]; ok { + for _, old := range oldResources { + if old.stopChan != nil { + close(old.stopChan) + } } - r.Map[gvkp.Group][gvkp.Version] = append(r.Map[gvkp.Group][gvkp.Version], kindPlural{Kind: gvkp.Kind, Plural: gvkp.Plural}) - r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()] = make(chan struct{}) } + + // Create stop channels for new resources + for _, res := range resources { + res.stopChan = make(chan struct{}) + } + + if len(resources) == 0 { + delete(r.resourcesBySource, sourceID) // empty slice signals deletion + } else { + r.resourcesBySource[sourceID] = resources + } + + r.wasUpdated = true + + r.UpdateEvents.Inc() + r.updateCacheCountLocked() } -// RemoveFromMap removes the given GVKs from the cache. -func (r *CRDiscoverer) RemoveFromMap(gvkps ...groupVersionKindPlural) { - for _, gvkp := range gvkps { - if _, ok := r.Map[gvkp.Group]; !ok { - continue +// DeleteSource removes all resources for a source and closes their stop channels. +func (r *CRDiscoverer) DeleteSource(sourceID string) { + r.mu.Lock() + defer r.mu.Unlock() + + oldResources, ok := r.resourcesBySource[sourceID] + if !ok { + return + } + + // Close stop channels + for _, res := range oldResources { + if res.stopChan != nil { + close(res.stopChan) } - if _, ok := r.Map[gvkp.Group][gvkp.Version]; !ok { - continue + } + + delete(r.resourcesBySource, sourceID) + r.wasUpdated = true + + r.DeleteEvents.Inc() + r.updateCacheCountLocked() +} + +// GetStopChan returns the stop channel for the given GVK. +func (r *CRDiscoverer) GetStopChan(gvk schema.GroupVersionKind) (chan struct{}, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, resources := range r.resourcesBySource { + for _, res := range resources { + if res.GroupVersionKind == gvk { + return res.stopChan, true + } } - for i, el := range r.Map[gvkp.Group][gvkp.Version] { - if el.Kind == gvkp.Kind { - if _, ok := r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()]; ok { - close(r.GVKToReflectorStopChanMap[gvkp.GroupVersionKind.String()]) - delete(r.GVKToReflectorStopChanMap, gvkp.GroupVersionKind.String()) - } - if len(r.Map[gvkp.Group][gvkp.Version]) == 1 { - delete(r.Map[gvkp.Group], gvkp.Version) - if len(r.Map[gvkp.Group]) == 0 { - delete(r.Map, gvkp.Group) - } - break - } - r.Map[gvkp.Group][gvkp.Version] = append(r.Map[gvkp.Group][gvkp.Version][:i], r.Map[gvkp.Group][gvkp.Version][i+1:]...) - break + } + return nil, false +} + +// Resolve resolves a GVK pattern to matching DiscoveredResources. +// Group is required and cannot be a wildcard. +// Supports "*" for Version and/or Kind. +func (r *CRDiscoverer) Resolve(gvk schema.GroupVersionKind) ([]DiscoveredResource, error) { + g := gvk.Group + v := gvk.Version + k := gvk.Kind + + if g == "" || g == "*" { + return nil, fmt.Errorf("group is required in the defined GVK %v", gvk) + } + + hasVersion := v != "" && v != "*" + hasKind := k != "" && k != "*" + + r.mu.RLock() + defer r.mu.RUnlock() + + var results []DiscoveredResource + for _, resources := range r.resourcesBySource { + for _, res := range resources { + if res.GroupVersionKind.Group != g { + continue + } + if hasVersion && res.GroupVersionKind.Version != v { + continue + } + if hasKind && res.GroupVersionKind.Kind != k { + continue + } + results = append(results, DiscoveredResource{ + GroupVersionKind: res.GroupVersionKind, + Plural: res.Plural, + }) + // exit if exact match + if hasVersion && hasKind { + return results, nil } } } + return results, nil +} + +// CheckAndResetUpdated checks if the cache was updated and resets the flag. +func (r *CRDiscoverer) CheckAndResetUpdated() bool { + r.mu.Lock() + defer r.mu.Unlock() + updated := r.wasUpdated + r.wasUpdated = false + return updated +} + +// updateCacheCountLocked updates the cache count gauge. Must be called with mu held. +func (r *CRDiscoverer) updateCacheCountLocked() { + count := 0 + for _, resources := range r.resourcesBySource { + count += len(resources) + } + r.CacheCount.Set(float64(count)) } diff --git a/internal/store/builder.go b/internal/store/builder.go index 890550a355..8c5f32ebad 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -86,7 +86,7 @@ type Builder struct { useAPIServerCache bool objectLimit int64 - GVKToReflectorStopChanMap *map[string]chan struct{} + GVKStopChanProvider ksmtypes.StopChanProvider } // NewBuilder returns a new builder. @@ -621,7 +621,16 @@ func (b *Builder) startReflector( instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit) reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0}) if cr, ok := expectedType.(*unstructured.Unstructured); ok { - go reflector.Run((*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()]) + if b.GVKStopChanProvider == nil { + klog.ErrorS(nil, "StopChanProvider is nil, cannot start reflector for custom resource", "gvk", cr.GroupVersionKind()) + return + } + stopChan, ok := b.GVKStopChanProvider.GetStopChan(cr.GroupVersionKind()) + if !ok { + klog.ErrorS(nil, "no stop channel found for GVK", "gvk", cr.GroupVersionKind()) + return + } + go reflector.Run(stopChan) } else { go reflector.Run(b.ctx.Done()) } diff --git a/pkg/app/server.go b/pkg/app/server.go index e1fdcc529d..91f305508d 100644 --- a/pkg/app/server.go +++ b/pkg/app/server.go @@ -118,21 +118,17 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { }, []string{"type", "filename"}) // Register self-metrics to track the state of the cache. - crdsAddEventsCounter := promauto.With(ksmMetricsRegistry).NewCounter(prometheus.CounterOpts{ - Name: "kube_state_metrics_custom_resource_state_add_events_total", - Help: "Number of times that the CRD informer triggered the add event.", - }) - crdsUpdateEventsCounter := promauto.With(ksmMetricsRegistry).NewCounter(prometheus.CounterOpts{ + crsUpdateEventsCounter := promauto.With(ksmMetricsRegistry).NewCounter(prometheus.CounterOpts{ Name: "kube_state_metrics_custom_resource_state_update_events_total", - Help: "Number of times that the CRD informer triggered the update event.", + Help: "Number of times that the Custom Resource discovery triggered the update event.", }) - crdsDeleteEventsCounter := promauto.With(ksmMetricsRegistry).NewCounter(prometheus.CounterOpts{ + crsDeleteEventsCounter := promauto.With(ksmMetricsRegistry).NewCounter(prometheus.CounterOpts{ Name: "kube_state_metrics_custom_resource_state_delete_events_total", - Help: "Number of times that the CRD informer triggered the remove event.", + Help: "Number of times that the Custom Resource discovery triggered the remove event.", }) - crdsCacheCountGauge := promauto.With(ksmMetricsRegistry).NewGauge(prometheus.GaugeOpts{ + crsCacheCountGauge := promauto.With(ksmMetricsRegistry).NewGauge(prometheus.GaugeOpts{ Name: "kube_state_metrics_custom_resource_state_cache", - Help: "Net amount of CRDs affecting the cache currently.", + Help: "Net amount of Custom Resources affecting the cache currently.", }) storeBuilder := store.NewBuilder() storeBuilder.WithMetrics(ksmMetricsRegistry) @@ -316,14 +312,13 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { // A nil CRS config implies that we need to hold off on all CRS operations. if config != nil { - discovererInstance := &discovery.CRDiscoverer{ - CRDsAddEventsCounter: crdsAddEventsCounter, - CRDsUpdateEventsCounter: crdsUpdateEventsCounter, - CRDsDeleteEventsCounter: crdsDeleteEventsCounter, - CRDsCacheCountGauge: crdsCacheCountGauge, - } - // storeBuilder starts reflectors for the discovered GVKs, and as such, should close them too. - storeBuilder.GVKToReflectorStopChanMap = &discovererInstance.GVKToReflectorStopChanMap + discovererInstance := discovery.NewCRDiscoverer( + crsUpdateEventsCounter, + crsDeleteEventsCounter, + crsCacheCountGauge, + ) + // storeBuilder uses the discoverer to get stop channels for reflectors. + storeBuilder.GVKStopChanProvider = discovererInstance // This starts a goroutine that will watch for any new GVKs to extract from CRDs. err = discovererInstance.StartDiscovery(ctx, kubeConfig) if err != nil { diff --git a/pkg/builder/types/interfaces.go b/pkg/builder/types/interfaces.go index bc13ca3c3e..e89f9ba975 100644 --- a/pkg/builder/types/interfaces.go +++ b/pkg/builder/types/interfaces.go @@ -22,6 +22,7 @@ import ( metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store" "github.com/prometheus/client_golang/prometheus" + "k8s.io/apimachinery/pkg/runtime/schema" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -30,6 +31,13 @@ import ( "k8s.io/kube-state-metrics/v2/pkg/options" ) +// StopChanProvider provides thread-safe access to stop channels for reflectors. +type StopChanProvider interface { + // GetStopChan returns the stop channel for the given GVK. + // Returns false if no stop channel exists for this GVK. + GetStopChan(gvk schema.GroupVersionKind) (chan struct{}, bool) +} + // BuilderInterface represent all methods that a Builder should implements type BuilderInterface interface { WithMetrics(r prometheus.Registerer) diff --git a/pkg/customresourcestate/config.go b/pkg/customresourcestate/config.go index a9e47614a8..f8c052c871 100644 --- a/pkg/customresourcestate/config.go +++ b/pkg/customresourcestate/config.go @@ -188,7 +188,7 @@ func FromConfig(decoder ConfigDecoder, discovererInstance *discovery.CRDiscovere // resolvedGVKPs will have the final list of GVKs, in addition to the resolved G** resources. var resolvedGVKPs []Resource for _, resource := range resources /* G** */ { - resolvedSet /* GVKPs */, err := discovererInstance.ResolveGVKToGVKPs(schema.GroupVersionKind(resource.GroupVersionKind)) + resolvedSet /* DiscoveredResources */, err := discovererInstance.Resolve(schema.GroupVersionKind(resource.GroupVersionKind)) if err != nil { klog.ErrorS(err, "failed to resolve GVK", "gvk", resource.GroupVersionKind) } From 149b22a9edfe8b13805debed7177251859f74e10 Mon Sep 17 00:00:00 2001 From: Alexander North Date: Tue, 20 Jan 2026 17:44:35 +0100 Subject: [PATCH 4/8] add licence header for lint check --- internal/discovery/gvk_extractors.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/internal/discovery/gvk_extractors.go b/internal/discovery/gvk_extractors.go index 2d6617064c..34ab6e0b29 100644 --- a/internal/discovery/gvk_extractors.go +++ b/internal/discovery/gvk_extractors.go @@ -1,3 +1,16 @@ +/* +Copyright 2023 The Kubernetes Authors All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package discovery import ( From 9586f0b3decb227398b5c1bf37efa246aebb3991 Mon Sep 17 00:00:00 2001 From: Alexander North Date: Tue, 20 Jan 2026 18:12:56 +0100 Subject: [PATCH 5/8] fix linter discovered issues --- internal/discovery/discovery.go | 2 +- internal/discovery/types.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index 20ad36c8df..6cce9b9f63 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -58,7 +58,7 @@ func (r *CRDiscoverer) runInformer(ctx context.Context, informer cache.SharedInd resources := extractor.ExtractGVKs(obj) r.UpdateSource(sourceID, resources) }, - UpdateFunc: func(oldObj, newObj interface{}) { + UpdateFunc: func(_, newObj interface{}) { sourceID := extractor.SourceID(newObj) resources := extractor.ExtractGVKs(newObj) r.UpdateSource(sourceID, resources) diff --git a/internal/discovery/types.go b/internal/discovery/types.go index 890efb2e40..2c08e634fe 100644 --- a/internal/discovery/types.go +++ b/internal/discovery/types.go @@ -62,6 +62,7 @@ type CRDiscoverer struct { CacheCount prometheus.Gauge } +// NewCRDiscoverer creates a new CRDiscoverer instance. func NewCRDiscoverer( updateEvents prometheus.Counter, deleteEvents prometheus.Counter, @@ -172,13 +173,13 @@ func (r *CRDiscoverer) Resolve(gvk schema.GroupVersionKind) ([]DiscoveredResourc var results []DiscoveredResource for _, resources := range r.resourcesBySource { for _, res := range resources { - if res.GroupVersionKind.Group != g { + if res.Group != g { continue } - if hasVersion && res.GroupVersionKind.Version != v { + if hasVersion && res.Version != v { continue } - if hasKind && res.GroupVersionKind.Kind != k { + if hasKind && res.Kind != k { continue } results = append(results, DiscoveredResource{ From a65967fff20f8dd3b20c3d46e1eddc60303e07cb Mon Sep 17 00:00:00 2001 From: Alexander North Date: Tue, 20 Jan 2026 18:58:19 +0100 Subject: [PATCH 6/8] add back 'add' metric --- internal/discovery/discovery_test.go | 1 + internal/discovery/types.go | 15 +++++++++++++-- pkg/app/server.go | 5 +++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/internal/discovery/discovery_test.go b/internal/discovery/discovery_test.go index fef7d3f706..8ddd9453dc 100644 --- a/internal/discovery/discovery_test.go +++ b/internal/discovery/discovery_test.go @@ -25,6 +25,7 @@ import ( // newTestCRDiscoverer creates a CRDiscoverer with no-op metrics for testing. func newTestCRDiscoverer() *CRDiscoverer { return NewCRDiscoverer( + prometheus.NewCounter(prometheus.CounterOpts{Name: "test_add"}), prometheus.NewCounter(prometheus.CounterOpts{Name: "test_update"}), prometheus.NewCounter(prometheus.CounterOpts{Name: "test_delete"}), prometheus.NewGauge(prometheus.GaugeOpts{Name: "test_count"}), diff --git a/internal/discovery/types.go b/internal/discovery/types.go index 2c08e634fe..a9bb11ad7f 100644 --- a/internal/discovery/types.go +++ b/internal/discovery/types.go @@ -54,7 +54,9 @@ type CRDiscoverer struct { wasUpdated bool // Metrics for discovery events. - // UpdateEvents counts add and update operations (any source mutation). + // AddEvents counts add operations. + AddEvents prometheus.Counter + // UpdateEvents counts update operations. UpdateEvents prometheus.Counter // DeleteEvents counts source deletions. DeleteEvents prometheus.Counter @@ -64,12 +66,14 @@ type CRDiscoverer struct { // NewCRDiscoverer creates a new CRDiscoverer instance. func NewCRDiscoverer( + addEvents prometheus.Counter, updateEvents prometheus.Counter, deleteEvents prometheus.Counter, cacheCount prometheus.Gauge, ) *CRDiscoverer { return &CRDiscoverer{ resourcesBySource: make(map[string][]*DiscoveredResource), + AddEvents: addEvents, UpdateEvents: updateEvents, DeleteEvents: deleteEvents, CacheCount: cacheCount, @@ -87,6 +91,8 @@ func (r *CRDiscoverer) UpdateSource(sourceID string, resources []*DiscoveredReso r.mu.Lock() defer r.mu.Unlock() + _, existing := r.resourcesBySource[sourceID] + // Close stop channels for old resources if oldResources, ok := r.resourcesBySource[sourceID]; ok { for _, old := range oldResources { @@ -109,7 +115,12 @@ func (r *CRDiscoverer) UpdateSource(sourceID string, resources []*DiscoveredReso r.wasUpdated = true - r.UpdateEvents.Inc() + if !existing { + r.AddEvents.Inc() + } else { + r.UpdateEvents.Inc() + } + r.updateCacheCountLocked() } diff --git a/pkg/app/server.go b/pkg/app/server.go index 91f305508d..99ddd7f87f 100644 --- a/pkg/app/server.go +++ b/pkg/app/server.go @@ -118,6 +118,10 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { }, []string{"type", "filename"}) // Register self-metrics to track the state of the cache. + crsAddEventsCounter := promauto.With(ksmMetricsRegistry).NewCounter(prometheus.CounterOpts{ + Name: "kube_state_metrics_custom_resource_state_add_events_total", + Help: "Number of times that the Custom Resource discovery triggered the add event.", + }) crsUpdateEventsCounter := promauto.With(ksmMetricsRegistry).NewCounter(prometheus.CounterOpts{ Name: "kube_state_metrics_custom_resource_state_update_events_total", Help: "Number of times that the Custom Resource discovery triggered the update event.", @@ -313,6 +317,7 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { // A nil CRS config implies that we need to hold off on all CRS operations. if config != nil { discovererInstance := discovery.NewCRDiscoverer( + crsAddEventsCounter, crsUpdateEventsCounter, crsDeleteEventsCounter, crsCacheCountGauge, From a0367caba321084b3a95da6610b9d0ccfd00d56e Mon Sep 17 00:00:00 2001 From: Alexander North Date: Wed, 11 Feb 2026 16:07:35 +0100 Subject: [PATCH 7/8] split pr: remove APIService discovery components --- internal/discovery/discovery.go | 22 +------ internal/discovery/discovery_test.go | 8 +-- internal/discovery/gvk_extractors.go | 88 ---------------------------- internal/discovery/types.go | 6 +- 4 files changed, 8 insertions(+), 116 deletions(-) diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index 6cce9b9f63..846f6487fc 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -19,7 +19,6 @@ import ( "time" "k8s.io/apimachinery/pkg/runtime/schema" - k8sdiscovery "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/rest" @@ -43,10 +42,7 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) if err != nil { return err } - err = r.startAPIServiceDiscovery(ctx, config) - if err != nil { - return err - } + return nil } @@ -95,22 +91,6 @@ func (r *CRDiscoverer) startCRDDiscovery(ctx context.Context, config *rest.Confi return r.runInformer(ctx, factory.Informer(), extractor) } -func (r *CRDiscoverer) startAPIServiceDiscovery(ctx context.Context, config *rest.Config) error { - client := dynamic.NewForConfigOrDie(config) - factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{ - Group: "apiregistration.k8s.io", - Version: "v1", - Resource: "apiservices", - }, "", 0, nil, nil) - - discoveryClient := k8sdiscovery.NewDiscoveryClientForConfigOrDie(config) - extractor := &apiServiceExtractor{ - discoveryClient: discoveryClient, - } - - return r.runInformer(ctx, factory.Informer(), extractor) -} - // PollForCacheUpdates polls the cache for updates and updates the stores accordingly. func (r *CRDiscoverer) PollForCacheUpdates( ctx context.Context, diff --git a/internal/discovery/discovery_test.go b/internal/discovery/discovery_test.go index 8ddd9453dc..afac9ad848 100644 --- a/internal/discovery/discovery_test.go +++ b/internal/discovery/discovery_test.go @@ -297,9 +297,9 @@ func TestUpdateSourceNilSkipsUpdate(t *testing.T) { Plural: "testobjects1", }, } - discoverer.UpdateSource("apiservice:testobjects.testgroup", resources) + discoverer.UpdateSource("crd:testobjects.testgroup", resources) // Update with nil (simulating skipping update) - discoverer.UpdateSource("apiservice:testobjects.testgroup", nil) + discoverer.UpdateSource("crd:testobjects.testgroup", nil) // Verify original resource is still present got, err := discoverer.Resolve(schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}) @@ -321,10 +321,10 @@ func TestUpdateSourceEmptyRemovesResources(t *testing.T) { Plural: "testobjects1", }, } - discoverer.UpdateSource("apiservice:testobjects.testgroup", resources) + discoverer.UpdateSource("crd:testobjects.testgroup", resources) // Update with empty slice (simulating removal) - discoverer.UpdateSource("apiservice:testobjects.testgroup", []*DiscoveredResource{}) + discoverer.UpdateSource("crd:testobjects.testgroup", []*DiscoveredResource{}) // Verify resource is removed got, err := discoverer.Resolve(schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}) diff --git a/internal/discovery/gvk_extractors.go b/internal/discovery/gvk_extractors.go index 34ab6e0b29..f5de176751 100644 --- a/internal/discovery/gvk_extractors.go +++ b/internal/discovery/gvk_extractors.go @@ -14,13 +14,8 @@ limitations under the License. package discovery import ( - "fmt" - "strings" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - k8sdiscovery "k8s.io/client-go/discovery" - "k8s.io/klog/v2" ) type crdExtractor struct{} @@ -51,86 +46,3 @@ func (e *crdExtractor) ExtractGVKs(obj interface{}) []*DiscoveredResource { } return resources } - -func isAPIServiceReady(obj interface{}) bool { - status, found, err := unstructured.NestedSlice(obj.(*unstructured.Unstructured).Object, "status", "conditions") - if err != nil || !found { - return false - } - - for _, condition := range status { - conditionMap, ok := condition.(map[string]interface{}) - if !ok { - continue // skip invalid condition - } - if conditionMap["type"] == "Available" && conditionMap["status"] == "True" { - return true - } - } - return false -} - -type apiServiceExtractor struct { - discoveryClient *k8sdiscovery.DiscoveryClient -} - -// SourceID returns a unique identifier for the APIService. -func (e *apiServiceExtractor) SourceID(obj interface{}) string { - u := obj.(*unstructured.Unstructured) - return "apiservice:" + u.GetName() -} - -// ExtractGVKs extracts GVK information from an APIService object. -// Returns nil if the APIService is not ready (signals "skip update"). -func (e *apiServiceExtractor) ExtractGVKs(obj interface{}) []*DiscoveredResource { - serviceSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) - group, _, err := unstructured.NestedString(serviceSpec, "group") - if err != nil { - klog.ErrorS(err, "failed to extract group from APIService") - return nil - } - version, _, err := unstructured.NestedString(serviceSpec, "version") - if err != nil { - klog.ErrorS(err, "failed to extract version from APIService") - return nil - } - - // Check if APIService has a service defined - i.e. not local - if svc, ok := serviceSpec["service"]; !ok || svc == nil { - klog.V(5).InfoS("skipping local APIService", "group", group, "version", version) - // Return empty slice to clear any existing resources for this source - return []*DiscoveredResource{} - } - - if !isAPIServiceReady(obj) { - klog.InfoS("skipping non-ready APIService", "group", group, "version", version) - // Return empty slice to remove resources for non-ready APIService - return []*DiscoveredResource{} - } - - resourceList, err := e.discoveryClient.ServerResourcesForGroupVersion(fmt.Sprintf("%s/%s", group, version)) - if err != nil { - klog.ErrorS(err, "failed to fetch server resources for group version", "groupVersion", fmt.Sprintf("%s/%s", group, version)) - // Return nil to skip resources update - return nil - } - - var resources []*DiscoveredResource - for _, resource := range resourceList.APIResources { - // Skip subresources - if strings.Contains(resource.Name, "/") { - continue - } - - resources = append(resources, &DiscoveredResource{ - GroupVersionKind: schema.GroupVersionKind{ - Group: group, - Version: version, - Kind: resource.Kind, - }, - Plural: resource.Name, - }) - } - - return resources -} diff --git a/internal/discovery/types.go b/internal/discovery/types.go index a9bb11ad7f..9765f3111c 100644 --- a/internal/discovery/types.go +++ b/internal/discovery/types.go @@ -33,10 +33,10 @@ func (d DiscoveredResource) String() string { return fmt.Sprintf("%s/%s, Kind=%s, Plural=%s", d.Group, d.Version, d.Kind, d.Plural) } -// extractor defines the interface for extracting DiscoveredResources from CRDs/APIServices. +// extractor defines the interface for extracting DiscoveredResources type extractor interface { // SourceID returns a unique identifier for the source object. - // For CRDs: "crd:", for APIServices: "apiservice:" + // For CRDs: "crd:" SourceID(obj interface{}) string // ExtractGVKs extracts discovered resources from the object. // Return nil to skip, empty array to signal deletion of all resources for the source. @@ -48,7 +48,7 @@ type CRDiscoverer struct { // mu protects all fields below. mu sync.RWMutex // resourcesBySource maps source objects to their discovered resources. - // Keys: "crd:" or "apiservice:" + // Keys e.g. "crd:" resourcesBySource map[string][]*DiscoveredResource // wasUpdated indicates whether the cache was updated since last check. wasUpdated bool From b42ed1cc2988c37746c475f5865681917d50a8f0 Mon Sep 17 00:00:00 2001 From: Alexander North Date: Wed, 11 Feb 2026 17:05:08 +0100 Subject: [PATCH 8/8] implement APIService discovery This reverts commit a0367caba321084b3a95da6610b9d0ccfd00d56e. --- internal/discovery/discovery.go | 22 ++++++- internal/discovery/discovery_test.go | 8 +-- internal/discovery/gvk_extractors.go | 88 ++++++++++++++++++++++++++++ internal/discovery/types.go | 6 +- 4 files changed, 116 insertions(+), 8 deletions(-) diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go index 846f6487fc..6cce9b9f63 100644 --- a/internal/discovery/discovery.go +++ b/internal/discovery/discovery.go @@ -19,6 +19,7 @@ import ( "time" "k8s.io/apimachinery/pkg/runtime/schema" + k8sdiscovery "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/rest" @@ -42,7 +43,10 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) if err != nil { return err } - + err = r.startAPIServiceDiscovery(ctx, config) + if err != nil { + return err + } return nil } @@ -91,6 +95,22 @@ func (r *CRDiscoverer) startCRDDiscovery(ctx context.Context, config *rest.Confi return r.runInformer(ctx, factory.Informer(), extractor) } +func (r *CRDiscoverer) startAPIServiceDiscovery(ctx context.Context, config *rest.Config) error { + client := dynamic.NewForConfigOrDie(config) + factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{ + Group: "apiregistration.k8s.io", + Version: "v1", + Resource: "apiservices", + }, "", 0, nil, nil) + + discoveryClient := k8sdiscovery.NewDiscoveryClientForConfigOrDie(config) + extractor := &apiServiceExtractor{ + discoveryClient: discoveryClient, + } + + return r.runInformer(ctx, factory.Informer(), extractor) +} + // PollForCacheUpdates polls the cache for updates and updates the stores accordingly. func (r *CRDiscoverer) PollForCacheUpdates( ctx context.Context, diff --git a/internal/discovery/discovery_test.go b/internal/discovery/discovery_test.go index afac9ad848..8ddd9453dc 100644 --- a/internal/discovery/discovery_test.go +++ b/internal/discovery/discovery_test.go @@ -297,9 +297,9 @@ func TestUpdateSourceNilSkipsUpdate(t *testing.T) { Plural: "testobjects1", }, } - discoverer.UpdateSource("crd:testobjects.testgroup", resources) + discoverer.UpdateSource("apiservice:testobjects.testgroup", resources) // Update with nil (simulating skipping update) - discoverer.UpdateSource("crd:testobjects.testgroup", nil) + discoverer.UpdateSource("apiservice:testobjects.testgroup", nil) // Verify original resource is still present got, err := discoverer.Resolve(schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}) @@ -321,10 +321,10 @@ func TestUpdateSourceEmptyRemovesResources(t *testing.T) { Plural: "testobjects1", }, } - discoverer.UpdateSource("crd:testobjects.testgroup", resources) + discoverer.UpdateSource("apiservice:testobjects.testgroup", resources) // Update with empty slice (simulating removal) - discoverer.UpdateSource("crd:testobjects.testgroup", []*DiscoveredResource{}) + discoverer.UpdateSource("apiservice:testobjects.testgroup", []*DiscoveredResource{}) // Verify resource is removed got, err := discoverer.Resolve(schema.GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}) diff --git a/internal/discovery/gvk_extractors.go b/internal/discovery/gvk_extractors.go index f5de176751..34ab6e0b29 100644 --- a/internal/discovery/gvk_extractors.go +++ b/internal/discovery/gvk_extractors.go @@ -14,8 +14,13 @@ limitations under the License. package discovery import ( + "fmt" + "strings" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + k8sdiscovery "k8s.io/client-go/discovery" + "k8s.io/klog/v2" ) type crdExtractor struct{} @@ -46,3 +51,86 @@ func (e *crdExtractor) ExtractGVKs(obj interface{}) []*DiscoveredResource { } return resources } + +func isAPIServiceReady(obj interface{}) bool { + status, found, err := unstructured.NestedSlice(obj.(*unstructured.Unstructured).Object, "status", "conditions") + if err != nil || !found { + return false + } + + for _, condition := range status { + conditionMap, ok := condition.(map[string]interface{}) + if !ok { + continue // skip invalid condition + } + if conditionMap["type"] == "Available" && conditionMap["status"] == "True" { + return true + } + } + return false +} + +type apiServiceExtractor struct { + discoveryClient *k8sdiscovery.DiscoveryClient +} + +// SourceID returns a unique identifier for the APIService. +func (e *apiServiceExtractor) SourceID(obj interface{}) string { + u := obj.(*unstructured.Unstructured) + return "apiservice:" + u.GetName() +} + +// ExtractGVKs extracts GVK information from an APIService object. +// Returns nil if the APIService is not ready (signals "skip update"). +func (e *apiServiceExtractor) ExtractGVKs(obj interface{}) []*DiscoveredResource { + serviceSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{}) + group, _, err := unstructured.NestedString(serviceSpec, "group") + if err != nil { + klog.ErrorS(err, "failed to extract group from APIService") + return nil + } + version, _, err := unstructured.NestedString(serviceSpec, "version") + if err != nil { + klog.ErrorS(err, "failed to extract version from APIService") + return nil + } + + // Check if APIService has a service defined - i.e. not local + if svc, ok := serviceSpec["service"]; !ok || svc == nil { + klog.V(5).InfoS("skipping local APIService", "group", group, "version", version) + // Return empty slice to clear any existing resources for this source + return []*DiscoveredResource{} + } + + if !isAPIServiceReady(obj) { + klog.InfoS("skipping non-ready APIService", "group", group, "version", version) + // Return empty slice to remove resources for non-ready APIService + return []*DiscoveredResource{} + } + + resourceList, err := e.discoveryClient.ServerResourcesForGroupVersion(fmt.Sprintf("%s/%s", group, version)) + if err != nil { + klog.ErrorS(err, "failed to fetch server resources for group version", "groupVersion", fmt.Sprintf("%s/%s", group, version)) + // Return nil to skip resources update + return nil + } + + var resources []*DiscoveredResource + for _, resource := range resourceList.APIResources { + // Skip subresources + if strings.Contains(resource.Name, "/") { + continue + } + + resources = append(resources, &DiscoveredResource{ + GroupVersionKind: schema.GroupVersionKind{ + Group: group, + Version: version, + Kind: resource.Kind, + }, + Plural: resource.Name, + }) + } + + return resources +} diff --git a/internal/discovery/types.go b/internal/discovery/types.go index 9765f3111c..a9bb11ad7f 100644 --- a/internal/discovery/types.go +++ b/internal/discovery/types.go @@ -33,10 +33,10 @@ func (d DiscoveredResource) String() string { return fmt.Sprintf("%s/%s, Kind=%s, Plural=%s", d.Group, d.Version, d.Kind, d.Plural) } -// extractor defines the interface for extracting DiscoveredResources +// extractor defines the interface for extracting DiscoveredResources from CRDs/APIServices. type extractor interface { // SourceID returns a unique identifier for the source object. - // For CRDs: "crd:" + // For CRDs: "crd:", for APIServices: "apiservice:" SourceID(obj interface{}) string // ExtractGVKs extracts discovered resources from the object. // Return nil to skip, empty array to signal deletion of all resources for the source. @@ -48,7 +48,7 @@ type CRDiscoverer struct { // mu protects all fields below. mu sync.RWMutex // resourcesBySource maps source objects to their discovered resources. - // Keys e.g. "crd:" + // Keys: "crd:" or "apiservice:" resourcesBySource map[string][]*DiscoveredResource // wasUpdated indicates whether the cache was updated since last check. wasUpdated bool