Skip to content

Commit

Permalink
ensure metrics refresh time <= refreshMetricsInterval
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander committed Jan 20, 2025
1 parent fa40dc5 commit d3f5765
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 25 deletions.
61 changes: 47 additions & 14 deletions pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -10,10 +11,6 @@ import (
klog "k8s.io/klog/v2"
)

const (
fetchMetricsTimeout = 5 * time.Second
)

func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider {
p := &Provider{
podMetrics: sync.Map{},
Expand All @@ -35,23 +32,46 @@ type PodMetricsClient interface {
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error)
}

func isPodMetricsStale(pm *PodMetrics) bool {
// TODO: make it configurable
return time.Since(pm.UpdatedTime) > 5*time.Second
}

func (p *Provider) AllPodMetrics() []*PodMetrics {
return p.allPodMetrics(false)
}

func (p *Provider) AllPodMetricsIncludingStale() []*PodMetrics {
return p.allPodMetrics(true)
}

func (p *Provider) allPodMetrics(staleIncluded bool) []*PodMetrics {
res := []*PodMetrics{}
fn := func(k, v any) bool {
res = append(res, v.(*PodMetrics))
m := v.(*PodMetrics)

if !staleIncluded && isPodMetricsStale(m) {
// exclude stale metrics for scheduler
klog.V(4).Infof("Pod metrics for %s is stale, skipping", m.Pod)
return true
}

res = append(res, m)
return true
}
p.podMetrics.Range(fn)
return res
}

func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) {
pm.UpdatedTime = time.Now()
p.podMetrics.Store(pod, pm)
}

func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
val, ok := p.podMetrics.Load(pod)
if ok {
// For now, we don't exclude stale metrics with GET operation.
return val.(*PodMetrics), true
}
return nil, false
Expand All @@ -60,11 +80,11 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error {
p.refreshPodsOnce()

if err := p.refreshMetricsOnce(); err != nil {
if err := p.refreshMetricsOnce(refreshMetricsInterval); err != nil {
klog.Errorf("Failed to init metrics: %v", err)
}

klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetricsIncludingStale())

// periodically refresh pods
go func() {
Expand All @@ -76,10 +96,18 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio

// periodically refresh metrics
go func() {
time.Sleep(refreshMetricsInterval)
for {
time.Sleep(refreshMetricsInterval)
if err := p.refreshMetricsOnce(); err != nil {
klog.V(4).Infof("Failed to refresh metrics: %v", err)
start := time.Now()

if err := p.refreshMetricsOnce(refreshMetricsInterval); err != nil {
klog.Errorf("Failed to refresh metrics: %v", err)
}

now := time.Now()
used := now.Sub(start)
if used < refreshMetricsInterval {
time.Sleep(refreshMetricsInterval - used)
}
}
}()
Expand All @@ -89,7 +117,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
go func() {
for {
time.Sleep(5 * time.Second)
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics())
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetricsIncludingStale())
}
}()
}
Expand Down Expand Up @@ -127,8 +155,8 @@ func (p *Provider) refreshPodsOnce() {
p.datastore.pods.Range(addNewPods)
}

func (p *Provider) refreshMetricsOnce() error {
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
func (p *Provider) refreshMetricsOnce(interval time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
start := time.Now()
defer func() {
Expand All @@ -147,7 +175,12 @@ func (p *Provider) refreshMetricsOnce() error {
defer wg.Done()
updated, err := p.pmc.FetchMetrics(ctx, pod, existing)
if err != nil {
errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err)
// handle timeout error as less severe error
if errors.Is(err, context.Canceled) {
klog.V(4).Infof("Timeout fetching metrics for pod %s", pod)
} else {
errCh <- fmt.Errorf("failed to fetch metrics from %s: %v", pod, err)
}
return
}
p.UpdatePodMetrics(pod, updated)
Expand Down
28 changes: 22 additions & 6 deletions pkg/ext-proc/backend/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ var (

func TestProvider(t *testing.T) {
tests := []struct {
name string
pmc PodMetricsClient
datastore *K8sDatastore
initErr bool
want []*PodMetrics
name string
pmc PodMetricsClient
datastore *K8sDatastore
initErr bool
want []*PodMetrics
wantIncludingStale []*PodMetrics
}{
{
name: "Init success",
Expand All @@ -56,7 +57,8 @@ func TestProvider(t *testing.T) {
pod2.Pod: pod2,
},
},
want: []*PodMetrics{pod1, pod2},
want: []*PodMetrics{pod1, pod2},
wantIncludingStale: []*PodMetrics{pod1, pod2},
},
{
name: "Fetch metrics error",
Expand All @@ -72,6 +74,11 @@ func TestProvider(t *testing.T) {
pods: populateMap(pod1.Pod, pod2.Pod),
},
want: []*PodMetrics{
pod1,
// Failed to fetch pod2 metrics so it remains the default values,
// which is stale.
},
wantIncludingStale: []*PodMetrics{
pod1,
// Failed to fetch pod2 metrics so it remains the default values.
{
Expand Down Expand Up @@ -101,6 +108,15 @@ func TestProvider(t *testing.T) {
if diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(lessFunc)); diff != "" {
t.Errorf("Unexpected output (-want +got): %v", diff)
}

// Then check for AllPodMetricsIncludingStale
if len(test.wantIncludingStale) > 0 {
metricsIncludingStale := p.AllPodMetricsIncludingStale()
if diff := cmp.Diff(test.wantIncludingStale, metricsIncludingStale, cmpopts.SortSlices(lessFunc)); diff != "" {
t.Errorf("Unexpected output (-want +got): %v", diff)
}

}
})
}
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/ext-proc/backend/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Package backend is a library to interact with backend model servers such as probing metrics.
package backend

import "fmt"
import (
"fmt"
"time"
)

type PodSet map[Pod]bool

Expand All @@ -28,6 +31,9 @@ type Metrics struct {
type PodMetrics struct {
Pod
Metrics

// UpdatedTime record the time when the metrics are updated.
UpdatedTime time.Time
}

func (pm *PodMetrics) String() string {
Expand All @@ -48,6 +54,7 @@ func (pm *PodMetrics) Clone() *PodMetrics {
KVCacheUsagePercent: pm.KVCacheUsagePercent,
KvCacheMaxTokenCapacity: pm.KvCacheMaxTokenCapacity,
},
UpdatedTime: pm.UpdatedTime,
}
return clone
}
9 changes: 5 additions & 4 deletions pkg/ext-proc/backend/vllm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,23 @@ func (p *PodMetricsClientImpl) FetchMetrics(
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
// As we use a short fetch timeout to ensure the metrics are up-to-date, there will be a lot
// of timeout of error even only 0.1% requests are timed out.
// Return the raw error so that the caller can filter out it via errors.Is(err, context.Canceled)
return nil, err
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
}

parser := expfmt.TextParser{}
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to parse metrics from %s: %w", pod, err)
}
return promToPodMetrics(metricFamilies, existing)
}
Expand Down

0 comments on commit d3f5765

Please sign in to comment.