Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refresh metrics per pod periodically #223

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ vet: ## Run go vet against code.

.PHONY: test
test: manifests generate fmt vet envtest ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -coverprofile cover.out
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -race -coverprofile cover.out

.PHONY: test-integration
test-integration: manifests generate fmt vet envtest ## Run tests.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ext-proc/backend/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi
return nil, err
}
klog.V(logutil.VERBOSE).InfoS("Fetching metrics for pod", "pod", pod, "existing", existing, "new", f.Res[pod])
return f.Res[pod], nil
return f.Res[pod].Clone(), nil
}

type FakeDataStore struct {
Expand Down
77 changes: 77 additions & 0 deletions pkg/ext-proc/backend/pod_metrics_refresher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package backend

import (
"context"
"time"

klog "k8s.io/klog/v2"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

type PodMetricsRefresher struct {
done chan struct{}
interval time.Duration
timeout time.Duration

// refresher holds provider & pod so it can update the metrics concurrent-safely.
pod Pod
provider *Provider
}

func NewPodMetricsRefresher(provider *Provider, pod Pod, interval, timeout time.Duration) *PodMetricsRefresher {
return &PodMetricsRefresher{
done: make(chan struct{}),
interval: interval,
timeout: timeout,
pod: pod,
provider: provider,
}
}

func (r *PodMetricsRefresher) start() {
go func() {
klog.V(logutil.DEFAULT).InfoS("Starting refresher", "pod", r.pod)
for {
select {
case <-r.done:
return
default:
}

err := r.refreshMetrics()
if err != nil {
klog.ErrorS(err, "Failed to refresh metrics", "pod", r.pod)
}

time.Sleep(r.interval)
}
}()
}

func (r *PodMetricsRefresher) refreshMetrics() error {
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()

pod := r.pod
existing, found := r.provider.GetPodMetrics(pod)
if !found {
// As refresher is running in the background, it's possible that the pod is deleted but
// the refresh goroutine doesn't read the done channel yet. In this case, we just return nil.
// The refresher will be stopped after this interval.
return nil
}

klog.V(logutil.DEBUG).InfoS("Refresh metrics", "pod", pod, "metrics", existing.Metrics)
updated, err := r.provider.pmc.FetchMetrics(ctx, r.pod, existing)
if err != nil {
return err
}

r.provider.UpdatePodMetrics(pod, updated)
return nil
}

func (r *PodMetricsRefresher) stop() {
klog.V(logutil.DEFAULT).InfoS("Stopping refresher", "pod", r.pod)
close(r.done)
}
142 changes: 61 additions & 81 deletions pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@ package backend

import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/multierr"
klog "k8s.io/klog/v2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

const (
fetchMetricsTimeout = 5 * time.Second
// TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336
metricsValidityPeriod = 5 * time.Second
)

func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider {
p := &Provider{
podMetrics: sync.Map{},
pmc: pmc,
datastore: datastore,
podMetrics: sync.Map{},
podMetricsRefresher: sync.Map{},
pmc: pmc,
datastore: datastore,
}
return p
}
Expand All @@ -29,60 +29,78 @@ func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider {
type Provider struct {
// key: Pod, value: *PodMetrics
podMetrics sync.Map
pmc PodMetricsClient
datastore *K8sDatastore
// key: Pod, value: *PodMetricsRefresher
podMetricsRefresher sync.Map
pmc PodMetricsClient
datastore *K8sDatastore
}

type PodMetricsClient interface {
// FetchMetrics fetches metrics for the given pod.
// The returned PodMetrics and the existing one should not be the same object.
// Otherwise, there will be race.
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error)
}

func (p *Provider) AllPodMetrics() []*PodMetrics {
func isPodMetricsStale(pm *PodMetrics) bool {
return time.Since(pm.Metrics.UpdatedTime) > metricsValidityPeriod
}

func (p *Provider) AllFreshPodMetrics() []*PodMetrics {
return p.allPodMetrics(false)
}

func (p *Provider) AllStalePodMetrics() []*PodMetrics {
return p.allPodMetrics(true)
}

func (p *Provider) allPodMetrics(stale bool) []*PodMetrics {
res := []*PodMetrics{}
fn := func(k, v any) bool {
res = append(res, v.(*PodMetrics))
m := v.(*PodMetrics)

if !stale {
if isPodMetricsStale(m) {
// exclude stale metrics for scheduler
klog.V(logutil.DEBUG).InfoS("Pod metrics is stale, skipping", "pod", m.Pod)
} else {
res = append(res, m)
}

} else {
if isPodMetricsStale(m) {
res = append(res, m)
}
}

return true
}
p.podMetrics.Range(fn)
return res
}

func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) {
pm.Metrics.UpdatedTime = time.Now()
p.podMetrics.Store(pod, pm)
klog.V(logutil.DEBUG).InfoS("Updated metrics", "pod", pod, "metrics", pm.Metrics)
}

func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
val, ok := p.podMetrics.Load(pod)
if ok {
// For now, the only caller of GetPodMetrics is the refresher, so we
// don't need to exclude the stale metrics.
return val.(*PodMetrics), true
}
return nil, false
}

func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error {
p.refreshPodsOnce()

if err := p.refreshMetricsOnce(); err != nil {
klog.ErrorS(err, "Failed to init metrics")
}

klog.InfoS("Initialized pods and metrics", "metrics", p.AllPodMetrics())

func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshMetricsTimeout, refreshPrometheusMetricsInterval time.Duration) error {
// periodically refresh pods
go func() {
for {
p.refreshPodsOnce(refreshMetricsInterval, refreshMetricsTimeout)
time.Sleep(refreshPodsInterval)
p.refreshPodsOnce()
}
}()

// periodically refresh metrics
go func() {
for {
time.Sleep(refreshMetricsInterval)
if err := p.refreshMetricsOnce(); err != nil {
klog.V(logutil.TRACE).ErrorS(err, "Failed to refresh metrics")
}
}
}()

Expand All @@ -99,7 +117,8 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm
go func() {
for {
time.Sleep(5 * time.Second)
klogV.InfoS("Current Pods and metrics gathered", "metrics", p.AllPodMetrics())
klogV.InfoS("Current Pods and metrics gathered", "fresh metrics", p.AllFreshPodMetrics(),
"stale metrics", p.AllStalePodMetrics())
}
}()
}
Expand All @@ -109,7 +128,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm

// refreshPodsOnce lists pods and updates keys in the podMetrics map.
// Note this function doesn't update the PodMetrics value, it's done separately.
func (p *Provider) refreshPodsOnce() {
func (p *Provider) refreshPodsOnce(refreshMetricsInterval, refreshMetricsTimeout time.Duration) {
// merge new pods with cached ones.
// add new pod to the map
addNewPods := func(k, v any) bool {
Expand All @@ -120,8 +139,13 @@ func (p *Provider) refreshPodsOnce() {
Metrics: Metrics{
ActiveModels: make(map[string]int),
},
// Metrics are considered stale until they are first refreshed.
}
p.podMetrics.Store(pod, new)

refresher := NewPodMetricsRefresher(p, pod, refreshMetricsInterval, refreshMetricsTimeout)
refresher.start()
p.podMetricsRefresher.Store(pod, refresher)
}
return true
}
Expand All @@ -130,61 +154,17 @@ func (p *Provider) refreshPodsOnce() {
pod := k.(Pod)
if _, ok := p.datastore.pods.Load(pod); !ok {
p.podMetrics.Delete(pod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this, given the LoadAndDelete below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map loads the pod is different from the map deletes the pod.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused, in line 161 p.podMetrics.Delete(pod), and then in line 162, p.podMetrics.LoadAndDelete(pod), won't line 162 always return false since the pod is already deleted in line 161?

Copy link
Contributor Author

@spacewander spacewander Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching up! Sorry for misunderstanding you at the beginning. The p.podMetrics.LoadAndDelete(pod) should be p.podMetricsRefresher.LoadAndDelete(pod). I will fix it soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need another map in the first place? can't we put the PodMetricsRefresher in the PodMetrics struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahg-g
I tried it before. But if we put PodMetricsRefresher under PodMetrics, it's not easy to make it concurrency-safe without a big refactor. As PodMetricsRefresher needs to change PodMetrics on the fly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should be able to do that, I sent out #350 which does a major refactor to the datastore/provider layer. It consolidating storage in one place under datastore, and so it does in-place updates to the metrics. Please take a look and advice is there are any concurrency issues that I may have missed.

if v, ok := p.podMetricsRefresher.LoadAndDelete(pod); ok {
refresher := v.(*PodMetricsRefresher)
refresher.stop()
}
}
return true
}
p.podMetrics.Range(mergeFn)
p.datastore.pods.Range(addNewPods)
}

func (p *Provider) refreshMetricsOnce() error {
klogV := klog.V(logutil.TRACE)
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
defer cancel()
start := time.Now()
defer func() {
d := time.Since(start)
// TODO: add a metric instead of logging
klogV.InfoS("Metrics refreshed", "duration", d)
}()
var wg sync.WaitGroup
errCh := make(chan error)
processOnePod := func(key, value any) bool {
klogV.InfoS("Pod and metric being processed", "pod", key, "metric", value)
pod := key.(Pod)
existing := value.(*PodMetrics)
wg.Add(1)
go func() {
defer wg.Done()
updated, err := p.pmc.FetchMetrics(ctx, pod, existing)
if err != nil {
errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err)
return
}
p.UpdatePodMetrics(pod, updated)
klogV.InfoS("Updated metrics for pod", "pod", pod, "metrics", updated.Metrics)
}()
return true
}
p.podMetrics.Range(processOnePod)

// Wait for metric collection for all pods to complete and close the error channel in a
// goroutine so this is unblocking, allowing the code to proceed to the error collection code
// below.
// Note we couldn't use a buffered error channel with a size because the size of the podMetrics
// sync.Map is unknown beforehand.
go func() {
wg.Wait()
close(errCh)
}()

var errs error
for err := range errCh {
errs = multierr.Append(errs, err)
}
return errs
}

func (p *Provider) flushPrometheusMetricsOnce() {
klog.V(logutil.DEBUG).InfoS("Flushing Prometheus Metrics")

Expand All @@ -197,7 +177,7 @@ func (p *Provider) flushPrometheusMetricsOnce() {
var kvCacheTotal float64
var queueTotal int

podMetrics := p.AllPodMetrics()
podMetrics := append(p.AllFreshPodMetrics(), p.AllStalePodMetrics()...)
if len(podMetrics) == 0 {
return
}
Expand Down
Loading