Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduling changes for lora affinity load balancing #423

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions config/manifests/vllm/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: Deployment
metadata:
name: vllm-llama2-7b-pool
spec:
replicas: 3
replicas: 6
selector:
matchLabels:
app: vllm-llama2-7b-pool
Expand All @@ -24,15 +24,23 @@ spec:
- "1"
- "--port"
- "8000"
- "--compilation-config"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may not need this if using V0. It outputs the CUDA graph for optimization.

- "3"
- "--max-num-seqs"
- "2048"
- "--enable-lora"
- "--max-loras"
- "4"
- "--max-cpu-loras"
- "12"
- "15"
- "--max-lora-rank"
- "16"
- "--lora-modules"
- '{"name": "tweet-summary-0", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
- '{"name": "tweet-summary-1", "path": "vineetsharma/qlora-adapter-Llama-2-7b-hf-TweetSumm", "base_model_name": "llama-2"}'
env:
- name: VLLM_USE_V1
value: "1"
Copy link
Contributor

@ahg-g ahg-g Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The released vllm version doesn't support our metrics yet, right? if so, then we can't use it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is why the tests are failing. I will switch back to V0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that is, the integration test doesn't use this deployment yaml.

I think the test is failing because this PR introduces some randomness to the selection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- name: PORT
value: "8000"
- name: HUGGING_FACE_HUB_TOKEN
Expand Down
37 changes: 36 additions & 1 deletion pkg/epp/backend/vllm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
const (
LoraRequestInfoMetricName = "vllm:lora_requests_info"
LoraRequestInfoRunningAdaptersMetricName = "running_lora_adapters"
LoraRequestInfoWaitingAdaptersMetricName = "waiting_lora_adapters"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On one hand, I can see why considering waiting is useful, because waiting loras are going to be served next. However, I have concerns of this weakening the lora affinity. running is bound by the max lora, waiting is not bound. If we enter an unstable state with a long waiting, we can lose the affinity benefit.

An improvement algorithm could be we prioritize waiting over running, what do you think?

Copy link
Contributor Author

@kaushikmitr kaushikmitr Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so using waiting + running for affinity is always superior to using just running. Because adapters get served in First come first serve basis. So we know for sure thar waiting if not available will get loaded for sure. But yes, within waiting + running prioritizing waiting over running makes sense I think, but need to test first.

LoraRequestInfoMaxAdaptersMetricName = "max_lora"
// TODO: Replace these with the num_tokens_running/waiting below once we add those to the fork.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clean up the TODOs and the metrics that are not currently used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the TODOs are still relevant. I will remove max token in KV Cache because its not being used.

RunningQueueSizeMetricName = "vllm:num_requests_running"
Expand Down Expand Up @@ -136,6 +137,14 @@ func promToPodMetrics(
}
}
}
if label.GetName() == LoraRequestInfoWaitingAdaptersMetricName {
if label.GetValue() != "" {
adapterList := strings.Split(label.GetValue(), ",")
for _, adapter := range adapterList {
updated.ActiveModels[adapter] = 0
}
}
}
if label.GetName() == LoraRequestInfoMaxAdaptersMetricName {
if label.GetValue() != "" {
updated.MaxActiveModels, err = strconv.Atoi(label.GetValue())
Expand All @@ -161,14 +170,40 @@ func getLatestLoraMetric(logger logr.Logger, metricFamilies map[string]*dto.Metr
logger.V(logutil.DEFAULT).Error(nil, "Metric family not found", "name", LoraRequestInfoMetricName)
return nil, time.Time{}, fmt.Errorf("metric family %q not found", LoraRequestInfoMetricName)
}
var latestTs float64

var latest *dto.Metric
var latestTs float64

// Iterate over all metrics in the family.
for _, m := range loraRequests.GetMetric() {
var running, waiting string
// Read the label values for running and waiting adapters.
for _, lp := range m.GetLabel() {
switch lp.GetName() {
case LoraRequestInfoRunningAdaptersMetricName:
running = lp.GetValue()
case LoraRequestInfoWaitingAdaptersMetricName:
waiting = lp.GetValue()
}
}

// Ignore metrics with both labels empty.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Ignore metrics with both labels empty.
// Ignore metrics with both labels empty. This happens when there are no running or waiting requests on
// the server, in this case it is best to use the last set of active adapters.

if running == "" && waiting == "" {
// continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented out code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a bug.

}

// Select the metric with the latest creation timestamp.
if m.GetGauge().GetValue() > latestTs {
latestTs = m.GetGauge().GetValue()
latest = m
}
}

if latest == nil {
return nil, time.Time{}, fmt.Errorf("no valid metric found")
}

// Convert the gauge value (creation timestamp) to time.Time.
return latest, time.Unix(0, int64(latestTs*1000)), nil
}

Expand Down
59 changes: 55 additions & 4 deletions pkg/epp/scheduling/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package scheduling
import (
"errors"
"math"
"math/rand"
"time"

"github.com/go-logr/logr"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
Expand Down Expand Up @@ -183,10 +185,59 @@ func lowLoRACostPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool {
return ok || len(pod.ActiveModels) < pod.MaxActiveModels
}

// loRAAffinityPredicate is a filter function to check whether a pod has affinity to the lora requested.
func loRAAffinityPredicate(req *LLMRequest, pod *datastore.PodMetrics) bool {
_, ok := pod.ActiveModels[req.ResolvedTargetModel]
return ok
// loRASoftAffinityPredicate implements a pod selection strategy that prioritizes pods
// with existing LoRA model affinity while allowing for load balancing through randomization.
//
// The function works by:
// 1. Separating pods into two groups: those with target model affinity and those with available capacity
// 2. Using a probability threshold to sometimes select from non-affinity pods to enable load balancing
// 3. Falling back to whatever group has pods if one group is empty
//
// Parameters:
// - logger: Logger interface for diagnostic output
// - req: LLM request containing the resolved target model
// - pods: Slice of pod metrics to filter
//
// Returns:
// - Filtered slice of pod metrics based on affinity and availability
// - Error if any issues occur during filtering
func loRASoftAffinityPredicate(logger logr.Logger, req *LLMRequest, pods []*datastore.PodMetrics) ([]*datastore.PodMetrics, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a predicate, this is a filter, according to the current filter and predicate interface definition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed


// Pre-allocate slices with estimated capacity
filtered_affinity := make([]*datastore.PodMetrics, 0, len(pods))
filtered_available := make([]*datastore.PodMetrics, 0, len(pods))

// Categorize pods based on affinity and availability
for _, pod := range pods {
if pod == nil {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add a warning log here and state that this should never happen

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this, as this scenario is captured already upstream

}

if _, exists := pod.ActiveModels[req.ResolvedTargetModel]; exists {
filtered_affinity = append(filtered_affinity, pod)
} else if len(pod.ActiveModels) < pod.MaxActiveModels {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is essentially the canAcceptNewLoraPredicate function below, are we still using canAcceptNewLoraPredicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not using canAcceptNewLoraPredicate any more. But would be good to keep I think.

filtered_available = append(filtered_available, pod)
}
}

// Use crypto/rand for better randomization in production environments
randSource := rand.NewSource(time.Now().UnixNano())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a follow up, but it sounds like we can extend the current filter framework to support such probability based filtering. So instead of having one base filter, we have a list of filters with weights. This way we can keep each filter very focused, and make them more reusable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

randGen := rand.New(randSource)

// If both groups have pods, use probability to select which group to return
if len(filtered_affinity) > 0 && len(filtered_available) > 0 {
if randGen.Float64() < loraAffinityThreshold {
return filtered_affinity, nil
}
return filtered_available, nil
}

// Return whichever group has pods
if len(filtered_affinity) > 0 {
return filtered_affinity, nil
}

return filtered_available, nil
}

// canAcceptNewLoraPredicate is a filter function to check whether a pod has room to load the adapter.
Expand Down
20 changes: 9 additions & 11 deletions pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ const (
queueThresholdCritical = 5
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable.
// the threshold for queued requests to be considered low below which we can prioritize LoRA affinity.
// The value of 50 is arrived heuristicically based on experiments.
queueingThresholdLoRA = 50
// The value of 128 is arrived heuristicically based on experiments.
queueingThresholdLoRA = 128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make this configurable perhaps via a flag for now. Different environments will likely need different thresholds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather levarage this to make this configurable. #16

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have time to do API change for the next release. Given we already had to change it on different accelerator types, it's important to have this knob configurable. Exposing it as a flag seems straightforward and gives us time to gather feedback on this before making an API change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took at look, iiuc, adding this flag is not straightforward, the way scheduler is written. If its needed for next release would rather have it in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining a flag for each parameter is tedious, we can use a versioned configuration file, this is called ComponentConfig, ideally we do that for #383

Here is JobSet's config file as an example: https://github.com/kubernetes-sigs/jobset/tree/main/api/config/v1alpha1

// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16) Make this configurable.
// loraAffinityThreshold indicates the probability with which we prefer a pod with LoRA affinity over a pod without but having room to fit more LoRA adapters.
loraAffinityThreshold = 0.999
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have some insights to show why this is needed and why this value is picked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked it after some trail and error. This value worked well when we had skewed traffic for different adapters, helped spread out high QPS adapters while keeping low QPS adapters less spread out

)

var (
Expand All @@ -54,7 +57,7 @@ var (
filter: leastQueuingFilterFunc,
nextOnSuccessOrFailure: &filter{
name: "low cost LoRA",
filter: toFilterFunc(lowLoRACostPredicate),
filter: loRASoftAffinityPredicate,
nextOnSuccessOrFailure: &filter{
name: "least KV cache percent",
filter: leastKVCacheFilterFunc,
Expand All @@ -76,14 +79,9 @@ var (
name: "low queueing filter",
filter: toFilterFunc((lowQueueingPodPredicate)),
nextOnSuccess: &filter{
name: "affinity LoRA",
filter: toFilterFunc(loRAAffinityPredicate),
nextOnSuccess: queueAndKVCacheFilter,
nextOnFailure: &filter{
name: "can accept LoRA Adapter",
filter: toFilterFunc(canAcceptNewLoraPredicate),
nextOnSuccessOrFailure: queueAndKVCacheFilter,
},
name: "affinity LoRA",
filter: loRASoftAffinityPredicate,
nextOnSuccessOrFailure: queueAndKVCacheFilter,
},
nextOnFailure: queueLoRAAndKVCacheFilter,
}
Expand Down