Skip to content

Commit

Permalink
Provide sanity check test of RayJob with TAS
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Feb 27, 2025
1 parent 49308a8 commit f229159
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 0 deletions.
239 changes: 239 additions & 0 deletions test/e2e/tas/rayjob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tase2e

import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/util/testing"
testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob"
"sigs.k8s.io/kueue/test/util"
)

var _ = ginkgo.Describe("TopologyAwareScheduling for RayJob", ginkgo.Ordered, func() {
var (
ns *corev1.Namespace
topology *kueuealpha.Topology
tasFlavor *kueue.ResourceFlavor
clusterQueue *kueue.ClusterQueue
localQueue *kueue.LocalQueue
)

ginkgo.BeforeEach(func() {
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "e2e-tas-rayjob-",
},
}
gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed())

topology = testing.MakeDefaultThreeLevelTopology("datacenter")
gomega.Expect(k8sClient.Create(ctx, topology)).Should(gomega.Succeed())

tasFlavor = testing.MakeResourceFlavor("tas-flavor").
NodeLabel(tasNodeGroupLabel, instanceType).
TopologyName(topology.Name).
Obj()
gomega.Expect(k8sClient.Create(ctx, tasFlavor)).Should(gomega.Succeed())

clusterQueue = testing.MakeClusterQueue("cluster-queue").
ResourceGroup(
*testing.MakeFlavorQuotas(tasFlavor.Name).
Resource(corev1.ResourceCPU, "1").
Resource(corev1.ResourceMemory, "8Gi").
Resource(extraResource, "8").
Obj(),
).
Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed())
util.ExpectClusterQueuesToBeActive(ctx, k8sClient, clusterQueue)

localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
util.ExpectLocalQueuesToBeActive(ctx, k8sClient, localQueue)
})
ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteAllRayJobsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectObjectToBeDeleted(ctx, k8sClient, clusterQueue, true)
util.ExpectObjectToBeDeleted(ctx, k8sClient, tasFlavor, true)
util.ExpectObjectToBeDeleted(ctx, k8sClient, topology, true)
})

ginkgo.When("Creating a RayJob", func() {
ginkgo.It("Should place pods based on the ranks-ordering", func() {
const (
headReplicas = 1
workerReplicas = 3
submitterReplica = 1
)
numPods := headReplicas + workerReplicas + submitterReplica
kuberayTestImage := util.GetKuberayTestImage()
rayjob := testingrayjob.MakeJob("ranks-ray", ns.Name).
Queue(localQueue.Name).
WithSubmissionMode(rayv1.K8sJobMode).
Entrypoint("python -c \"import ray; ray.init(); print(ray.cluster_resources())\"").
WithHeadGroupSpec(
rayv1.HeadGroupSpec{
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
Containers: []corev1.Container{
{
Name: "head-container",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
},
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetPreferredTopologyAnnotation: testing.DefaultRackTopologyLevel,
},
},
},
},
).
WithWorkerGroups(
rayv1.WorkerGroupSpec{
GroupName: "workers-group-0",
Replicas: ptr.To[int32](workerReplicas),
MinReplicas: ptr.To[int32](workerReplicas),
MaxReplicas: ptr.To[int32](10),
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
Containers: []corev1.Container{
{
Name: "head-container",
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
extraResource: resource.MustParse("1"),
},
Limits: corev1.ResourceList{
extraResource: resource.MustParse("1"),
},
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetPreferredTopologyAnnotation: testing.DefaultBlockTopologyLevel,
},
},
},
},
).
WithSubmitterPodTemplate(corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ranks-ray-submitter",
Image: kuberayTestImage,
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("50m"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("50m"),
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
kueuealpha.PodSetRequiredTopologyAnnotation: testing.DefaultBlockTopologyLevel,
},
},
}).
Image(rayv1.HeadNode, kuberayTestImage, []string{}).
Image(rayv1.WorkerNode, kuberayTestImage, []string{}).
Obj()

gomega.Expect(k8sClient.Create(ctx, rayjob)).Should(gomega.Succeed())

ginkgo.By("RayJob is unsuspended", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(rayjob), rayjob)).To(gomega.Succeed())
g.Expect(rayjob.Spec.Suspend).Should(gomega.BeFalse())
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

pods := &corev1.PodList{}
ginkgo.By("ensure all pods are created", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.List(ctx, pods, client.InNamespace(rayjob.Namespace))).To(gomega.Succeed())
g.Expect(pods.Items).Should(gomega.HaveLen(numPods))
// The timeout is long to ensure all cluster pods are up and running.
// This is because the Ray image takes long time (around 170s on the CI)
// to load and then to sync with head.
}, util.StartUpTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("ensure all pods are scheduled", func() {
listOpts := &client.ListOptions{
FieldSelector: fields.OneTermNotEqualSelector("spec.nodeName", ""),
}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name), listOpts)).To(gomega.Succeed())
g.Expect(pods.Items).Should(gomega.HaveLen(numPods))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("verify the assignment of pods are as expected with rank-based ordering", func() {
gomega.Expect(k8sClient.List(ctx, pods, client.InNamespace(ns.Name))).To(gomega.Succeed())
workersAssignedNodes := readWorkersAssignedNodes(pods.Items)
gomega.Expect(workersAssignedNodes).Should(gomega.HaveLen(workerReplicas))
})
})
})
})

func readWorkersAssignedNodes(pods []corev1.Pod) map[string]int {
assignment := make(map[string]int, len(pods))
for _, pod := range pods {
if role := pod.Labels["ray.io/node-type"]; role == "worker" {
if v, ok := assignment[pod.Spec.NodeName]; !ok {
assignment[pod.Spec.NodeName] = 1
} else {
assignment[pod.Spec.NodeName] = v + 1
}
}
}
return assignment
}
5 changes: 5 additions & 0 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/onsi/gomega"
awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
"github.com/prometheus/client_golang/prometheus"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -161,6 +162,10 @@ func DeleteAllAppWrappersInNamespace(ctx context.Context, c client.Client, ns *c
return deleteAllObjectsInNamespace(ctx, c, ns, &awv1beta2.AppWrapper{})
}

func DeleteAllRayJobsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error {
return deleteAllObjectsInNamespace(ctx, c, ns, &rayv1.RayJob{})
}

func DeleteAllPodsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error {
return deleteAllPodsInNamespace(ctx, c, ns, 2)
}
Expand Down

0 comments on commit f229159

Please sign in to comment.