-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ensure metrics refresh time <= refreshMetricsInterval #207
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package backend | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
@@ -10,10 +11,6 @@ import ( | |
klog "k8s.io/klog/v2" | ||
) | ||
|
||
const ( | ||
fetchMetricsTimeout = 5 * time.Second | ||
) | ||
|
||
func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { | ||
p := &Provider{ | ||
podMetrics: sync.Map{}, | ||
|
@@ -35,23 +32,46 @@ type PodMetricsClient interface { | |
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error) | ||
} | ||
|
||
func isPodMetricsStale(pm *PodMetrics) bool { | ||
// TODO: make it configurable | ||
return time.Since(pm.UpdatedTime) > 5*time.Second | ||
} | ||
|
||
func (p *Provider) AllPodMetrics() []*PodMetrics { | ||
return p.allPodMetrics(false) | ||
} | ||
|
||
func (p *Provider) AllPodMetricsIncludingStale() []*PodMetrics { | ||
return p.allPodMetrics(true) | ||
} | ||
|
||
func (p *Provider) allPodMetrics(staleIncluded bool) []*PodMetrics { | ||
res := []*PodMetrics{} | ||
fn := func(k, v any) bool { | ||
res = append(res, v.(*PodMetrics)) | ||
m := v.(*PodMetrics) | ||
|
||
if !staleIncluded && isPodMetricsStale(m) { | ||
// exclude stale metrics for scheduler | ||
klog.V(4).Infof("Pod metrics for %s is stale, skipping", m.Pod) | ||
return true | ||
} | ||
|
||
res = append(res, m) | ||
return true | ||
} | ||
p.podMetrics.Range(fn) | ||
return res | ||
} | ||
|
||
func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) { | ||
pm.UpdatedTime = time.Now() | ||
p.podMetrics.Store(pod, pm) | ||
} | ||
|
||
func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { | ||
val, ok := p.podMetrics.Load(pod) | ||
if ok { | ||
// For now, we don't exclude stale metrics with GET operation. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you create an issue to track this? We may want to deprioritize or skip a pod if its metrics is stale in the future. |
||
return val.(*PodMetrics), true | ||
} | ||
return nil, false | ||
|
@@ -60,11 +80,11 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { | |
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error { | ||
p.refreshPodsOnce() | ||
|
||
if err := p.refreshMetricsOnce(); err != nil { | ||
if err := p.refreshMetricsOnce(refreshMetricsInterval); err != nil { | ||
klog.Errorf("Failed to init metrics: %v", err) | ||
} | ||
|
||
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics()) | ||
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetricsIncludingStale()) | ||
|
||
// periodically refresh pods | ||
go func() { | ||
|
@@ -76,10 +96,18 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio | |
|
||
// periodically refresh metrics | ||
go func() { | ||
time.Sleep(refreshMetricsInterval) | ||
for { | ||
time.Sleep(refreshMetricsInterval) | ||
if err := p.refreshMetricsOnce(); err != nil { | ||
klog.V(4).Infof("Failed to refresh metrics: %v", err) | ||
start := time.Now() | ||
|
||
if err := p.refreshMetricsOnce(refreshMetricsInterval); err != nil { | ||
klog.Errorf("Failed to refresh metrics: %v", err) | ||
} | ||
|
||
now := time.Now() | ||
used := now.Sub(start) | ||
if used < refreshMetricsInterval { | ||
time.Sleep(refreshMetricsInterval - used) | ||
} | ||
} | ||
}() | ||
|
@@ -89,7 +117,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio | |
go func() { | ||
for { | ||
time.Sleep(5 * time.Second) | ||
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics()) | ||
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetricsIncludingStale()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you also add another debug line which is the stale metrics, and it can be a klog.Warning |
||
} | ||
}() | ||
} | ||
|
@@ -127,8 +155,8 @@ func (p *Provider) refreshPodsOnce() { | |
p.datastore.pods.Range(addNewPods) | ||
} | ||
|
||
func (p *Provider) refreshMetricsOnce() error { | ||
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) | ||
func (p *Provider) refreshMetricsOnce(interval time.Duration) error { | ||
ctx, cancel := context.WithTimeout(context.Background(), interval) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should give a longer timeout than the interval, to allow some slow backends to return metrics. Otherwise we need to really carefully pick a reasonable interval in various environments. I think the fix should be: each pod has its own independent refresh loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I previously considered having each pod obtain metrics independently, but couldn't figure out how to adapt to the fact that pods are dynamically added. Since pods are added dynamically and metrics are collected at fixed intervals, when collecting all pod metrics (AllPodMetrics) at a certain point in time, they won't have simultaneity. Consider the following scenario, where metrics are collected once per time unit, and the queue length increases by one every half-time unit: Pod A is added starting at 0, metrics are collected at 0, 1, and 2, and by point 2, its queue length is 4. Pod B is added starting at 0.5, metrics are collected at 0.5 and 1.5, and by point 2, its queue length is 3. However, if we make a decision at point 2, using metrics collected at points 2 and 1.5, we get values of 4 and 2, which doesn't reflect the actual situation. For slow backends, both approaches are the same because metrics obtained from slow backends have latency. While collecting metrics independently for each pod could prevent slow backends from slowing down the overall progress, it would sacrifice the metrics' refreshment of normal backends. Since the vast majority of backends are normal, it's better to stick with the existing approach. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO there are 2 things: 1. the metric refresh interval, which is supposed to refresh metrics as fast as possible, and is optimistic. It's OK that sometimes some backends are not fast enough than the interval. 2. The metrics API call timeout. This needs to be long enough so that it covers some non-ideal situations when metrics API call is taking longer than normal. A slow API response is better than no response. My concern with the current implementation is that it will fail the metrics call if the call timeouts at the interval. This can happen for example in an environment with slow networking, and it requires the operator to carefully pick an interval. Even that, during degraded networking conditions, this can lead to no metric refresh. I don't think it's a big issue that pod metrics are not refreshed synchronously. Even they are refreshed synchronously, there is always a delay of the real-time state vs. what the ext proc sees. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @liu-cong So we have a stale limit to filter out pods which don't have fresh data, and for slow backends we can avoid abnormal abort and have some chances to retry. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add some metrics on the metrics API latency to make a more educated guess, but your proposed numbers look good to me for now. And we should make them configurable (adding to the startup flag is OK to begin with). Thanks again for the discussion! |
||
defer cancel() | ||
start := time.Now() | ||
defer func() { | ||
|
@@ -147,7 +175,12 @@ func (p *Provider) refreshMetricsOnce() error { | |
defer wg.Done() | ||
updated, err := p.pmc.FetchMetrics(ctx, pod, existing) | ||
if err != nil { | ||
errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err) | ||
// handle timeout error as less severe error | ||
if errors.Is(err, context.Canceled) { | ||
klog.V(4).Infof("Timeout fetching metrics for pod %s", pod) | ||
} else { | ||
errCh <- fmt.Errorf("failed to fetch metrics from %s: %v", pod, err) | ||
} | ||
return | ||
} | ||
p.UpdatePodMetrics(pod, updated) | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -49,22 +49,23 @@ func (p *PodMetricsClientImpl) FetchMetrics( | |||
} | ||||
resp, err := http.DefaultClient.Do(req) | ||||
if err != nil { | ||||
klog.Errorf("failed to fetch metrics from %s: %v", pod, err) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you change this to klog.V(4)? https://github.com/kubernetes-sigs/gateway-api-inference-extension/blob/main/docs/dev.md |
||||
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err) | ||||
// As we use a short fetch timeout to ensure the metrics are up-to-date, there will be a lot | ||||
// of timeout of error even only 0.1% requests are timed out. | ||||
// Return the raw error so that the caller can filter out it via errors.Is(err, context.Canceled) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The %w works for errors.Is right? I don't think there is a need to change this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing out that. Would fix it in the next commit. |
||||
return nil, err | ||||
} | ||||
defer func() { | ||||
_ = resp.Body.Close() | ||||
}() | ||||
|
||||
if resp.StatusCode != http.StatusOK { | ||||
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you change this to klog.V(4)? According to the https://github.com/kubernetes-sigs/gateway-api-inference-extension/blob/main/docs/dev.md There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The returned error will be gathered to
I remove the error log so it won't be logged twice. |
||||
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode) | ||||
} | ||||
|
||||
parser := expfmt.TextParser{} | ||||
metricFamilies, err := parser.TextToMetricFamilies(resp.Body) | ||||
if err != nil { | ||||
return nil, err | ||||
return nil, fmt.Errorf("failed to parse metrics from %s: %w", pod, err) | ||||
} | ||||
return promToPodMetrics(metricFamilies, existing) | ||||
} | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we make the 5 second a constant?