Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metrics] Add streaming support for metrics #329

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions pkg/epp/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,35 @@ func (s *Server) HandleRequestBody(
loggerVerbose.Info("Model requested", "model", model)
modelName := model

// Resolve streaming options

streaming, ok := rb["stream"].(bool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about defining a struct instead of unmarshaling many times

{
    Stream bool
    Model string
   ...
}

if !ok {
// streaming not set, no-op
} else {
reqCtx.Streaming = streaming
}

type Usage struct {
IncludeUsage string `json:"include_usage,omitempty"`
}
if streamOption, ok := rb["stream_options"]; ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These logic should be under streaming= true

includeUsage := Usage{}
optionJson, err := json.Marshal(streamOption)
if err != nil {
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("marshalling stream_options: %v", err)}
}
if err := json.Unmarshal(optionJson, &includeUsage); err != nil {
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling stream_options: %v", err)}
}
usageEnabled, err := strconv.ParseBool(includeUsage.IncludeUsage)
if err != nil {
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("invalid include_usage: %v", includeUsage.IncludeUsage)}
}

reqCtx.StreamingIncludeUsage = usageEnabled
}

// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
// This might be a security risk in the future where adapters not registered in the InferenceModel
// are able to be requested by using their distinct name.
Expand Down
71 changes: 58 additions & 13 deletions pkg/epp/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"

configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
Expand Down Expand Up @@ -132,22 +134,65 @@ func (s *Server) HandleResponseBody(
) (*extProcPb.ProcessingResponse, error) {
logger := log.FromContext(ctx)
loggerVerbose := logger.V(logutil.VERBOSE)
loggerVerbose.Info("Processing HandleResponseBody")
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)

res := Response{}
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
if reqCtx.Streaming {
logger.V(logutil.DEBUG).Info("Processing HandleResponseBody")

responseText := string(body.ResponseBody.Body)
// Example message if "stream_options": {"include_usage": "true"} is included in the request:
// data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[],
// "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
//
// data: [DONE]
// Noticed that vLLM returns two entries in one response.
// We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data.
//
// If include_usage is not included in the request, `data: [DONE]` is returned separately, which
// indicates end of streaming.
if strings.Contains(responseText, "data: [DONE]") {
response := Response{}

if reqCtx.StreamingIncludeUsage {

re := regexp.MustCompile(`\{.*(?:\{.*\}|[^\{]*)\}`) // match for JSON object
match := re.FindString(responseText)
if match == "" {
return nil, errutil.Error{Code: errutil.ModelServerError, Msg: fmt.Sprintf("model server returned invalid response: %v", responseText)}
}
byteSlice := []byte(match)
if err := json.Unmarshal(byteSlice, &response); err != nil {
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
}
} else {
// ResponseBody.EndOfStream is only set if include_usage is set to true.
reqCtx.ResponseComplete = true
loggerVerbose.Info("Streaming is completed")
}

reqCtx.Response = response
}

if body.ResponseBody.EndOfStream {
loggerVerbose.Info("Streaming is completed")
reqCtx.ResponseComplete = true
} else {
reqCtx.ResponseSize += len(body.ResponseBody.Body)
}

} else {
loggerVerbose.Info("Processing HandleResponseBody")

res := Response{}
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
}
reqCtx.Response = res
reqCtx.ResponseSize = len(body.ResponseBody.Body)
reqCtx.ResponseComplete = true

loggerVerbose.Info("Response generated", "response", res)
}
reqCtx.Response = res
reqCtx.ResponseSize = len(body.ResponseBody.Body)
// ResponseComplete is to indicate the response is complete. In non-streaming
// case, it will be set to be true once the response is processed; in
// streaming case, it will be set to be true once the last chunk is processed.
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
// will add the processing for streaming case.
reqCtx.ResponseComplete = true
loggerVerbose.Info("Response generated", "response", res)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
Expand Down
50 changes: 49 additions & 1 deletion pkg/epp/handlers/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ const (
}
}
`

streamingBodyWithoutUsage = `
data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":null}
`

streamingBodyWithUsage = `
data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}

data: [DONE]
`
)

func TestHandleResponseBody(t *testing.T) {
Expand All @@ -57,6 +67,7 @@ func TestHandleResponseBody(t *testing.T) {
tests := []struct {
name string
req *extProcPb.ProcessingRequest_ResponseBody
reqCtx *RequestContext
want Response
wantErr bool
}{
Expand Down Expand Up @@ -84,12 +95,49 @@ func TestHandleResponseBody(t *testing.T) {
},
wantErr: true,
},
{
name: "streaming request without usage",
req: &extProcPb.ProcessingRequest_ResponseBody{
ResponseBody: &extProcPb.HttpBody{
Body: []byte(streamingBodyWithoutUsage),
},
},
reqCtx: &RequestContext{
Streaming: true,
StreamingIncludeUsage: false,
},
wantErr: false,
// In the middle of streaming response, so request context response is not set yet.
},
{
name: "streaming request with usage",
req: &extProcPb.ProcessingRequest_ResponseBody{
ResponseBody: &extProcPb.HttpBody{
Body: []byte(streamingBodyWithUsage),
},
},
reqCtx: &RequestContext{
Streaming: true,
StreamingIncludeUsage: true,
},
wantErr: false,
want: Response{
Usage: Usage{
PromptTokens: 7,
TotalTokens: 17,
CompletionTokens: 10,
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := &Server{}
reqCtx := &RequestContext{}
reqCtx := test.reqCtx
if reqCtx == nil {
reqCtx = &RequestContext{}
}
_, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req})
if err != nil {
if !test.wantErr {
Expand Down
14 changes: 12 additions & 2 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
}
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
if reqCtx.Streaming {
logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx)
} else {
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
}
default:
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
return status.Error(codes.Unknown, "unknown request type")
Expand Down Expand Up @@ -183,7 +187,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
}
}

loggerVerbose.Info("Response generated", "response", resp)
if !reqCtx.Streaming {
loggerVerbose.Info("Response generated", "response", resp)
} else {
logger.V(logutil.DEBUG).Info("Response generated", "response", resp)
}
if err := srv.Send(resp); err != nil {
logger.V(logutil.DEFAULT).Error(err, "Send failed")
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
Expand All @@ -204,4 +212,6 @@ type RequestContext struct {
ResponseSize int
ResponseComplete bool
ResponseStatusCode string
Streaming bool
StreamingIncludeUsage bool
}
41 changes: 35 additions & 6 deletions site-src/guides/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,29 @@ This guide describes the current state of exposed metrics and how to scrape them

## Requirements

Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode.
For non-streaming request, enable `Buffered` for response in `EnvoyExtensionPolicy`:

Currently there are two options:
- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics.

- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported.
```
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyExtensionPolicy
metadata:
name: ext-proc-policy
namespace: default
spec:
extProc:
- backendRefs:
- group: ""
kind: Service
name: inference-gateway-ext-proc
port: 9002
processingMode:
request:
body: Buffered
response:
body: Buffered
```

For streaming request, enable `Streamed` for response in `EnvoyExtensionPolicy`:

```
apiVersion: gateway.envoyproxy.io/v1alpha1
Expand All @@ -29,7 +45,20 @@ spec:
request:
body: Buffered
response:
body: Buffered
body: Streamed
```

If you want to include usage metrics for vLLM model server, send the request with `include_usage`:

```
curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
"model": "tweet-summary",
"prompt": "whats your fav movie?",
"max_tokens": 10,
"temperature": 0,
"stream": true,
"stream_options": {"include_usage": "true"}
}'
```

## Exposed metrics
Expand Down