diff --git a/Makefile b/Makefile index b7654ed7..a6ca9c4a 100644 --- a/Makefile +++ b/Makefile @@ -105,7 +105,7 @@ vet: ## Run go vet against code. .PHONY: test test: manifests generate fmt vet envtest ## Run tests. - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -coverprofile cover.out + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test $$(go list ./... | grep -v /e2e) -race -coverprofile cover.out .PHONY: test-integration test-integration: manifests generate fmt vet envtest ## Run tests. diff --git a/pkg/ext-proc/backend/fake.go b/pkg/ext-proc/backend/fake.go index 7ab8a464..74efdcef 100644 --- a/pkg/ext-proc/backend/fake.go +++ b/pkg/ext-proc/backend/fake.go @@ -18,7 +18,7 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi return nil, err } klog.V(logutil.VERBOSE).InfoS("Fetching metrics for pod", "pod", pod, "existing", existing, "new", f.Res[pod]) - return f.Res[pod], nil + return f.Res[pod].Clone(), nil } type FakeDataStore struct { diff --git a/pkg/ext-proc/backend/pod_metrics_refresher.go b/pkg/ext-proc/backend/pod_metrics_refresher.go new file mode 100644 index 00000000..461cfa45 --- /dev/null +++ b/pkg/ext-proc/backend/pod_metrics_refresher.go @@ -0,0 +1,77 @@ +package backend + +import ( + "context" + "time" + + klog "k8s.io/klog/v2" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" +) + +type PodMetricsRefresher struct { + done chan struct{} + interval time.Duration + timeout time.Duration + + // refresher holds provider & pod so it can update the metrics concurrent-safely. + pod Pod + provider *Provider +} + +func NewPodMetricsRefresher(provider *Provider, pod Pod, interval, timeout time.Duration) *PodMetricsRefresher { + return &PodMetricsRefresher{ + done: make(chan struct{}), + interval: interval, + timeout: timeout, + pod: pod, + provider: provider, + } +} + +func (r *PodMetricsRefresher) start() { + go func() { + klog.V(logutil.DEFAULT).InfoS("Starting refresher", "pod", r.pod) + for { + select { + case <-r.done: + return + default: + } + + err := r.refreshMetrics() + if err != nil { + klog.ErrorS(err, "Failed to refresh metrics", "pod", r.pod) + } + + time.Sleep(r.interval) + } + }() +} + +func (r *PodMetricsRefresher) refreshMetrics() error { + ctx, cancel := context.WithTimeout(context.Background(), r.timeout) + defer cancel() + + pod := r.pod + existing, found := r.provider.GetPodMetrics(pod) + if !found { + // As refresher is running in the background, it's possible that the pod is deleted but + // the refresh goroutine doesn't read the done channel yet. In this case, we just return nil. + // The refresher will be stopped after this interval. + return nil + } + + klog.V(logutil.DEBUG).InfoS("Refresh metrics", "pod", pod, "metrics", existing.Metrics) + updated, err := r.provider.pmc.FetchMetrics(ctx, r.pod, existing) + if err != nil { + return err + } + + r.provider.UpdatePodMetrics(pod, updated) + return nil +} + +func (r *PodMetricsRefresher) stop() { + klog.V(logutil.DEFAULT).InfoS("Stopping refresher", "pod", r.pod) + close(r.done) +} diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index d64b80b3..1827fceb 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -2,25 +2,25 @@ package backend import ( "context" - "fmt" "sync" "time" - "go.uber.org/multierr" klog "k8s.io/klog/v2" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) const ( - fetchMetricsTimeout = 5 * time.Second + // TODO: https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/336 + metricsValidityPeriod = 5 * time.Second ) func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { p := &Provider{ - podMetrics: sync.Map{}, - pmc: pmc, - datastore: datastore, + podMetrics: sync.Map{}, + podMetricsRefresher: sync.Map{}, + pmc: pmc, + datastore: datastore, } return p } @@ -29,18 +29,50 @@ func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { type Provider struct { // key: Pod, value: *PodMetrics podMetrics sync.Map - pmc PodMetricsClient - datastore *K8sDatastore + // key: Pod, value: *PodMetricsRefresher + podMetricsRefresher sync.Map + pmc PodMetricsClient + datastore *K8sDatastore } type PodMetricsClient interface { + // FetchMetrics fetches metrics for the given pod. + // The returned PodMetrics and the existing one should not be the same object. + // Otherwise, there will be race. FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error) } -func (p *Provider) AllPodMetrics() []*PodMetrics { +func isPodMetricsStale(pm *PodMetrics) bool { + return time.Since(pm.Metrics.UpdatedTime) > metricsValidityPeriod +} + +func (p *Provider) AllFreshPodMetrics() []*PodMetrics { + return p.allPodMetrics(false) +} + +func (p *Provider) AllStalePodMetrics() []*PodMetrics { + return p.allPodMetrics(true) +} + +func (p *Provider) allPodMetrics(stale bool) []*PodMetrics { res := []*PodMetrics{} fn := func(k, v any) bool { - res = append(res, v.(*PodMetrics)) + m := v.(*PodMetrics) + + if !stale { + if isPodMetricsStale(m) { + // exclude stale metrics for scheduler + klog.V(logutil.DEBUG).InfoS("Pod metrics is stale, skipping", "pod", m.Pod) + } else { + res = append(res, m) + } + + } else { + if isPodMetricsStale(m) { + res = append(res, m) + } + } + return true } p.podMetrics.Range(fn) @@ -48,41 +80,27 @@ func (p *Provider) AllPodMetrics() []*PodMetrics { } func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) { + pm.Metrics.UpdatedTime = time.Now() p.podMetrics.Store(pod, pm) + klog.V(logutil.DEBUG).InfoS("Updated metrics", "pod", pod, "metrics", pm.Metrics) } func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { val, ok := p.podMetrics.Load(pod) if ok { + // For now, the only caller of GetPodMetrics is the refresher, so we + // don't need to exclude the stale metrics. return val.(*PodMetrics), true } return nil, false } -func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error { - p.refreshPodsOnce() - - if err := p.refreshMetricsOnce(); err != nil { - klog.ErrorS(err, "Failed to init metrics") - } - - klog.InfoS("Initialized pods and metrics", "metrics", p.AllPodMetrics()) - +func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshMetricsTimeout, refreshPrometheusMetricsInterval time.Duration) error { // periodically refresh pods go func() { for { + p.refreshPodsOnce(refreshMetricsInterval, refreshMetricsTimeout) time.Sleep(refreshPodsInterval) - p.refreshPodsOnce() - } - }() - - // periodically refresh metrics - go func() { - for { - time.Sleep(refreshMetricsInterval) - if err := p.refreshMetricsOnce(); err != nil { - klog.V(logutil.TRACE).ErrorS(err, "Failed to refresh metrics") - } } }() @@ -99,7 +117,8 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm go func() { for { time.Sleep(5 * time.Second) - klogV.InfoS("Current Pods and metrics gathered", "metrics", p.AllPodMetrics()) + klogV.InfoS("Current Pods and metrics gathered", "fresh metrics", p.AllFreshPodMetrics(), + "stale metrics", p.AllStalePodMetrics()) } }() } @@ -109,7 +128,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshProm // refreshPodsOnce lists pods and updates keys in the podMetrics map. // Note this function doesn't update the PodMetrics value, it's done separately. -func (p *Provider) refreshPodsOnce() { +func (p *Provider) refreshPodsOnce(refreshMetricsInterval, refreshMetricsTimeout time.Duration) { // merge new pods with cached ones. // add new pod to the map addNewPods := func(k, v any) bool { @@ -120,8 +139,13 @@ func (p *Provider) refreshPodsOnce() { Metrics: Metrics{ ActiveModels: make(map[string]int), }, + // Metrics are considered stale until they are first refreshed. } p.podMetrics.Store(pod, new) + + refresher := NewPodMetricsRefresher(p, pod, refreshMetricsInterval, refreshMetricsTimeout) + refresher.start() + p.podMetricsRefresher.Store(pod, refresher) } return true } @@ -130,6 +154,10 @@ func (p *Provider) refreshPodsOnce() { pod := k.(Pod) if _, ok := p.datastore.pods.Load(pod); !ok { p.podMetrics.Delete(pod) + if v, ok := p.podMetricsRefresher.LoadAndDelete(pod); ok { + refresher := v.(*PodMetricsRefresher) + refresher.stop() + } } return true } @@ -137,54 +165,6 @@ func (p *Provider) refreshPodsOnce() { p.datastore.pods.Range(addNewPods) } -func (p *Provider) refreshMetricsOnce() error { - klogV := klog.V(logutil.TRACE) - ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) - defer cancel() - start := time.Now() - defer func() { - d := time.Since(start) - // TODO: add a metric instead of logging - klogV.InfoS("Metrics refreshed", "duration", d) - }() - var wg sync.WaitGroup - errCh := make(chan error) - processOnePod := func(key, value any) bool { - klogV.InfoS("Pod and metric being processed", "pod", key, "metric", value) - pod := key.(Pod) - existing := value.(*PodMetrics) - wg.Add(1) - go func() { - defer wg.Done() - updated, err := p.pmc.FetchMetrics(ctx, pod, existing) - if err != nil { - errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err) - return - } - p.UpdatePodMetrics(pod, updated) - klogV.InfoS("Updated metrics for pod", "pod", pod, "metrics", updated.Metrics) - }() - return true - } - p.podMetrics.Range(processOnePod) - - // Wait for metric collection for all pods to complete and close the error channel in a - // goroutine so this is unblocking, allowing the code to proceed to the error collection code - // below. - // Note we couldn't use a buffered error channel with a size because the size of the podMetrics - // sync.Map is unknown beforehand. - go func() { - wg.Wait() - close(errCh) - }() - - var errs error - for err := range errCh { - errs = multierr.Append(errs, err) - } - return errs -} - func (p *Provider) flushPrometheusMetricsOnce() { klog.V(logutil.DEBUG).InfoS("Flushing Prometheus Metrics") @@ -197,7 +177,7 @@ func (p *Provider) flushPrometheusMetricsOnce() { var kvCacheTotal float64 var queueTotal int - podMetrics := p.AllPodMetrics() + podMetrics := append(p.AllFreshPodMetrics(), p.AllStalePodMetrics()...) if len(podMetrics) == 0 { return } diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go index ddd7f0d6..0ac2ebe2 100644 --- a/pkg/ext-proc/backend/provider_test.go +++ b/pkg/ext-proc/backend/provider_test.go @@ -8,11 +8,12 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" ) var ( pod1 = &PodMetrics{ - Pod: Pod{Name: "pod1"}, + Pod: Pod{Name: "pod1", Address: "127.0.0.1"}, Metrics: Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0.2, @@ -24,7 +25,7 @@ var ( }, } pod2 = &PodMetrics{ - Pod: Pod{Name: "pod2"}, + Pod: Pod{Name: "pod2", Address: "127.0.0.2"}, Metrics: Metrics{ WaitingQueueSize: 1, KVCacheUsagePercent: 0.2, @@ -44,6 +45,7 @@ func TestProvider(t *testing.T) { datastore *K8sDatastore initErr bool want []*PodMetrics + wantStale []*PodMetrics }{ { name: "Init success", @@ -73,9 +75,13 @@ func TestProvider(t *testing.T) { }, want: []*PodMetrics{ pod1, + // Failed to fetch pod2 metrics so it remains the default values, + // which is stale. + }, + wantStale: []*PodMetrics{ // Failed to fetch pod2 metrics so it remains the default values. { - Pod: Pod{Name: "pod2"}, + Pod: Pod{Name: "pod2", Address: "127.0.0.2"}, Metrics: Metrics{ WaitingQueueSize: 0, KVCacheUsagePercent: 0, @@ -90,17 +96,58 @@ func TestProvider(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := NewProvider(test.pmc, test.datastore) - err := p.Init(time.Millisecond, time.Millisecond, time.Millisecond) + err := p.Init(10*time.Millisecond, time.Millisecond, time.Second, 10*time.Millisecond) if test.initErr != (err != nil) { t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr) } - metrics := p.AllPodMetrics() + + // do some turns of refreshing... + time.Sleep(3 * time.Millisecond) + + metrics := p.AllFreshPodMetrics() + metricsCopy := make([]*PodMetrics, len(metrics)) + for i, pm := range metrics { + if pm.UpdatedTime.IsZero() { + t.Errorf("Expected non-zero UpdatedTime, got %v", pm.UpdatedTime) + } + cp := pm.Clone() + // reset the UpdatedTime for comparison + cp.UpdatedTime = time.Time{} + metricsCopy[i] = cp + } + lessFunc := func(a, b *PodMetrics) bool { return a.String() < b.String() } - if diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(lessFunc)); diff != "" { + if diff := cmp.Diff(test.want, metricsCopy, cmpopts.SortSlices(lessFunc)); diff != "" { t.Errorf("Unexpected output (-want +got): %v", diff) } + + // then check for AllStalePodMetrics + if len(test.wantStale) > 0 { + staleMetrics := p.AllStalePodMetrics() + metricsCopy := make([]*PodMetrics, len(staleMetrics)) + for i, pm := range staleMetrics { + cp := pm.Clone() + // reset the UpdatedTime for comparison + cp.UpdatedTime = time.Time{} + metricsCopy[i] = cp + } + + if diff := cmp.Diff(test.wantStale, metricsCopy, cmpopts.SortSlices(lessFunc)); diff != "" { + t.Errorf("Unexpected output (-want +got): %v", diff) + } + } + + // simulate pod deletion + p.datastore.pods.Range(func(k, v any) bool { + p.datastore.pods.Delete(k) + return true + }) + assert.Eventuallyf(t, func() bool { + // ensure no update is writing to the PodMetrics by background refreshing + return len(p.AllFreshPodMetrics()) == 0 + }, 100*time.Millisecond, 10*time.Millisecond, "Expected no metrics, got %v", p.AllFreshPodMetrics()) }) } } diff --git a/pkg/ext-proc/backend/suite_test.go b/pkg/ext-proc/backend/suite_test.go new file mode 100644 index 00000000..2174b2b3 --- /dev/null +++ b/pkg/ext-proc/backend/suite_test.go @@ -0,0 +1,20 @@ +package backend + +import ( + "flag" + "os" + "testing" + + klog "k8s.io/klog/v2" +) + +// Turn on klog in test so we can observe the behavior, which is good +// for debugging and catching error. +func TestMain(m *testing.M) { + klog.InitFlags(nil) + _ = flag.Set("logtostderr", "true") // Log to stderr instead of files + _ = flag.Set("v", "4") // Log level + flag.Parse() + + os.Exit(m.Run()) +} diff --git a/pkg/ext-proc/backend/types.go b/pkg/ext-proc/backend/types.go index 7e399fed..23089fdd 100644 --- a/pkg/ext-proc/backend/types.go +++ b/pkg/ext-proc/backend/types.go @@ -1,7 +1,10 @@ // Package backend is a library to interact with backend model servers such as probing metrics. package backend -import "fmt" +import ( + "fmt" + "time" +) type PodSet map[Pod]bool @@ -23,6 +26,9 @@ type Metrics struct { WaitingQueueSize int KVCacheUsagePercent float64 KvCacheMaxTokenCapacity int + + // UpdatedTime record the time when the metrics are updated. + UpdatedTime time.Time } type PodMetrics struct { @@ -43,10 +49,12 @@ func (pm *PodMetrics) Clone() *PodMetrics { Pod: pm.Pod, Metrics: Metrics{ ActiveModels: cm, + MaxActiveModels: pm.MaxActiveModels, RunningQueueSize: pm.RunningQueueSize, WaitingQueueSize: pm.WaitingQueueSize, KVCacheUsagePercent: pm.KVCacheUsagePercent, KvCacheMaxTokenCapacity: pm.KvCacheMaxTokenCapacity, + UpdatedTime: pm.UpdatedTime, }, } return clone diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 06c77af3..9220012b 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -67,6 +67,10 @@ var ( "refreshMetricsInterval", runserver.DefaultRefreshMetricsInterval, "interval to refresh metrics") + refreshMetricsTimeout = flag.Duration( + "refreshMetricsTimeout", + runserver.DefaultRefreshMetricsTimeout, + "timeout of metrics refreshing") refreshPrometheusMetricsInterval = flag.Duration( "refreshPrometheusMetricsInterval", runserver.DefaultRefreshPrometheusMetricsInterval, @@ -131,6 +135,7 @@ func run() error { PoolNamespace: *poolNamespace, RefreshPodsInterval: *refreshPodsInterval, RefreshMetricsInterval: *refreshMetricsInterval, + RefreshMetricsTimeout: *refreshMetricsTimeout, RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval, Datastore: datastore, } diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index 50564898..f20aecad 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -106,13 +106,12 @@ type Scheduler struct { // PodMetricsProvider is an interface to provide set of pods in the backend and information such as // metrics. type PodMetricsProvider interface { - AllPodMetrics() []*backend.PodMetrics + AllFreshPodMetrics() []*backend.PodMetrics } // Schedule finds the target pod based on metrics and the requested lora adapter. func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) { - klog.V(logutil.VERBOSE).InfoS("Scheduling a request", "request", req, "metrics", s.podMetricsProvider.AllPodMetrics()) - pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics()) + pods, err := s.filter.Filter(req, s.podMetricsProvider.AllFreshPodMetrics()) if err != nil || len(pods) == 0 { return backend.Pod{}, fmt.Errorf( "failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index 2d92e412..5913848a 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -25,6 +25,7 @@ type ExtProcServerRunner struct { PoolNamespace string RefreshPodsInterval time.Duration RefreshMetricsInterval time.Duration + RefreshMetricsTimeout time.Duration RefreshPrometheusMetricsInterval time.Duration Datastore *backend.K8sDatastore } @@ -37,6 +38,7 @@ const ( DefaultPoolNamespace = "default" // default for --poolNamespace DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval + DefaultRefreshMetricsTimeout = 1 * time.Second // default for --refreshMetricsTimeout DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval ) @@ -48,6 +50,7 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner { PoolNamespace: DefaultPoolNamespace, RefreshPodsInterval: DefaultRefreshPodsInterval, RefreshMetricsInterval: DefaultRefreshMetricsInterval, + RefreshMetricsTimeout: DefaultRefreshMetricsTimeout, RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, // Datastore can be assigned later. } @@ -102,7 +105,7 @@ func (r *ExtProcServerRunner) AsRunnable( return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error { // Initialize backend provider pp := backend.NewProvider(podMetricsClient, podDatastore) - if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval, r.RefreshPrometheusMetricsInterval); err != nil { + if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval, r.RefreshMetricsTimeout, r.RefreshPrometheusMetricsInterval); err != nil { klog.ErrorS(err, "Failed to initialize backend provider") return err } diff --git a/pkg/ext-proc/test/benchmark/benchmark.go b/pkg/ext-proc/test/benchmark/benchmark.go index c83dbcb9..e92c0e9b 100644 --- a/pkg/ext-proc/test/benchmark/benchmark.go +++ b/pkg/ext-proc/test/benchmark/benchmark.go @@ -27,6 +27,7 @@ var ( localServer = flag.Bool("local_server", true, "whether to start a local ext proc server") refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods") refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics via polling pods") + refreshMetricsTimeout = flag.Duration("refreshMetricsTimeout", 1*time.Second, "timeout of metrics refreshing") refreshPrometheusMetricsInterval = flag.Duration("refreshPrometheusMetricsInterval", 5*time.Second, "interval to flush prometheus metrics") ) @@ -45,7 +46,7 @@ func run() error { flag.Parse() if *localServer { - test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, *refreshPrometheusMetricsInterval, fakePods(), fakeModels()) + test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, *refreshMetricsTimeout, *refreshPrometheusMetricsInterval, fakePods(), fakeModels()) time.Sleep(time.Second) // wait until server is up klog.InfoS("Server started") } diff --git a/pkg/ext-proc/test/utils.go b/pkg/ext-proc/test/utils.go index 4c000722..7068ae1e 100644 --- a/pkg/ext-proc/test/utils.go +++ b/pkg/ext-proc/test/utils.go @@ -17,7 +17,7 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) -func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server { +func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refreshMetricsTimeout, refreshPrometheusMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server { ps := make(backend.PodSet) pms := make(map[backend.Pod]*backend.PodMetrics) for _, pod := range pods { @@ -26,7 +26,7 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refresh } pmc := &backend.FakePodMetricsClient{Res: pms} pp := backend.NewProvider(pmc, backend.NewK8sDataStore(backend.WithPods(pods))) - if err := pp.Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval); err != nil { + if err := pp.Init(refreshPodsInterval, refreshMetricsInterval, refreshMetricsTimeout, refreshPrometheusMetricsInterval); err != nil { logutil.Fatal(err, "Failed to initialize") } return startExtProc(port, pp, models)