Skip to content

Commit

Permalink
create pods during integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: Kuromesi <[email protected]>
  • Loading branch information
Kuromesi committed Feb 28, 2025
1 parent d2c6e7a commit 6e51c02
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 21 deletions.
4 changes: 2 additions & 2 deletions pkg/epp/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ func GenerateRequest(logger logr.Logger, prompt, model string) *extProcPb.Proces
}

func FakePodMetrics(index int, metrics datastore.Metrics) *datastore.PodMetrics {
address := fmt.Sprintf("address-%v", index)
address := fmt.Sprintf("192.168.1.%d", index+1)
pod := datastore.PodMetrics{
Pod: datastore.Pod{
NamespacedName: types.NamespacedName{Name: fmt.Sprintf("pod-%v", index)},
NamespacedName: types.NamespacedName{Name: fmt.Sprintf("pod-%v", index), Namespace: "default"},
Address: address,
},
Metrics: metrics,
Expand Down
14 changes: 14 additions & 0 deletions pkg/epp/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ func MakePod(podName string) *PodWrapper {
}
}

// Complete sets necessary fields for a Pod to make it not denied by the apiserver
func (p *PodWrapper) Complete() *PodWrapper {
if p.Pod.Namespace == "" {
p.Namespace("default")
}
p.Spec.Containers = []corev1.Container{
{
Name: "mock-vllm",
Image: "mock-vllm:latest",
},
}
return p
}

func (p *PodWrapper) Namespace(ns string) *PodWrapper {
p.ObjectMeta.Namespace = ns
return p
Expand Down
67 changes: 48 additions & 19 deletions test/integration/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
{
Header: &configPb.HeaderValue{
Key: runserver.DefaultDestinationEndpointHintKey,
RawValue: []byte("address-1:8000"),
RawValue: []byte("192.168.1.2:8000"),
},
},
{
Expand All @@ -122,7 +122,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetadata: makeMetadata("address-1:8000"),
wantMetadata: makeMetadata("192.168.1.2:8000"),
wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"test1\",\"temperature\":0}"),
wantMetrics: `
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
{
Header: &configPb.HeaderValue{
Key: runserver.DefaultDestinationEndpointHintKey,
RawValue: []byte("address-1:8000"),
RawValue: []byte("192.168.1.2:8000"),
},
},
{
Expand All @@ -175,7 +175,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetadata: makeMetadata("address-1:8000"),
wantMetadata: makeMetadata("192.168.1.2:8000"),
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test2\",\"temperature\":0}"),
wantMetrics: `
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
{
Header: &configPb.HeaderValue{
Key: runserver.DefaultDestinationEndpointHintKey,
RawValue: []byte("address-2:8000"),
RawValue: []byte("192.168.1.3:8000"),
},
},
{
Expand All @@ -229,7 +229,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetadata: makeMetadata("address-2:8000"),
wantMetadata: makeMetadata("192.168.1.3:8000"),
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test3\",\"temperature\":0}"),
wantMetrics: `
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
{
Header: &configPb.HeaderValue{
Key: runserver.DefaultDestinationEndpointHintKey,
RawValue: []byte("address-0:8000"),
RawValue: []byte("192.168.1.1:8000"),
},
},
{
Expand All @@ -326,7 +326,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetadata: makeMetadata("address-0:8000"),
wantMetadata: makeMetadata("192.168.1.1:8000"),
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"test5\",\"temperature\":0}"),
wantMetrics: `
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
Expand Down Expand Up @@ -397,23 +397,42 @@ func setUpHermeticServer(podMetrics []*datastore.PodMetrics) (client extProcPb.E
pmc := &backend.FakePodMetricsClient{Res: pms}

serverCtx, stopServer := context.WithCancel(context.Background())
go func() {
serverRunner.Datastore.PodDeleteAll()
for _, pm := range podMetrics {
pod := utiltesting.MakePod(pm.NamespacedName.Name).
Namespace(pm.NamespacedName.Namespace).
ReadyCondition().
IP(pm.Address).
ObjRef()
serverRunner.Datastore.PodUpdateOrAddIfNotExist(pod)
serverRunner.Datastore.PodUpdateMetricsIfExist(pm.NamespacedName, &pm.Metrics)

// TODO: this should be consistent with the inference pool
podLabels := map[string]string{
"app": "vllm-llama2-7b-pool",
}

for _, pm := range podMetrics {
pod := utiltesting.MakePod(pm.NamespacedName.Name).
Namespace(pm.NamespacedName.Namespace).
ReadyCondition().
Labels(podLabels).
IP(pm.Address).
Complete().
ObjRef()

copy := pod.DeepCopy()
if err := k8sClient.Create(context.Background(), copy); err != nil {
logutil.Fatal(logger, err, "Failed to create pod", "pod", pm.NamespacedName)
}

// since no pod controllers deployed in fake environment, we manually update pod status
copy.Status = pod.Status
if err := k8sClient.Status().Update(context.Background(), copy); err != nil {
logutil.Fatal(logger, err, "Failed to update pod status", "pod", pm.NamespacedName)
}
serverRunner.Provider = backend.NewProvider(pmc, serverRunner.Datastore)
}
serverRunner.Provider = backend.NewProvider(pmc, serverRunner.Datastore)
go func() {
if err := serverRunner.AsRunnable(logger.WithName("ext-proc")).Start(serverCtx); err != nil {
logutil.Fatal(logger, err, "Failed to start ext-proc server")
}
}()

// sleep 5 seconds to wait for datastore to be synced
time.Sleep(5 * time.Second)

address := fmt.Sprintf("localhost:%v", port)
// Create a grpc connection
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand All @@ -430,6 +449,16 @@ func setUpHermeticServer(podMetrics []*datastore.PodMetrics) (client extProcPb.E
cancel()
conn.Close()
stopServer()

// clear created pods
for _, pm := range podMetrics {
pod := utiltesting.MakePod(pm.NamespacedName.Name).
Namespace(pm.NamespacedName.Namespace).Complete().ObjRef()

if err := k8sClient.Delete(context.Background(), pod); err != nil {
logutil.Fatal(logger, err, "Failed to delete pod", "pod", pm.NamespacedName)
}
}
// wait a little until the goroutines actually exit
time.Sleep(5 * time.Second)
}
Expand Down

0 comments on commit 6e51c02

Please sign in to comment.