Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats/opentelemetry: add trace event for name resolution delay #8074

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d773e44
Added add trace event for name resolution delay
vinothkumarr227 Feb 10, 2025
126b42f
small tweaks
vinothkumarr227 Feb 10, 2025
dad6621
Removed unused parameter
vinothkumarr227 Feb 10, 2025
37e1803
Removed unused parameter
vinothkumarr227 Feb 10, 2025
9ae5cc8
Fixed the duplicate import issues
vinothkumarr227 Feb 11, 2025
f9b00e9
refactor: simplify tests, fix SubConn deprecation, and add docs
vinothkumarr227 Feb 11, 2025
3ce7799
refactor: add function documentation and update variable names
vinothkumarr227 Feb 12, 2025
76dc143
Add test for 'Name resolution completed with delay' event check
vinothkumarr227 Feb 14, 2025
82f3ede
small tweaks
vinothkumarr227 Feb 14, 2025
2ec49a9
small tweaks
vinothkumarr227 Feb 14, 2025
3af52f4
Fixed the test cases
vinothkumarr227 Feb 14, 2025
c0affc5
Added add trace event for name resolution delay
vinothkumarr227 Feb 14, 2025
323b663
small tweaks
vinothkumarr227 Feb 14, 2025
b76ed24
small tweaks
vinothkumarr227 Feb 14, 2025
bc99c93
small tweaks
vinothkumarr227 Feb 14, 2025
e86b142
Fixed the name resolutions delay event test issues
vinothkumarr227 Feb 15, 2025
27914a2
small tweaks
vinothkumarr227 Feb 15, 2025
63bb794
Refactor the integration of nameResolutionDelay in RPCInfo and fix a …
vinothkumarr227 Feb 19, 2025
a4b6281
small tweaks
vinothkumarr227 Feb 19, 2025
89a2bfa
Fixed the test cases
vinothkumarr227 Feb 21, 2025
797db0a
small tweaks
vinothkumarr227 Feb 21, 2025
099d604
cleanup code
vinothkumarr227 Feb 24, 2025
7493d5f
Fixed the review changes
vinothkumarr227 Feb 25, 2025
50a187f
small tweaks
vinothkumarr227 Feb 25, 2025
67bd066
Fixed the review changes
vinothkumarr227 Feb 25, 2025
0a7df7e
small tweaks
vinothkumarr227 Feb 25, 2025
442ff47
small tweaks
vinothkumarr227 Feb 27, 2025
783b826
Fixed the review changes
vinothkumarr227 Feb 27, 2025
cd6e768
small tweaks
vinothkumarr227 Feb 27, 2025
ec567ff
updated logs and fixed formatting
vinothkumarr227 Feb 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,22 +675,26 @@
cc.mu.Unlock()
}

// waitForResolvedAddrs blocks until the resolver has provided addresses or the
// context expires. Returns nil unless the context expires first; otherwise
// returns a status error based on the context.
func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
// waitForResolvedAddrs blocks until the resolver provides addresses or the
// context expires, whichever happens first. Error is nil unless the context
// expires first; otherwise returns a status error based on the context. The
// returned boolean indicates whether it did block or not. If the resolution
// has already happened once before, it returns false without blocking.
// Otherwise, it wait for the resolution and return true if resolution
// succeeded or false along with error if resolution failed.

Check warning on line 684 in clientconn.go

View check run for this annotation

Codecov / codecov/patch

clientconn.go#L682-L684

Added lines #L682 - L684 were not covered by tests
func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) (bool, error) {
// This is on the RPC path, so we use a fast path to avoid the
// more-expensive "select" below after the resolver has returned once.
if cc.firstResolveEvent.HasFired() {
return nil
return false, nil
}
select {
case <-cc.firstResolveEvent.Done():
return nil
return true, nil
case <-ctx.Done():
return status.FromContextError(ctx.Err()).Err()
return false, status.FromContextError(ctx.Err()).Err()
case <-cc.ctx.Done():
return ErrClientConnClosing
return false, ErrClientConnClosing
}
}

