Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduling changes for lora affinity load balancing #423

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/manifests/vllm/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ data:
- base-model: meta-llama/Llama-2-7b-hf
id: tweet-summary-1
source: vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm
1 change: 1 addition & 0 deletions docs/proposals/003-model-server-protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ The model server MUST expose the following LoRA adapter metrics via the same Pro
requested adapter. Example: `"max_lora": "8"`.
* `running_lora_adapters`: A comma separated list of adapters that are currently loaded in GPU
memory and ready to serve requests. Example: `"running_lora_adapters": "adapter1, adapter2"`
* `waiting_lora_adapters`: A comma separated list of adapters that are waiting to be served. Example: `"waiting_lora_adapters": "adapter1, adapter2"`
44 changes: 41 additions & 3 deletions pkg/epp/backend/vllm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ import (
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

// Metric names used in the vLLM metrics implementation.
// Refer to the protocol doc for more details:
// https://github.com/kubernetes-sigs/gateway-api-inference-extension/tree/main/docs/proposals/003-model-server-protocol
const (
LoraRequestInfoMetricName = "vllm:lora_requests_info"
LoraRequestInfoRunningAdaptersMetricName = "running_lora_adapters"
LoraRequestInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On one hand, I can see why considering waiting is useful, because waiting loras are going to be served next. However, I have concerns of this weakening the lora affinity. running is bound by the max lora, waiting is not bound. If we enter an unstable state with a long waiting, we can lose the affinity benefit.

An improvement algorithm could be we prioritize waiting over running, what do you think?

Copy link
Contributor Author

@kaushikmitr kaushikmitr Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so using waiting + running for affinity is always superior to using just running. Because adapters get served in First come first serve basis. So we know for sure thar waiting if not available will get loaded for sure. But yes, within waiting + running prioritizing waiting over running makes sense I think, but need to test first.

LoraRequestInfoMaxAdaptersMetricName = "max_lora"
// TODO: Replace these with the num_tokens_running/waiting below once we add those to the fork.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clean up the TODOs and the metrics that are not currently used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the TODOs are still relevant. I will remove max token in KV Cache because its not being used.

RunningQueueSizeMetricName = "vllm:num_requests_running"
Expand All @@ -45,8 +49,7 @@ const (
RunningQueueSizeMetricName = "vllm:num_tokens_running"
WaitingQueueSizeMetricName = "vllm:num_tokens_waiting"
*/
KVCacheUsagePercentMetricName = "vllm:gpu_cache_usage_perc"
KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity"
KVCacheUsagePercentMetricName = "vllm:gpu_cache_usage_perc"
)

type PodMetricsClientImpl struct{}
Expand Down Expand Up @@ -136,6 +139,14 @@ func promToPodMetrics(
}
}
}
if label.GetName() == LoraRequestInfoWaitingAdaptersMetricName {
if label.GetValue() != "" {
adapterList := strings.Split(label.GetValue(), ",")
for _, adapter := range adapterList {
updated.ActiveModels[adapter] = 0
}
}
}
if label.GetName() == LoraRequestInfoMaxAdaptersMetricName {
if label.GetValue() != "" {
updated.MaxActiveModels, err = strconv.Atoi(label.GetValue())
Expand All @@ -161,14 +172,41 @@ func getLatestLoraMetric(logger logr.Logger, metricFamilies map[string]*dto.Metr
logger.V(logutil.DEFAULT).Error(nil, "Metric family not found", "name", LoraRequestInfoMetricName)
return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName)
}
var latestTs float64

var latest *dto.Metric
var latestTs float64

// Iterate over all metrics in the family.
for _, m := range loraRequests.GetMetric() {
var running, waiting string
// Read the label values for running and waiting adapters.
for _, lp := range m.GetLabel() {
switch lp.GetName() {
case LoraRequestInfoRunningAdaptersMetricName:
running = lp.GetValue()
case LoraRequestInfoWaitingAdaptersMetricName:
waiting = lp.GetValue()
}
}

// Ignore metrics with both labels empty.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Ignore metrics with both labels empty.
// Ignore metrics with both labels empty. This happens when there are no running or waiting requests on
// the server, in this case it is best to use the last set of active adapters.

if running == "" && waiting == "" {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this happen in practice? in what cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there are no running requests, that is qps = 0 for the pod. In this case it's good to use last adapters that ran to determine affinity (as a proxy for Lora metric for what's already loaded)

}

// Select the metric with the latest creation timestamp.
if m.GetGauge().GetValue() > latestTs {
latestTs = m.GetGauge().GetValue()
latest = m
}
}

if latest == nil {
logger.V(logutil.TRACE).Info("Metric value Empty", "value", latest, "metric", LoraRequestInfoMetricName)
return nil, time.Time{}, nil
}

// Convert the gauge value (creation timestamp) to time.Time.
return latest, time.Unix(0, int64(latestTs*1000)), nil
}

