8000 [RayJob] Support deletion policies based on job status by weizhaowz · Pull Request #3731 · ray-project/kuberay · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[RayJob] Support deletion policies based on job status #3731

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,48 @@ _Appears in:_

#### DeletionPolicy







_Appears in:_
- [DeletionStrategy](#deletionstrategy)

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `policy` _[DeletionPolicyType](#deletionpolicytype)_ | Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'. | | |


#### DeletionPolicyType

_Underlying type:_ _string_





_Appears in:_
- [DeletionPolicy](#deletionpolicy)



#### DeletionStrategy







_Appears in:_
- [RayJobSpec](#rayjobspec)

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `onSuccess` _[DeletionPolicy](#deletionpolicy)_ | | | |
| `onFailure` _[DeletionPolicy](#deletionpolicy)_ | | | |



Expand Down Expand 10000 Up @@ -209,7 +242,7 @@ _Appears in:_
| `clusterSelector` _object (keys:string, values:string)_ | clusterSelector is used to select running rayclusters by labels | | |
| `submitterConfig` _[SubmitterConfig](#submitterconfig)_ | Configurations of submitter k8s job. | | |
| `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayJob.<br />The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.<br />The kuberay-operator reconciles a RayJob which doesn't have this field at all or<br />the field value is the reserved string 'ray.io/kuberay-operator',<br />but delegates reconciling the RayJob with 'kueue.x-k8s.io/multikueue' to the Kueue.<br />The field is immutable. | | |
| `deletionPolicy` _[DeletionPolicy](#deletionpolicy)_ | DeletionPolicy indicates what resources of the RayJob are deleted upon job completion.<br />Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'.<br />If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.<br />This field requires the RayJobDeletionPolicy feature gate to be enabled. | | |
| `deletionStrategy` _[DeletionStrategy](#deletionstrategy)_ | DeletionStrategy indicates what resources of the RayJob and how they are deleted upon job completion.<br />If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.<br />This field requires the RayJobDeletionPolicy feature gate to be enabled. | | |
| `entrypoint` _string_ | Entrypoint represents the command to start execution. | | |
| `runtimeEnvYAML` _string_ | RuntimeEnvYAML represents the runtime environment configuration<br />provided as a multi-line YAML string. | | |
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
Expand Down Expand Up @@ -377,7 +410,7 @@ _Appears in:_

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `suspend` _boolean_ | Suspend indicates whether a worker group should be suspended.<br />A suspended worker group will have all pods deleted.<br />This is not a user-facing API and is only used by RayJob DeletionPolicy. | | |
| `suspend` _boolean_ | Suspend indicates whether a worker group should be suspended.<br />A suspended worker group will have all pods deleted.<br />This is not a user-facing API and is only used by RayJob DeletionStrategy. | | |
| `groupName` _string_ | we can have multiple worker groups, we distinguish them by name | | |
| `replicas` _integer_ | Replicas is the number of desired Pods for this worker group. See https://github.com/ray-project/kuberay/pull/1443 for more details about the reason for making this field optional. | 0 | |
| `minReplicas` _integer_ | MinReplicas denotes the minimum number of desired Pods for this worker group. | 0 | |
Expand Down
36 changes: 30 additions & 6 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type HeadGroupSpec struct {
type WorkerGroupSpec struct {
// Suspend indicates whether a worker group should be suspended.
// A suspended worker group will have all pods deleted.
// This is not a user-facing API and is only used by RayJob DeletionPolicy.
// This is not a user-facing API and is only used by RayJob DeletionStrategy.
// +optional
Suspend *bool `json:"suspend,omitempty"`
// we can have multiple worker groups, we distinguish them by name
Expand Down
27 changes: 18 additions & 9 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,24 @@ const (
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
)

type DeletionPolicy string
type DeletionPolicyType string

type DeletionStrategy struct {
OnSuccess DeletionPolicy `json:"onSuccess"`
OnFailure DeletionPolicy `json:"onFailure"`
}

type DeletionPolicy struct {
// Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'.
// +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the policy field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
Policy *DeletionPolicyType `json:"policy"`
}

const (
DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster" // Deletion policy to delete the entire RayCluster custom resource on job completion.
DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers" // Deletion policy to delete only the workers on job completion.
DeleteSelfDeletionPolicy DeletionPolicy = "DeleteSelf" // Deletion policy to delete the RayJob custom resource (and all associated resources) on job completion.
DeleteNoneDeletionPolicy DeletionPolicy = "DeleteNone" // Deletion policy to delete no resources on job completion.
DeleteCluster DeletionPolicyType = "DeleteCluster" // To delete the entire RayCluster custom resource on job completion.
DeleteWorkers DeletionPolicyType = "DeleteWorkers" // To delete only the workers on job completion.
DeleteSelf DeletionPolicyType = "DeleteSelf" // To delete the RayJob custom resource (and all associated resources) on job completion.
DeleteNone DeletionPolicyType = "DeleteNone" // To delete no resources on job completion.
)

type SubmitterConfig struct {
Expand Down Expand Up @@ -144,13 +155,11 @@ type RayJobSpec struct {
// +kubebuilder:validation:XValidation:rule="self in ['ray.io/kuberay-operator', 'kueue.x-k8s.io/multikueue']",message="the managedBy field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'"
// +optional
ManagedBy *string `json:"managedBy,omitempty"`
// DeletionPolicy indicates what resources of the RayJob are deleted upon job completion.
// Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'.
// DeletionStrategy indicates what resources of the RayJob and how they are deleted upon job completion.
// If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
// This field requires the RayJobDeletionPolicy feature gate to be enabled.
// +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the deletionPolicy field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
// +optional
DeletionPolicy *DeletionPolicy `json:"deletionPolicy,omitempty"`
DeletionStrategy *DeletionStrategy `json:"deletionStrategy,omitempty"`
// Entrypoint represents the command to start execution.
// +optional
Entrypoint string `json:"entrypoint,omitempty"`
Expand Down
45 changes: 41 additions & 4 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 30 additions & 6 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 28 additions & 8 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,24 +382,44 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
"ShutdownTime", shutdownTime)

if features.Enabled(features.RayJobDeletionPolicy) &&
rayJobInstance.Spec.DeletionPolicy != nil &&
*rayJobInstance.Spec.DeletionPolicy != rayv1.DeleteNoneDeletionPolicy &&
rayJobInstance.Spec.DeletionStrategy != nil &&
len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info("Shutdown behavior is defined by the deletion policy", "deletionPolicy", rayJobInstance.Spec.DeletionPolicy)

if shutdownTime.After(nowTime) {
delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}

policy := rayv1.DeleteNone
if rayJobInstance.Status.JobStatus == rayv1.JobStatusSucceeded {
Copy link
Member
@andrewsykim andrewsykim Jun 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the expected behavior if only onSuccess or onFailure is set? What should be the default deletion policy in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ray-operator/controllers/ray/utils/validation.go, it's validated that both onSuccess and onFailure should be set, otherwise it return.

policy = *rayJobInstance.Spec.DeletionStrategy.OnSuccess.Policy
} else if rayJobInstance.Status.JobStatus == rayv1.JobStatusFailed {
policy = *rayJobInstance.Spec.DeletionStrategy.OnFailure.Policy
} else {
logger.Info("jobStatus not valid for deletion", "jobStatus", rayJobInstance.Status.JobStatus)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an else here with error logging, there are other job status' to consider

// https://docs.ray.io/en/latest/cluster/running-applications/job-submission/jobs-package-ref.html#jobstatus
const (
JobStatusNew JobStatus = ""
JobStatusPending JobStatus = "PENDING"
JobStatusRunning JobStatus = "RUNNING"
JobStatusStopped JobStatus = "STOPPED"
JobStatusSucceeded JobStatus = "SUCCEEDED"
JobStatusFailed JobStatus = "FAILED"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


// no need to continue as the selected policy is DeleteNone
if policy == rayv1.DeleteNone {
break
}

logger.Info("Shutdown behavior is defined by the deletion policy", "deletionPolicy", rayJobInstance.Spec.DeletionStrategy)
if shutdownTime.After(nowTime) {
delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}

switch *rayJobInstance.Spec.DeletionPolicy {
case rayv1.DeleteClusterDeletionPolicy:
switch policy {
case rayv1.DeleteCluster:
logger.Info("Deleting RayCluster", "RayCluster", rayJobInstance.Status.RayClusterName)
_, err = r.deleteClusterResources(ctx, rayJobInstance)
case rayv1.DeleteWorkersDeletionPolicy:
case rayv1.DeleteWorkers:
logger.Info("Suspending all worker groups", "RayCluster", rayJobInstance.Status.RayClusterName)
err = r.suspendWorkerGroups(ctx, rayJobInstance)
case rayv1.DeleteSelfDeletionPolicy:
case rayv1.DeleteSelf:
logger.Info("Deleting RayJob")
err = r.Client.Delete(ctx, rayJobInstance)
default:
Expand All @@ -409,7 +429,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}
}

if (!features.Enabled(features.RayJobDeletionPolicy) || rayJobInstance.Spec.DeletionPolicy == nil) && rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
if (!features.Enabled(features.RayJobDeletionPolicy) || rayJobInstance.Spec.DeletionStrategy == nil) && rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info("Shutdown behavior is defined by the 506D `ShutdownAfterJobFinishes` flag", "shutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes)
if shutdownTime.After(nowTime) {
delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
Expand Down
Loading
0