Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 194 additions & 1 deletion balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpcsync"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
rlstest "google.golang.org/grpc/internal/testutils/rls"
Expand Down Expand Up @@ -975,9 +977,43 @@ func (s) TestDataCachePurging(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, true)
}

// wrappingConnectivityStateSubscriber wraps a grpcsync.Subscriber and forwards
// connectivity state updates to both the delegate and a channel for testing.
type wrappingConnectivityStateSubscriber struct {
delegate grpcsync.Subscriber
connStateCh *buffer.Unbounded
}

func (w *wrappingConnectivityStateSubscriber) OnMessage(msg any) {
w.delegate.OnMessage(msg)
w.connStateCh.Put(msg.(connectivity.State))
}

// waitForConnectivityState waits for the specified connectivity state to appear
// on the channel. It skips intermediate states and fails the test if the
// context expires before the desired state is reached.
func waitForConnectivityState(ctx context.Context, t *testing.T, ch *buffer.Unbounded, want connectivity.State) {
t.Helper()
for {
select {
case gotState := <-ch.Get():
ch.Load()
if gotState.(connectivity.State) == want {
return
}
case <-ctx.Done():
t.Fatalf("Timeout waiting for RLS control channel to become %q", want)
}
}
}

// TestControlChannelConnectivityStateMonitoring tests the scenario where the
// control channel goes down and comes back up again and verifies that backoff
// state is reset for cache entries in this scenario.
// state is reset for cache entries in this scenario. It also verifies that:
// - Backoff is NOT reset when the control channel first becomes READY (i.e.,
// the initial CONNECTING → READY transition should not trigger a backoff reset)
// - Backoff is NOT reset for READY → IDLE → READY transitions (benign state changes)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not seem to be testing point 2 here i.e.
- Backoff is NOT reset for READY → IDLE → READY transitions (benign state changes)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this case is covered by TestControlChannelIdleTransitionNoBackoffReset, removed the confusing point in the comment

// - Backoff IS reset for READY → TRANSIENT_FAILURE → READY transitions
func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
Expand All @@ -1004,6 +1040,16 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
defer func() { defaultBackoffStrategy = origBackoffStrategy }()

// Override the connectivity state subscriber to wrap the original and
// make connectivity state changes visible to the test.
wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: buffer.NewUnbounded()}
origConnectivityStateSubscriber := newConnectivityStateSubscriber
newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber {
wrappedSubscriber.delegate = delegate
return wrappedSubscriber
}
defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }()

// Register an LB policy to act as the child policy for RLS LB policy.
childPolicyName := "test-child-policy" + t.Name()
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
Expand Down Expand Up @@ -1038,6 +1084,30 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh, true)

// Verify that the control channel moves to READY.
wantStates := []connectivity.State{connectivity.Connecting, connectivity.Ready}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not using waitForConnectivityState here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was meant to wait for a sequence of Connecting followed by Ready state, replaced with two subsequent calls of waitForConnectivityState

for _, wantState := range wantStates {
select {
case gotState := <-wrappedSubscriber.connStateCh.Get():
wrappedSubscriber.connStateCh.Load()
if gotState.(connectivity.State) != wantState {
t.Fatalf("Unexpected connectivity state: got %v, want %v", gotState, wantState)
}
case <-ctx.Done():
t.Fatalf("Timeout waiting for RLS control channel to become %q", wantState)
}
}

// Verify that the initial READY state of the control channel did NOT trigger
// a backoff reset. The resetBackoffHook should only be called when
// transitioning from TRANSIENT_FAILURE to READY, not for the initial
// CONNECTING → READY transition.
select {
case <-resetBackoffDone:
t.Fatal("Backoff reset was triggered for initial READY state, want no reset")
case <-time.After(10 * time.Millisecond):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace this with the already defined const defaultTestShortTimeout here and below?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// Stop the RLS server.
lis.Stop()

Expand All @@ -1049,6 +1119,11 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
// of the test.
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)