Expand Down
61 changes: 52 additions & 9 deletions pkg/epp/scheduling/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package scheduling
import (
"errors"
"math"
"math/rand"
"time"

"github.com/go-logr/logr"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
Expand Down Expand Up @@ -183,18 +185,59 @@ func lowLoRACostPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool {
return ok || len(pod.ActiveModels) < pod.MaxActiveModels
}

// loRAAffinityPredicate is a filter function to check whether a pod has affinity to the lora requested.
func loRAAffinityPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool {
_, ok := pod.ActiveModels[req.ResolvedTargetModel]
return ok
}
// loRASoftAffinityPredicate implements a pod selection strategy that prioritizes pods
// with existing LoRA model affinity while allowing for load balancing through randomization.
//
// The function works by:
// 1. Separating pods into two groups: those with target model affinity and those with available capacity
// 2. Using a probability threshold to sometimes select from non-affinity pods to enable load balancing
// 3. Falling back to whatever group has pods if one group is empty
//
// Parameters:
// - logger: Logger interface for diagnostic output
// - req: LLM request containing the resolved target model
// - pods: Slice of pod metrics to filter
//
// Returns:
// - Filtered slice of pod metrics based on affinity and availability
// - Error if any issues occur during filtering
func loRASoftAffinityFilter(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) {

// Pre-allocate slices with estimated capacity
filtered_affinity := make([]*datastore.PodMetrics, 0, len(pods))
filtered_available := make([]*datastore.PodMetrics, 0, len(pods))

// Categorize pods based on affinity and availability
for _, pod := range pods {

if _, exists := pod.ActiveModels[req.ResolvedTargetModel]; exists {
filtered_affinity = append(filtered_affinity, pod)
} else if len(pod.ActiveModels) < pod.MaxActiveModels {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is essentially the canAcceptNewLoraPredicate function below, are we still using canAcceptNewLoraPredicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not using canAcceptNewLoraPredicate any more. But would be good to keep I think.

filtered_available = append(filtered_available, pod)
}
}

// Use crypto/rand for better randomization in production environments
randSource := rand.NewSource(time.Now().UnixNano())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a follow up, but it sounds like we can extend the current filter framework to support such probability based filtering. So instead of having one base filter, we have a list of filters with weights. This way we can keep each filter very focused, and make them more reusable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

randGen := rand.New(randSource)

// If both groups have pods, use probability to select which group to return
if len(filtered_affinity) > 0 && len(filtered_available) > 0 {
if randGen.Float64() < loraAffinityThreshold {
return filtered_affinity, nil
}
return filtered_available, nil
}

// Return whichever group has pods
if len(filtered_affinity) > 0 {
return filtered_affinity, nil
}

// canAcceptNewLoraPredicate is a filter function to check whether a pod has room to load the adapter.
func canAcceptNewLoraPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool {
return len(pod.ActiveModels) < pod.MaxActiveModels
return filtered_available, nil
}

func criticalRequestPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool {
func criticalRequestPredicate(req *LLMRequest, _ *datastore.PodMetrics) bool {
return req.Critical
}

Expand Down
20 changes: 9 additions & 11 deletions pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ const (
queueThresholdCritical = 5
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable.
// the threshold for queued requests to be considered low below which we can prioritize LoRA affinity.
// The value of 50 is arrived heuristicically based on experiments.
queueingThresholdLoRA = 50
// The value of 128 is arrived heuristicically based on experiments.
queueingThresholdLoRA = 128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make this configurable perhaps via a flag for now. Different environments will likely need different thresholds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather levarage this to make this configurable. #16

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have time to do API change for the next release. Given we already had to change it on different accelerator types, it's important to have this knob configurable. Exposing it as a flag seems straightforward and gives us time to gather feedback on this before making an API change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took at look, iiuc, adding this flag is not straightforward, the way scheduler is written. If its needed for next release would rather have it in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining a flag for each parameter is tedious, we can use a versioned configuration file, this is called ComponentConfig, ideally we do that for #383

Here is JobSet's config file as an example: https://github.com/kubernetes-sigs/jobset/tree/main/api/config/v1alpha1

// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable.
// loraAffinityThreshold indicates the probability with which we prefer a pod with LoRA affinity over a pod without but having room to fit more LoRA adapters.
loraAffinityThreshold = 0.999
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have some insights to show why this is needed and why this value is picked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked it after some trail and error. This value worked well when we had skewed traffic for different adapters, helped spread out high QPS adapters while keeping low QPS adapters less spread out

)

var (
Expand All @@ -54,7 +57,7 @@ var (
filter: leastQueuingFilterFunc,
nextOnSuccessOrFailure: &filter{
name: "low cost LoRA",
filter: toFilterFunc(lowLoRACostPredicate),
filter: loRASoftAffinityFilter,
nextOnSuccessOrFailure: &filter{
name: "least KV cache percent",
filter: leastKVCacheFilterFunc,
Expand All @@ -76,14 +79,9 @@ var (
name: "low queueing filter",
filter: toFilterFunc((lowQueueingPodPredicate)),
nextOnSuccess: &filter{
name: "affinity LoRA",
filter: toFilterFunc(loRAAffinityPredicate),
nextOnSuccess: queueAndKVCacheFilter,
nextOnFailure: &filter{
name: "can accept LoRA Adapter",
filter: toFilterFunc(canAcceptNewLoraPredicate),
nextOnSuccessOrFailure: queueAndKVCacheFilter,
},
name: "affinity LoRA",
filter: loRASoftAffinityFilter,
nextOnSuccessOrFailure: queueAndKVCacheFilter,
},
nextOnFailure: queueLoRAAndKVCacheFilter,
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
}),
extprocutils.FakePodMetrics(1, datastore.Metrics{
WaitingQueueSize: 50,
WaitingQueueSize: 200,
KVCacheUsagePercent: 0.1,
ActiveModels: map[string]int{
"foo": 1,
Expand Down