8000 [RayJob] Support deletion policies based on job status by weizhaowz · Pull Request #3731 · ray-project/kuberay · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[RayJob] Support deletion policies based on job status #3731

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jun 17, 2025

Conversation

weizhaowz
Copy link
Contributor
@weizhaowz weizhaowz commented Jun 2, 2025

Why are these changes needed?

Support different deletion policy when RayJob ends in different status

Related issue number

[Feature] Update RayJob DeletionPolicy API to differentiate success/failure scenarios #3714

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@weizhaowz weizhaowz force-pushed the enrich-deletion-policy branch from 0ef4ba4 to 4b19d17 Compare June 3, 2025 04:22
type DeleteResource string

type DeletionPolicy struct {
OnSuccess *DeletionConfig `json:"on_success"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convention for json tags is to use camel case onSuccess, onFailure, etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

type DeletionConfig struct {
// Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'.
// +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the deleteResource field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
DeleteResource *DeleteResource `json:"delete_resource"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on discussions in #3714 I think we want this field to be policy or something simlar

8000

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@weizhaowz weizhaowz marked this pull request as ready for review June 4, 2025 00:19
@weizhaowz weizhaowz requested a review from andrewsykim June 4, 2025 00:19
Copy link
Member
@andrewsykim andrewsykim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weizhaowz can you test it manually as well and share results?

policy = *rayJobInstance.Spec.DeletionPolicy.OnSuccess.Policy
} else if rayJobInstance.Status.JobStatus == rayv1.JobStatusFailed {
policy = *rayJobInstance.Spec.DeletionPolicy.OnFailure.Policy
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an else here with error logging, there are other job status' to consider

// https://docs.ray.io/en/latest/cluster/running-applications/job-submission/jobs-package-ref.html#jobstatus
const (
JobStatusNew JobStatus = ""
JobStatusPending JobStatus = "PENDING"
JobStatusRunning JobStatus = "RUNNING"
JobStatusStopped JobStatus = "STOPPED"
JobStatusSucceeded JobStatus = "SUCCEEDED"
JobStatusFailed JobStatus = "FAILED"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

} else if rayJobInstance.Status.JobStatus == rayv1.JobStatusFailed {
policy = *rayJobInstance.Spec.DeletionPolicy.OnFailure.Policy
} else {
logger.Error(errors.NewBadRequest("the job's status is unrecognized for deletion"), "job status", string(rayJobInstance.Status.JobStatus))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log info instead and requeue since the job status could change in the next reconcile:

logger.Info("jobStatus not valid for deletion", "jobStatus", rayJobInstance.Status.JobStatus)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context, it's updated. If now time is already later than should shut down time, we don't need to wait for the status change, is it correct?

len(rayJobInstance.Spec.ClusterSelector) == 0 {

policy := rayv1.DeleteNone
if rayJobInstance.Status.JobStatus == rayv1.JobStatusSucceeded {
Copy link
Member
@andrewsykim andrewsykim Jun 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the expected behavior if only onSuccess or onFailure is set? What should be the default deletion policy in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ray-operator/controllers/ray/utils/validation.go, it's validated that both onSuccess and onFailure should be set, otherwise it return.

@@ -84,13 +84,24 @@ const (
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
)

type DeletionPolicy string
type Policy string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just type Policy string is too generic I think, should update to:

type DeletionPolicyType string 

Or something like that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

type DeletionPolicy string
type Policy string

type DeletionPolicy struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this naming instead:

type DeletionStrategy struct {
  OnSuccess DeletionPolicy `json:"onSuccess"`
  OnFailure DeletionPolicy `json:"onFailure"`
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think strategy is good as we already us policy to name the field.


type DeletionPolicy struct {
OnSuccess *DeletionConfig `json:"onSuccess"`
OnFailure *DeletionConfig `json:"onFailure"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeletionConfig doesn't need to be pointer since it's a required field if DeletionPolicy is set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

OnFailure *DeletionConfig `json:"onFailure"`
}

type DeletionConfig struct {
Copy link
Member
@andrewsykim andrewsykim Jun 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type DeletionPolicy struct {
  Policy DeletionPolicyType `json:"policy"`
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

// +optional
DeletionPolicy *DeletionPolicy `json:"deletionPolicy,omitempty"`
DeletionStrategy *DeletionStrategy `json:"deletionPolicy,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change json tag to deletionStrategy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@@ -23,7 +23,7 @@ const (
// alpha: v1.3
//
// Enables new deletion policy API in RayJob
RayJobDeletionPolicy featuregate.Feature = "RayJobDeletionPolicy"
Copy link
Member

Choose a reason for hiding this comment 10000

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can change the feature gate name, leave it as RayJobDeletionPolicy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

@davidxia
Copy link
Contributor

I'm looking forward to trying this out. Can we add or update a sample RayJob YAML showing how to use with the various options? Might be helpful for e2e test if you didn't write one already.

@weizhaowz
Copy link
Contributor Author

I'm looking forward to trying this out. Can we add or update a sample RayJob YAML showing how to use with the various options? Might be helpful for e2e test if you didn't write one already.

Hi, I share the manual test doc with you. The RayJob is from ray-operator/config/samples/ray-job.sample.yaml and I add the deletionStrategy into the Spec.

@weizhaowz
Copy link
Contributor Author

@weizhaowz can you test it manually as well and share results?

tested, here is the doc and the result looks good.

@weizhaowz weizhaowz requested a review from andrewsykim June 11, 2025 03:58
@andrewsykim andrewsykim changed the title Enrich deletion policy [RayJob] Support deletion policies based on job status Jun 11, 2025
@weizhaowz
Copy link
Contributor Author

@weizhaowz can you test it manually as well and share results?

tested, here is the doc and the result looks good.

Manually Test Rayjob Deletion Policy
Expect RayJob Succeeds and Cluster is deleted

apiVersion: ray.io/v1
kind: RayJob
metadata:
  name: rayjob-sample
spec:
  deletionStrategy:
    onSuccess:
      policy: DeleteCluster
    onFailure:
      policy: DeleteWorkers
  # submissionMode specifies how RayJob submits the Ray job to the RayCluster.
  # The default value is "K8sJobMode", meaning RayJob will submit the Ray job via a submitter Kubernetes Job.
  # The alternative value is "HTTPMode", indicating that KubeRay will submit the Ray job by sending an HTTP request to the RayCluster.
  # submissionMode: "K8sJobMode"
  entrypoint: python /home/ray/samples/sample_code.py
  # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
  # shutdownAfterJobFinishes: false

  # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
  # ttlSecondsAfterFinished: 10

  # activeDeadlineSeconds is the duration in seconds that the RayJob may be active before
  # KubeRay actively tries to terminate the RayJob; value must be positive integer.
  # activeDeadlineSeconds: 120

  # RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string.
  # See https://docs.ray.io/en/latest/ray-core/handling-dependencies.html for details.
  # (New in KubeRay version 1.0.)
  runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"

  # Suspend specifies whether the RayJob controller should create a RayCluster instance.
  # If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
  # If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluster will be created.
  # suspend: false

  # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
  rayClusterSpec:
    rayVersion: '2.46.0' # should match the Ray version in the image of the containers
    # Ray head pod template
    headGroupSpec:
      # The `rayStartParams` are used to configure the `ray start` command.
      # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
      # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
      rayStartParams: {}
      #pod template
      template:
        spec:
          containers:
          - name: ray-head
            image: rayproject/ray:2.46.0
            ports:
            - containerPort: 6379
              name: gcs-server
            - containerPort: 8265 # Ray dashboard
              name: dashboard
            - containerPort: 10001
              name: client
            resources:
              limits:
                cpu: "1"
              requests:
                cpu: "200m"
            volumeMounts:
            - mountPath: /home/ray/samples
              name: code-sample
          volumes:
          # You set volumes at the Pod level, then mount them into containers inside that Pod
          - name: code-sample
            configMap:
              # Provide the name of the ConfigMap you want to mount.
              name: ray-job-code-sample
              # An array of keys from the ConfigMap to create as files
              items:
              - key: sample_code.py
                path: sample_code.py
    workerGroupSpecs:
    # the pod replicas in this group typed worker
    - replicas: 1
      minReplicas: 1
      maxReplicas: 5
      # logical group name, for this called small-group, also can be functional
      groupName: small-group
      # The `rayStartParams` are used to configure the `ray start` command.
      # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
      # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
      rayStartParams: {}
      #pod template
      template:
        spec:
          containers:
          - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
            image: rayproject/ray:2.46.0
            resources:
              limits:
                cpu: "1"
              requests:
                cpu: "200m"
                # SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster.
                # If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container.
                # submitterPodTemplate:
                #   spec:
                #     restartPolicy: Never
                #     containers:
                #       - name: my-custom-rayjob-submitter-pod
                #         image: rayproject/ray:2.46.0
                #         # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field.
                #         # Specifying Command is not recommended.
                #         # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"]


######################Ray code sample#################################
# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example
# it is mounted into the container and executed to show the Ray job at work
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
data:
  sample_code.py: |
    import ray
    import os
    import requests

    ray.init()

    @ray.remote
    class Counter:
        def __init__(self):
            # Used to verify runtimeEnv
            self.name = os.getenv("counter_name")
            assert self.name == "test_counter"
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return "{} got {}".format(self.name, self.counter)

    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    # Verify that the correct runtime env was used for the job.
        assert requests.__version__ == "2.26.0"

Create cluster and enable RayJobDeletionPolicy in helm-chart/kuberay-operator/values.yaml and then build KubeRay operator
https://github.com/ray-project/kuberay/blob/master/ray-operator/DEVELOPMENT.md

Confirm the feature is enable
kubectl logs deployments/kuberay-operator | grep "feature gates"

Run the above RayJob rayjob-sample.yaml
kubectl apply -f rayjob-sample.yaml

Observe the job
kubectl get rayjob
Screenshot 2025-06-10 at 5 25 27 PM

kubectl get cluster
Screenshot 2025-06-10 at 5 35 44 PM

kubectl get pod
Screenshot 2025-06-10 at 5 26 08 PM

Confirm the cluster is deleted when the job succeeded
kubectl get rayjob
Screenshot 2025-06-10 at 5 26 26 PM

kubectl get pod
Screenshot 2025-06-10 at 5 37 35 PM

kubectl get cluster
Screenshot 2025-06-10 at 5 27 14 PM

kubectl get pod
Screenshot 2025-06-10 at 5 38 24 PM

Expect RayJob Fails and Workers deleted

apiVersion: ray.io/v1
kind: RayJob
metadata:
  name: rayjob-sample-failure
spec:
  deletionStrategy:
    onSuccess:
      policy: DeleteCluster
    onFailure:
      policy: DeleteWorkers
  entrypoint: python /home/ray/samples/sample_code.py
  runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"
  rayClusterSpec:
    rayVersion: '2.46.0'
    headGroupSpec:
      rayStartParams: {}
      template:
        spec:
          containers:
          - name: ray-head
            image: rayproject/ray:2.46.0
            ports:
            - containerPort: 6379
              name: gcs-server
            - containerPort: 8265
              name: dashboard
            - containerPort: 10001
              name: client
            resources:
              limits:
                cpu: "1"
              requests:
                cpu: "200m"
            volumeMounts:
            - mountPath: /home/ray/samples
              name: code-sample
          volumes:
          - name: code-sample
            configMap:
              name: ray-job-code-sample
              items:
              - key: sample_code.py
                path: sample_code.py
    workerGroupSpecs:
    - replicas: 1
      minReplicas: 1
      maxReplicas: 5
      groupName: small-group
      rayStartParams: {}
      template:
        spec:
          containers:
          - name: ray-worker
            image: rayproject/ray:2.46.0
            resources:
              limits:
                cpu: "1"
              requests:
                cpu: "200m"

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
data:
  sample_code.py: |
    import ray
    import os
    import requests

    ray.init()

    @ray.remote
    class Counter:
        def __init__(self):
            # Used to verify runtimeEnv
            self.name = os.getenv("counter_name")
            assert self.name == "test_counter"
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return "{} got {}".format(self.name, self.counter)

    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    # Verify that the correct runtime env was used for the job.
    assert requests.__version__ == "2.26.0"

    # --- INTENTIONAL FAILURE START ---
    # This line will cause the script to fail.
    raise ValueError("Intentional failure for RayJob testing!")
    # --- INTENTIONAL FAILURE END ---

Repeat step #1, #2, and #3 from the previous session to deploy the job

Observe the job
kubectl get rayjob
Screenshot 2025-06-10 at 6 07 44 PM

kubectl get raycluster
Screenshot 2025-06-10 at 6 08 16 PM

kubectl get pod
Screenshot 2025-06-10 at 6 08 42 PM

Confirm the cluster is deleted when the job failed
kubectl get rayjob
Screenshot 2025-06-10 at 6 10 24 PM

kubectl get pod
Screenshot 2025-06-10 at 6 11 16 PM

kubectl get pod
Screenshot 2025-06-10 at 6 12 45 PM

kubectl get raycluster
Screenshot 2025-06-10 at 6 13 18 PM

Copy link
Member
@andrewsykim andrewsykim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, just some minor comments. @kevin85421 @davidxia PTAL if you have a chance

// If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
// This field requires the RayJobDeletionPolicy feature gate to be enabled.
// +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the deletionPolicy field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
// This field requires the RayJobDeletionStrategy feature gate to be enabled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field requires the RayJobDeletionPolicy feature gate to be enabled

F438

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

} else if rayJobInstance.Status.JobStatus == rayv1.JobStatusFailed {
policy = *rayJobInstance.Spec.DeletionStrategy.OnFailure.Policy
} else {
if shutdownTime.After(nowTime) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for this if? Shouldn't we always check for invalid job status when we reached TTL?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition should probably go at the top of this condition on line 387

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it up to line 387, but then the condition in line 407 will be redundant, we should remove it. Because now the logic is: 1. if shutdown time not reached, requeue the rayjob, otherwise, 2. check the job status, if it's an invalid status for deletion, the default deletion policy is delete none, so it breaks in line 402, therefore we don't need to check if the shutdown time is reached or not in line 407. Is it correct? Thanks.

@davidxia
Copy link
Contributor

Will do. Busy last week. Will try to test manually today or tomorrow.

@davidxia
Copy link
Contributor

manually tested onSuccess with DeleteWorkers, DeleteCluster, and DeleteSelf. They work as expected!

@andrewsykim
Copy link
Member

Thanks @davidxia !

@weizhaowz weizhaowz requested a review from andrewsykim June 17, 2025 04:45
@andrewsykim andrewsykim merged commit f5d7131 into ray-project:master Jun 17, 2025
10 checks passed
@kevin85421
Copy link
Member

Sorry for the late response because I'm still working on the release process. Thank you for the contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants
0