-
Notifications
You must be signed in to change notification settings - Fork 221
Expand file tree
/
Copy pathvirtualmcpserver_vmcpconfig.go
More file actions
464 lines (407 loc) · 17.5 KB
/
virtualmcpserver_vmcpconfig.go
File metadata and controls
464 lines (407 loc) · 17.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc.
// SPDX-License-Identifier: Apache-2.0
package controllers
import (
"context"
"fmt"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
mcpv1beta1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1beta1"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/kubernetes/configmaps"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/oidc"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/runconfig/configmap/checksum"
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/virtualmcpserverstatus"
operatorvmcpconfig "github.com/stacklok/toolhive/cmd/thv-operator/pkg/vmcpconfig"
"github.com/stacklok/toolhive/pkg/groups"
vmcptypes "github.com/stacklok/toolhive/pkg/vmcp"
"github.com/stacklok/toolhive/pkg/vmcp/aggregator"
vmcpconfig "github.com/stacklok/toolhive/pkg/vmcp/config"
vmcpruntimeconfig "github.com/stacklok/toolhive/pkg/vmcp/config/runtime"
"github.com/stacklok/toolhive/pkg/vmcp/workloads"
)
// ensureVmcpConfigConfigMap ensures the vmcp Config ConfigMap exists and is up to date.
// workloadInfos is the list of workloads in the group, passed in to ensure consistency
// across multiple calls that need the same workload list.
// telemetryCfg is the already-fetched MCPTelemetryConfig (nil when not referenced),
// passed through from handleConfigRefs to avoid redundant API calls.
// statusManager is used to set auth config conditions for any conversion failures.
func (r *VirtualMCPServerReconciler) ensureVmcpConfigConfigMap(
ctx context.Context,
vmcp *mcpv1beta1.VirtualMCPServer,
typedWorkloads []workloads.TypedWorkload,
telemetryCfg *mcpv1beta1.MCPTelemetryConfig,
statusManager virtualmcpserverstatus.StatusManager,
) error {
// Create OIDC resolver and converter for CRD-to-config transformation
oidcResolver := oidc.NewResolver(r.Client)
converter, err := operatorvmcpconfig.NewConverter(oidcResolver, r.Client)
if err != nil {
return fmt.Errorf("failed to create vmcp converter: %w", err)
}
config, authServerRC, err := converter.Convert(ctx, vmcp, telemetryCfg)
if err != nil {
return fmt.Errorf("failed to create vmcp Config from VirtualMCPServer: %w", err)
}
// Process outgoing auth configuration for both inline and discovered modes
if err := r.processOutgoingAuth(ctx, vmcp, config, typedWorkloads, statusManager); err != nil {
return err
}
// Auto-populate optimizer config from EmbeddingServerRef or emit warnings.
if err := r.populateOptimizerEmbeddingService(ctx, vmcp, config); err != nil {
return err
}
// Validate the vmcp Config before creating the ConfigMap
validator := operatorvmcpconfig.NewValidator()
if err := validator.Validate(ctx, config); err != nil {
return fmt.Errorf("invalid vmcp Config: %w", err)
}
// Cross-validate auth server RunConfig against backend strategies.
// TODO: Move this into the operator's vmcpconfig.Validator wrapper so callers
// don't need to know about the two-step validation sequence.
if err := vmcpconfig.ValidateAuthServerIntegration(config, authServerRC); err != nil {
message := fmt.Sprintf("invalid auth server integration: %v", err)
statusManager.SetPhase(mcpv1beta1.VirtualMCPServerPhaseFailed)
statusManager.SetMessage(message)
statusManager.SetAuthServerConfigValidatedCondition(
mcpv1beta1.ConditionReasonAuthServerConfigInvalid,
message,
metav1.ConditionFalse,
)
statusManager.SetObservedGeneration(vmcp.Generation)
return &SpecValidationError{Message: message}
}
// Marshal the serializable Config to YAML for storage in ConfigMap.
// Note: gopkg.in/yaml.v3 produces deterministic output by sorting map keys alphabetically.
// This ensures stable checksums for triggering pod rollouts only when content actually changes.
//
// Wrap in runtime.Config so future operator-resolved fields can be
// added to the ConfigMap without leaking into the public
// vmcpconfig.Config (and therefore the CRD schema). Today runtime.Config
// embeds vmcpconfig.Config inline and adds no extra keys, so the
// marshalled YAML is byte-identical.
runtimeCfg := vmcpruntimeconfig.Config{Config: *config}
vmcpConfigYAML, err := yaml.Marshal(runtimeCfg)
if err != nil {
return fmt.Errorf("failed to marshal vmcp config: %w", err)
}
configMapName := vmcpConfigMapName(vmcp.Name)
configMapData := map[string]string{
"config.yaml": string(vmcpConfigYAML),
}
// If an embedded auth server is configured, serialize its RunConfig as a separate key.
// RunConfig contains only references (file paths, env var names) — never actual secrets —
// so it is safe for ConfigMap storage. The vMCP binary loads this alongside config.yaml.
if authServerRC != nil {
authServerYAML, marshalErr := yaml.Marshal(authServerRC)
if marshalErr != nil {
return fmt.Errorf("failed to marshal auth server config: %w", marshalErr)
}
configMapData["authserver-config.yaml"] = string(authServerYAML)
}
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Namespace: vmcp.Namespace,
Labels: labelsForVmcpConfig(vmcp.Name),
},
Data: configMapData,
}
// Compute and add content checksum annotation using robust SHA256-based checksum
checksumCalculator := checksum.NewRunConfigConfigMapChecksum()
checksumValue := checksumCalculator.ComputeConfigMapChecksum(configMap)
configMap.Annotations = map[string]string{
checksum.ContentChecksumAnnotation: checksumValue,
}
// Use the kubernetes configmaps client for upsert operations
configMapsClient := configmaps.NewClient(r.Client, r.Scheme)
if _, err := configMapsClient.UpsertWithOwnerReference(ctx, configMap, vmcp); err != nil {
return fmt.Errorf("failed to upsert vmcp Config ConfigMap: %w", err)
}
return nil
}
// populateOptimizerEmbeddingService wires the EmbeddingServer URL into the optimizer
// config and emits warnings for non-recommended configurations.
//
// Decision matrix (ref = EmbeddingServerRef, svc = config.optimizer.embeddingService):
//
// ref set + optimizer set + svc set → ref overrides svc (warning)
// ref set + optimizer set + svc empty → ref populates svc (auto-configured event)
// ref nil + optimizer set + svc set → warning: prefer embeddingServerRef
// ref nil + optimizer set + svc empty → rejected earlier by Validate()
//
// Note: Validate() auto-populates optimizer with defaults when ref is set but optimizer is nil,
// so the "ref set + optimizer nil" case no longer reaches this function.
func (r *VirtualMCPServerReconciler) populateOptimizerEmbeddingService(
ctx context.Context,
vmcp *mcpv1beta1.VirtualMCPServer,
config *vmcpconfig.Config,
) error {
ctxLogger := log.FromContext(ctx)
hasRef := vmcp.Spec.EmbeddingServerRef != nil
if hasRef && config.Optimizer != nil {
// When the optimizer has no embeddingService set, it will be auto-populated
// from the EmbeddingServerRef URL.
return r.populateOptimizerFromRef(ctx, vmcp, config)
}
// No ref — warn if the user manually set the embedding service.
if config.Optimizer != nil && config.Optimizer.EmbeddingService != "" {
ctxLogger.Info("config.optimizer.embeddingService is set without embeddingServerRef; "+
"consider using embeddingServerRef for managed lifecycle",
"embeddingService", config.Optimizer.EmbeddingService)
if r.Recorder != nil {
r.Recorder.Eventf(vmcp, nil, corev1.EventTypeWarning, "EmbeddingServiceManual", "ValidateEmbeddingService",
"config.optimizer.embeddingService is set without embeddingServerRef; "+
"specifying an embeddingServerRef is the recommended configuration")
}
}
return nil
}
// populateOptimizerFromRef resolves the EmbeddingServer URL and writes it into
// config.Optimizer.EmbeddingService, warning if it overrides a manually-set value.
func (r *VirtualMCPServerReconciler) populateOptimizerFromRef(
ctx context.Context,
vmcp *mcpv1beta1.VirtualMCPServer,
config *vmcpconfig.Config,
) error {
ctxLogger := log.FromContext(ctx)
esURL, err := r.resolveEmbeddingServiceURL(ctx, vmcp)
if err != nil {
return fmt.Errorf("failed to resolve embedding service URL: %w", err)
}
if config.Optimizer.EmbeddingService != "" && esURL != "" {
ctxLogger.Info("EmbeddingServerRef overrides config.optimizer.embeddingService",
"ref", vmcp.Spec.EmbeddingServerRef.Name,
"overridden", config.Optimizer.EmbeddingService,
"new", esURL)
if r.Recorder != nil {
r.Recorder.Eventf(vmcp, nil, corev1.EventTypeWarning, "EmbeddingServiceOverridden", "ResolveEmbeddingService",
"config.optimizer.embeddingService will be replaced by EmbeddingServerRef %q URL",
vmcp.Spec.EmbeddingServerRef.Name)
}
}
if esURL != "" {
config.Optimizer.EmbeddingService = esURL
}
return nil
}
// labelsForVmcpConfig returns labels for vmcp config ConfigMap
func labelsForVmcpConfig(vmcpName string) map[string]string {
return map[string]string{
"toolhive.stacklok.io/component": "vmcp-config",
"toolhive.stacklok.io/virtual-mcp-server": vmcpName,
"toolhive.stacklok.io/managed-by": "toolhive-operator",
}
}
// discoverBackendsWithMetadata discovers backends and returns full Backend objects with metadata.
// Used in static mode for ConfigMap generation to preserve backend metadata.
func (r *VirtualMCPServerReconciler) discoverBackendsWithMetadata(
ctx context.Context,
vmcp *mcpv1beta1.VirtualMCPServer,
) ([]vmcptypes.Backend, error) {
groupsManager := groups.NewCRDManager(r.Client, vmcp.Namespace)
workloadDiscoverer := workloads.NewK8SDiscovererWithClient(r.Client, vmcp.Namespace)
// Build auth config if OutgoingAuth is configured
var authConfig *vmcpconfig.OutgoingAuthConfig
if vmcp.Spec.OutgoingAuth != nil {
typedWorkloads, err := workloadDiscoverer.ListWorkloadsInGroup(ctx, vmcp.ResolveGroupName())
if err != nil {
return nil, fmt.Errorf("failed to list workloads in group: %w", err)
}
// Build auth config and collect any errors (but don't fail the operation)
// Note: Auth errors are collected and reported via status conditions by processOutgoingAuth.
// In static mode, we still attempt to build the auth config for ConfigMap embedding.
authConfig, _, _ = r.buildOutgoingAuthConfig(ctx, vmcp, typedWorkloads)
}
backendDiscoverer := aggregator.NewUnifiedBackendDiscoverer(workloadDiscoverer, groupsManager, authConfig)
backends, err := backendDiscoverer.Discover(ctx, vmcp.ResolveGroupName())
if err != nil {
return nil, fmt.Errorf("failed to discover backends: %w", err)
}
return backends, nil
}
// buildTransportMap builds a map of backend names to transport types from workload Specs.
// Used in static mode to populate transport field in ConfigMap.
func (r *VirtualMCPServerReconciler) buildTransportMap(
ctx context.Context,
namespace string,
typedWorkloads []workloads.TypedWorkload,
) (map[string]string, error) {
transportMap := make(map[string]string, len(typedWorkloads))
mcpServerMap, err := r.listMCPServersAsMap(ctx, namespace)
if err != nil {
return nil, fmt.Errorf("failed to list MCPServers: %w", err)
}
mcpRemoteProxyMap, err := r.listMCPRemoteProxiesAsMap(ctx, namespace)
if err != nil {
return nil, fmt.Errorf("failed to list MCPRemoteProxies: %w", err)
}
mcpServerEntryMap, err := r.listMCPServerEntriesAsMap(ctx, namespace)
if err != nil {
return nil, fmt.Errorf("failed to list MCPServerEntries: %w", err)
}
for _, workload := range typedWorkloads {
var transport string
switch workload.Type {
case workloads.WorkloadTypeMCPServer:
if mcpServer, found := mcpServerMap[workload.Name]; found {
// Read effective transport (ProxyMode takes precedence over Transport)
// For stdio servers, ProxyMode indicates how they're proxied (sse or streamable-http)
if mcpServer.Spec.ProxyMode != "" {
transport = string(mcpServer.Spec.ProxyMode)
} else {
transport = string(mcpServer.Spec.Transport)
}
}
case workloads.WorkloadTypeMCPRemoteProxy:
if mcpRemoteProxy, found := mcpRemoteProxyMap[workload.Name]; found {
transport = string(mcpRemoteProxy.Spec.Transport)
}
case workloads.WorkloadTypeMCPServerEntry:
if mcpServerEntry, found := mcpServerEntryMap[workload.Name]; found {
transport = mcpServerEntry.Spec.Transport
}
}
if transport != "" {
transportMap[workload.Name] = transport
}
}
return transportMap, nil
}
// buildCABundlePathMap builds a map of backend names to CA bundle file paths for MCPServerEntry backends.
// Only entries with a caBundleRef are included in the map.
func (r *VirtualMCPServerReconciler) buildCABundlePathMap(
ctx context.Context,
namespace string,
typedWorkloads []workloads.TypedWorkload,
) (map[string]string, error) {
caBundlePathMap := make(map[string]string)
// Early return if no MCPServerEntry workloads to avoid unnecessary API calls
hasEntries := false
for _, workload := range typedWorkloads {
if workload.Type == workloads.WorkloadTypeMCPServerEntry {
hasEntries = true
break
}
}
if !hasEntries {
return caBundlePathMap, nil
}
mcpServerEntryMap, err := r.listMCPServerEntriesAsMap(ctx, namespace)
if err != nil {
return nil, fmt.Errorf("failed to list MCPServerEntries: %w", err)
}
for _, workload := range typedWorkloads {
if workload.Type != workloads.WorkloadTypeMCPServerEntry {
continue
}
entry, found := mcpServerEntryMap[workload.Name]
if !found || entry.Spec.CABundleRef == nil || entry.Spec.CABundleRef.ConfigMapRef == nil {
continue
}
caBundlePathMap[workload.Name] = caBundleMountPath(workload.Name, entry.Spec.CABundleRef)
}
return caBundlePathMap, nil
}
// extractInlineBackendNames extracts the list of inline backend names from the VirtualMCPServer spec.
func extractInlineBackendNames(vmcp *mcpv1beta1.VirtualMCPServer) []string {
if vmcp.Spec.OutgoingAuth == nil || vmcp.Spec.OutgoingAuth.Backends == nil {
return nil
}
names := make([]string, 0, len(vmcp.Spec.OutgoingAuth.Backends))
for backendName := range vmcp.Spec.OutgoingAuth.Backends {
names = append(names, backendName)
}
return names
}
// determineValidInlineBackends determines which inline backends have valid auth configs.
func determineValidInlineBackends(authConfig *vmcpconfig.OutgoingAuthConfig, inlineBackendNames []string) []string {
if authConfig == nil || authConfig.Backends == nil {
return nil
}
valid := make([]string, 0)
for backendName := range authConfig.Backends {
// Only count inline backends (not discovered backends)
for _, inlineBackend := range inlineBackendNames {
if backendName == inlineBackend {
valid = append(valid, backendName)
break
}
}
}
return valid
}
// processOutgoingAuth processes outgoing auth configuration for both inline and discovered modes.
// It builds auth configs, sets status conditions for all auth config types, and configures static backends for inline mode.
func (r *VirtualMCPServerReconciler) processOutgoingAuth(
ctx context.Context,
vmcp *mcpv1beta1.VirtualMCPServer,
config *vmcpconfig.Config,
typedWorkloads []workloads.TypedWorkload,
statusManager virtualmcpserverstatus.StatusManager,
) error {
// Clean up stale conditions if outgoing auth is not configured
if config.OutgoingAuth == nil {
setAuthConfigConditions(statusManager, nil, nil, false, nil, nil)
return nil
}
isInlineMode := config.OutgoingAuth.Source == OutgoingAuthSourceInline
isDiscoveredMode := config.OutgoingAuth.Source == OutgoingAuthSourceDiscovered
// Clean up stale conditions if not using inline or discovered mode
if !isInlineMode && !isDiscoveredMode {
setAuthConfigConditions(statusManager, nil, nil, false, nil, nil)
return nil
}
// Build auth config and collect all errors (default, backend-specific, discovered)
// All errors are non-fatal - the system continues in degraded mode with partial auth config
authConfig, backendsWithAuthConfig, allAuthErrors := r.buildOutgoingAuthConfig(ctx, vmcp, typedWorkloads)
// Extract inline backend names and determine valid auth configs
inlineBackendNames := extractInlineBackendNames(vmcp)
hasValidDefaultAuth := authConfig != nil && authConfig.Default != nil
validInlineBackends := determineValidInlineBackends(authConfig, inlineBackendNames)
// Set conditions for all auth config types (default, backend-specific, discovered)
// True for success, False for errors
setAuthConfigConditions(
statusManager,
backendsWithAuthConfig,
inlineBackendNames,
hasValidDefaultAuth,
validInlineBackends,
allAuthErrors,
)
// Static mode (inline): Embed full backend details in ConfigMap
if isInlineMode {
if authConfig != nil {
config.OutgoingAuth = authConfig
}
// Discover backends with metadata
backends, err := r.discoverBackendsWithMetadata(ctx, vmcp)
if err != nil {
return fmt.Errorf("failed to discover backends for static mode: %w", err)
}
// Get transport types from workload specs
transportMap, err := r.buildTransportMap(ctx, vmcp.Namespace, typedWorkloads)
if err != nil {
return fmt.Errorf("failed to build transport map for static mode: %w", err)
}
// Build CA bundle path map for MCPServerEntry backends
caBundlePathMap, err := r.buildCABundlePathMap(ctx, vmcp.Namespace, typedWorkloads)
if err != nil {
return fmt.Errorf("failed to build CA bundle path map for static mode: %w", err)
}
config.Backends = convertBackendsToStaticBackends(ctx, backends, transportMap, caBundlePathMap)
// Validate at least one backend exists
if len(config.Backends) == 0 {
return fmt.Errorf(
"static mode requires at least one backend with valid transport (%v), "+
"but none were discovered in group %s",
vmcpconfig.StaticModeAllowedTransports,
config.Group,
)
}
}
// Dynamic mode (discovered): vMCP discovers backends at runtime via K8s API
// Conditions are already set above, no additional ConfigMap config needed
return nil
}