Skip to content
210 changes: 62 additions & 148 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ package discovery

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
Expand All @@ -40,157 +38,82 @@ const Interval = 3 * time.Second
// StartDiscovery starts the discovery process, fetching all the objects that can be listed from the apiserver, every `Interval` seconds.
// resolveGVK needs to be called after StartDiscovery to generate factories.
func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) error {
client := dynamic.NewForConfigOrDie(config)
factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "customresourcedefinitions",
}, "", 0, nil, nil)
informer := factory.Informer()
stopper := make(chan struct{})
extractGVKPs := func(obj interface{}) []groupVersionKindPlural {
objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{})
var gvkps []groupVersionKindPlural
for _, version := range objSpec["versions"].([]interface{}) {
g := objSpec["group"].(string)
v := version.(map[string]interface{})["name"].(string)
k := objSpec["names"].(map[string]interface{})["kind"].(string)
p := objSpec["names"].(map[string]interface{})["plural"].(string)
gvkps = append(gvkps, groupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: v,
Kind: k,
},
Plural: p,
})
}
return gvkps
err := r.startCRDDiscovery(ctx, config)
if err != nil {
return err
}

return nil
}

func (r *CRDiscoverer) runInformer(ctx context.Context, informer cache.SharedIndexInformer, extractor extractor) error {
stopper := make(chan struct{})

// Each handler recovers from panics so a single bad object cannot kill
// the informer goroutine
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
gvkps := extractGVKPs(obj)
r.SafeWrite(func() {
r.AppendToMap(gvkps...)
r.WasUpdated = true
})
r.SafeWrite(func() {
r.CRDsAddEventsCounter.Inc()
r.CRDsCacheCountGauge.Inc()
})
defer func() {
if rec := recover(); rec != nil {
klog.ErrorS(nil, "recovered from panic in discovery add handler", "panic", rec)
}
}()
sourceID := extractor.SourceID(obj)
if sourceID == "" {
return
}
r.UpdateSource(sourceID, extractor.ExtractGVKs(obj))
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldGVKPs := extractGVKPs(oldObj)
newGVKPs := extractGVKPs(newObj)
r.SafeWrite(func() {
r.RemoveFromMap(oldGVKPs...)
r.AppendToMap(newGVKPs...)
r.WasUpdated = true
})
r.SafeWrite(func() {
r.CRDsUpdateEventsCounter.Inc()
})
UpdateFunc: func(_, newObj interface{}) {
defer func() {
if rec := recover(); rec != nil {
klog.ErrorS(nil, "recovered from panic in discovery update handler", "panic", rec)
}
}()
sourceID := extractor.SourceID(newObj)
if sourceID == "" {
return
}
r.UpdateSource(sourceID, extractor.ExtractGVKs(newObj))
},
DeleteFunc: func(obj interface{}) {
gvkps := extractGVKPs(obj)
r.SafeWrite(func() {
r.RemoveFromMap(gvkps...)
r.WasUpdated = true
})
r.SafeWrite(func() {
r.CRDsDeleteEventsCounter.Inc()
r.CRDsCacheCountGauge.Dec()
})
defer func() {
if rec := recover(); rec != nil {
klog.ErrorS(nil, "recovered from panic in discovery delete handler", "panic", rec)
}
}()
sourceID := extractor.SourceID(obj)
if sourceID == "" {
return
}
r.DeleteSource(sourceID)
},
})
Comment thread
alexandernorth marked this conversation as resolved.
if err != nil {
return err
}

// Respect context cancellation.
go func() {
for range ctx.Done() {
klog.InfoS("context cancelled, stopping discovery")
close(stopper)
return
}
<-ctx.Done()
klog.InfoS("context cancelled, stopping discovery")
close(stopper)
}()
go informer.Run(stopper)
return nil
}

// ResolveGVKToGVKPs resolves the variable VKs to a GVK list, based on the current cache.
func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedGVKPs []groupVersionKindPlural, err error) { // nolint:revive
g := gvk.Group
v := gvk.Version
k := gvk.Kind
if g == "" || g == "*" {
return nil, fmt.Errorf("group is required in the defined GVK %v", gvk)
}
hasVersion := v != "" && v != "*"
hasKind := k != "" && k != "*"
// No need to resolve, return.
if hasVersion && hasKind {
for _, el := range r.Map[g][v] {
if el.Kind == k {
return []groupVersionKindPlural{
{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: v,
Kind: k,
},
Plural: el.Plural,
},
}, nil
}
}
}
if hasVersion && !hasKind {
kinds := r.Map[g][v]
for _, el := range kinds {
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: v,
Kind: el.Kind,
},
Plural: el.Plural,
})
}
}
if !hasVersion && hasKind {
versions := r.Map[g]
for version, kinds := range versions {
for _, el := range kinds {
if el.Kind == k {
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: version,
Kind: k,
},
Plural: el.Plural,
})
}
}
}
}
if !hasVersion && !hasKind {
versions := r.Map[g]
for version, kinds := range versions {
for _, el := range kinds {
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: version,
Kind: el.Kind,
},
Plural: el.Plural,
})
}
}
}
return
func (r *CRDiscoverer) startCRDDiscovery(ctx context.Context, config *rest.Config) error {
client := dynamic.NewForConfigOrDie(config)
factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "customresourcedefinitions",
}, "", 0, nil, nil)

extractor := &crdExtractor{}

return r.runInformer(ctx, factory.Informer(), extractor)
}

// PollForCacheUpdates polls the cache for updates and updates the stores accordingly.
Expand Down Expand Up @@ -239,28 +162,19 @@ func (r *CRDiscoverer) PollForCacheUpdates(
}
// Configure the generation function for the custom resource stores.
storeBuilder.WithGenerateCustomResourceStoresFunc(storeBuilder.DefaultGenerateCustomResourceStoresFunc())
// Reset the flag, if there were no errors. Else, we'll try again on the next tick.
// Keep retrying if there were errors.
r.SafeWrite(func() {
r.WasUpdated = false
})
// Update metric handler with the new configs.
m.BuildWriters(ctx)
}
go func() {
for range t.C {
for {
select {
case <-ctx.Done():
klog.InfoS("context cancelled")
t.Stop()
return
default:
// Check if cache has been updated.
shouldGenerateMetrics := false
r.SafeRead(func() {
shouldGenerateMetrics = r.WasUpdated
})
if shouldGenerateMetrics {
case <-t.C:
// Check if cache has been updated and reset the flag.
if r.CheckAndResetUpdated() {
generateMetrics()
klog.InfoS("discovery finished, cache updated")
}
Expand Down
Loading