Skip to content

Commit

Permalink
TAS cohorts and preemption
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Feb 27, 2025
1 parent 54ec1bb commit a6a9a7f
Show file tree
Hide file tree
Showing 15 changed files with 707 additions and 71 deletions.
10 changes: 2 additions & 8 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,6 @@ func (c *clusterQueue) inactiveReason() (string, string) {
}

if features.Enabled(features.TopologyAwareScheduling) && len(c.tasFlavors) > 0 {
if c.HasParent() {
reasons = append(reasons, kueue.ClusterQueueActiveReasonNotSupportedWithTopologyAwareScheduling)
messages = append(messages, "TAS is not supported for cohorts")
}
if len(c.multiKueueAdmissionChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonNotSupportedWithTopologyAwareScheduling)
messages = append(messages, "TAS is not supported with MultiKueue admission check")
Expand Down Expand Up @@ -312,9 +308,7 @@ func (c *clusterQueue) isTASViolated() bool {
return true
}
}
return c.HasParent() ||
len(c.multiKueueAdmissionChecks) > 0 ||
len(c.provisioningAdmissionChecks) > 0
return len(c.multiKueueAdmissionChecks) > 0 || len(c.provisioningAdmissionChecks) > 0
}

// UpdateWithFlavors updates a ClusterQueue based on the passed ResourceFlavors set.
Expand Down Expand Up @@ -514,7 +508,7 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
removeUsage(c, fr, q)
}
}
if features.Enabled(features.TopologyAwareScheduling) && wi.IsUsingTAS() {
if features.Enabled(features.TopologyAwareScheduling) {
for tasFlavor, tasUsage := range wi.TASUsage() {
if tasFlvCache := c.tasFlavorCache(tasFlavor); tasFlvCache != nil {
if m == 1 {
Expand Down
30 changes: 20 additions & 10 deletions pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,37 +68,47 @@ func (c *ClusterQueueSnapshot) RGByResource(resource corev1.ResourceName) *Resou
return nil
}

func (c *ClusterQueueSnapshot) AddUsage(frq resources.FlavorResourceQuantities) {
for fr, q := range frq {
func (c *ClusterQueueSnapshot) AddUsage(usage workload.Usage) {
for fr, q := range usage.Quota {
addUsage(c, fr, q)
}
c.updateTASUsage(usage.TAS, add)
}

func (c *ClusterQueueSnapshot) removeUsage(frq resources.FlavorResourceQuantities) {
for fr, q := range frq {
func (c *ClusterQueueSnapshot) removeUsage(usage workload.Usage) {
for fr, q := range usage.Quota {
removeUsage(c, fr, q)
}
c.updateTASUsage(usage.TAS, subtract)
}

func (c *ClusterQueueSnapshot) updateTASUsage(wi *workload.Info, op usageOp) {
if features.Enabled(features.TopologyAwareScheduling) && wi.IsUsingTAS() {
for tasFlavor, tasUsage := range wi.TASUsage() {
func (c *ClusterQueueSnapshot) updateTASUsage(usage workload.TASUsage, op usageOp) {
if features.Enabled(features.TopologyAwareScheduling) {
for tasFlavor, tasUsage := range usage {
if tasFlvCache := c.TASFlavors[tasFlavor]; tasFlvCache != nil {
for _, tr := range tasUsage {
domainID := utiltas.DomainID(tr.Values)
tasFlvCache.updateTASUsage(domainID, tr.Requests, op)
tasFlvCache.updateTASUsage(domainID, tr.TotalRequests(), op)
}
}
}
}
}

func (c *ClusterQueueSnapshot) Fits(frq resources.FlavorResourceQuantities) bool {
for fr, q := range frq {
func (c *ClusterQueueSnapshot) Fits(usage workload.Usage) bool {
for fr, q := range usage.Quota {
if c.Available(fr) < q {
return false
}
}
for tasFlavor, flvUsage := range usage.TAS {
// We assume the `tasFlavor` is already in the snapshot as this was
// already checked earlier during flavor assignment, and the set of
// flavors is immutable in snapshot.
if !c.TASFlavors[tasFlavor].Fits(flvUsage) {
return false
}
}
return true
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,8 @@ func TestClusterQueueReadinessWithTAS(t *testing.T) {
ResourceQuotaWrapper("example.com/gpu").NominalQuota("5").Append().
FlavorQuotas,
).Cohort("some-cohort").Obj(),
wantReason: kueue.ClusterQueueActiveReasonNotSupportedWithTopologyAwareScheduling,
wantMessage: "Can't admit new workloads: TAS is not supported for cohorts.",
wantReason: kueue.ClusterQueueActiveReasonReady,
wantMessage: "Can admit new workloads",
},
{
name: "TAS do not support Preemption",
Expand Down
5 changes: 3 additions & 2 deletions pkg/cache/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/resources"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/pkg/workload"
)

func TestAvailable(t *testing.T) {
Expand Down Expand Up @@ -401,7 +402,7 @@ func TestAvailable(t *testing.T) {
// add usage
{
for cqName, usage := range tc.usage {
snapshot.ClusterQueues[cqName].AddUsage(usage)
snapshot.ClusterQueues[cqName].AddUsage(workload.Usage{Quota: usage})
}
gotAvailable := make(map[string]resources.FlavorResourceQuantities, len(snapshot.ClusterQueues))
gotPotentiallyAvailable := make(map[string]resources.FlavorResourceQuantities, len(snapshot.ClusterQueues))
Expand All @@ -425,7 +426,7 @@ func TestAvailable(t *testing.T) {
// remove usage
{
for cqName, usage := range tc.usage {
snapshot.ClusterQueues[cqName].removeUsage(usage)
snapshot.ClusterQueues[cqName].removeUsage(workload.Usage{Quota: usage})
}
gotAvailable := make(map[string]resources.FlavorResourceQuantities, len(snapshot.ClusterQueues))
gotPotentiallyAvailable := make(map[string]resources.FlavorResourceQuantities, len(snapshot.ClusterQueues))
Expand Down
6 changes: 2 additions & 4 deletions pkg/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,15 @@ type Snapshot struct {
func (s *Snapshot) RemoveWorkload(wl *workload.Info) {
cq := s.ClusterQueues[wl.ClusterQueue]
delete(cq.Workloads, workload.Key(wl.Obj))
cq.removeUsage(wl.FlavorResourceUsage())
cq.updateTASUsage(wl, subtract)
cq.removeUsage(wl.Usage())
}

// AddWorkload adds a workload from its corresponding ClusterQueue and
// updates resource usage.
func (s *Snapshot) AddWorkload(wl *workload.Info) {
cq := s.ClusterQueues[wl.ClusterQueue]
cq.Workloads[workload.Key(wl.Obj)] = wl
cq.AddUsage(wl.FlavorResourceUsage())
cq.updateTASUsage(wl, add)
cq.AddUsage(wl.Usage())
}

func (s *Snapshot) Log(log logr.Logger) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/tas_flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ func (c *TASFlavorCache) updateUsage(topologyRequests []workload.TopologyDomainR
c.usage[domainID] = resources.Requests{}
}
if op == subtract {
c.usage[domainID].Sub(tr.Requests)
c.usage[domainID].Sub(tr.TotalRequests())
c.usage[domainID].Sub(resources.Requests{corev1.ResourcePods: int64(tr.Count)})
} else {
c.usage[domainID].Add(tr.Requests)
c.usage[domainID].Add(tr.TotalRequests())
c.usage[domainID].Add(resources.Requests{corev1.ResourcePods: int64(tr.Count)})
}
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/cache/tas_flavor_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/resources"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
"sigs.k8s.io/kueue/pkg/workload"
)

var (
Expand Down Expand Up @@ -276,6 +277,22 @@ type tasPodSetAssignmentResult struct {

type FlavorTASRequests []TASPodSetRequests

func (s *TASFlavorSnapshot) Fits(flavorUsage workload.TASFlavorUsage) bool {
for _, domainUsage := range flavorUsage {
domainID := utiltas.DomainID(domainUsage.Values)
leaf, found := s.leaves[domainID]
if !found {
return false
}
remainingCapacity := leaf.freeCapacity.Clone()
remainingCapacity.Sub(leaf.tasUsage)
if !domainUsage.SinglePodRequests.FitsIn(remainingCapacity) {
return false
}
}
return true
}

// FindTopologyAssignmentsForFlavor returns TAS assignment, if possible, for all
// the TAS requests in the flavor handled by the snapshot.
// The simulateEmpty parameter allows to look for the assignment under the
Expand Down
16 changes: 16 additions & 0 deletions pkg/resources/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ func (r Requests) Clone() Requests {
return maps.Clone(r)
}

func (r Requests) ScaledUp(f int64) Requests {
ret := r.Clone()
ret.Mul(f)
return ret
}

func (r Requests) ScaledDown(f int64) Requests {
ret := r.Clone()
ret.Divide(f)
return ret
}

func (r Requests) Divide(f int64) {
for k := range r {
r[k] /= f
Expand Down Expand Up @@ -118,3 +130,7 @@ func (req Requests) CountIn(capacity Requests) int32 {
}
return ptr.Deref(result, 0)
}

func (req Requests) FitsIn(capacity Requests) bool {
return req.CountIn(capacity) >= 1
}
8 changes: 4 additions & 4 deletions pkg/scheduler/flavorassigner/flavorassigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,15 +1967,15 @@ func TestAssignFlavors(t *testing.T) {
t.Fatalf("Failed to create CQ snapshot")
}
if tc.clusterQueueUsage != nil {
clusterQueue.AddUsage(tc.clusterQueueUsage)
clusterQueue.AddUsage(workload.Usage{Quota: tc.clusterQueueUsage})
}

if tc.secondaryClusterQueue != nil {
secondaryClusterQueue := snapshot.ClusterQueues[tc.secondaryClusterQueue.Name]
if secondaryClusterQueue == nil {
t.Fatalf("Failed to create secondary CQ snapshot")
}
secondaryClusterQueue.AddUsage(tc.secondaryClusterQueueUsage)
secondaryClusterQueue.AddUsage(workload.Usage{Quota: tc.secondaryClusterQueueUsage})
}

flvAssigner := New(wlInfo, clusterQueue, resourceFlavors, tc.enableFairSharing, &testOracle{})
Expand Down Expand Up @@ -2125,10 +2125,10 @@ func TestReclaimBeforePriorityPreemption(t *testing.T) {
t.Fatalf("unexpected error while building snapshot: %v", err)
}
otherClusterQueue := snapshot.ClusterQueues["other-clusterqueue"]
otherClusterQueue.AddUsage(tc.otherClusterQueueUsage)
otherClusterQueue.AddUsage(workload.Usage{Quota: tc.otherClusterQueueUsage})

testClusterQueue := snapshot.ClusterQueues["test-clusterqueue"]
testClusterQueue.AddUsage(tc.testClusterQueueUsage)
testClusterQueue.AddUsage(workload.Usage{Quota: tc.testClusterQueueUsage})

flvAssigner := New(wlInfo, testClusterQueue, resourceFlavors, false, &testOracle{})
log := testr.NewWithOptions(t, testr.Options{Verbosity: 2})
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/flavorassigner/tas_flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ func podSetTopologyRequest(psAssignment *PodSetAssignment,
return nil, errors.New("workload requires Topology, but there is no TAS cache information")
}
psResources := wl.TotalRequests[podSetIndex]
singlePodRequests := psResources.Requests.Clone()
singlePodRequests.Divide(int64(psResources.Count))
singlePodRequests := psResources.SinglePodRequests()
podCount := psAssignment.Count
tasFlvr, err := onlyFlavor(psAssignment.Flavors)
if err != nil {
Expand Down
45 changes: 42 additions & 3 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,17 @@ type entry struct {
preemptionTargets []*preemption.Target
}

// netUsage returns how much capacity this entry will require from the ClusterQueue/Cohort.
func (e *entry) netUsage() workload.Usage {
return workload.Usage{
Quota: e.netQuotaUsage(),
TAS: e.tasUsage(),
}
}

// netQuotaUsage returns how much capacity this entry will require from the ClusterQueue/Cohort.
// When a workload is preempting, it subtracts the preempted resources from the resources
// required, as the remaining quota is all we need from the CQ/Cohort.
func (e *entry) netUsage() resources.FlavorResourceQuantities {
func (e *entry) netQuotaUsage() resources.FlavorResourceQuantities {
if e.assignment.RepresentativeMode() == flavorassigner.Fit {
return e.assignment.Usage
}
Expand All @@ -350,6 +357,31 @@ func (e *entry) netUsage() resources.FlavorResourceQuantities {
return usage
}

func (e *entry) tasUsage() workload.TASUsage {
if !features.Enabled(features.TopologyAwareScheduling) || !e.IsUsingTAS() {
return nil
}
result := make(workload.TASUsage)
for _, psa := range e.assignment.PodSets {
if features.Enabled(features.TopologyAwareScheduling) && psa.TopologyAssignment != nil {
singlePodRequests := resources.NewRequests(psa.Requests).ScaledDown(int64(psa.Count))
for _, flv := range psa.Flavors {
if _, ok := result[flv.Name]; !ok {
result[flv.Name] = make(workload.TASFlavorUsage, 0)
}
for _, domain := range psa.TopologyAssignment.Domains {
result[flv.Name] = append(result[flv.Name], workload.TopologyDomainRequests{
Values: domain.Values,
SinglePodRequests: singlePodRequests.Clone(),
Count: domain.Count,
})
}
}
}
}
return result
}

// nominate returns the workloads with their requirements (resource flavors, borrowing) if
// they were admitted by the clusterQueues in the snapshot.
func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, snap *cache.Snapshot) []entry {
Expand Down Expand Up @@ -392,7 +424,14 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
}

// resourcesToReserve calculates how much of the available resources in cq/cohort assignment should be reserved.
func resourcesToReserve(e *entry, cq *cache.ClusterQueueSnapshot) resources.FlavorResourceQuantities {
func resourcesToReserve(e *entry, cq *cache.ClusterQueueSnapshot) workload.Usage {
return workload.Usage{
Quota: quotaResourcesToReserve(e, cq),
TAS: e.TASUsage(),
}
}

func quotaResourcesToReserve(e *entry, cq *cache.ClusterQueueSnapshot) resources.FlavorResourceQuantities {
if e.assignment.RepresentativeMode() != flavorassigner.Preempt {
return e.assignment.Usage
}
Expand Down
Loading

0 comments on commit a6a9a7f

Please sign in to comment.