Expand Down
3 changes: 3 additions & 0 deletions stats/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type RPCTagInfo struct {
// FailFast indicates if this RPC is failfast.
// This field is only valid on client side, it's always false on server side.
FailFast bool
// NameResolutionDelay indicates if there was a delay in the name resolution.
// This field is only valid on client side, it's always false on server side.
NameResolutionDelay bool
}

// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
Expand Down
1 change: 1 addition & 0 deletions stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
xdsLabels: labels.TelemetryLabels,
method: removeLeadingSlash(info.FullMethodName),
}
ai.nameResolutionDelayed = info.NameResolutionDelay
if h.options.isTracingEnabled() {
ctx, ai = h.traceTagRPC(ctx, ai)
}
Expand Down
3 changes: 3 additions & 0 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ type attemptInfo struct {
countSentMsg uint32
countRecvMsg uint32
previousRPCAttempts uint32
// NameResolutionDelay indicates if there was a delay in the name resolution.
// This field is only valid on client side, it's always false on server side.
nameResolutionDelayed bool
}

type clientMetrics struct {
Expand Down
3 changes: 3 additions & 0 deletions stats/opentelemetry/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
)
// increment previous rpc attempts applicable for next attempt
atomic.AddUint32(&ai.previousRPCAttempts, 1)
if ai.nameResolutionDelayed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong. As per https://github.com/grpc/proposal/blob/master/A72-open-telemetry-tracing.md#tracing-information, "Delayed name resolution" should be an event in the call span not attempt span.

This should be right place https://github.com/grpc/grpc-go/blob/master/stats/opentelemetry/client_tracing.go#L34. Before creating the attempt span, you need to retrieve the current call span using trace.SpanFromContext and add an event to that span. Before that, also check if that event already exist or not. And only add, if it exist.

span.AddEvent("Delayed name resolution complete")
}

Check warning on line 57 in stats/opentelemetry/trace.go

View check run for this annotation

Codecov / codecov/patch

stats/opentelemetry/trace.go#L56-L57

Added lines #L56 - L57 were not covered by tests
case *stats.PickerUpdated:
span.AddEvent("Delayed LB pick complete")
case *stats.InPayload:
Expand Down
39 changes: 22 additions & 17 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,15 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
if err := cc.waitForResolvedAddrs(ctx); err != nil {
nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
if err != nil {
return nil, err
}

var mc serviceconfig.MethodConfig
var onCommit func()
newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, nameResolutionDelayed, opts...)
}

rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
Expand Down Expand Up @@ -257,7 +258,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
return newStream(ctx, func() {})
}

func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) {
callInfo := defaultCallInfo()
if mc.WaitForReady != nil {
callInfo.failFast = !*mc.WaitForReady
Expand Down Expand Up @@ -321,19 +322,20 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}

cs := &clientStream{
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: callInfo,
cc: cc,
desc: desc,
codec: callInfo.codec,
compressorV0: compressorV0,
compressorV1: compressorV1,
cancel: cancel,
firstAttempt: true,
onCommit: onCommit,
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: callInfo,
cc: cc,
desc: desc,
codec: callInfo.codec,
compressorV0: compressorV0,
compressorV1: compressorV1,
cancel: cancel,
firstAttempt: true,
onCommit: onCommit,
NameResolutionDelay: nameResolutionDelayed,
}
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
Expand Down Expand Up @@ -417,7 +419,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
var beginTime time.Time
shs := cs.cc.dopts.copts.StatsHandlers
for _, sh := range shs {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast, NameResolutionDelay: cs.NameResolutionDelay})
beginTime = time.Now()
begin := &stats.Begin{
Client: true,
Expand Down Expand Up @@ -573,6 +575,9 @@ type clientStream struct {
onCommit func()
replayBuffer []replayOp // operations to replay on retry
replayBufferSize int // current size of replayBuffer
// NameResolutionDelay indicates if there was a delay in the name resolution.
// This field is only valid on client side, it's always false on server side.
NameResolutionDelay bool
}

type replayOp struct {
Expand Down
132 changes: 132 additions & 0 deletions test/clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/stubserver"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -83,3 +86,132 @@ func (s) TestClientConnClose_WithPendingRPC(t *testing.T) {
t.Fatal(err)
}
}

type testStatsHandler struct {
nameResolutionDelayed bool
}

