From 1d66114df4ed1a189d7bb71c39fd23180524545b Mon Sep 17 00:00:00 2001 From: jeffluoo Date: Thu, 13 Feb 2025 02:48:52 +0000 Subject: [PATCH] WIP: [Metrics] Add streaming support for metrics --- pkg/ext-proc/handlers/request.go | 9 +++ pkg/ext-proc/handlers/response.go | 67 +++++++++++++++++---- pkg/ext-proc/handlers/server.go | 12 +++- pkg/manifests/gateway/extension_policy.yaml | 1 + 4 files changed, 74 insertions(+), 15 deletions(-) diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index 17278025..9d280dac 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -38,6 +38,15 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces klog.V(logutil.VERBOSE).Infof("Model requested: %v", model) modelName := model + // Resolve streaming options + + streaming, ok := rb["stream"].(bool) + if !ok { + // streaming not set, no-op + } else { + reqCtx.Streaming = streaming + } + // NOTE: The nil checking for the modelObject means that we DO allow passthrough currently. // This might be a security risk in the future where adapters not registered in the InferenceModel // are able to be requested by using their distinct name. diff --git a/pkg/ext-proc/handlers/response.go b/pkg/ext-proc/handlers/response.go index 34a7219a..2f745190 100644 --- a/pkg/ext-proc/handlers/response.go +++ b/pkg/ext-proc/handlers/response.go @@ -3,6 +3,8 @@ package handlers import ( "encoding/json" "fmt" + "regexp" + "strings" configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -16,6 +18,10 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders) klog.V(logutil.VERBOSE).Infof("Headers before: %+v\n", h) + if h.ResponseHeaders.EndOfStream { + reqCtx.StreamingCompleted = true + klog.V(logutil.VERBOSE).Info("Response is completed") + } resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseHeaders{ ResponseHeaders: &extProcPb.HeadersResponse{ @@ -66,22 +72,57 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr } }*/ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(logutil.VERBOSE).Info("Processing HandleResponseBody") body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody) - res := Response{} - if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil { - return nil, fmt.Errorf("unmarshaling response body: %v", err) + if reqCtx.Streaming { + responseText := string(reqCtx.prevResponse) + if strings.Contains(responseText, "[DONE]") { + lastResponse := Response{} + + // Example message: + // data: {"id":"cmpl-d6392493-b56c-4d81-9f11-995a0dc93c5d","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}} + // + // data: [DONE] + // we need to strip the `data:` prefix and next Data: [DONE] message. + + msgInStr := string(reqCtx.prevResponse) + // msgInStr = msgInStr[6:] + re := regexp.MustCompile(`\{.*(?:\{.*\}|[^\{]*)\}`) // match for JSON object + match := re.FindString(msgInStr) + + byteSlice := []byte(match) + if err := json.Unmarshal(byteSlice, &lastResponse); err != nil { + return nil, fmt.Errorf("unmarshaling response body: %v", err) + } + klog.V(logutil.VERBOSE).Infof("[DONE] previous response is: %+v", lastResponse) + + reqCtx.Response = lastResponse + } + + // This should be placed before checking [DONE] message because [DONE] message is produced + // after usage context. + reqCtx.prevResponse = body.ResponseBody.Body + + if reqCtx.StreamingCompleted || body.ResponseBody.EndOfStream { + klog.V(logutil.VERBOSE).Info("Streaming is completed") + reqCtx.ResponseComplete = true + } else { + reqCtx.ResponseSize += len(body.ResponseBody.Body) + } + + } else { + klog.V(logutil.VERBOSE).Info("Processing HandleResponseBody") + + res := Response{} + if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil { + return nil, fmt.Errorf("unmarshaling response body: %v", err) + } + reqCtx.Response = res + reqCtx.ResponseSize = len(body.ResponseBody.Body) + reqCtx.ResponseComplete = true + + klog.V(logutil.VERBOSE).Infof("Response: %+v", res) } - reqCtx.Response = res - reqCtx.ResponseSize = len(body.ResponseBody.Body) - // ResponseComplete is to indicate the response is complete. In non-streaming - // case, it will be set to be true once the response is processed; in - // streaming case, it will be set to be true once the last chunk is processed. - // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) - // will add the processing for streaming case. - reqCtx.ResponseComplete = true - klog.V(logutil.VERBOSE).Infof("Response: %+v", res) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseBody{ diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go index f27c9a15..ec2d0481 100644 --- a/pkg/ext-proc/handlers/server.go +++ b/pkg/ext-proc/handlers/server.go @@ -95,12 +95,13 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { resp, err = s.HandleResponseBody(reqCtx, req) if err == nil && reqCtx.ResponseComplete { reqCtx.ResponseCompleteTimestamp = time.Now() + metrics.RecordRequestLatencies(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp) metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize) metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens) } - klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseBody: %+v", reqCtx) + klog.V(logutil.DEBUG).Infof("Request context after HandleResponseBody: %+v", reqCtx) default: klog.Errorf("Unknown Request type %+v", v) return status.Error(codes.Unknown, "unknown request type") @@ -125,7 +126,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { } } - klog.V(logutil.VERBOSE).Infof("response: %v", resp) + if !reqCtx.Streaming { + klog.V(logutil.VERBOSE).Infof("response: %v", resp) + } else { + klog.V(logutil.DEBUG).Infof("response: %v", resp) + } if err := srv.Send(resp); err != nil { klog.Errorf("send error %v", err) return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) @@ -144,4 +149,7 @@ type RequestContext struct { Response Response ResponseSize int ResponseComplete bool + Streaming bool + StreamingCompleted bool + prevResponse []byte // in streaming mode, we need to track the previous response in order to parse it before DONE message } diff --git a/pkg/manifests/gateway/extension_policy.yaml b/pkg/manifests/gateway/extension_policy.yaml index a8105d6d..698686f0 100644 --- a/pkg/manifests/gateway/extension_policy.yaml +++ b/pkg/manifests/gateway/extension_policy.yaml @@ -14,6 +14,7 @@ spec: request: body: Buffered response: + body: Streamed # The timeouts are likely not needed here. We can experiment with removing/tuning them slowly. # The connection limits are more important and will cause the opaque: ext_proc_gRPC_error_14 error in Envoy GW if not configured correctly. messageTimeout: 1000s