Skip to content
Open
22 changes: 20 additions & 2 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ func configureGCSFaultTolerance(podTemplate *corev1.PodTemplateSpec, instance ra

// DefaultHeadPodTemplate sets the config values
func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, headSpec rayv1.HeadGroupSpec, podName string, headPort string) corev1.PodTemplateSpec {
log := ctrl.LoggerFrom(ctx)

// TODO (Dmitri) The argument headPort is essentially unused;
// headPort is passed into setMissingRayStartParams but unused there for the head pod.
// To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic.
Expand Down Expand Up @@ -213,9 +215,16 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head
mergeAutoscalerOverrides(&autoscalerContainer, instance.Spec.AutoscalerOptions)
podTemplate.Spec.Containers = append(podTemplate.Spec.Containers, autoscalerContainer)

// The error is ignored here because the function will return false if there's an error parsing the version.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we keep err here, i think we should also re-word the comment?

// For example, if rayVersion is empty or unparseable, it considers the feature is not valid.
if utils.IsAutoscalingV2Enabled(&instance.Spec) {
setAutoscalerV2EnvVars(&podTemplate)
podTemplate.Spec.RestartPolicy = corev1.RestartPolicyNever
if autoscalerRestartValid, err := utils.IsRayVersionAtLeast(instance.Spec.RayVersion, utils.MinAutoscalerRestartValidVersion); !autoscalerRestartValid {
if err != nil {
log.Info("Parsing ray version failed, fall back the restart policy to Never.", "rayVersion", instance.Spec.RayVersion, "error", err)
}
podTemplate.Spec.RestartPolicy = corev1.RestartPolicyNever
}
} else if utils.IsAutoscalingV1Enabled(&instance.Spec) {
setAutoscalerV1EnvVars(&podTemplate)
}
Expand Down Expand Up @@ -364,6 +373,8 @@ func getEnableProbesInjection() bool {

// DefaultWorkerPodTemplate sets the config values
func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, replicaIndex int, numHostIndex int) corev1.PodTemplateSpec {
log := ctrl.LoggerFrom(ctx)

podTemplate := workerSpec.Template
podTemplate.GenerateName = podName
// Pods created by RayCluster should be restricted to the namespace of the RayCluster.
Expand Down Expand Up @@ -466,7 +477,14 @@ func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, wo
podTemplate.Spec.Containers[utils.RayContainerIndex].Ports = append(podTemplate.Spec.Containers[utils.RayContainerIndex].Ports, metricsPort)
}

if utils.IsAutoscalingEnabled(&instance.Spec) && utils.IsAutoscalingV2Enabled(&instance.Spec) {
// Use the RayVersion and autoscaler version to determine whether the RestartPolicy should be Never or not, since the head pod is the one that runs the autoscaler.
// The error is ignored here because the function will return false if there's an error parsing the version.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed at 565d9a8 along with the above.

// For example, if rayVersion is empty or unparseable, it considers the feature is not valid.
autoscalerRestartValid, err := utils.IsRayVersionAtLeast(instance.Spec.RayVersion, utils.MinAutoscalerRestartValidVersion)
if err != nil {
log.Info("Parsing ray version failed, fallback the decision of restart policy.", "rayVersion", instance.Spec.RayVersion, "error", err)
}
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
if !autoscalerRestartValid && utils.IsAutoscalingEnabled(&instance.Spec) && utils.IsAutoscalingV2Enabled(&instance.Spec) {
podTemplate.Spec.RestartPolicy = corev1.RestartPolicyNever
}

Expand Down
77 changes: 73 additions & 4 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,19 +1175,43 @@ func TestHeadPodTemplate_WithAutoscalingEnabled(t *testing.T) {

func TestDefaultHeadPodTemplate_Autoscaling(t *testing.T) {
clusterNoAutoscaling := instance.DeepCopy()

clusterAutoscalingVersionNotSet := instance.DeepCopy()
clusterAutoscalingVersionNotSet.Spec.EnableInTreeAutoscaling = new(true)
clusterAutoscalingV1 := instance.DeepCopy()
clusterAutoscalingV1.Spec.EnableInTreeAutoscaling = new(true)
clusterAutoscalingV1.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{
Version: ptr.To(rayv1.AutoscalerVersionV1),
}

// clusterAutoscalingV2 has no RayVersion set, so IsRayVersionAtLeast
// returns false: env var is injected AND RestartPolicy is set to Never.
clusterAutoscalingV2 := instance.DeepCopy()
clusterAutoscalingV2.Spec.EnableInTreeAutoscaling = new(true)
clusterAutoscalingV2.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{
Version: ptr.To(rayv1.AutoscalerVersionV2),
}

// clusterAutoscalingV2OldRay uses a Ray version strictly below MinAutoscalerRestartValidVersion
// (2.55.0): env var is injected AND RestartPolicy is set to Never.
clusterAutoscalingV2OldRay := instance.DeepCopy()
clusterAutoscalingV2OldRay.Spec.EnableInTreeAutoscaling = new(true)
clusterAutoscalingV2OldRay.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{
Version: ptr.To(rayv1.AutoscalerVersionV2),
}
clusterAutoscalingV2OldRay.Spec.RayVersion = "2.54.0"

// clusterAutoscalingV2NewRay uses a Ray version >= MinAutoscalerRestartValidVersion (2.55.0).
// With the updated logic, setAutoscalerV2EnvVars is always called when AutoscalerV2 is enabled,
// so the env var IS injected. Only RestartPolicy=Never is skipped for newer versions.
clusterAutoscalingV2NewRay := instance.DeepCopy()
clusterAutoscalingV2NewRay.Spec.EnableInTreeAutoscaling = new(true)
clusterAutoscalingV2NewRay.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{
Version: ptr.To(rayv1.AutoscalerVersionV2),
}
clusterAutoscalingV2NewRay.Spec.RayVersion = "2.55.0"
clusterAutoscalingV2NewRay.Spec.HeadGroupSpec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure

ctx := context.Background()
podName := strings.ToLower(instance.Name + utils.DashSymbol + string(rayv1.HeadNode) + utils.DashSymbol + utils.FormatInt32(0))

Expand All @@ -1212,20 +1236,35 @@ func TestDefaultHeadPodTemplate_Autoscaling(t *testing.T) {
expectedAutoscalerV1EnvVar: false,
expectedRestartPolicy: "",
},
"Pod template with autoscaling v1 enabled should the correct autoscaler v1 fields": {
"Pod template with autoscaling v1 enabled should have the correct autoscaler v1 fields": {
cluster: *clusterAutoscalingV1,
expectedHeadContainers: 2,
expectedAutoscalerV2EnvVar: false,
expectedAutoscalerV1EnvVar: true,
expectedRestartPolicy: "",
},
"Pod template with autoscaling v2 enabled should the correct autoscaler v2 fields": {
"Pod template with autoscaling v2 enabled and unparseable Ray image should inject env var and set RestartPolicy to Never": {
cluster: *clusterAutoscalingV2,
expectedHeadContainers: 2,
expectedAutoscalerV2EnvVar: true,
expectedAutoscalerV1EnvVar: false,
expectedRestartPolicy: corev1.RestartPolicyNever,
},
"Pod template with autoscaling v2 enabled and Ray version below MinAutoscalerRestartValidVersion should inject env var and set RestartPolicy to Never": {
cluster: *clusterAutoscalingV2OldRay,
expectedHeadContainers: 2,
expectedAutoscalerV2EnvVar: true,
expectedRestartPolicy: corev1.RestartPolicyNever,
},
// setAutoscalerV2EnvVars is now always called when AutoscalerV2 is enabled regardless of
// the Ray version, so the env var is injected even for new Ray versions. Only
// RestartPolicy=Never is version-gated (skipped when version >= MinAutoscalerRestartValidVersion).
"Pod template with autoscaling v2 enabled and Ray version at or above MinAutoscalerRestartValidVersion should inject env var but not set RestartPolicy to Never": {
cluster: *clusterAutoscalingV2NewRay,
expectedHeadContainers: 2,
expectedAutoscalerV2EnvVar: true,
expectedRestartPolicy: corev1.RestartPolicyOnFailure,
},
}

for name, tc := range tests {
Expand Down Expand Up @@ -1443,12 +1482,34 @@ func TestDefaultWorkerPodTemplate_Autoscaling(t *testing.T) {
clusterNoAutoscaling := instance.DeepCopy()
clusterAutoscalingV1 := instance.DeepCopy()
clusterAutoscalingV1.Spec.EnableInTreeAutoscaling = new(true)

// clusterAutoscalingV2 has no RayVersion set, so IsRayVersionAtLeast
// returns false and RestartPolicy should be set to Never.
clusterAutoscalingV2 := instance.DeepCopy()
clusterAutoscalingV2.Spec.EnableInTreeAutoscaling = new(true)
clusterAutoscalingV2.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{
Version: ptr.To(rayv1.AutoscalerVersionV2),
}

// clusterAutoscalingV2OldRay uses a Ray version strictly below MinAutoscalerRestartValidVersion
// (2.55.0), so RestartPolicy should still be set to Never.
clusterAutoscalingV2OldRay := instance.DeepCopy()
clusterAutoscalingV2OldRay.Spec.EnableInTreeAutoscaling = new(true)
clusterAutoscalingV2OldRay.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{
Version: ptr.To(rayv1.AutoscalerVersionV2),
}
clusterAutoscalingV2OldRay.Spec.RayVersion = "2.54.0"

// clusterAutoscalingV2NewRay uses a Ray version >= MinAutoscalerRestartValidVersion (2.55.0),
// so autoscalerRestartValid is true and RestartPolicy should NOT be set to Never.
clusterAutoscalingV2NewRay := instance.DeepCopy()
clusterAutoscalingV2NewRay.Spec.EnableInTreeAutoscaling = new(true)
clusterAutoscalingV2NewRay.Spec.AutoscalerOptions = &rayv1.AutoscalerOptions{
Version: ptr.To(rayv1.AutoscalerVersionV2),
}
clusterAutoscalingV2NewRay.Spec.RayVersion = "2.55.0"
clusterAutoscalingV2NewRay.Spec.WorkerGroupSpecs[0].Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure

ctx := context.Background()
podName := strings.ToLower(instance.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + utils.FormatInt32(0))
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, instance, instance.Namespace)
Expand All @@ -1461,14 +1522,22 @@ func TestDefaultWorkerPodTemplate_Autoscaling(t *testing.T) {
cluster: *clusterNoAutoscaling,
expectedRestartPolicy: "",
},
"Pod template with autoscaling v1 enabled should the correct autoscaler v1 fields": {
"Pod template with autoscaling v1 enabled should the the correct autoscaler v1 fields": {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor typo: "have"

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix at f3458fd.

cluster: *clusterAutoscalingV1,
expectedRestartPolicy: "",
},
"Pod template with autoscaling v2 enabled should the correct autoscaler v2 fields": {
"Pod template with autoscaling v2 enabled and unparseable Ray image should set RestartPolicy to Never": {
cluster: *clusterAutoscalingV2,
expectedRestartPolicy: corev1.RestartPolicyNever,
},
"Pod template with autoscaling v2 enabled and Ray version below MinAutoscalerRestartValidVersion should set RestartPolicy to Never": {
cluster: *clusterAutoscalingV2OldRay,
expectedRestartPolicy: corev1.RestartPolicyNever,
},
"Pod template with autoscaling v2 enabled and Ray version at or above MinAutoscalerRestartValidVersion should not set RestartPolicy to Never": {
cluster: *clusterAutoscalingV2NewRay,
expectedRestartPolicy: corev1.RestartPolicyOnFailure,
},
}

for name, tc := range tests {
Expand Down
3 changes: 3 additions & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ const (
// MaxRayJobNameLength is the maximum RayJob name to make sure it pass the RayCluster validation
// Minus 6 since we append 6 characters to the RayJob name to create the cluster (GenerateRayClusterName).
MaxRayJobNameLength = MaxRayClusterNameLength - 6

// MinAutoscalerRestartValidVersion is the minimum Ray image version that supports the feature of autoscaler restart.
MinAutoscalerRestartValidVersion = "2.55.0"
)

type ServiceType string
Expand Down
25 changes: 25 additions & 0 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -1095,6 +1096,30 @@ func IsHTTPRouteEqual(existing, desired *gwv1.HTTPRoute) bool {
return true
}

// IsRayVersionAtLeast reports whether the given rayVersion (from RayClusterSpec.RayVersion)
// is at least targetVersion.
//
// Both rayVersion and targetVersion are parsed with k8s.io/apimachinery/pkg/util/version,
// which accepts standard semver strings (e.g. "2.46.0").
// Returns (false, error) if rayVersion is empty or either version string cannot be parsed.
func IsRayVersionAtLeast(rayVersion string, targetVersion string) (bool, error) {
if rayVersion == "" {
return false, fmt.Errorf("rayVersion is empty")
}

v, err := version.ParseGeneric(rayVersion)
if err != nil {
return false, fmt.Errorf("failed to parse Ray version %q: %w", rayVersion, err)
}

minVersion, err := version.ParseGeneric(targetVersion)
if err != nil {
return false, fmt.Errorf("failed to parse minimum version %q: %w", targetVersion, err)
}

return v.AtLeast(minVersion), nil
}

// IsGatewayEqual checks if the existing Gateway matches the desired Gateway.
// This check only compares the fields explicitly managed by the RayService controller.
// If the controller starts managing additional Gateway fields in the future,
Expand Down
89 changes: 89 additions & 0 deletions ray-operator/controllers/ray/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2310,3 +2310,92 @@ func TestIsGatewayEqual(t *testing.T) {
})
}
}

func TestIsRayVersionAtLeast(t *testing.T) {
tests := []struct {
name string
rayVersion string
targetVersion string
want bool
wantErr bool
}{
{
name: "version equal to target",
rayVersion: "2.9.0",
targetVersion: "2.9.0",
want: true,
},
{
name: "version greater than target (minor)",
rayVersion: "2.10.0",
targetVersion: "2.9.0",
want: true,
},
{
name: "version greater than target (major)",
rayVersion: "3.0.0",
targetVersion: "2.9.0",
want: true,
},
{
name: "version greater than target (patch)",
rayVersion: "2.9.1",
targetVersion: "2.9.0",
want: true,
},
{
name: "version with no patch component",
rayVersion: "2.9",
targetVersion: "2.9.0",
want: true,
},
{
name: "version less than target (minor)",
rayVersion: "2.8.0",
targetVersion: "2.9.0",
want: false,
},
{
name: "version less than target (major)",
rayVersion: "1.13.0",
targetVersion: "2.0.0",
want: false,
},
{
name: "version at least target",
rayVersion: "2.46.0",
targetVersion: "2.46.0",
want: true,
},
{
name: "version below target",
rayVersion: "2.45.0",
targetVersion: "2.46.0",
want: false,
},
{
name: "empty rayVersion",
rayVersion: "",
targetVersion: "2.9.0",
wantErr: true,
},
{
name: "unparseable rayVersion",
rayVersion: "latest",
targetVersion: "2.9.0",
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := IsRayVersionAtLeast(tt.rayVersion, tt.targetVersion)
if tt.wantErr {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tt.want, got)
}
})
}
}
8 changes: 6 additions & 2 deletions ray-operator/controllers/ray/utils/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,16 @@ func ValidateRayClusterSpec(spec *rayv1.RayClusterSpec, annotations map[string]s
}

if IsAutoscalingV2Enabled(spec) {
if spec.HeadGroupSpec.Template.Spec.RestartPolicy != "" && spec.HeadGroupSpec.Template.Spec.RestartPolicy != corev1.RestartPolicyNever {
// The error is ignored here because the function will return false if there's an error parsing the version.
// For example, if rayVersion is empty or unparseable, it considers the feature is not valid.
autoscalerRestartValid, _ := IsRayVersionAtLeast(spec.RayVersion, MinAutoscalerRestartValidVersion)

if !autoscalerRestartValid && spec.HeadGroupSpec.Template.Spec.RestartPolicy != "" && spec.HeadGroupSpec.Template.Spec.RestartPolicy != corev1.RestartPolicyNever {
return fmt.Errorf("restartPolicy for head Pod should be Never or unset when using autoscaler V2")
}

for _, workerGroup := range spec.WorkerGroupSpecs {
if workerGroup.Template.Spec.RestartPolicy != "" && workerGroup.Template.Spec.RestartPolicy != corev1.RestartPolicyNever {
if !autoscalerRestartValid && workerGroup.Template.Spec.RestartPolicy != "" && workerGroup.Template.Spec.RestartPolicy != corev1.RestartPolicyNever {
return fmt.Errorf("restartPolicy for worker group %s should be Never or unset when using autoscaler V2", workerGroup.GroupName)
}
}
Expand Down
Loading
Loading