From 91fe439e52c6a325cc34bc31006d3b9d8e884ee7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E6=B3=BD=E8=BD=A9?= Date: Fri, 24 Jan 2025 02:48:16 +0000 Subject: [PATCH 1/7] refresh metrics per pod periodically MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 罗泽轩 --- Makefile | 2 +- pkg/ext-proc/backend/fake.go | 2 +- pkg/ext-proc/backend/pod_metrics_refresher.go | 79 ++++++++++ pkg/ext-proc/backend/provider.go | 140 ++++++++---------- pkg/ext-proc/backend/provider_test.go | 72 +++++++-- pkg/ext-proc/backend/suite_test.go | 20 +++ pkg/ext-proc/backend/types.go | 12 +- 7 files changed, 236 insertions(+), 91 deletions(-) create mode 100644 pkg/ext-proc/backend/pod_metrics_refresher.go create mode 100644 pkg/ext-proc/backend/suite_test.go diff --git a/Makefile b/Makefile index 44759e45..20907888 100644 --- a/Makefile +++ b/Makefile @@ -103,7 +103,7 @@ vet: ## Run go vet against code. .PHONY: test test: manifests generate fmt vet envtest ## Run tests. - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -coverprofile cover.out + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -race -coverprofile cover.out # Utilize Kind or modify the e2e tests to load the image locally, enabling compatibility with other vendors. .PHONY: test-e2e # Run the e2e tests against a Kind k8s instance that is spun up. diff --git a/pkg/ext-proc/backend/fake.go b/pkg/ext-proc/backend/fake.go index c4545497..56c14752 100644 --- a/pkg/ext-proc/backend/fake.go +++ b/pkg/ext-proc/backend/fake.go @@ -17,7 +17,7 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi return nil, err } klog.V(1).Infof("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod]) - return f.Res[pod], nil + return f.Res[pod].Clone(), nil } type FakeDataStore struct { diff --git a/pkg/ext-proc/backend/pod_metrics_refresher.go b/pkg/ext-proc/backend/pod_metrics_refresher.go new file mode 100644 index 00000000..1ff08c26 --- /dev/null +++ b/pkg/ext-proc/backend/pod_metrics_refresher.go @@ -0,0 +1,79 @@ +package backend + +import ( + "context" + "time" + + klog "k8s.io/klog/v2" +) + +const ( + // TODO: make it configurable + fetchMetricsTimeout = 1 * time.Second +) + +type PodMetricsRefresher struct { + done chan struct{} + interval time.Duration + + // refresher holds provider & pod so it can update the metrics concurrent-safely. + pod Pod + provider *Provider +} + +func NewPodMetricsRefresher(provider *Provider, pod Pod, interval time.Duration) *PodMetricsRefresher { + return &PodMetricsRefresher{ + done: make(chan struct{}), + interval: interval, + pod: pod, + provider: provider, + } +} + +func (r *PodMetricsRefresher) start() { + go func() { + klog.V(2).Infof("Starting refresher for pod %v", r.pod) + for { + select { + case <-r.done: + return + default: + } + + err := r.refreshMetrics() + if err != nil { + klog.Errorf("Failed to refresh metrics for pod %s: %v", r.pod, err) + } + + time.Sleep(r.interval) + } + }() +} + +func (r *PodMetricsRefresher) refreshMetrics() error { + ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) + defer cancel() + + pod := r.pod + existing, found := r.provider.GetPodMetrics(pod) + if !found { + // As refresher is running in the background, it's possible that the pod is deleted but + // the refresh goroutine doesn't read the done channel yet. In this case, we just return nil. + // The refresher will be stopped after this interval. + return nil + } + + klog.V(4).Infof("Processing pod %v and metric %v", pod, existing.Metrics) + updated, err := r.provider.pmc.FetchMetrics(ctx, r.pod, existing) + if err != nil { + return err + } + + r.provider.UpdatePodMetrics(pod, updated) + return nil +} + +func (r *PodMetricsRefresher) stop() { + klog.V(2).Infof("Stopping refresher for pod %v", r.pod) + close(r.done) +} diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index f37a3878..93ab9afb 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -2,23 +2,25 @@ package backend import ( "context" - "fmt" "sync" "time" - "go.uber.org/multierr" klog "k8s.io/klog/v2" ) const ( - fetchMetricsTimeout = 5 * time.Second + // TODO: make it configurable. One idea is to provide a configuration singleton + // and put fields like refreshMetricsInterval in it. So far, we have to pass these + // fields across several layers. + metricsValidityPeriod = 5 * time.Second ) func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { p := &Provider{ - podMetrics: sync.Map{}, - pmc: pmc, - datastore: datastore, + podMetrics: sync.Map{}, + podMetricsRefresher: sync.Map{}, + pmc: pmc, + datastore: datastore, } return p } @@ -27,18 +29,43 @@ func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { type Provider struct { // key: Pod, value: *PodMetrics podMetrics sync.Map - pmc PodMetricsClient - datastore *K8sDatastore + // key: Pod, value: *PodMetricsRefresher + podMetricsRefresher sync.Map + pmc PodMetricsClient + datastore *K8sDatastore } type PodMetricsClient interface { + // FetchMetrics fetches metrics for the given pod. + // The returned PodMetrics and the existing one should not be the same object. + // Otherwise, there will be race. FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error) } +func isPodMetricsStale(pm *PodMetrics) bool { + return time.Since(pm.UpdatedTime) > metricsValidityPeriod +} + func (p *Provider) AllPodMetrics() []*PodMetrics { + return p.allPodMetrics(false) +} + +func (p *Provider) AllPodMetricsIncludingStale() []*PodMetrics { + return p.allPodMetrics(true) +} + +func (p *Provider) allPodMetrics(staleIncluded bool) []*PodMetrics { res := []*PodMetrics{} fn := func(k, v any) bool { - res = append(res, v.(*PodMetrics)) + m := v.(*PodMetrics) + + if !staleIncluded && isPodMetricsStale(m) { + // exclude stale metrics for scheduler + klog.V(4).Infof("Pod metrics for %s is stale, skipping", m.Pod) + return true + } + + res = append(res, m) return true } p.podMetrics.Range(fn) @@ -46,41 +73,29 @@ func (p *Provider) AllPodMetrics() []*PodMetrics { } func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) { + pm.UpdatedTime = time.Now() p.podMetrics.Store(pod, pm) + klog.V(4).Infof("Updated metrics for pod %s: %v", pod, pm.Metrics) } func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { val, ok := p.podMetrics.Load(pod) if ok { + // For now, the only caller of GetPodMetrics is the refresher, so we + // don't need to exclude the stale metrics. return val.(*PodMetrics), true } return nil, false } func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error { - p.refreshPodsOnce() - - if err := p.refreshMetricsOnce(); err != nil { - klog.Errorf("Failed to init metrics: %v", err) - } - - klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics()) + p.refreshPodsOnce(refreshMetricsInterval) // periodically refresh pods go func() { for { time.Sleep(refreshPodsInterval) - p.refreshPodsOnce() - } - }() - - // periodically refresh metrics - go func() { - for { - time.Sleep(refreshMetricsInterval) - if err := p.refreshMetricsOnce(); err != nil { - klog.V(4).Infof("Failed to refresh metrics: %v", err) - } + p.refreshPodsOnce(refreshMetricsInterval) } }() @@ -89,7 +104,18 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio go func() { for { time.Sleep(5 * time.Second) - klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics()) + podMetrics := p.AllPodMetricsIncludingStale() + stalePodMetrics := make([]*PodMetrics, 0) + freshPodMetrics := make([]*PodMetrics, 0, len(podMetrics)) + for _, pm := range podMetrics { + if isPodMetricsStale(pm) { + stalePodMetrics = append(stalePodMetrics, pm) + } else { + freshPodMetrics = append(freshPodMetrics, pm) + } + } + klog.Infof("===DEBUG: Current Pods and metrics: %+v", freshPodMetrics) + klog.Infof("===DEBUG: Stale Pods and metrics: %+v", stalePodMetrics) } }() } @@ -99,7 +125,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio // refreshPodsOnce lists pods and updates keys in the podMetrics map. // Note this function doesn't update the PodMetrics value, it's done separately. -func (p *Provider) refreshPodsOnce() { +func (p *Provider) refreshPodsOnce(refreshMetricsInterval time.Duration) { // merge new pods with cached ones. // add new pod to the map addNewPods := func(k, v any) bool { @@ -110,8 +136,13 @@ func (p *Provider) refreshPodsOnce() { Metrics: Metrics{ ActiveModels: make(map[string]int), }, + // Metrics are considered stale until they are first refreshed. } p.podMetrics.Store(pod, new) + + refresher := NewPodMetricsRefresher(p, pod, refreshMetricsInterval) + refresher.start() + p.podMetricsRefresher.Store(pod, refresher) } return true } @@ -120,56 +151,13 @@ func (p *Provider) refreshPodsOnce() { pod := k.(Pod) if _, ok := p.datastore.pods.Load(pod); !ok { p.podMetrics.Delete(pod) + if v, ok := p.podMetrics.LoadAndDelete(pod); ok { + refresher := v.(*PodMetricsRefresher) + refresher.stop() + } } return true } p.podMetrics.Range(mergeFn) p.datastore.pods.Range(addNewPods) } - -func (p *Provider) refreshMetricsOnce() error { - ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) - defer cancel() - start := time.Now() - defer func() { - d := time.Since(start) - // TODO: add a metric instead of logging - klog.V(4).Infof("Refreshed metrics in %v", d) - }() - var wg sync.WaitGroup - errCh := make(chan error) - processOnePod := func(key, value any) bool { - klog.V(4).Infof("Processing pod %v and metric %v", key, value) - pod := key.(Pod) - existing := value.(*PodMetrics) - wg.Add(1) - go func() { - defer wg.Done() - updated, err := p.pmc.FetchMetrics(ctx, pod, existing) - if err != nil { - errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err) - return - } - p.UpdatePodMetrics(pod, updated) - klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics) - }() - return true - } - p.podMetrics.Range(processOnePod) - - // Wait for metric collection for all pods to complete and close the error channel in a - // goroutine so this is unblocking, allowing the code to proceed to the error collection code - // below. - // Note we couldn't use a buffered error channel with a size because the size of the podMetrics - // sync.Map is unknown beforehand. - go func() { - wg.Wait() - close(errCh) - }() - - var errs error - for err := range errCh { - errs = multierr.Append(errs, err) - } - return errs -} diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go index ad231f57..bf644f09 100644 --- a/pkg/ext-proc/backend/provider_test.go +++ b/pkg/ext-proc/backend/provider_test.go @@ -12,7 +12,7 @@ import ( var ( pod1 = &PodMetrics{ - Pod: Pod{Name: "pod1"}, + Pod: Pod{Name: "pod1", Address: "127.0.0.1"}, Metrics: Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.2, @@ -24,7 +24,7 @@ var ( }, } pod2 = &PodMetrics{ - Pod: Pod{Name: "pod2"}, + Pod: Pod{Name: "pod2", Address: "127.0.0.2"}, Metrics: Metrics{ WaitingQueueSize: 1, KVCacheUsagePercent: 0.2, @@ -39,11 +39,12 @@ var ( func TestProvider(t *testing.T) { tests := []struct { - name string - pmc PodMetricsClient - datastore *K8sDatastore - initErr bool - want []*PodMetrics + name string + pmc PodMetricsClient + datastore *K8sDatastore + initErr bool + want []*PodMetrics + wantIncludingStale []*PodMetrics }{ { name: "Init success", @@ -56,7 +57,8 @@ func TestProvider(t *testing.T) { pod2.Pod: pod2, }, }, - want: []*PodMetrics{pod1, pod2}, + want: []*PodMetrics{pod1, pod2}, + wantIncludingStale: []*PodMetrics{pod1, pod2}, }, { name: "Fetch metrics error", @@ -72,10 +74,15 @@ func TestProvider(t *testing.T) { pods: populateMap(pod1.Pod, pod2.Pod), }, want: []*PodMetrics{ + pod1, + // Failed to fetch pod2 metrics so it remains the default values, + // which is stale. + }, + wantIncludingStale: []*PodMetrics{ pod1, // Failed to fetch pod2 metrics so it remains the default values. { - Pod: Pod{Name: "pod2"}, + Pod: Pod{Name: "pod2", Address: "127.0.0.2"}, Metrics: Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0, @@ -90,17 +97,60 @@ func TestProvider(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := NewProvider(test.pmc, test.datastore) - err := p.Init(time.Millisecond, time.Millisecond) + err := p.Init(10*time.Millisecond, time.Millisecond) if test.initErr != (err != nil) { t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr) } + + // do some turns of refreshing... + time.Sleep(3 * time.Millisecond) + metrics := p.AllPodMetrics() + metricsCopy := make([]*PodMetrics, len(metrics)) + for i, pm := range metrics { + if pm.UpdatedTime.IsZero() { + t.Errorf("Expected non-zero UpdatedTime, got %v", pm.UpdatedTime) + } + cp := pm.Clone() + // reset the UpdatedTime for comparison + cp.UpdatedTime = time.Time{} + metricsCopy[i] = cp + } + lessFunc := func(a, b *PodMetrics) bool { return a.String() < b.String() } - if diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(lessFunc)); diff != "" { + if diff := cmp.Diff(test.want, metricsCopy, cmpopts.SortSlices(lessFunc)); diff != "" { t.Errorf("Unexpected output (-want +got): %v", diff) } + + // then check for AllPodMetricsIncludingStale + if len(test.wantIncludingStale) > 0 { + metricsIncludingStale := p.AllPodMetricsIncludingStale() + metricsCopy := make([]*PodMetrics, len(metricsIncludingStale)) + for i, pm := range metricsIncludingStale { + cp := pm.Clone() + // reset the UpdatedTime for comparison + cp.UpdatedTime = time.Time{} + metricsCopy[i] = cp + } + + if diff := cmp.Diff(test.wantIncludingStale, metricsCopy, cmpopts.SortSlices(lessFunc)); diff != "" { + t.Errorf("Unexpected output (-want +got): %v", diff) + } + } + + // simulate pod deletion + p.datastore.pods.Range(func(k, v any) bool { + p.datastore.pods.Delete(k) + return true + }) + time.Sleep(20 * time.Millisecond) + metrics = p.AllPodMetrics() + // ensure no update is writing to the PodMetrics by background refreshing + if len(metrics) != 0 { + t.Errorf("Expected no metrics, got %v", metrics) + } }) } } diff --git a/pkg/ext-proc/backend/suite_test.go b/pkg/ext-proc/backend/suite_test.go new file mode 100644 index 00000000..9b7a9dc0 --- /dev/null +++ b/pkg/ext-proc/backend/suite_test.go @@ -0,0 +1,20 @@ +package backend + +import ( + "flag" + "os" + "testing" + + klog "k8s.io/klog/v2" +) + +// Turn on klog in test so we can observe the behavior, which is good +// for debugging and catching error. +func TestMain(m *testing.M) { + klog.InitFlags(nil) + flag.Set("logtostderr", "true") // Log to stderr instead of files + flag.Set("v", "4") // Log level + flag.Parse() + + os.Exit(m.Run()) +} diff --git a/pkg/ext-proc/backend/types.go b/pkg/ext-proc/backend/types.go index 7e399fed..4becf7d7 100644 --- a/pkg/ext-proc/backend/types.go +++ b/pkg/ext-proc/backend/types.go @@ -1,7 +1,10 @@ // Package backend is a library to interact with backend model servers such as probing metrics. package backend -import "fmt" +import ( + "fmt" + "time" +) type PodSet map[Pod]bool @@ -28,10 +31,13 @@ type Metrics struct { type PodMetrics struct { Pod Metrics + + // UpdatedTime record the time when the metrics are updated. + UpdatedTime time.Time } func (pm *PodMetrics) String() string { - return fmt.Sprintf("Pod: %+v; Metrics: %+v", pm.Pod, pm.Metrics) + return fmt.Sprintf("Pod: %+v; Metrics: %+v; UpdatedTime: %s", pm.Pod, pm.Metrics, pm.UpdatedTime) } func (pm *PodMetrics) Clone() *PodMetrics { @@ -43,11 +49,13 @@ func (pm *PodMetrics) Clone() *PodMetrics { Pod: pm.Pod, Metrics: Metrics{ ActiveModels: cm, + MaxActiveModels: pm.MaxActiveModels, RunningQueueSize: pm.RunningQueueSize, WaitingQueueSize: pm.WaitingQueueSize, KVCacheUsagePercent: pm.KVCacheUsagePercent, KvCacheMaxTokenCapacity: pm.KvCacheMaxTokenCapacity, }, + UpdatedTime: pm.UpdatedTime, } return clone } From 2301c0ea80261e5babd6b5d3b9b01171aaec3379 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E6=B3=BD=E8=BD=A9?= Date: Sun, 26 Jan 2025 02:37:41 +0000 Subject: [PATCH 2/7] fix lint --- pkg/ext-proc/backend/suite_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ext-proc/backend/suite_test.go b/pkg/ext-proc/backend/suite_test.go index 9b7a9dc0..2174b2b3 100644 --- a/pkg/ext-proc/backend/suite_test.go +++ b/pkg/ext-proc/backend/suite_test.go @@ -12,8 +12,8 @@ import ( // for debugging and catching error. func TestMain(m *testing.M) { klog.InitFlags(nil) - flag.Set("logtostderr", "true") // Log to stderr instead of files - flag.Set("v", "4") // Log level + _ = flag.Set("logtostderr", "true") // Log to stderr instead of files + _ = flag.Set("v", "4") // Log level flag.Parse() os.Exit(m.Run()) From 71ec51fb7a00971d154c382f1c1e406b56ab8411 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E6=B3=BD=E8=BD=A9?= Date: Fri, 7 Feb 2025 17:34:13 +0800 Subject: [PATCH 3/7] Update pkg/ext-proc/backend/provider.go Co-authored-by: Cong Liu --- pkg/ext-proc/backend/provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 93ab9afb..75f2c0b6 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -106,7 +106,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio time.Sleep(5 * time.Second) podMetrics := p.AllPodMetricsIncludingStale() stalePodMetrics := make([]*PodMetrics, 0) - freshPodMetrics := make([]*PodMetrics, 0, len(podMetrics)) + freshPodMetrics := make([]*PodMetrics, 0) for _, pm := range podMetrics { if isPodMetricsStale(pm) { stalePodMetrics = append(stalePodMetrics, pm) From e048b6f1fcc98215193fba3899ccb061570b2151 Mon Sep 17 00:00:00 2001 From: spacewander Date: Fri, 7 Feb 2025 18:14:27 +0800 Subject: [PATCH 4/7] update according to the review Signed-off-by: spacewander --- pkg/ext-proc/backend/pod_metrics_refresher.go | 11 ++-- pkg/ext-proc/backend/provider.go | 53 ++++++++++--------- pkg/ext-proc/backend/provider_test.go | 45 ++++++++-------- pkg/ext-proc/backend/types.go | 10 ++-- pkg/ext-proc/main.go | 5 ++ pkg/ext-proc/scheduling/scheduler.go | 6 +-- pkg/ext-proc/server/runserver.go | 4 +- pkg/ext-proc/test/benchmark/benchmark.go | 3 +- pkg/ext-proc/test/utils.go | 4 +- 9 files changed, 74 insertions(+), 67 deletions(-) diff --git a/pkg/ext-proc/backend/pod_metrics_refresher.go b/pkg/ext-proc/backend/pod_metrics_refresher.go index 1ff08c26..9ffeb1d0 100644 --- a/pkg/ext-proc/backend/pod_metrics_refresher.go +++ b/pkg/ext-proc/backend/pod_metrics_refresher.go @@ -7,24 +7,21 @@ import ( klog "k8s.io/klog/v2" ) -const ( - // TODO: make it configurable - fetchMetricsTimeout = 1 * time.Second -) - type PodMetricsRefresher struct { done chan struct{} interval time.Duration + timeout time.Duration // refresher holds provider & pod so it can update the metrics concurrent-safely. pod Pod provider *Provider } -func NewPodMetricsRefresher(provider *Provider, pod Pod, interval time.Duration) *PodMetricsRefresher { +func NewPodMetricsRefresher(provider *Provider, pod Pod, interval, timeout time.Duration) *PodMetricsRefresher { return &PodMetricsRefresher{ done: make(chan struct{}), interval: interval, + timeout: timeout, pod: pod, provider: provider, } @@ -51,7 +48,7 @@ func (r *PodMetricsRefresher) start() { } func (r *PodMetricsRefresher) refreshMetrics() error { - ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) + ctx, cancel := context.WithTimeout(context.Background(), r.timeout) defer cancel() pod := r.pod diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 57de9f75..c7df475c 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "go.uber.org/multierr" logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" klog "k8s.io/klog/v2" ) @@ -45,29 +44,36 @@ type PodMetricsClient interface { } func isPodMetricsStale(pm *PodMetrics) bool { - return time.Since(pm.UpdatedTime) > metricsValidityPeriod + return time.Since(pm.Metrics.UpdatedTime) > metricsValidityPeriod } -func (p *Provider) AllPodMetrics() []*PodMetrics { +func (p *Provider) AllFreshPodMetrics() []*PodMetrics { return p.allPodMetrics(false) } -func (p *Provider) AllPodMetricsIncludingStale() []*PodMetrics { +func (p *Provider) AllStalePodMetrics() []*PodMetrics { return p.allPodMetrics(true) } -func (p *Provider) allPodMetrics(staleIncluded bool) []*PodMetrics { +func (p *Provider) allPodMetrics(stale bool) []*PodMetrics { res := []*PodMetrics{} fn := func(k, v any) bool { m := v.(*PodMetrics) - if !staleIncluded && isPodMetricsStale(m) { - // exclude stale metrics for scheduler - klog.V(4).Infof("Pod metrics for %s is stale, skipping", m.Pod) - return true + if !stale { + if isPodMetricsStale(m) { + // exclude stale metrics for scheduler + klog.V(4).Infof("Pod metrics for %s is stale, skipping", m.Pod) + } else { + res = append(res, m) + } + + } else { + if isPodMetricsStale(m) { + res = append(res, m) + } } - res = append(res, m) return true } p.podMetrics.Range(fn) @@ -75,7 +81,7 @@ func (p *Provider) allPodMetrics(staleIncluded bool) []*PodMetrics { } func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) { - pm.UpdatedTime = time.Now() + pm.Metrics.UpdatedTime = time.Now() p.podMetrics.Store(pod, pm) klog.V(4).Infof("Updated metrics for pod %s: %v", pod, pm.Metrics) } @@ -90,14 +96,12 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { return nil, false } -func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error { - p.refreshPodsOnce(refreshMetricsInterval) - +func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshMetricsTimeout time.Duration) error { // periodically refresh pods go func() { for { + p.refreshPodsOnce(refreshMetricsInterval, refreshMetricsTimeout) time.Sleep(refreshPodsInterval) - p.refreshPodsOnce(refreshMetricsInterval) } }() @@ -106,17 +110,18 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio go func() { for { time.Sleep(5 * time.Second) - podMetrics := p.AllPodMetricsIncludingStale() - stalePodMetrics := make([]*PodMetrics, 0) + podMetrics := p.AllFreshPodMetrics() freshPodMetrics := make([]*PodMetrics, 0) for _, pm := range podMetrics { - if isPodMetricsStale(pm) { - stalePodMetrics = append(stalePodMetrics, pm) - } else { - freshPodMetrics = append(freshPodMetrics, pm) - } + freshPodMetrics = append(freshPodMetrics, pm) } klog.Infof("===DEBUG: Current Pods and metrics: %+v", freshPodMetrics) + + podMetrics = p.AllStalePodMetrics() + stalePodMetrics := make([]*PodMetrics, 0) + for _, pm := range podMetrics { + stalePodMetrics = append(stalePodMetrics, pm) + } klog.Infof("===DEBUG: Stale Pods and metrics: %+v", stalePodMetrics) } }() @@ -127,7 +132,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio // refreshPodsOnce lists pods and updates keys in the podMetrics map. // Note this function doesn't update the PodMetrics value, it's done separately. -func (p *Provider) refreshPodsOnce(refreshMetricsInterval time.Duration) { +func (p *Provider) refreshPodsOnce(refreshMetricsInterval, refreshMetricsTimeout time.Duration) { // merge new pods with cached ones. // add new pod to the map addNewPods := func(k, v any) bool { @@ -142,7 +147,7 @@ func (p *Provider) refreshPodsOnce(refreshMetricsInterval time.Duration) { } p.podMetrics.Store(pod, new) - refresher := NewPodMetricsRefresher(p, pod, refreshMetricsInterval) + refresher := NewPodMetricsRefresher(p, pod, refreshMetricsInterval, refreshMetricsTimeout) refresher.start() p.podMetricsRefresher.Store(pod, refresher) } diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go index bf644f09..34e66a1b 100644 --- a/pkg/ext-proc/backend/provider_test.go +++ b/pkg/ext-proc/backend/provider_test.go @@ -8,6 +8,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" ) var ( @@ -39,12 +40,12 @@ var ( func TestProvider(t *testing.T) { tests := []struct { - name string - pmc PodMetricsClient - datastore *K8sDatastore - initErr bool - want []*PodMetrics - wantIncludingStale []*PodMetrics + name string + pmc PodMetricsClient + datastore *K8sDatastore + initErr bool + want []*PodMetrics + wantStale []*PodMetrics }{ { name: "Init success", @@ -57,8 +58,7 @@ func TestProvider(t *testing.T) { pod2.Pod: pod2, }, }, - want: []*PodMetrics{pod1, pod2}, - wantIncludingStale: []*PodMetrics{pod1, pod2}, + want: []*PodMetrics{pod1, pod2}, }, { name: "Fetch metrics error", @@ -78,8 +78,7 @@ func TestProvider(t *testing.T) { // Failed to fetch pod2 metrics so it remains the default values, // which is stale. }, - wantIncludingStale: []*PodMetrics{ - pod1, + wantStale: []*PodMetrics{ // Failed to fetch pod2 metrics so it remains the default values. { Pod: Pod{Name: "pod2", Address: "127.0.0.2"}, @@ -97,7 +96,7 @@ func TestProvider(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := NewProvider(test.pmc, test.datastore) - err := p.Init(10*time.Millisecond, time.Millisecond) + err := p.Init(10*time.Millisecond, time.Millisecond, time.Second) if test.initErr != (err != nil) { t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr) } @@ -105,7 +104,7 @@ func TestProvider(t *testing.T) { // do some turns of refreshing... time.Sleep(3 * time.Millisecond) - metrics := p.AllPodMetrics() + metrics := p.AllFreshPodMetrics() metricsCopy := make([]*PodMetrics, len(metrics)) for i, pm := range metrics { if pm.UpdatedTime.IsZero() { @@ -124,18 +123,18 @@ func TestProvider(t *testing.T) { t.Errorf("Unexpected output (-want +got): %v", diff) } - // then check for AllPodMetricsIncludingStale - if len(test.wantIncludingStale) > 0 { - metricsIncludingStale := p.AllPodMetricsIncludingStale() - metricsCopy := make([]*PodMetrics, len(metricsIncludingStale)) - for i, pm := range metricsIncludingStale { + // then check for AllStalePodMetrics + if len(test.wantStale) > 0 { + staleMetrics := p.AllStalePodMetrics() + metricsCopy := make([]*PodMetrics, len(staleMetrics)) + for i, pm := range staleMetrics { cp := pm.Clone() // reset the UpdatedTime for comparison cp.UpdatedTime = time.Time{} metricsCopy[i] = cp } - if diff := cmp.Diff(test.wantIncludingStale, metricsCopy, cmpopts.SortSlices(lessFunc)); diff != "" { + if diff := cmp.Diff(test.wantStale, metricsCopy, cmpopts.SortSlices(lessFunc)); diff != "" { t.Errorf("Unexpected output (-want +got): %v", diff) } } @@ -145,12 +144,10 @@ func TestProvider(t *testing.T) { p.datastore.pods.Delete(k) return true }) - time.Sleep(20 * time.Millisecond) - metrics = p.AllPodMetrics() - // ensure no update is writing to the PodMetrics by background refreshing - if len(metrics) != 0 { - t.Errorf("Expected no metrics, got %v", metrics) - } + assert.Eventuallyf(t, func() bool { + // ensure no update is writing to the PodMetrics by background refreshing + return len(p.AllFreshPodMetrics()) == 0 + }, 100*time.Millisecond, 10*time.Millisecond, "Expected no metrics, got %v", p.AllFreshPodMetrics()) }) } } diff --git a/pkg/ext-proc/backend/types.go b/pkg/ext-proc/backend/types.go index 4becf7d7..23089fdd 100644 --- a/pkg/ext-proc/backend/types.go +++ b/pkg/ext-proc/backend/types.go @@ -26,18 +26,18 @@ type Metrics struct { WaitingQueueSize int KVCacheUsagePercent float64 KvCacheMaxTokenCapacity int + + // UpdatedTime record the time when the metrics are updated. + UpdatedTime time.Time } type PodMetrics struct { Pod Metrics - - // UpdatedTime record the time when the metrics are updated. - UpdatedTime time.Time } func (pm *PodMetrics) String() string { - return fmt.Sprintf("Pod: %+v; Metrics: %+v; UpdatedTime: %s", pm.Pod, pm.Metrics, pm.UpdatedTime) + return fmt.Sprintf("Pod: %+v; Metrics: %+v", pm.Pod, pm.Metrics) } func (pm *PodMetrics) Clone() *PodMetrics { @@ -54,8 +54,8 @@ func (pm *PodMetrics) Clone() *PodMetrics { WaitingQueueSize: pm.WaitingQueueSize, KVCacheUsagePercent: pm.KVCacheUsagePercent, KvCacheMaxTokenCapacity: pm.KvCacheMaxTokenCapacity, + UpdatedTime: pm.UpdatedTime, }, - UpdatedTime: pm.UpdatedTime, } return clone } diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index a783aa2c..f71e5dd0 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -69,6 +69,10 @@ var ( "refreshMetricsInterval", runserver.DefaultRefreshMetricsInterval, "interval to refresh metrics") + refreshMetricsTimeout = flag.Duration( + "refreshMetricsTimeout", + runserver.DefaultRefreshMetricsTimeout, + "timeout of metrics refreshing") scheme = runtime.NewScheme() ) @@ -110,6 +114,7 @@ func main() { Zone: *zone, RefreshPodsInterval: *refreshPodsInterval, RefreshMetricsInterval: *refreshMetricsInterval, + RefreshMetricsTimeout: *refreshMetricsTimeout, Scheme: scheme, Config: ctrl.GetConfigOrDie(), Datastore: datastore, diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 9fc3e663..984ba433 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -107,13 +107,13 @@ type Scheduler struct { // PodMetricsProvider is an interface to provide set of pods in the backend and information such as // metrics. type PodMetricsProvider interface { - AllPodMetrics() []*backend.PodMetrics + AllFreshPodMetrics() []*backend.PodMetrics } // Schedule finds the target pod based on metrics and the requested lora adapter. func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) { - klog.V(logutil.VERBOSE).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics()) - pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics()) + klog.V(logutil.VERBOSE).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllFreshPodMetrics()) + pods, err := s.filter.Filter(req, s.podMetricsProvider.AllFreshPodMetrics()) if err != nil || len(pods) == 0 { return backend.Pod{}, fmt.Errorf( "failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index 1c9c1b2e..4a20e1b1 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -27,6 +27,7 @@ type ExtProcServerRunner struct { Zone string RefreshPodsInterval time.Duration RefreshMetricsInterval time.Duration + RefreshMetricsTimeout time.Duration Scheme *runtime.Scheme Config *rest.Config Datastore *backend.K8sDatastore @@ -43,6 +44,7 @@ const ( DefaultZone = "" // default for --zone DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval + DefaultRefreshMetricsTimeout = 1 * time.Second // default for --refreshMetricsTimeout ) func NewDefaultExtProcServerRunner() *ExtProcServerRunner { @@ -123,7 +125,7 @@ func (r *ExtProcServerRunner) Start( // Initialize backend provider pp := backend.NewProvider(podMetricsClient, podDatastore) - if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil { + if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval, r.RefreshMetricsTimeout); err != nil { klog.Fatalf("Failed to initialize backend provider: %v", err) } diff --git a/pkg/ext-proc/test/benchmark/benchmark.go b/pkg/ext-proc/test/benchmark/benchmark.go index 9ff61d8b..93f063ff 100644 --- a/pkg/ext-proc/test/benchmark/benchmark.go +++ b/pkg/ext-proc/test/benchmark/benchmark.go @@ -26,6 +26,7 @@ var ( localServer = flag.Bool("local_server", true, "whether to start a local ext proc server") refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods") refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics") + refreshMetricsTimeout = flag.Duration("refreshMetricsTimeout", 1*time.Second, "timeout of metrics refreshing") ) const ( @@ -37,7 +38,7 @@ func main() { flag.Parse() if *localServer { - test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, fakePods(), fakeModels()) + test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, *refreshMetricsTimeout, fakePods(), fakeModels()) time.Sleep(time.Second) // wait until server is up klog.Info("Server started") } diff --git a/pkg/ext-proc/test/utils.go b/pkg/ext-proc/test/utils.go index 63972849..660734b9 100644 --- a/pkg/ext-proc/test/utils.go +++ b/pkg/ext-proc/test/utils.go @@ -16,7 +16,7 @@ import ( klog "k8s.io/klog/v2" ) -func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server { +func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refreshMetricsTimeout time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server { ps := make(backend.PodSet) pms := make(map[backend.Pod]*backend.PodMetrics) for _, pod := range pods { @@ -25,7 +25,7 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Dur } pmc := &backend.FakePodMetricsClient{Res: pms} pp := backend.NewProvider(pmc, backend.NewK8sDataStore(backend.WithPods(pods))) - if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil { + if err := pp.Init(refreshPodsInterval, refreshMetricsInterval, refreshMetricsTimeout); err != nil { klog.Fatalf("failed to initialize: %v", err) } return startExtProc(port, pp, models) From 17983582040a50fd7e3854bdc7c79950e1862aa7 Mon Sep 17 00:00:00 2001 From: spacewander Date: Fri, 7 Feb 2025 19:13:57 +0800 Subject: [PATCH 5/7] make linter happy Signed-off-by: spacewander --- pkg/ext-proc/backend/provider.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index c7df475c..872b2a9c 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -111,18 +111,10 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshMetr for { time.Sleep(5 * time.Second) podMetrics := p.AllFreshPodMetrics() - freshPodMetrics := make([]*PodMetrics, 0) - for _, pm := range podMetrics { - freshPodMetrics = append(freshPodMetrics, pm) - } - klog.Infof("===DEBUG: Current Pods and metrics: %+v", freshPodMetrics) + klog.Infof("===DEBUG: Current Pods and metrics: %+v", podMetrics) podMetrics = p.AllStalePodMetrics() - stalePodMetrics := make([]*PodMetrics, 0) - for _, pm := range podMetrics { - stalePodMetrics = append(stalePodMetrics, pm) - } - klog.Infof("===DEBUG: Stale Pods and metrics: %+v", stalePodMetrics) + klog.Infof("===DEBUG: Stale Pods and metrics: %+v", podMetrics) } }() } From 0359982109b11dcb143e48a93f28561c930df920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E6=B3=BD=E8=BD=A9?= Date: Tue, 11 Feb 2025 10:09:22 +0000 Subject: [PATCH 6/7] let fmt happy --- pkg/ext-proc/backend/provider.go | 2 +- pkg/ext-proc/backend/provider_test.go | 2 +- pkg/ext-proc/main.go | 2 +- pkg/ext-proc/server/runserver.go | 2 +- pkg/ext-proc/test/benchmark/benchmark.go | 2 +- pkg/ext-proc/test/utils.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 4aecc8c3..612d2b15 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -195,4 +195,4 @@ func (p *Provider) flushPrometheusMetricsOnce() { podTotalCount := len(podMetrics) metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount)) metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount)) -} \ No newline at end of file +} diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go index 2543a1f5..0ac2ebe2 100644 --- a/pkg/ext-proc/backend/provider_test.go +++ b/pkg/ext-proc/backend/provider_test.go @@ -158,4 +158,4 @@ func populateMap(pods ...Pod) *sync.Map { newMap.Store(pod, true) } return newMap -} \ No newline at end of file +} diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 8efe7f80..729488e8 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -231,4 +231,4 @@ func validateFlags() error { } return nil -} \ No newline at end of file +} diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index 77ddf228..dfb9ed65 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -159,4 +159,4 @@ func (r *ExtProcServerRunner) StartManager() { klog.Fatalf("Error starting controller manager: %v", err) } klog.Info("Controller manager shutting down") -} \ No newline at end of file +} diff --git a/pkg/ext-proc/test/benchmark/benchmark.go b/pkg/ext-proc/test/benchmark/benchmark.go index 7d81cb30..52faaca2 100644 --- a/pkg/ext-proc/test/benchmark/benchmark.go +++ b/pkg/ext-proc/test/benchmark/benchmark.go @@ -109,4 +109,4 @@ func fakeMetrics(podNumber int) backend.Metrics { func modelName(i int) string { return fmt.Sprintf("adapter-%v", i) -} \ No newline at end of file +} diff --git a/pkg/ext-proc/test/utils.go b/pkg/ext-proc/test/utils.go index 16669a4d..a61ac577 100644 --- a/pkg/ext-proc/test/utils.go +++ b/pkg/ext-proc/test/utils.go @@ -80,4 +80,4 @@ func FakePod(index int) backend.Pod { Address: address, } return pod -} \ No newline at end of file +} From 23e4c1770fb8b178279fb9305c065b781baa2e4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E6=B3=BD=E8=BD=A9?= Date: Fri, 14 Feb 2025 10:46:38 +0000 Subject: [PATCH 7/7] resolve comments --- pkg/ext-proc/backend/pod_metrics_refresher.go | 9 +++++---- pkg/ext-proc/backend/provider.go | 10 ++++------ pkg/ext-proc/scheduling/scheduler.go | 1 - 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pkg/ext-proc/backend/pod_metrics_refresher.go b/pkg/ext-proc/backend/pod_metrics_refresher.go index 9ffeb1d0..461cfa45 100644 --- a/pkg/ext-proc/backend/pod_metrics_refresher.go +++ b/pkg/ext-proc/backend/pod_metrics_refresher.go @@ -5,6 +5,7 @@ import ( "time" klog "k8s.io/klog/v2" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) type PodMetricsRefresher struct { @@ -29,7 +30,7 @@ func NewPodMetricsRefresher(provider *Provider, pod Pod, interval, timeout time. func (r *PodMetricsRefresher) start() { go func() { - klog.V(2).Infof("Starting refresher for pod %v", r.pod) + klog.V(logutil.DEFAULT).InfoS("Starting refresher", "pod", r.pod) for { select { case <-r.done: @@ -39,7 +40,7 @@ func (r *PodMetricsRefresher) start() { err := r.refreshMetrics() if err != nil { - klog.Errorf("Failed to refresh metrics for pod %s: %v", r.pod, err) + klog.ErrorS(err, "Failed to refresh metrics", "pod", r.pod) } time.Sleep(r.interval) @@ -60,7 +61,7 @@ func (r *PodMetricsRefresher) refreshMetrics() error { return nil } - klog.V(4).Infof("Processing pod %v and metric %v", pod, existing.Metrics) + klog.V(logutil.DEBUG).InfoS("Refresh metrics", "pod", pod, "metrics", existing.Metrics) updated, err := r.provider.pmc.FetchMetrics(ctx, r.pod, existing) if err != nil { return err @@ -71,6 +72,6 @@ func (r *PodMetricsRefresher) refreshMetrics() error { } func (r *PodMetricsRefresher) stop() { - klog.V(2).Infof("Stopping refresher for pod %v", r.pod) + klog.V(logutil.DEFAULT).InfoS("Stopping refresher", "pod", r.pod) close(r.done) } diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 674eae19..1827fceb 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -11,9 +11,7 @@ import ( ) const ( - // TODO: make it configurable. One idea is to provide a configuration singleton - // and put fields like refreshMetricsInterval in it. So far, we have to pass these - // fields across several layers. + // TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336 metricsValidityPeriod = 5 * time.Second ) @@ -64,7 +62,7 @@ func (p *Provider) allPodMetrics(stale bool) []*PodMetrics { if !stale { if isPodMetricsStale(m) { // exclude stale metrics for scheduler - klog.V(4).Infof("Pod metrics for %s is stale, skipping", m.Pod) + klog.V(logutil.DEBUG).InfoS("Pod metrics is stale, skipping", "pod", m.Pod) } else { res = append(res, m) } @@ -84,7 +82,7 @@ func (p *Provider) allPodMetrics(stale bool) []*PodMetrics { func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) { pm.Metrics.UpdatedTime = time.Now() p.podMetrics.Store(pod, pm) - klog.V(4).Infof("Updated metrics for pod %s: %v", pod, pm.Metrics) + klog.V(logutil.DEBUG).InfoS("Updated metrics", "pod", pod, "metrics", pm.Metrics) } func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { @@ -156,7 +154,7 @@ func (p *Provider) refreshPodsOnce(refreshMetricsInterval, refreshMetricsTimeout pod := k.(Pod) if _, ok := p.datastore.pods.Load(pod); !ok { p.podMetrics.Delete(pod) - if v, ok := p.podMetrics.LoadAndDelete(pod); ok { + if v, ok := p.podMetricsRefresher.LoadAndDelete(pod); ok { refresher := v.(*PodMetricsRefresher) refresher.stop() } diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index c3447c98..f20aecad 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -111,7 +111,6 @@ type PodMetricsProvider interface { // Schedule finds the target pod based on metrics and the requested lora adapter. func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) { - klog.V(logutil.VERBOSE).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllFreshPodMetrics()) pods, err := s.filter.Filter(req, s.podMetricsProvider.AllFreshPodMetrics()) if err != nil || len(pods) == 0 { return backend.Pod{}, fmt.Errorf(