Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 56 additions & 147 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ package discovery

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
k8sdiscovery "k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
Expand All @@ -40,157 +39,76 @@ const Interval = 3 * time.Second
// StartDiscovery starts the discovery process, fetching all the objects that can be listed from the apiserver, every `Interval` seconds.
// resolveGVK needs to be called after StartDiscovery to generate factories.
func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config) error {
client := dynamic.NewForConfigOrDie(config)
factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "customresourcedefinitions",
}, "", 0, nil, nil)
informer := factory.Informer()
stopper := make(chan struct{})
extractGVKPs := func(obj interface{}) []groupVersionKindPlural {
objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{})
var gvkps []groupVersionKindPlural
for _, version := range objSpec["versions"].([]interface{}) {
g := objSpec["group"].(string)
v := version.(map[string]interface{})["name"].(string)
k := objSpec["names"].(map[string]interface{})["kind"].(string)
p := objSpec["names"].(map[string]interface{})["plural"].(string)
gvkps = append(gvkps, groupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: v,
Kind: k,
},
Plural: p,
})
}
return gvkps
err := r.startCRDDiscovery(ctx, config)
if err != nil {
return err
}
err = r.startAPIServiceDiscovery(ctx, config)
if err != nil {
return err
}
return nil
}

func (r *CRDiscoverer) runInformer(ctx context.Context, informer cache.SharedIndexInformer, extractor extractor) error {
stopper := make(chan struct{})
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
gvkps := extractGVKPs(obj)
r.SafeWrite(func() {
r.AppendToMap(gvkps...)
r.WasUpdated = true
})
r.SafeWrite(func() {
r.CRDsAddEventsCounter.Inc()
r.CRDsCacheCountGauge.Inc()
})
sourceID := extractor.SourceID(obj)
resources := extractor.ExtractGVKs(obj)
r.UpdateSource(sourceID, resources)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldGVKPs := extractGVKPs(oldObj)
newGVKPs := extractGVKPs(newObj)
r.SafeWrite(func() {
r.RemoveFromMap(oldGVKPs...)
r.AppendToMap(newGVKPs...)
r.WasUpdated = true
})
r.SafeWrite(func() {
r.CRDsUpdateEventsCounter.Inc()
})
UpdateFunc: func(_, newObj interface{}) {
sourceID := extractor.SourceID(newObj)
resources := extractor.ExtractGVKs(newObj)
r.UpdateSource(sourceID, resources)
},
DeleteFunc: func(obj interface{}) {
gvkps := extractGVKPs(obj)
r.SafeWrite(func() {
r.RemoveFromMap(gvkps...)
r.WasUpdated = true
})
r.SafeWrite(func() {
r.CRDsDeleteEventsCounter.Inc()
r.CRDsCacheCountGauge.Dec()
})
sourceID := extractor.SourceID(obj)
r.DeleteSource(sourceID)
},
})
if err != nil {
return err
}

// Respect context cancellation.
go func() {
for range ctx.Done() {
klog.InfoS("context cancelled, stopping discovery")
close(stopper)
return
}
<-ctx.Done()
klog.InfoS("context cancelled, stopping discovery")
close(stopper)
}()
go informer.Run(stopper)
return nil
}

// ResolveGVKToGVKPs resolves the variable VKs to a GVK list, based on the current cache.
func (r *CRDiscoverer) ResolveGVKToGVKPs(gvk schema.GroupVersionKind) (resolvedGVKPs []groupVersionKindPlural, err error) { // nolint:revive
g := gvk.Group
v := gvk.Version
k := gvk.Kind
if g == "" || g == "*" {
return nil, fmt.Errorf("group is required in the defined GVK %v", gvk)
}
hasVersion := v != "" && v != "*"
hasKind := k != "" && k != "*"
// No need to resolve, return.
if hasVersion && hasKind {
for _, el := range r.Map[g][v] {
if el.Kind == k {
return []groupVersionKindPlural{
{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: v,
Kind: k,
},
Plural: el.Plural,
},
}, nil
}
}
}
if hasVersion && !hasKind {
kinds := r.Map[g][v]
for _, el := range kinds {
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: v,
Kind: el.Kind,
},
Plural: el.Plural,
})
}
}
if !hasVersion && hasKind {
versions := r.Map[g]
for version, kinds := range versions {
for _, el := range kinds {
if el.Kind == k {
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: version,
Kind: k,
},
Plural: el.Plural,
})
}
}
}
}
if !hasVersion && !hasKind {
versions := r.Map[g]
for version, kinds := range versions {
for _, el := range kinds {
resolvedGVKPs = append(resolvedGVKPs, groupVersionKindPlural{
GroupVersionKind: schema.GroupVersionKind{
Group: g,
Version: version,
Kind: el.Kind,
},
Plural: el.Plural,
})
}
}
func (r *CRDiscoverer) startCRDDiscovery(ctx context.Context, config *rest.Config) error {
client := dynamic.NewForConfigOrDie(config)
factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "customresourcedefinitions",
}, "", 0, nil, nil)

extractor := &crdExtractor{}

return r.runInformer(ctx, factory.Informer(), extractor)
}

func (r *CRDiscoverer) startAPIServiceDiscovery(ctx context.Context, config *rest.Config) error {
client := dynamic.NewForConfigOrDie(config)
factory := dynamicinformer.NewFilteredDynamicInformer(client, schema.GroupVersionResource{
Group: "apiregistration.k8s.io",
Version: "v1",
Resource: "apiservices",
}, "", 0, nil, nil)

discoveryClient := k8sdiscovery.NewDiscoveryClientForConfigOrDie(config)
extractor := &apiServiceExtractor{
discoveryClient: discoveryClient,
}
return

return r.runInformer(ctx, factory.Informer(), extractor)
}

// PollForCacheUpdates polls the cache for updates and updates the stores accordingly.
Expand Down Expand Up @@ -239,28 +157,19 @@ func (r *CRDiscoverer) PollForCacheUpdates(
}
// Configure the generation function for the custom resource stores.
storeBuilder.WithGenerateCustomResourceStoresFunc(storeBuilder.DefaultGenerateCustomResourceStoresFunc())
// Reset the flag, if there were no errors. Else, we'll try again on the next tick.
// Keep retrying if there were errors.
r.SafeWrite(func() {
r.WasUpdated = false
})
// Update metric handler with the new configs.
m.BuildWriters(ctx)
}
go func() {
for range t.C {
for {
select {
case <-ctx.Done():
klog.InfoS("context cancelled")
t.Stop()
return
default:
// Check if cache has been updated.
shouldGenerateMetrics := false
r.SafeRead(func() {
shouldGenerateMetrics = r.WasUpdated
})
if shouldGenerateMetrics {
case <-t.C:
// Check if cache has been updated and reset the flag.
if r.CheckAndResetUpdated() {
generateMetrics()
klog.InfoS("discovery finished, cache updated")
}
Expand Down
Loading
Loading