From 00c54e033443275091e75a9093ff7dd18d3054ff Mon Sep 17 00:00:00 2001 From: jeffluoo Date: Thu, 13 Feb 2025 02:48:52 +0000 Subject: [PATCH] [Metrics] Add streaming support for metrics Address https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178 --- pkg/epp/handlers/request.go | 29 ++++++++++++ pkg/epp/handlers/response.go | 75 +++++++++++++++++++++++++------ pkg/epp/handlers/response_test.go | 50 ++++++++++++++++++++- pkg/epp/handlers/server.go | 15 ++++++- pkg/epp/metrics/README.md | 41 ++++++++++++++--- 5 files changed, 188 insertions(+), 22 deletions(-) diff --git a/pkg/epp/handlers/request.go b/pkg/epp/handlers/request.go index b9ffd0b0..5b18d77d 100644 --- a/pkg/epp/handlers/request.go +++ b/pkg/epp/handlers/request.go @@ -61,6 +61,35 @@ func (s *Server) HandleRequestBody( loggerVerbose.Info("Model requested", "model", model) modelName := model + // Resolve streaming options + + streaming, ok := rb["stream"].(bool) + if !ok { + // streaming not set, no-op + } else { + reqCtx.Streaming = streaming + } + + type Usage struct { + IncludeUsage string `json:"include_usage,omitempty"` + } + if streamOption, ok := rb["stream_options"]; ok { + includeUsage := Usage{} + optionJson, err := json.Marshal(streamOption) + if err != nil { + return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("marshalling stream_options: %v", err)} + } + if err := json.Unmarshal(optionJson, &includeUsage); err != nil { + return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling stream_options: %v", err)} + } + usageEnabled, err := strconv.ParseBool(includeUsage.IncludeUsage) + if err != nil { + return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("invalid include_usage: %v", includeUsage.IncludeUsage)} + } + + reqCtx.StreamingIncludeUsage = usageEnabled + } + // NOTE: The nil checking for the modelObject means that we DO allow passthrough currently. // This might be a security risk in the future where adapters not registered in the InferenceModel // are able to be requested by using their distinct name. diff --git a/pkg/epp/handlers/response.go b/pkg/epp/handlers/response.go index f9396acf..f1be1cc0 100644 --- a/pkg/epp/handlers/response.go +++ b/pkg/epp/handlers/response.go @@ -20,6 +20,8 @@ import ( "context" "encoding/json" "fmt" + "regexp" + "strings" configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -76,6 +78,10 @@ func (s *Server) HandleResponseHeaders( } } + if h.ResponseHeaders.EndOfStream { + reqCtx.StreamingCompleted = true + loggerVerbose.Info("Header indicates streaming complete") + } resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseHeaders{ ResponseHeaders: &extProcPb.HeadersResponse{ @@ -132,22 +138,65 @@ func (s *Server) HandleResponseBody( ) (*extProcPb.ProcessingResponse, error) { logger := log.FromContext(ctx) loggerVerbose := logger.V(logutil.VERBOSE) - loggerVerbose.Info("Processing HandleResponseBody") body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody) - res := Response{} - if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil { - return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)} + if reqCtx.Streaming { + logger.V(logutil.DEBUG).Info("Processing HandleResponseBody") + + responseText := string(body.ResponseBody.Body) + // Example message if "stream_options": {"include_usage": "true"} is included in the request: + // data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[], + // "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}} + // + // data: [DONE] + // Noticed that vLLM returns two entries in one response. + // We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data. + // + // If include_usage is not included in the request, `data: [DONE]` is returned separately, which + // indicates end of streaming. + if strings.Contains(responseText, "data: [DONE]") { + response := Response{} + + if reqCtx.StreamingIncludeUsage { + + re := regexp.MustCompile(`\{.*(?:\{.*\}|[^\{]*)\}`) // match for JSON object + match := re.FindString(responseText) + if match == "" { + return nil, errutil.Error{Code: errutil.ModelServerError, Msg: fmt.Sprintf("model server returned invalid response: %v", responseText)} + } + byteSlice := []byte(match) + if err := json.Unmarshal(byteSlice, &response); err != nil { + return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)} + } + } else { + // ResponseBody.EndOfStream is only set if include_usage is set to true. + reqCtx.ResponseComplete = true + loggerVerbose.Info("Streaming is completed") + } + + reqCtx.Response = response + } + + if body.ResponseBody.EndOfStream { + loggerVerbose.Info("Streaming is completed") + reqCtx.ResponseComplete = true + } else { + reqCtx.ResponseSize += len(body.ResponseBody.Body) + } + + } else { + loggerVerbose.Info("Processing HandleResponseBody") + + res := Response{} + if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil { + return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)} + } + reqCtx.Response = res + reqCtx.ResponseSize = len(body.ResponseBody.Body) + reqCtx.ResponseComplete = true + + loggerVerbose.Info("Response generated", "response", res) } - reqCtx.Response = res - reqCtx.ResponseSize = len(body.ResponseBody.Body) - // ResponseComplete is to indicate the response is complete. In non-streaming - // case, it will be set to be true once the response is processed; in - // streaming case, it will be set to be true once the last chunk is processed. - // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) - // will add the processing for streaming case. - reqCtx.ResponseComplete = true - loggerVerbose.Info("Response generated", "response", res) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseBody{ diff --git a/pkg/epp/handlers/response_test.go b/pkg/epp/handlers/response_test.go index 01f02d09..9697ec06 100644 --- a/pkg/epp/handlers/response_test.go +++ b/pkg/epp/handlers/response_test.go @@ -49,6 +49,16 @@ const ( } } ` + + streamingBodyWithoutUsage = ` +data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":null} + ` + + streamingBodyWithUsage = ` +data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}} + +data: [DONE] + ` ) func TestHandleResponseBody(t *testing.T) { @@ -57,6 +67,7 @@ func TestHandleResponseBody(t *testing.T) { tests := []struct { name string req *extProcPb.ProcessingRequest_ResponseBody + reqCtx *RequestContext want Response wantErr bool }{ @@ -84,12 +95,49 @@ func TestHandleResponseBody(t *testing.T) { }, wantErr: true, }, + { + name: "streaming request without usage", + req: &extProcPb.ProcessingRequest_ResponseBody{ + ResponseBody: &extProcPb.HttpBody{ + Body: []byte(streamingBodyWithoutUsage), + }, + }, + reqCtx: &RequestContext{ + Streaming: true, + StreamingIncludeUsage: false, + }, + wantErr: false, + // In the middle of streaming response, so request context response is not set yet. + }, + { + name: "streaming request with usage", + req: &extProcPb.ProcessingRequest_ResponseBody{ + ResponseBody: &extProcPb.HttpBody{ + Body: []byte(streamingBodyWithUsage), + }, + }, + reqCtx: &RequestContext{ + Streaming: true, + StreamingIncludeUsage: true, + }, + wantErr: false, + want: Response{ + Usage: Usage{ + PromptTokens: 7, + TotalTokens: 17, + CompletionTokens: 10, + }, + }, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { server := &Server{} - reqCtx := &RequestContext{} + reqCtx := test.reqCtx + if reqCtx == nil { + reqCtx = &RequestContext{} + } _, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req}) if err != nil { if !test.wantErr { diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 2c61118c..90616b48 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -121,7 +121,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens) } - loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx) + if reqCtx.Streaming { + logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx) + } else { + loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx) + } default: logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v) return status.Error(codes.Unknown, "unknown request type") @@ -179,7 +183,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { } } - loggerVerbose.Info("Response generated", "response", resp) + if !reqCtx.Streaming { + loggerVerbose.Info("Response generated", "response", resp) + } else { + logger.V(logutil.DEBUG).Info("Response generated", "response", resp) + } if err := srv.Send(resp); err != nil { logger.V(logutil.DEFAULT).Error(err, "Send failed") return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) @@ -200,4 +208,7 @@ type RequestContext struct { ResponseSize int ResponseComplete bool ResponseStatusCode string + Streaming bool + StreamingCompleted bool + StreamingIncludeUsage bool } diff --git a/pkg/epp/metrics/README.md b/pkg/epp/metrics/README.md index 1f68a0bd..df24820f 100644 --- a/pkg/epp/metrics/README.md +++ b/pkg/epp/metrics/README.md @@ -8,13 +8,29 @@ This documentation is the current state of exposed metrics. ## Requirements -Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode. +For non-streaming request, enable `Buffered` for response in `EnvoyExtensionPolicy`: -Currently there are two options: -- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics. - -- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported. +``` +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: EnvoyExtensionPolicy +metadata: + name: ext-proc-policy + namespace: default +spec: + extProc: + - backendRefs: + - group: "" + kind: Service + name: inference-gateway-ext-proc + port: 9002 + processingMode: + request: + body: Buffered + response: + body: Buffered +``` +For streaming request, enable `Streamed` for response in `EnvoyExtensionPolicy`: ``` apiVersion: gateway.envoyproxy.io/v1alpha1 @@ -33,7 +49,20 @@ spec: request: body: Buffered response: - body: Buffered + body: Streamed +``` + +If you want to include usage metrics for vLLM model server, send the request with `include_usage`: + +``` +curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{ +"model": "tweet-summary", +"prompt": "whats your fav movie?", +"max_tokens": 10, +"temperature": 0, +"stream": true, +"stream_options": {"include_usage": "true"} +}' ``` ## Exposed metrics