Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DELETE_RAYJOB_CR_AFTER_JOB_FINISHES doesn't work with Configuration API #2260

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

//+kubebuilder:object:root=true
Expand Down Expand Up @@ -67,11 +64,3 @@ type Configuration struct {
// DeleteRayJobAfterJobFinishes deletes the RayJob CR itself if shutdownAfterJobFinishes is set to true.
DeleteRayJobAfterJobFinishes bool `json:"deleteRayJobAfterJobFinishes,omitempty"`
}

func (config Configuration) GetDashboardClient(mgr manager.Manager) func() utils.RayDashboardClientInterface {
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
}

func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func() utils.RayHttpProxyClientInterface {
return utils.GetRayHttpProxyClientFunc(mgr, config.UseKubernetesProxy)
}
21 changes: 12 additions & 9 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package ray
import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/go-logr/logr"
Expand All @@ -26,6 +24,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)

Expand All @@ -41,17 +40,19 @@ type RayJobReconciler struct {
Scheme *runtime.Scheme
Recorder record.EventRecorder

dashboardClientFunc func() utils.RayDashboardClientInterface
dashboardClientFunc func() utils.RayDashboardClientInterface
deleteRayJobAfterJobFinishes bool
}

// NewRayJobReconciler returns a new reconcile.Reconciler
func NewRayJobReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider) *RayJobReconciler {
func NewRayJobReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider, config configapi.Configuration) *RayJobReconciler {
dashboardClientFunc := provider.GetDashboardClient(mgr)
return &RayJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rayjob-controller"),
dashboardClientFunc: dashboardClientFunc,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rayjob-controller"),
dashboardClientFunc: dashboardClientFunc,
deleteRayJobAfterJobFinishes: config.DeleteRayJobAfterJobFinishes,
}
}

Expand Down Expand Up @@ -335,7 +336,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
logger.Info(fmt.Sprintf("shutdownTime not reached, requeue this RayJob for %d seconds", delta))
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}
if s := os.Getenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES); strings.ToLower(s) == "true" {

if r.deleteRayJobAfterJobFinishes {
err = r.Client.Delete(ctx, rayJobInstance)
logger.Info("RayJob is deleted")
} else {
Expand All @@ -344,6 +346,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
_, err = r.deleteClusterResources(ctx, rayJobInstance)
logger.Info("RayCluster is deleted", "RayCluster", rayJobInstance.Status.RayClusterName)
}

if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
Expand Down
8 changes: 5 additions & 3 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package ray

import (
"context"
"os"
"time"

"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -423,8 +422,11 @@ var _ = Context("RayJob in K8sJobMode", func() {
})

It("If DELETE_RAYJOB_CR_AFTER_JOB_FINISHES environement variable is set, RayJob should be deleted.", func() {
os.Setenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES, "true")
defer os.Unsetenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES)
rayJobReconciler.deleteRayJobAfterJobFinishes = true
defer func() {
rayJobReconciler.deleteRayJobAfterJobFinishes = false
}()

Eventually(
func() bool {
return apierrors.IsNotFound(getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob)())
Expand Down
14 changes: 11 additions & 3 deletions ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/manager"

configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

Expand Down Expand Up @@ -51,6 +52,10 @@ var (

fakeRayDashboardClient *utils.FakeRayDashboardClient
fakeRayHttpProxyClient *utils.FakeRayHttpProxyClient

rayClusterReconciler *RayClusterReconciler
rayJobReconciler *RayJobReconciler
rayServiceReconciler *RayServiceReconciler
)

type TestClientProvider struct{}
Expand Down Expand Up @@ -120,14 +125,17 @@ var _ = BeforeSuite(func(ctx SpecContext) {
},
},
}
err = NewReconciler(ctx, mgr, options).SetupWithManager(mgr, 1)
rayClusterReconciler = NewReconciler(ctx, mgr, options)
err = rayClusterReconciler.SetupWithManager(mgr, 1)
Expect(err).NotTo(HaveOccurred(), "failed to setup RayCluster controller")

