Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure metrics refresh time <= refreshMetricsInterval #207

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 47 additions & 14 deletions pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -10,10 +11,6 @@ import (
klog "k8s.io/klog/v2"
)

const (
fetchMetricsTimeout = 5 * time.Second
)

func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider {
p := &Provider{
podMetrics: sync.Map{},
Expand All @@ -35,23 +32,46 @@ type PodMetricsClient interface {
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error)
}

func isPodMetricsStale(pm *PodMetrics) bool {
// TODO: make it configurable
return time.Since(pm.UpdatedTime) > 5*time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we make the 5 second a constant?

}

func (p *Provider) AllPodMetrics() []*PodMetrics {
return p.allPodMetrics(false)
}

func (p *Provider) AllPodMetricsIncludingStale() []*PodMetrics {
return p.allPodMetrics(true)
}

func (p *Provider) allPodMetrics(staleIncluded bool) []*PodMetrics {
res := []*PodMetrics{}
fn := func(k, v any) bool {
res = append(res, v.(*PodMetrics))
m := v.(*PodMetrics)

if !staleIncluded && isPodMetricsStale(m) {
// exclude stale metrics for scheduler
klog.V(4).Infof("Pod metrics for %s is stale, skipping", m.Pod)
return true
}

res = append(res, m)
return true
}
p.podMetrics.Range(fn)
return res
}

func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) {
pm.UpdatedTime = time.Now()
p.podMetrics.Store(pod, pm)
}

func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
val, ok := p.podMetrics.Load(pod)
if ok {
// For now, we don't exclude stale metrics with GET operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create an issue to track this? We may want to deprioritize or skip a pod if its metrics is stale in the future.

return val.(*PodMetrics), true
}
return nil, false
Expand All @@ -60,11 +80,11 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error {
p.refreshPodsOnce()

if err := p.refreshMetricsOnce(); err != nil {
if err := p.refreshMetricsOnce(refreshMetricsInterval); err != nil {
klog.Errorf("Failed to init metrics: %v", err)
}

klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetricsIncludingStale())

// periodically refresh pods
go func() {
Expand All @@ -76,10 +96,18 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio

// periodically refresh metrics
go func() {
time.Sleep(refreshMetricsInterval)
for {
time.Sleep(refreshMetricsInterval)
if err := p.refreshMetricsOnce(); err != nil {
klog.V(4).Infof("Failed to refresh metrics: %v", err)
start := time.Now()

if err := p.refreshMetricsOnce(refreshMetricsInterval); err != nil {
klog.Errorf("Failed to refresh metrics: %v", err)
}

now := time.Now()
used := now.Sub(start)
if used < refreshMetricsInterval {
time.Sleep(refreshMetricsInterval - used)
}
}
}()
Expand All @@ -89,7 +117,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
go func() {
for {
time.Sleep(5 * time.Second)
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics())
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetricsIncludingStale())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add another debug line which is the stale metrics, and it can be a klog.Warning

}
}()
}
Expand Down Expand Up @@ -127,8 +155,8 @@ func (p *Provider) refreshPodsOnce() {
p.datastore.pods.Range(addNewPods)
}

