diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index df21df40d7e..3b8fb6bdaf4 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -164,6 +164,8 @@ func configureGCSFaultTolerance(podTemplate *corev1.PodTemplateSpec, instance ra // DefaultHeadPodTemplate sets the config values func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, headSpec rayv1.HeadGroupSpec, podName string, headPort string) corev1.PodTemplateSpec { + log := ctrl.LoggerFrom(ctx) + // TODO (Dmitri) The argument headPort is essentially unused; // headPort is passed into setMissingRayStartParams but unused there for the head pod. // To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic. @@ -215,7 +217,12 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head if utils.IsAutoscalingV2Enabled(&instance.Spec) { setAutoscalerV2EnvVars(&podTemplate) - podTemplate.Spec.RestartPolicy = corev1.RestartPolicyNever + if autoscalerRestartValid, err := utils.IsRayVersionAtLeast(instance.Spec.RayVersion, utils.MinAutoscalerRestartValidVersion); !autoscalerRestartValid { + if err != nil { + log.Info("Parsing ray version failed, fall back the restart policy to Never.", "rayVersion", instance.Spec.RayVersion, "error", err) + } + podTemplate.Spec.RestartPolicy = corev1.RestartPolicyNever + } } else if utils.IsAutoscalingV1Enabled(&instance.Spec) { setAutoscalerV1EnvVars(&podTemplate) } @@ -364,6 +371,8 @@ func getEnableProbesInjection() bool { // DefaultWorkerPodTemplate sets the config values func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, replicaIndex int, numHostIndex int) corev1.PodTemplateSpec { + log := ctrl.LoggerFrom(ctx) + podTemplate := workerSpec.Template podTemplate.GenerateName = podName // Pods created by RayCluster should be restricted to the namespace of the RayCluster. @@ -467,7 +476,13 @@ func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, wo } if utils.IsAutoscalingEnabled(&instance.Spec) && utils.IsAutoscalingV2Enabled(&instance.Spec) { - podTemplate.Spec.RestartPolicy = corev1.RestartPolicyNever + // Use the RayVersion and autoscaler version to determine whether the RestartPolicy should be Never or not, since the head pod is the one that runs the autoscaler. + if autoscalerRestartValid, err := utils.IsRayVersionAtLeast(instance.Spec.RayVersion, utils.MinAutoscalerRestartValidVersion); !autoscalerRestartValid { + if err != nil { + log.Info("Parsing ray version failed, fallback the decision of restart policy.", "rayVersion", instance.Spec.RayVersion, "error", err) + } + podTemplate.Spec.RestartPolicy = corev1.RestartPolicyNever + } } if utils.IsAuthEnabled(&instance.Spec) { diff --git a/ray-operator/controllers/ray/common/pod_test.go b/ray-operator/controllers/ray/common/pod_test.go index f9c85849284..c8d39c3b795 100644 --- a/ray-operator/controllers/ray/common/pod_test.go +++ b/ray-operator/controllers/ray/common/pod_test.go @@ -1175,6 +1175,7 @@ func TestHeadPodTemplate_WithAutoscalingEnabled(t *testing.T) { func TestDefaultHeadPodTemplate_Autoscaling(t *testing.T) { clusterNoAutoscaling := instance.DeepCopy() + clusterAutoscalingVersionNotSet := instance.DeepCopy() clusterAutoscalingVersionNotSet.Spec.EnableInTreeAutoscaling = new(true) clusterAutoscalingV1 := instance.DeepCopy() @@ -1182,12 +1183,35 @@ func TestDefaultHeadPodTemplate_Autoscaling(t *testing.T) { clusterAutoscalingV1.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{ Version: ptr.To(rayv1.AutoscalerVersionV1), } + + // clusterAutoscalingV2 has no RayVersion set, so IsRayVersionAtLeast + // returns false: env var is injected AND RestartPolicy is set to Never. clusterAutoscalingV2 := instance.DeepCopy() clusterAutoscalingV2.Spec.EnableInTreeAutoscaling = new(true) clusterAutoscalingV2.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{ Version: ptr.To(rayv1.AutoscalerVersionV2), } + // clusterAutoscalingV2OldRay uses a Ray version strictly below MinAutoscalerRestartValidVersion + // (2.55.0): env var is injected AND RestartPolicy is set to Never. + clusterAutoscalingV2OldRay := instance.DeepCopy() + clusterAutoscalingV2OldRay.Spec.EnableInTreeAutoscaling = new(true) + clusterAutoscalingV2OldRay.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{ + Version: ptr.To(rayv1.AutoscalerVersionV2), + } + clusterAutoscalingV2OldRay.Spec.RayVersion = "2.54.0" + + // clusterAutoscalingV2NewRay uses a Ray version >= MinAutoscalerRestartValidVersion (2.55.0). + // With the updated logic, setAutoscalerV2EnvVars is always called when AutoscalerV2 is enabled, + // so the env var IS injected. Only RestartPolicy=Never is skipped for newer versions. + clusterAutoscalingV2NewRay := instance.DeepCopy() + clusterAutoscalingV2NewRay.Spec.EnableInTreeAutoscaling = new(true) + clusterAutoscalingV2NewRay.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{ + Version: ptr.To(rayv1.AutoscalerVersionV2), + } + clusterAutoscalingV2NewRay.Spec.RayVersion = "2.55.0" + clusterAutoscalingV2NewRay.Spec.HeadGroupSpec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure + ctx := context.Background() podName := strings.ToLower(instance.Name + utils.DashSymbol + string(rayv1.HeadNode) + utils.DashSymbol + utils.FormatInt32(0)) @@ -1212,20 +1236,35 @@ func TestDefaultHeadPodTemplate_Autoscaling(t *testing.T) { expectedAutoscalerV1EnvVar: false, expectedRestartPolicy: "", }, - "Pod template with autoscaling v1 enabled should the correct autoscaler v1 fields": { + "Pod template with autoscaling v1 enabled should have the correct autoscaler v1 fields": { cluster: *clusterAutoscalingV1, expectedHeadContainers: 2, expectedAutoscalerV2EnvVar: false, expectedAutoscalerV1EnvVar: true, expectedRestartPolicy: "", }, - "Pod template with autoscaling v2 enabled should the correct autoscaler v2 fields": { + "Pod template with autoscaling v2 enabled and unparseable Ray image should inject env var and set RestartPolicy to Never": { cluster: *clusterAutoscalingV2, expectedHeadContainers: 2, expectedAutoscalerV2EnvVar: true, expectedAutoscalerV1EnvVar: false, expectedRestartPolicy: corev1.RestartPolicyNever, }, + "Pod template with autoscaling v2 enabled and Ray version below MinAutoscalerRestartValidVersion should inject env var and set RestartPolicy to Never": { + cluster: *clusterAutoscalingV2OldRay, + expectedHeadContainers: 2, + expectedAutoscalerV2EnvVar: true, + expectedRestartPolicy: corev1.RestartPolicyNever, + }, + // setAutoscalerV2EnvVars is now always called when AutoscalerV2 is enabled regardless of + // the Ray version, so the env var is injected even for new Ray versions. Only + // RestartPolicy=Never is version-gated (skipped when version >= MinAutoscalerRestartValidVersion). + "Pod template with autoscaling v2 enabled and Ray version at or above MinAutoscalerRestartValidVersion should inject env var but not set RestartPolicy to Never": { + cluster: *clusterAutoscalingV2NewRay, + expectedHeadContainers: 2, + expectedAutoscalerV2EnvVar: true, + expectedRestartPolicy: corev1.RestartPolicyOnFailure, + }, } for name, tc := range tests { @@ -1443,12 +1482,34 @@ func TestDefaultWorkerPodTemplate_Autoscaling(t *testing.T) { clusterNoAutoscaling := instance.DeepCopy() clusterAutoscalingV1 := instance.DeepCopy() clusterAutoscalingV1.Spec.EnableInTreeAutoscaling = new(true) + + // clusterAutoscalingV2 has no RayVersion set, so IsRayVersionAtLeast + // returns false and RestartPolicy should be set to Never. clusterAutoscalingV2 := instance.DeepCopy() clusterAutoscalingV2.Spec.EnableInTreeAutoscaling = new(true) clusterAutoscalingV2.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{ Version: ptr.To(rayv1.AutoscalerVersionV2), } + // clusterAutoscalingV2OldRay uses a Ray version strictly below MinAutoscalerRestartValidVersion + // (2.55.0), so RestartPolicy should still be set to Never. + clusterAutoscalingV2OldRay := instance.DeepCopy() + clusterAutoscalingV2OldRay.Spec.EnableInTreeAutoscaling = new(true) + clusterAutoscalingV2OldRay.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{ + Version: ptr.To(rayv1.AutoscalerVersionV2), + } + clusterAutoscalingV2OldRay.Spec.RayVersion = "2.54.0" + + // clusterAutoscalingV2NewRay uses a Ray version >= MinAutoscalerRestartValidVersion (2.55.0), + // so autoscalerRestartValid is true and RestartPolicy should NOT be set to Never. + clusterAutoscalingV2NewRay := instance.DeepCopy() + clusterAutoscalingV2NewRay.Spec.EnableInTreeAutoscaling = new(true) + clusterAutoscalingV2NewRay.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{ + Version: ptr.To(rayv1.AutoscalerVersionV2), + } + clusterAutoscalingV2NewRay.Spec.RayVersion = "2.55.0" + clusterAutoscalingV2NewRay.Spec.WorkerGroupSpecs[0].Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure + ctx := context.Background() podName := strings.ToLower(instance.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + utils.FormatInt32(0)) fqdnRayIP := utils.GenerateFQDNServiceName(ctx, instance, instance.Namespace) @@ -1461,14 +1522,22 @@ func TestDefaultWorkerPodTemplate_Autoscaling(t *testing.T) { cluster: *clusterNoAutoscaling, expectedRestartPolicy: "", }, - "Pod template with autoscaling v1 enabled should the correct autoscaler v1 fields": { + "Pod template with autoscaling v1 enabled should have the correct autoscaler v1 fields": { cluster: *clusterAutoscalingV1, expectedRestartPolicy: "", }, - "Pod template with autoscaling v2 enabled should the correct autoscaler v2 fields": { + "Pod template with autoscaling v2 enabled and unparseable Ray image should set RestartPolicy to Never": { cluster: *clusterAutoscalingV2, expectedRestartPolicy: corev1.RestartPolicyNever, }, + "Pod template with autoscaling v2 enabled and Ray version below MinAutoscalerRestartValidVersion should set RestartPolicy to Never": { + cluster: *clusterAutoscalingV2OldRay, + expectedRestartPolicy: corev1.RestartPolicyNever, + }, + "Pod template with autoscaling v2 enabled and Ray version at or above MinAutoscalerRestartValidVersion should not set RestartPolicy to Never": { + cluster: *clusterAutoscalingV2NewRay, + expectedRestartPolicy: corev1.RestartPolicyOnFailure, + }, } for name, tc := range tests { diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 7a185557eec..9c9999e73d5 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -299,6 +299,9 @@ const ( // MaxRayJobNameLength is the maximum RayJob name to make sure it pass the RayCluster validation // Minus 6 since we append 6 characters to the RayJob name to create the cluster (GenerateRayClusterName). MaxRayJobNameLength = MaxRayClusterNameLength - 6 + + // MinAutoscalerRestartValidVersion is the minimum Ray image version that supports the feature of autoscaler restart. + MinAutoscalerRestartValidVersion = "2.55.0" ) type ServiceType string diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index d98ef6a1569..396d1fa0e40 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/version" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -1095,6 +1096,30 @@ func IsHTTPRouteEqual(existing, desired *gwv1.HTTPRoute) bool { return true } +// IsRayVersionAtLeast reports whether the given rayVersion (from RayClusterSpec.RayVersion) +// is at least targetVersion. +// +// Both rayVersion and targetVersion are parsed with k8s.io/apimachinery/pkg/util/version, +// which accepts standard semver strings (e.g. "2.46.0"). +// Returns (false, error) if rayVersion is empty or either version string cannot be parsed. +func IsRayVersionAtLeast(rayVersion string, targetVersion string) (bool, error) { + if rayVersion == "" { + return false, fmt.Errorf("rayVersion is empty") + } + + v, err := version.ParseGeneric(rayVersion) + if err != nil { + return false, fmt.Errorf("failed to parse Ray version %q: %w", rayVersion, err) + } + + minVersion, err := version.ParseGeneric(targetVersion) + if err != nil { + return false, fmt.Errorf("failed to parse minimum version %q: %w", targetVersion, err) + } + + return v.AtLeast(minVersion), nil +} + // IsGatewayEqual checks if the existing Gateway matches the desired Gateway. // This check only compares the fields explicitly managed by the RayService controller. // If the controller starts managing additional Gateway fields in the future, diff --git a/ray-operator/controllers/ray/utils/util_test.go b/ray-operator/controllers/ray/utils/util_test.go index 587a3590a43..8db0d90e49e 100644 --- a/ray-operator/controllers/ray/utils/util_test.go +++ b/ray-operator/controllers/ray/utils/util_test.go @@ -2310,3 +2310,92 @@ func TestIsGatewayEqual(t *testing.T) { }) } } + +func TestIsRayVersionAtLeast(t *testing.T) { + tests := []struct { + name string + rayVersion string + targetVersion string + want bool + wantErr bool + }{ + { + name: "version equal to target", + rayVersion: "2.9.0", + targetVersion: "2.9.0", + want: true, + }, + { + name: "version greater than target (minor)", + rayVersion: "2.10.0", + targetVersion: "2.9.0", + want: true, + }, + { + name: "version greater than target (major)", + rayVersion: "3.0.0", + targetVersion: "2.9.0", + want: true, + }, + { + name: "version greater than target (patch)", + rayVersion: "2.9.1", + targetVersion: "2.9.0", + want: true, + }, + { + name: "version with no patch component", + rayVersion: "2.9", + targetVersion: "2.9.0", + want: true, + }, + { + name: "version less than target (minor)", + rayVersion: "2.8.0", + targetVersion: "2.9.0", + want: false, + }, + { + name: "version less than target (major)", + rayVersion: "1.13.0", + targetVersion: "2.0.0", + want: false, + }, + { + name: "version at least target", + rayVersion: "2.46.0", + targetVersion: "2.46.0", + want: true, + }, + { + name: "version below target", + rayVersion: "2.45.0", + targetVersion: "2.46.0", + want: false, + }, + { + name: "empty rayVersion", + rayVersion: "", + targetVersion: "2.9.0", + wantErr: true, + }, + { + name: "unparseable rayVersion", + rayVersion: "latest", + targetVersion: "2.9.0", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := IsRayVersionAtLeast(tt.rayVersion, tt.targetVersion) + if tt.wantErr { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, tt.want, got) + } + }) + } +} diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 41b5855a4ee..f4da5c3b64a 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -216,12 +216,16 @@ func ValidateRayClusterSpec(spec *rayv1.RayClusterSpec, annotations map[string]s } if IsAutoscalingV2Enabled(spec) { - if spec.HeadGroupSpec.Template.Spec.RestartPolicy != "" && spec.HeadGroupSpec.Template.Spec.RestartPolicy != corev1.RestartPolicyNever { + // The error is ignored here because the function will return false if there's an error parsing the version. + // For example, if rayVersion is empty or unparseable, it considers the feature is not valid. + autoscalerRestartValid, _ := IsRayVersionAtLeast(spec.RayVersion, MinAutoscalerRestartValidVersion) + + if !autoscalerRestartValid && spec.HeadGroupSpec.Template.Spec.RestartPolicy != "" && spec.HeadGroupSpec.Template.Spec.RestartPolicy != corev1.RestartPolicyNever { return fmt.Errorf("restartPolicy for head Pod should be Never or unset when using autoscaler V2") } for _, workerGroup := range spec.WorkerGroupSpecs { - if workerGroup.Template.Spec.RestartPolicy != "" && workerGroup.Template.Spec.RestartPolicy != corev1.RestartPolicyNever { + if !autoscalerRestartValid && workerGroup.Template.Spec.RestartPolicy != "" && workerGroup.Template.Spec.RestartPolicy != corev1.RestartPolicyNever { return fmt.Errorf("restartPolicy for worker group %s should be Never or unset when using autoscaler V2", workerGroup.GroupName) } } diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index 9a481176996..a9d57c28d9b 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -667,6 +667,85 @@ func TestValidateRayClusterSpecAutoscaler(t *testing.T) { }, }, }, + // Tests for version-gated restartPolicy validation (MinAutoscalerRestartValidVersion = 2.55.0) + "should return error if autoscaler v2 is enabled, Ray version < 2.55.0, and head Pod has non-Never restartPolicy": { + spec: rayv1.RayClusterSpec{ + RayVersion: "2.54.0", + EnableInTreeAutoscaling: new(true), + AutoscalerOptions: &rayv1.AutoscalerOptions{ + Version: ptr.To(rayv1.AutoscalerVersionV2), + }, + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: podTemplateSpec(nil, ptr.To(corev1.RestartPolicyAlways)), + }, + }, + expectedErr: "restartPolicy for head Pod should be Never or unset when using autoscaler V2", + }, + "should return error if autoscaler v2 is enabled, Ray version < 2.55.0, and a worker group has non-Never restartPolicy": { + spec: rayv1.RayClusterSpec{ + RayVersion: "2.54.0", + EnableInTreeAutoscaling: new(true), + AutoscalerOptions: &rayv1.AutoscalerOptions{ + Version: ptr.To(rayv1.AutoscalerVersionV2), + }, + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: podTemplateSpec(nil, ptr.To(corev1.RestartPolicyNever)), + }, + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ + { + GroupName: "worker-group-1", + Template: podTemplateSpec(nil, ptr.To(corev1.RestartPolicyOnFailure)), + }, + }, + }, + expectedErr: "restartPolicy for worker group worker-group-1 should be Never or unset when using autoscaler V2", + }, + "should not return error if autoscaler v2 is enabled, Ray version >= 2.55.0, and head Pod has non-Never restartPolicy": { + spec: rayv1.RayClusterSpec{ + RayVersion: "2.55.0", + EnableInTreeAutoscaling: new(true), + AutoscalerOptions: &rayv1.AutoscalerOptions{ + Version: ptr.To(rayv1.AutoscalerVersionV2), + }, + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: podTemplateSpec(nil, ptr.To(corev1.RestartPolicyAlways)), + }, + }, + }, + "should not return error if autoscaler v2 is enabled, Ray version >= 2.55.0, and worker groups have non-Never restartPolicy": { + spec: rayv1.RayClusterSpec{ + RayVersion: "2.55.0", + EnableInTreeAutoscaling: new(true), + AutoscalerOptions: &rayv1.AutoscalerOptions{ + Version: ptr.To(rayv1.AutoscalerVersionV2), + }, + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: podTemplateSpec(nil, ptr.To(corev1.RestartPolicyAlways)), + }, + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ + { + GroupName: "worker-group-1", + Template: podTemplateSpec(nil, ptr.To(corev1.RestartPolicyOnFailure)), + }, + { + GroupName: "worker-group-2", + Template: podTemplateSpec(nil, ptr.To(corev1.RestartPolicyAlways)), + }, + }, + }, + }, + "should not return error if autoscaler v2 is enabled, Ray version > 2.55.0, and head Pod has non-Never restartPolicy": { + spec: rayv1.RayClusterSpec{ + RayVersion: "2.60.0", + EnableInTreeAutoscaling: new(true), + AutoscalerOptions: &rayv1.AutoscalerOptions{ + Version: ptr.To(rayv1.AutoscalerVersionV2), + }, + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: podTemplateSpec(nil, ptr.To(corev1.RestartPolicyOnFailure)), + }, + }, + }, } features.SetFeatureGateDuringTest(t, features.RayJobDeletionPolicy, true)