Skip to content

Commit

Permalink
xdsclient: include xds node ID in errors from the WatchResource API (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Feb 15, 2025
1 parent 91eb6aa commit b524c08
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 10 deletions.
16 changes: 9 additions & 7 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,9 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
a.logger.Infof("New watch for type %q, resource name %q", rType.TypeName(), resourceName)
}

xdsChannel := a.xdsChannelToUse()
if xdsChannel == nil {
xdsChannel, err := a.xdsChannelToUse()
if err != nil {
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) })
return
}

Expand Down Expand Up @@ -739,22 +740,23 @@ func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string,
// Otherwise, it creates a new channel using the first server configuration in
// the list of configurations, and returns that.
//
// A non-nil error is returned if the channel creation fails.
//
// Only executed in the context of a serializer callback.
func (a *authority) xdsChannelToUse() *xdsChannelWithConfig {
func (a *authority) xdsChannelToUse() (*xdsChannelWithConfig, error) {
if a.activeXDSChannel != nil {
return a.activeXDSChannel
return a.activeXDSChannel, nil
}

sc := a.xdsChannelConfigs[0].serverConfig
xc, cleanup, err := a.getChannelForADS(sc, a)
if err != nil {
a.logger.Warningf("Failed to create xDS channel: %v", err)
return nil
return nil, err
}
a.xdsChannelConfigs[0].channel = xc
a.xdsChannelConfigs[0].cleanup = cleanup
a.activeXDSChannel = a.xdsChannelConfigs[0]
return a.activeXDSChannel
return a.activeXDSChannel, nil
}

// closeXDSChannels closes all the xDS channels associated with this authority,
Expand Down
20 changes: 17 additions & 3 deletions xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// wrappingWatcher is a wrapper around an xdsresource.ResourceWatcher that adds
// the node ID to the error messages reported to the watcher.
type wrappingWatcher struct {
xdsresource.ResourceWatcher
nodeID string
}

func (w *wrappingWatcher) OnError(err error, done xdsresource.OnDoneFunc) {
w.ResourceWatcher.OnError(fmt.Errorf("[xDS node id: %v]: %v", w.nodeID, err), done)
}

// WatchResource uses xDS to discover the resource associated with the provided
// resource name. The resource type implementation determines how xDS responses
// are are deserialized and validated, as received from the xDS management
Expand All @@ -43,6 +54,11 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
return func() {}
}

watcher = &wrappingWatcher{
ResourceWatcher: watcher,
nodeID: c.config.Node().GetId(),
}

if err := c.resourceTypes.maybeRegister(rType); err != nil {
logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName)
c.serializer.TrySchedule(func(context.Context) { watcher.OnError(err, func() {}) })
Expand All @@ -53,9 +69,7 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
a := c.getAuthorityForResource(n)
if a == nil {
logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority)
c.serializer.TrySchedule(func(context.Context) {
watcher.OnError(fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {})
})
watcher.OnError(fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {})
return func() {}
}
// The watchResource method on the authority is invoked with n.String()
Expand Down
135 changes: 135 additions & 0 deletions xds/internal/xdsclient/tests/misc_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/testutils/xds/fakeserver"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/xds/internal"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
xdsclientinternal "google.golang.org/grpc/xds/internal/xdsclient/internal"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -371,3 +374,135 @@ func readDiscoveryResponseAndCheckForNonEmptyNodeProto(ctx context.Context, reqC
}
return nil
}

type testRouteConfigResourceType struct{}

func (testRouteConfigResourceType) TypeURL() string { return version.V3RouteConfigURL }
func (testRouteConfigResourceType) TypeName() string { return "RouteConfigResource" }
func (testRouteConfigResourceType) AllResourcesRequiredInSotW() bool { return false }
func (testRouteConfigResourceType) Decode(*xdsresource.DecodeOptions, *anypb.Any) (*xdsresource.DecodeResult, error) {
return nil, nil
}

// Tests that the errors returned by the xDS client when watching a resource
// contain the node ID that was used to create the client. This test covers two
// scenarios:
//
// 1. When a watch is registered for an already registered resource type, but
// this time with a different implementation,
// 2. When a watch is registered for a resource name whose authority is not
// found in the bootstrap configuration.
func (s) TestWatchErrorsContainNodeID(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)

// Create an xDS client with the above bootstrap contents.
config, err := bootstrap.NewConfigFromContents(bc)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err)
}
pool := xdsclient.NewPool(config)
client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{
Name: t.Name(),
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

t.Run("Multiple_ResourceType_Implementations", func(t *testing.T) {
const routeConfigName = "route-config-name"
watcher := xdstestutils.NewTestResourceWatcher()
client.WatchResource(routeConfigResourceType, routeConfigName, watcher)

sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
select {
case <-sCtx.Done():
case <-watcher.UpdateCh:
t.Fatal("Unexpected resource update")
case <-watcher.ErrorCh:
t.Fatal("Unexpected resource error")
case <-watcher.ResourceDoesNotExistCh:
t.Fatal("Unexpected resource does not exist")
}

client.WatchResource(testRouteConfigResourceType{}, routeConfigName, watcher)
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for error callback to be invoked")
case err := <-watcher.ErrorCh:
if err == nil || !strings.Contains(err.Error(), nodeID) {
t.Fatalf("Unexpected error: %v, want error with node ID: %q", err, nodeID)
}
}
})

t.Run("Missing_Authority", func(t *testing.T) {
const routeConfigName = "xdstp://nonexistant-authority/envoy.config.route.v3.RouteConfiguration/route-config-name"
watcher := xdstestutils.NewTestResourceWatcher()
client.WatchResource(routeConfigResourceType, routeConfigName, watcher)

select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for error callback to be invoked")
case err := <-watcher.ErrorCh:
if err == nil || !strings.Contains(err.Error(), nodeID) {
t.Fatalf("Unexpected error: %v, want error with node ID: %q", err, nodeID)
}
}
})
}

// Tests that the errors returned by the xDS client when watching a resource
// contain the node ID when channel creation to the management server fails.
func (s) TestWatchErrorsContainNodeID_ChannelCreationFailure(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)

// Create an xDS client with the above bootstrap contents.
config, err := bootstrap.NewConfigFromContents(bc)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err)
}
pool := xdsclient.NewPool(config)
client, close, err := pool.NewClientForTesting(xdsclient.OptionsForTesting{
Name: t.Name(),
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Override the xDS channel dialer with one that always fails.
origDialer := xdsclientinternal.GRPCNewClient
xdsclientinternal.GRPCNewClient = func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return nil, fmt.Errorf("failed to create channel")
}
defer func() { xdsclientinternal.GRPCNewClient = origDialer }()

const routeConfigName = "route-config-name"
watcher := xdstestutils.NewTestResourceWatcher()
client.WatchResource(routeConfigResourceType, routeConfigName, watcher)

select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for error callback to be invoked")
case err := <-watcher.ErrorCh:
if err == nil || !strings.Contains(err.Error(), nodeID) {
t.Fatalf("Unexpected error: %v, want error with node ID: %q", err, nodeID)
}
}
}

0 comments on commit b524c08

Please sign in to comment.