Skip to content

Commit

Permalink
[Metrics] Add request error metrics (#269)
Browse files Browse the repository at this point in the history
This change defines some general errors, the list might grow in the future if more finer error types are needed.
  • Loading branch information
JeffLuoo authored Feb 19, 2025
1 parent 9f34673 commit 6130ee0
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 25 deletions.
14 changes: 7 additions & 7 deletions pkg/ext-proc/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package handlers
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"

Expand All @@ -29,6 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

Expand All @@ -49,14 +49,14 @@ func (s *Server) HandleRequestBody(
var rb map[string]interface{}
if err := json.Unmarshal(v.RequestBody.Body, &rb); err != nil {
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body")
return nil, fmt.Errorf("error unmarshaling request body: %v", err)
return nil, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Sprintf("error unmarshaling request body: %v", err)}
}
loggerVerbose.Info("Request body unmarshalled", "body", rb)

// Resolve target models.
model, ok := rb["model"].(string)
if !ok {
return nil, errors.New("model not found in request")
return nil, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"}
}
loggerVerbose.Info("Model requested", "model", model)
modelName := model
Expand All @@ -66,12 +66,12 @@ func (s *Server) HandleRequestBody(
// are able to be requested by using their distinct name.
modelObj, exist := s.datastore.ModelGet(model)
if !exist {
return nil, fmt.Errorf("error finding a model object in InferenceModel for input %v", model)
return nil, errutil.Error{Code: errutil.BadConfiguration, Msg: fmt.Sprintf("error finding a model object in InferenceModel for input %v", model)}
}
if len(modelObj.Spec.TargetModels) > 0 {
modelName = datastore.RandomWeightedDraw(logger, modelObj, 0)
if modelName == "" {
return nil, fmt.Errorf("error getting target model name for model %v", modelObj.Name)
return nil, errutil.Error{Code: errutil.BadConfiguration, Msg: fmt.Sprintf("error getting target model name for model %v", modelObj.Name)}
}
}
llmReq := &scheduling.LLMRequest{
Expand All @@ -89,14 +89,14 @@ func (s *Server) HandleRequestBody(
requestBody, err = json.Marshal(rb)
if err != nil {
logger.V(logutil.DEFAULT).Error(err, "Error marshaling request body")
return nil, fmt.Errorf("error marshaling request body: %v", err)
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
}
loggerVerbose.Info("Updated request body marshalled", "body", string(requestBody))
}

targetPod, err := s.scheduler.Schedule(ctx, llmReq)
if err != nil {
return nil, fmt.Errorf("failed to find target pod: %w", err)
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
}

logger.V(logutil.DEFAULT).Info("Request handled",
Expand Down
40 changes: 39 additions & 1 deletion pkg/ext-proc/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"sigs.k8s.io/controller-runtime/pkg/log"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

Expand All @@ -38,6 +39,43 @@ func (s *Server) HandleResponseHeaders(
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
loggerVerbose.Info("Headers before", "headers", h)

// Example header
// {
// "ResponseHeaders": {
// "headers": [
// {
// "key": ":status",
// "raw_value": "200"
// },
// {
// "key": "date",
// "raw_value": "Thu, 30 Jan 2025 18:50:48 GMT"
// },
// {
// "key": "server",
// "raw_value": "uvicorn"
// },
// {
// "key": "content-type",
// "raw_value": "text/event-stream; charset=utf-8"
// },
// {
// "key": "transfer-encoding",
// "raw_value": "chunked"
// }
// ]
// }
// }
for _, header := range h.ResponseHeaders.Headers.GetHeaders() {
if header.Key == "status" {
code := header.RawValue[0]
if string(code) != "200" {
reqCtx.ResponseStatusCode = errutil.ModelServerError
}
break
}
}

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
ResponseHeaders: &extProcPb.HeadersResponse{
Expand Down Expand Up @@ -99,7 +137,7 @@ func (s *Server) HandleResponseBody(

res := Response{}
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
return nil, fmt.Errorf("unmarshaling response body: %v", err)
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)}
}
reqCtx.Response = res
reqCtx.ResponseSize = len(body.ResponseBody.Body)
Expand Down
58 changes: 53 additions & 5 deletions pkg/ext-proc/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

Expand Down Expand Up @@ -65,18 +66,30 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
// See https://github.com/envoyproxy/envoy/issues/17540.
reqCtx := &RequestContext{}

// Create variable for error handling as each request should only report once for
// error metric. This doesn't cover the error "Cannot receive stream request" because
// such error might happen even the response is processed.
var err error
defer func(error) {
if reqCtx.ResponseStatusCode != "" {
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseStatusCode)
} else if err != nil {
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, errutil.CanonicalCode(err))
}
}(err)

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

req, err := srv.Recv()
if err == io.EOF || errors.Is(err, context.Canceled) {
req, recvErr := srv.Recv()
if recvErr == io.EOF || errors.Is(recvErr, context.Canceled) {
return nil
}
if err != nil {
if recvErr != nil {
// This error occurs very frequently, though it doesn't seem to have any impact.
// TODO Figure out if we can remove this noise.
loggerVerbose.Error(err, "Cannot receive stream request")
Expand Down Expand Up @@ -113,12 +126,13 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v)
return status.Error(codes.Unknown, "unknown request type")
}

if err != nil {
logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req)
switch status.Code(err) {
switch errutil.CanonicalCode(err) {
// This code can be returned by scheduler when there is no capacity for sheddable
// requests.
case codes.ResourceExhausted:
case errutil.InferencePoolResourceExhausted:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Expand All @@ -128,6 +142,38 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
},
},
}
// This code can be returned by when EPP processes the request and run into server-side errors.
case errutil.Internal:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_InternalServerError,
},
},
},
}
// This code can be returned when users provide invalid json request.
case errutil.BadRequest:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_BadRequest,
},
},
},
}
case errutil.BadConfiguration:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_NotFound,
},
},
},
}
default:
return status.Errorf(status.Code(err), "failed to handle request: %v", err)
}
Expand All @@ -139,6 +185,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
}
}

}