// Wait for the control channel to move to TRANSIENT_FAILURE. When the server
// is stopped, we expect the control channel to go through Connecting and
// eventually reach TransientFailure.
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.TransientFailure)

// Restart the RLS server.
lis.Restart()

Expand All @@ -1066,6 +1141,10 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh)

// Wait for the control channel to move back to READY.
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Ready)

// Verify that backoff was reset when transitioning from TRANSIENT_FAILURE to READY.
select {
case <-ctx.Done():
t.Fatalf("Timed out waiting for resetBackoffDone")
Expand All @@ -1081,6 +1160,120 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, true)
}

// TestControlChannelIdleTransitionNoBackoffReset tests that READY → IDLE → READY
// transitions do not trigger backoff resets. This is a benign state change that
// should not affect cache entry backoff state.
func (s) TestControlChannelIdleTransitionNoBackoffReset(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

// Start an RLS server with the restartable listener and set the throttler to
// never throttle requests.
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Override the reset backoff hook to get notified.
resetBackoffCalled := make(chan struct{}, 1)
origResetBackoffHook := resetBackoffHook
resetBackoffHook = func() { resetBackoffCalled <- struct{}{} }
defer func() { resetBackoffHook = origResetBackoffHook }()

// Override the backoff strategy to return a large backoff which
// will make sure the data cache entry remains in backoff for the
// duration of the test.
origBackoffStrategy := defaultBackoffStrategy
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
defer func() { defaultBackoffStrategy = origBackoffStrategy }()

// Override the connectivity state subscriber to wrap the original and
// make connectivity state changes visible to the test.
wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: buffer.NewUnbounded()}
origConnectivityStateSubscriber := newConnectivityStateSubscriber
newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber {
wrappedSubscriber.delegate = delegate
return wrappedSubscriber
}
defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }()

// Register an LB policy to act as the child policy for RLS LB policy.
childPolicyName := "test-child-policy" + t.Name()
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
t.Logf("Registered child policy with name %q", childPolicyName)

// Build RLS service config with header matchers, and a very low value for
// maxAge to ensure that cache entries become invalid very soon.
rlsConfig := buildBasicRLSConfig(childPolicyName, rlsServer.Address)
rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)

// Start a test backend, and set up the fake RLS server to return this as a
// target in the RLS response.
backendCh, backendAddress := startBackend(t)
rlsServer.SetResponseCallback(func(_ context.Context, _ *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}}
})

// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to create gRPC client: %v", err)
}
defer cc.Close()

// Make an RPC and ensure it gets routed to the test backend.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, backendCh)

// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh, true)

// Wait for the control channel to move to READY.
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Ready)

// Verify that the initial READY state did NOT trigger a backoff reset.
select {
case <-resetBackoffCalled:
t.Fatal("Backoff reset was triggered for initial READY state, want no reset")
case <-time.After(10 * time.Millisecond):
}

// Stop the RLS server to force the control channel to go IDLE. Stop()
// closes all existing connections on the listener. Since there are no active
// RPCs on the control channel, the subchannel transitions to IDLE instead of
// TRANSIENT_FAILURE.
lis.Stop()

// Wait for the control channel to move to IDLE.
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Idle)

// Restart the RLS server.
lis.Restart()

// Make another RPC to trigger reconnection. Use different headers to create
// a new cache entry.
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestShortTimeout}
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh)

// Wait for the control channel to move back to READY.
waitForConnectivityState(ctx, t, wrappedSubscriber.connStateCh, connectivity.Ready)

// Verify that the READY → IDLE → READY transition did NOT trigger a backoff reset.
// This is the key assertion of this test.
select {
case <-resetBackoffCalled:
t.Fatal("Backoff reset was triggered for READY → IDLE → READY transition, want no reset")
case <-time.After(10 * time.Millisecond):
// Good - no backoff reset was triggered for this benign transition.
}
}

// testCCWrapper wraps a balancer.ClientConn and overrides UpdateState and
// stores all state updates pushed by the RLS LB policy.
type testCCWrapper struct {
Expand Down
Loading
Loading