func (p *Provider) refreshMetricsOnce() error {
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
func (p *Provider) refreshMetricsOnce(interval time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should give a longer timeout than the interval, to allow some slow backends to return metrics. Otherwise we need to really carefully pick a reasonable interval in various environments.

I think the fix should be: each pod has its own independent refresh loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I previously considered having each pod obtain metrics independently, but couldn't figure out how to adapt to the fact that pods are dynamically added.

Since pods are added dynamically and metrics are collected at fixed intervals, when collecting all pod metrics (AllPodMetrics) at a certain point in time, they won't have simultaneity.

Consider the following scenario, where metrics are collected once per time unit, and the queue length increases by one every half-time unit:

Pod A is added starting at 0, metrics are collected at 0, 1, and 2, and by point 2, its queue length is 4.

Pod B is added starting at 0.5, metrics are collected at 0.5 and 1.5, and by point 2, its queue length is 3.

However, if we make a decision at point 2, using metrics collected at points 2 and 1.5, we get values of 4 and 2, which doesn't reflect the actual situation.

For slow backends, both approaches are the same because metrics obtained from slow backends have latency. While collecting metrics independently for each pod could prevent slow backends from slowing down the overall progress, it would sacrifice the metrics' refreshment of normal backends. Since the vast majority of backends are normal, it's better to stick with the existing approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO there are 2 things: 1. the metric refresh interval, which is supposed to refresh metrics as fast as possible, and is optimistic. It's OK that sometimes some backends are not fast enough than the interval. 2. The metrics API call timeout. This needs to be long enough so that it covers some non-ideal situations when metrics API call is taking longer than normal. A slow API response is better than no response.

My concern with the current implementation is that it will fail the metrics call if the call timeouts at the interval. This can happen for example in an environment with slow networking, and it requires the operator to carefully pick an interval. Even that, during degraded networking conditions, this can lead to no metric refresh.

I don't think it's a big issue that pod metrics are not refreshed synchronously. Even they are refreshed synchronously, there is always a delay of the real-time state vs. what the ext proc sees.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liu-cong
I agree with you. What do you think about the following duration setting?
Refresh interval: 50ms
Fetch timeout: 1s
Stale time: 5s

So we have a stale limit to filter out pods which don't have fresh data, and for slow backends we can avoid abnormal abort and have some chances to retry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some metrics on the metrics API latency to make a more educated guess, but your proposed numbers look good to me for now. And we should make them configurable (adding to the startup flag is OK to begin with). Thanks again for the discussion!

defer cancel()
start := time.Now()
defer func() {
Expand All @@ -147,7 +175,12 @@ func (p *Provider) refreshMetricsOnce() error {
defer wg.Done()
updated, err := p.pmc.FetchMetrics(ctx, pod, existing)
if err != nil {
errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err)
// handle timeout error as less severe error
if errors.Is(err, context.Canceled) {
klog.V(4).Infof("Timeout fetching metrics for pod %s", pod)
} else {
errCh <- fmt.Errorf("failed to fetch metrics from %s: %v", pod, err)
}
return
}
p.UpdatePodMetrics(pod, updated)
Expand Down
28 changes: 22 additions & 6 deletions pkg/ext-proc/backend/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ var (

func TestProvider(t *testing.T) {
tests := []struct {
name string
pmc PodMetricsClient
datastore *K8sDatastore
initErr bool
want []*PodMetrics
name string
pmc PodMetricsClient
datastore *K8sDatastore
initErr bool
want []*PodMetrics
wantIncludingStale []*PodMetrics
}{
{
name: "Init success",
Expand All @@ -56,7 +57,8 @@ func TestProvider(t *testing.T) {
pod2.Pod: pod2,
},
},
want: []*PodMetrics{pod1, pod2},
want: []*PodMetrics{pod1, pod2},
wantIncludingStale: []*PodMetrics{pod1, pod2},
},
{
name: "Fetch metrics error",
Expand All @@ -72,6 +74,11 @@ func TestProvider(t *testing.T) {
pods: populateMap(pod1.Pod, pod2.Pod),
},
want: []*PodMetrics{
pod1,
// Failed to fetch pod2 metrics so it remains the default values,
// which is stale.
},
wantIncludingStale: []*PodMetrics{
pod1,
// Failed to fetch pod2 metrics so it remains the default values.
{
Expand Down Expand Up @@ -101,6 +108,15 @@ func TestProvider(t *testing.T) {
if diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(lessFunc)); diff != "" {
t.Errorf("Unexpected output (-want +got): %v", diff)
}

// Then check for AllPodMetricsIncludingStale
if len(test.wantIncludingStale) > 0 {
metricsIncludingStale := p.AllPodMetricsIncludingStale()
if diff := cmp.Diff(test.wantIncludingStale, metricsIncludingStale, cmpopts.SortSlices(lessFunc)); diff != "" {
t.Errorf("Unexpected output (-want +got): %v", diff)
}

}
})
}
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/ext-proc/backend/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Package backend is a library to interact with backend model servers such as probing metrics.
package backend

import "fmt"
import (
"fmt"
"time"
)

type PodSet map[Pod]bool

Expand All @@ -28,6 +31,9 @@ type Metrics struct {
type PodMetrics struct {
Pod
Metrics

// UpdatedTime record the time when the metrics are updated.
UpdatedTime time.Time
}

func (pm *PodMetrics) String() string {
Expand All @@ -48,6 +54,7 @@ func (pm *PodMetrics) Clone() *PodMetrics {
KVCacheUsagePercent: pm.KVCacheUsagePercent,
KvCacheMaxTokenCapacity: pm.KvCacheMaxTokenCapacity,
},
UpdatedTime: pm.UpdatedTime,
}
return clone
}
9 changes: 5 additions & 4 deletions pkg/ext-proc/backend/vllm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,23 @@ func (p *PodMetricsClientImpl) FetchMetrics(
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
// As we use a short fetch timeout to ensure the metrics are up-to-date, there will be a lot
// of timeout of error even only 0.1% requests are timed out.
// Return the raw error so that the caller can filter out it via errors.Is(err, context.Canceled)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The %w works for errors.Is right? I don't think there is a need to change this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out that. Would fix it in the next commit.

return nil, err
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned error will be gathered to errCh and be logged outside finally.

errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err)

I remove the error log so it won't be logged twice.

return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
}

parser := expfmt.TextParser{}
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to parse metrics from %s: %w", pod, err)
}
return promToPodMetrics(metricFamilies, existing)
}
Expand Down