testClientProvider := TestClientProvider{}
err = NewRayServiceReconciler(ctx, mgr, testClientProvider).SetupWithManager(mgr, 1)
rayServiceReconciler = NewRayServiceReconciler(ctx, mgr, testClientProvider)
err = rayServiceReconciler.SetupWithManager(mgr, 1)
Expect(err).NotTo(HaveOccurred(), "failed to setup RayService controller")

err = NewRayJobReconciler(ctx, mgr, testClientProvider).SetupWithManager(mgr, 1)
rayJobReconciler = NewRayJobReconciler(ctx, mgr, testClientProvider, configapi.Configuration{})
err = rayJobReconciler.SetupWithManager(mgr, 1)
Expect(err).NotTo(HaveOccurred(), "failed to setup RayJob controller")

go func() {
Expand Down
14 changes: 14 additions & 0 deletions ray-operator/controllers/ray/utils/dashboard_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"

"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/util/json"

configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)

Expand All @@ -30,6 +32,18 @@ var (
JobPath = "/api/jobs/"
)

type RayClientProvider struct {
configapi.Configuration
}

func (r *RayClientProvider) GetDashboardClient(mgr manager.Manager) func() RayDashboardClientInterface {
return GetRayDashboardClientFunc(mgr, r.UseKubernetesProxy)
}

func (r *RayClientProvider) GetHttpProxyClient(mgr manager.Manager) func() RayHttpProxyClientInterface {
return GetRayHttpProxyClientFunc(mgr, r.UseKubernetesProxy)
}

Comment on lines +35 to +46
Copy link
Contributor

@MortalHappiness MortalHappiness Jul 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a new type RayClientProvider is introduced here? Is this for refactoring purpose to make sure that the type Configuration has no other methods except for deepcopy? If yes it might be better to put this in a separate PR or at least mentioned this in the PR description.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactor was needed because otherwise in NewRayJobReconciler we pass in two copies of the Configuration, one implementing util.ClientProvider and one containing the actual type. Also as you mentioned, Kubernetes API types should not implement other methods outside standrdard DeepCopy and other generated methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks.

type RayDashboardClientInterface interface {
InitClient(ctx context.Context, url string, rayCluster *rayv1.RayCluster) error
UpdateDeployments(ctx context.Context, configJson []byte) error
Expand Down
7 changes: 4 additions & 3 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func main() {
config.LogStdoutEncoder = logStdoutEncoder
config.EnableBatchScheduler = ray.EnableBatchScheduler
config.UseKubernetesProxy = useKubernetesProxy
config.DeleteRayJobAfterJobFinishes = os.Getenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES) == "true"
config.DeleteRayJobAfterJobFinishes = strings.ToLower(os.Getenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES)) == "true"
}

stdoutEncoder, err := newLogEncoder(logStdoutEncoder)
Expand Down Expand Up @@ -220,11 +220,12 @@ func main() {
WorkerSidecarContainers: config.WorkerSidecarContainers,
}
ctx := ctrl.SetupSignalHandler()
rayClientProvider := &utils.RayClientProvider{Configuration: config}
exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(mgr, config.ReconcileConcurrency),
"unable to create controller", "controller", "RayCluster")
exitOnError(ray.NewRayServiceReconciler(ctx, mgr, config).SetupWithManager(mgr, config.ReconcileConcurrency),
exitOnError(ray.NewRayServiceReconciler(ctx, mgr, rayClientProvider).SetupWithManager(mgr, config.ReconcileConcurrency),
"unable to create controller", "controller", "RayService")
exitOnError(ray.NewRayJobReconciler(ctx, mgr, config).SetupWithManager(mgr, config.ReconcileConcurrency),
exitOnError(ray.NewRayJobReconciler(ctx, mgr, rayClientProvider, config).SetupWithManager(mgr, config.ReconcileConcurrency),
"unable to create controller", "controller", "RayJob")

if os.Getenv("ENABLE_WEBHOOKS") == "true" {
Expand Down
Loading