// TagRPC is called when an RPC is initiated and allows adding metadata to the
// context. It checks if the RPC experienced a name resolution delay and
// updates the handler's state.
func (h *testStatsHandler) TagRPC(ctx context.Context, rpcInfo *stats.RPCTagInfo) context.Context {
h.nameResolutionDelayed = rpcInfo.NameResolutionDelay
return ctx
}

// This method is required to satisfy the stats.Handler interface.
func (h *testStatsHandler) HandleRPC(_ context.Context, _ stats.RPCStats) {}

// TagConn exists to satisfy stats.Handler.
func (h *testStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

// HandleConn exists to satisfy stats.Handler.
func (h *testStatsHandler) HandleConn(_ context.Context, _ stats.ConnStats) {}

// startStubServer initializes a stub gRPC server and returns its address and cleanup function.
func startStubServer(t *testing.T) (*stubserver.StubServer, func()) {
t.Helper()
stub := &stubserver.StubServer{
EmptyCallF: func(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
t.Log("EmptyCall received and processed")
return &testpb.Empty{}, nil
},
}
if err := stub.Start(nil); err != nil {
t.Fatalf("Failed to start StubServer: %v", err)
}
return stub, func() { stub.Stop() }
}

// createTestClient sets up a gRPC client connection with a manual resolver.
func createTestClient(t *testing.T, scheme string, statsHandler *testStatsHandler) (*grpc.ClientConn, *manual.Resolver) {
t.Helper()
rb := manual.NewBuilderWithScheme(scheme)
cc, err := grpc.NewClient(
scheme+":///test.server",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(rb),
grpc.WithStatsHandler(statsHandler),
)
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
return cc, rb
}

// TestClientConnRPC_WithoutNameResolutionDelay verify that if the resolution
// has already happened once before at the time of making RPC, the name
// resolution flag is not set indicating there was no delay in name resolution.
func (s) TestClientConnRPC_WithoutNameResolutionDelay(t *testing.T) {
stub, cleanup := startStubServer(t)
defer cleanup()

statsHandler := &testStatsHandler{}
rb := manual.NewBuilderWithScheme("instant")
rb.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: stub.Address}}})
cc, err := grpc.NewClient(rb.Scheme()+":///test.server",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(rb),
grpc.WithStatsHandler(statsHandler),
)
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
// Verify that the RPC succeeds.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("First RPC failed unexpectedly: %v", err)
}
// Verify that name resolution did not happen.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't mean "name resolution did not happen". Either it should be "name resolution did not happen again" or "Verify that RPC was not blocked on waiting for resolver to return addresses indicating no name resolution delay". I prefer the latter.

if statsHandler.nameResolutionDelayed {
t.Fatalf("Expected nameResolutionDelayed to be false, but got %v", statsHandler.nameResolutionDelayed)
}
}

// TestStatsHandlerDetectsResolutionDelay verifies that if this is the
// first time resolution is happening at the time of making RPC,
// nameResolutionDelayed flag was set indicating there was a delay in name
// resolution waiting for resolver to return addresses.
func (s) TestClientConnRPC_WithNameResolutionDelay(t *testing.T) {
stub, cleanup := startStubServer(t)
defer cleanup()

statsHandler := &testStatsHandler{}
cc, rb := createTestClient(t, "delayed", statsHandler)
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
rpcError := make(chan error, 1)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

go func() {
t.Log("RPC waiting for resolved addresses")
_, err := client.EmptyCall(ctx, &testpb.Empty{})
rpcError <- err
}()

timer := time.AfterFunc(100*time.Millisecond, func() {
rb.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: stub.Address}}})
t.Log("Resolver updated to return addresses.")
})
defer timer.Stop()

select {
case err := <-rpcError:
if err != nil {
t.Fatalf("RPC failed after resolution: %v", err)
}
if !statsHandler.nameResolutionDelayed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment about verifying that RPC was blocked on resolver to return addresses indicating name resolution delay.

t.Fatalf("Expected statsHandler.nameResolutionDelayed to be true, but got %v", statsHandler.nameResolutionDelayed)
}
case <-ctx.Done():
t.Fatal("Test setup timed out unexpectedly.")
}
}