// RequestContext stores context information during the life time of an HTTP request.
Expand All @@ -153,4 +200,5 @@ type RequestContext struct {
Response Response
ResponseSize int
ResponseComplete bool
ResponseStatusCode string
}
1 change: 1 addition & 0 deletions pkg/ext-proc/metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ spec:
| Metric name | Metric Type | Description | Labels | Status |
| ------------|--------------| ----------- | ------ | ------ |
| inference_model_request_total | Counter | The counter of requests broken out for each model. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_model_request_error_total | Counter | The counter of requests errors broken out for each model. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_model_request_duration_seconds | Distribution | Distribution of response latency. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_model_request_sizes | Distribution | Distribution of request size in bytes. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
Expand Down
18 changes: 18 additions & 0 deletions pkg/ext-proc/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ var (
[]string{"model_name", "target_model_name"},
)

requestErrCounter = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Subsystem: InferenceModelComponent,
Name: "request_error_total",
Help: "Counter of inference model requests errors broken out for each model and target model.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"model_name", "target_model_name", "error_code"},
)

requestLatencies = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Subsystem: InferenceModelComponent,
Expand Down Expand Up @@ -139,6 +149,7 @@ var registerMetrics sync.Once
func Register() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(requestCounter)
legacyregistry.MustRegister(requestErrCounter)
legacyregistry.MustRegister(requestLatencies)
legacyregistry.MustRegister(requestSizes)
legacyregistry.MustRegister(responseSizes)
Expand All @@ -155,6 +166,13 @@ func RecordRequestCounter(modelName, targetModelName string) {
requestCounter.WithLabelValues(modelName, targetModelName).Inc()
}

// RecordRequestErrCounter records the number of error requests.
func RecordRequestErrCounter(modelName, targetModelName string, code string) {
if code != "" {
requestErrCounter.WithLabelValues(modelName, targetModelName, code).Inc()
}
}

// RecordRequestSizes records the request sizes.
func RecordRequestSizes(modelName, targetModelName string, reqSize int) {
requestSizes.WithLabelValues(modelName, targetModelName).Observe(float64(reqSize))
Expand Down
77 changes: 69 additions & 8 deletions pkg/ext-proc/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@ import (

"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
)

const (
RequestTotalMetric = InferenceModelComponent + "_request_total"
RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds"
RequestSizesMetric = InferenceModelComponent + "_request_sizes"
ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
InputTokensMetric = InferenceModelComponent + "_input_tokens"
OutputTokensMetric = InferenceModelComponent + "_output_tokens"
KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
RequestTotalMetric = InferenceModelComponent + "_request_total"
RequestErrorTotalMetric = InferenceModelComponent + "_request_error_total"
RequestLatenciesMetric = InferenceModelComponent + "_request_duration_seconds"
RequestSizesMetric = InferenceModelComponent + "_request_sizes"
ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
InputTokensMetric = InferenceModelComponent + "_input_tokens"
OutputTokensMetric = InferenceModelComponent + "_output_tokens"
KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
)

func TestRecordRequestCounterandSizes(t *testing.T) {
Expand Down Expand Up @@ -107,6 +109,65 @@ func TestRecordRequestCounterandSizes(t *testing.T) {
}
}

func TestRecordRequestErrorCounter(t *testing.T) {
type requests struct {
modelName string
targetModelName string
error string
}
scenarios := []struct {
name string
reqs []requests
invalid bool
}{{
name: "multiple requests",
reqs: []requests{
{
modelName: "m10",
targetModelName: "t10",
error: errutil.Internal,
},
{
modelName: "m10",
targetModelName: "t10",
error: errutil.Internal,
},
{
modelName: "m10",
targetModelName: "t11",
error: errutil.ModelServerError,
},
{
modelName: "m20",
targetModelName: "t20",
error: errutil.InferencePoolResourceExhausted,
},
},
},
}
Register()
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
for _, req := range scenario.reqs {
RecordRequestErrCounter(req.modelName, req.targetModelName, req.error)
}

wantRequestErrorCounter, err := os.Open("testdata/request_error_total_metric")
defer func() {
if err := wantRequestErrorCounter.Close(); err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestErrorCounter, RequestErrorTotalMetric); err != nil {
t.Error(err)
}
})
}
}

func TestRecordRequestLatencies(t *testing.T) {
ctx := logutil.NewTestLoggerIntoContext(context.Background())
timeBaseline := time.Now()
Expand Down
5 changes: 5 additions & 0 deletions pkg/ext-proc/metrics/testdata/request_error_total_metric
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# HELP inference_model_request_error_total [ALPHA] Counter of inference model requests errors broken out for each model and target model.
# TYPE inference_model_request_error_total counter
inference_model_request_error_total{error_code="Internal", model_name="m10",target_model_name="t10"} 2
inference_model_request_error_total{error_code="ModelServerError", model_name="m10",target_model_name="t11"} 1
inference_model_request_error_total{error_code="InferencePoolResourceExhausted", model_name="m20",target_model_name="t20"} 1
Loading

0 comments on commit 6130ee0

Please sign in to comment.