From 0af1655d0257475aaf0156705baf720ad3608f65 Mon Sep 17 00:00:00 2001 From: Kuromesi Date: Fri, 28 Feb 2025 14:45:03 +0800 Subject: [PATCH] create pods during integration tests Signed-off-by: Kuromesi --- pkg/epp/test/utils.go | 4 +- pkg/epp/util/testing/wrappers.go | 14 +++++++ test/integration/hermetic_test.go | 67 ++++++++++++++++++++++--------- 3 files changed, 64 insertions(+), 21 deletions(-) diff --git a/pkg/epp/test/utils.go b/pkg/epp/test/utils.go index a916bda2..b18b0919 100644 --- a/pkg/epp/test/utils.go +++ b/pkg/epp/test/utils.go @@ -114,10 +114,10 @@ func GenerateRequest(logger logr.Logger, prompt, model string) *extProcPb.Proces } func FakePodMetrics(index int, metrics datastore.Metrics) *datastore.PodMetrics { - address := fmt.Sprintf("address-%v", index) + address := fmt.Sprintf("192.168.1.%d", index+1) pod := datastore.PodMetrics{ Pod: datastore.Pod{ - NamespacedName: types.NamespacedName{Name: fmt.Sprintf("pod-%v", index)}, + NamespacedName: types.NamespacedName{Name: fmt.Sprintf("pod-%v", index), Namespace: "default"}, Address: address, }, Metrics: metrics, diff --git a/pkg/epp/util/testing/wrappers.go b/pkg/epp/util/testing/wrappers.go index bfcf2690..a63b942b 100644 --- a/pkg/epp/util/testing/wrappers.go +++ b/pkg/epp/util/testing/wrappers.go @@ -40,6 +40,20 @@ func MakePod(podName string) *PodWrapper { } } +// Complete sets necessary fields for a Pod to make it not denied by the apiserver +func (p *PodWrapper) Complete() *PodWrapper { + if p.Pod.Namespace == "" { + p.Namespace("default") + } + p.Spec.Containers = []corev1.Container{ + { + Name: "mock-vllm", + Image: "mock-vllm:latest", + }, + } + return p +} + func (p *PodWrapper) Namespace(ns string) *PodWrapper { p.ObjectMeta.Namespace = ns return p diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index b4355539..42ba0af5 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -112,7 +112,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { { Header: &configPb.HeaderValue{ Key: runserver.DefaultDestinationEndpointHintKey, - RawValue: []byte("address-1:8000"), + RawValue: []byte("192.168.1.2:8000"), }, }, { @@ -122,7 +122,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, - wantMetadata: makeMetadata("address-1:8000"), + wantMetadata: makeMetadata("192.168.1.2:8000"), wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"test1\",\"temperature\":0}"), wantMetrics: ` # HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model. @@ -165,7 +165,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { { Header: &configPb.HeaderValue{ Key: runserver.DefaultDestinationEndpointHintKey, - RawValue: []byte("address-1:8000"), + RawValue: []byte("192.168.1.2:8000"), }, }, { @@ -175,7 +175,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, - wantMetadata: makeMetadata("address-1:8000"), + wantMetadata: makeMetadata("192.168.1.2:8000"), wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test2\",\"temperature\":0}"), wantMetrics: ` # HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model. @@ -219,7 +219,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { { Header: &configPb.HeaderValue{ Key: runserver.DefaultDestinationEndpointHintKey, - RawValue: []byte("address-2:8000"), + RawValue: []byte("192.168.1.3:8000"), }, }, { @@ -229,7 +229,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, - wantMetadata: makeMetadata("address-2:8000"), + wantMetadata: makeMetadata("192.168.1.3:8000"), wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test3\",\"temperature\":0}"), wantMetrics: ` # HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model. @@ -316,7 +316,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { { Header: &configPb.HeaderValue{ Key: runserver.DefaultDestinationEndpointHintKey, - RawValue: []byte("address-0:8000"), + RawValue: []byte("192.168.1.1:8000"), }, }, { @@ -326,7 +326,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, - wantMetadata: makeMetadata("address-0:8000"), + wantMetadata: makeMetadata("192.168.1.1:8000"), wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"test5\",\"temperature\":0}"), wantMetrics: ` # HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model. @@ -397,23 +397,42 @@ func setUpHermeticServer(podMetrics []*datastore.PodMetrics) (client extProcPb.E pmc := &backend.FakePodMetricsClient{Res: pms} serverCtx, stopServer := context.WithCancel(context.Background()) - go func() { - serverRunner.Datastore.PodDeleteAll() - for _, pm := range podMetrics { - pod := utiltesting.MakePod(pm.NamespacedName.Name). - Namespace(pm.NamespacedName.Namespace). - ReadyCondition(). - IP(pm.Address). - ObjRef() - serverRunner.Datastore.PodUpdateOrAddIfNotExist(pod) - serverRunner.Datastore.PodUpdateMetricsIfExist(pm.NamespacedName, &pm.Metrics) + + // TODO: this should be consistent with the inference pool + podLabels := map[string]string{ + "app": "vllm-llama2-7b-pool", + } + + for _, pm := range podMetrics { + pod := utiltesting.MakePod(pm.NamespacedName.Name). + Namespace(pm.NamespacedName.Namespace). + ReadyCondition(). + Labels(podLabels). + IP(pm.Address). + Complete(). + ObjRef() + + copy := pod.DeepCopy() + if err := k8sClient.Create(context.Background(), copy); err != nil { + logutil.Fatal(logger, err, "Failed to create pod", "pod", pm.NamespacedName) + } + + // since no pod controllers deployed in fake environment, we manually update pod status + copy.Status = pod.Status + if err := k8sClient.Status().Update(context.Background(), copy); err != nil { + logutil.Fatal(logger, err, "Failed to update pod status", "pod", pm.NamespacedName) } - serverRunner.Provider = backend.NewProvider(pmc, serverRunner.Datastore) + } + serverRunner.Provider = backend.NewProvider(pmc, serverRunner.Datastore) + go func() { if err := serverRunner.AsRunnable(logger.WithName("ext-proc")).Start(serverCtx); err != nil { logutil.Fatal(logger, err, "Failed to start ext-proc server") } }() + // sleep 5 seconds to wait for datastore to be synced + time.Sleep(5 * time.Second) + address := fmt.Sprintf("localhost:%v", port) // Create a grpc connection conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -430,6 +449,16 @@ func setUpHermeticServer(podMetrics []*datastore.PodMetrics) (client extProcPb.E cancel() conn.Close() stopServer() + + // clear created pods + for _, pm := range podMetrics { + pod := utiltesting.MakePod(pm.NamespacedName.Name). + Namespace(pm.NamespacedName.Namespace).Complete().ObjRef() + + if err := k8sClient.Delete(context.Background(), pod); err != nil { + logutil.Fatal(logger, err, "Failed to create pod", "pod", pm.NamespacedName) + } + } // wait a little until the goroutines actually exit time.Sleep(5 * time.Second) }