-
Notifications
You must be signed in to change notification settings - Fork 4.7k
xds/googlec2p: enable DirectPath over Interconnect support for on-prem clients #9133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -136,24 +136,39 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts | |||||
| return nil, fmt.Errorf("google-c2p URI scheme does not support authorities") | ||||||
| } | ||||||
|
|
||||||
| if !runDirectPath() { | ||||||
| isGCE := onGCE() | ||||||
| _, forceXds := t.URL.Query()["force-xds"] | ||||||
| if !forceXds && !isGCE { | ||||||
| // If not xDS, fallback to DNS. | ||||||
| t.URL.Scheme = dnsName | ||||||
| return resolver.Get(dnsName).Build(t, cc, opts) | ||||||
| } | ||||||
|
|
||||||
| // Note that the following calls to getZone() and getIPv6Capable() does I/O, | ||||||
| // and has 10 seconds timeout each. | ||||||
| // | ||||||
| // This should be fine in most of the cases. In certain error cases, this | ||||||
| // could block Dial() for up to 10 seconds (each blocking call has its own | ||||||
| // goroutine). | ||||||
| zoneCh, ipv6CapableCh := make(chan string), make(chan bool) | ||||||
| go func() { zoneCh <- getZone(httpReqTimeout) }() | ||||||
| go func() { ipv6CapableCh <- getIPv6Capable(httpReqTimeout) }() | ||||||
| var zone string | ||||||
| var ipv6Capable bool | ||||||
| if isGCE { | ||||||
| // Note that the following calls to getZone() and getIPv6Capable() does I/O, | ||||||
| // and has 10 seconds timeout each. | ||||||
| // | ||||||
| // This should be fine in most of the cases. In certain error cases, this | ||||||
| // could block Dial() for up to 10 seconds (each blocking call has its own | ||||||
| // goroutine). | ||||||
| zoneCh, ipv6CapableCh := make(chan string), make(chan bool) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a best practice in Go to use buffered channels when a goroutine is expected to send a single value and then terminate. This prevents the goroutine from leaking if the receiver (the main thread in
Suggested change
|
||||||
| go func() { zoneCh <- getZone(httpReqTimeout) }() | ||||||
| go func() { ipv6CapableCh <- getIPv6Capable(httpReqTimeout) }() | ||||||
| zone, ipv6Capable = <-zoneCh, <-ipv6CapableCh | ||||||
| } else { | ||||||
| // When running off-GCP (for DirectPath over GCI), the GCE metadata | ||||||
| // server is not available. | ||||||
| // - Set zone to empty. | ||||||
| // - Set ipv6Capable to true because DirectPath over GCI supports | ||||||
| // IPv6-capable on-premise clients. | ||||||
| zone = "" | ||||||
| ipv6Capable = true | ||||||
| } | ||||||
|
|
||||||
| xdsServerURI := getXdsServerURI() | ||||||
| nodeCfg := newNodeConfig(<-zoneCh, <-ipv6CapableCh) | ||||||
| nodeCfg := newNodeConfig(zone, ipv6Capable, isGCE) | ||||||
| xdsServerCfg := newXdsServerConfig(xdsServerURI) | ||||||
| authoritiesCfg := newAuthoritiesConfig(xdsServerCfg) | ||||||
|
|
||||||
|
|
@@ -203,10 +218,16 @@ func (b c2pResolverBuilder) Scheme() string { | |||||
| return c2pScheme | ||||||
| } | ||||||
|
|
||||||
| func newNodeConfig(zone string, ipv6Capable bool) map[string]any { | ||||||
| func newNodeConfig(zone string, ipv6Capable bool, isGCE bool) map[string]any { | ||||||
| prefix := "C2P" | ||||||
| if !isGCE { | ||||||
| prefix = "C2P-non-gcp" | ||||||
| } | ||||||
| node := map[string]any{ | ||||||
| "id": fmt.Sprintf("C2P-%d", randInt()), | ||||||
| "locality": map[string]any{"zone": zone}, | ||||||
| "id": fmt.Sprintf("%s-%d", prefix, randInt()), | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pull request description mentions that the xDS Client Node ID should use a UUID as a suffix (e.g., |
||||||
| } | ||||||
| if zone != "" { | ||||||
| node["locality"] = map[string]any{"zone": zone} | ||||||
| } | ||||||
| // Enable dualstack endpoints in TD. | ||||||
| if ipv6Capable { | ||||||
|
|
@@ -228,10 +249,3 @@ func newXdsServerConfig(uri string) map[string]any { | |||||
| "server_features": []any{"ignore_resource_deletion"}, | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // runDirectPath returns whether this resolver should use direct path. | ||||||
| // | ||||||
| // direct path is enabled if this client is running on GCE. | ||||||
| func runDirectPath() bool { | ||||||
| return onGCE() | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -655,3 +655,175 @@ func (s) TestCreateMultipleXDSClients(t *testing.T) { | |
| c2pXDSTarget := resolver.Target{URL: url.URL{Scheme: xdsName, Host: c2pAuthority, Path: c2pTarget.URL.Path}} | ||
| verifyXDSClientBootstrapConfig(t, xdsClientPool, c2pXDSTarget.String(), c2pConfig) | ||
| } | ||
|
|
||
| // TestBuildXDSClientNotOnGCEWithForceXDS validates that the C2P resolver | ||
| // handles not on GCP (Google Cloud Interconnect) clients correctly when the | ||
| // force-xds query parameter is provided in various valid formats. | ||
| // It verifies that: | ||
| // - GCE metadata server queries for zone and ipv6 capability are bypassed | ||
| // to prevent dial timeouts. | ||
| // - The node ID is formatted using the prefix ("C2P-non-gcp"). | ||
| // - Zone information is completely omitted from the node config. | ||
| // - Dualstack IPv6 capability (TRAFFICDIRECTOR_DIRECTPATH_C2P_IPV6_CAPABLE) | ||
| // is set to true. | ||
| func (s) TestBuildXDSClientNotOnGCEWithForceXDS(t *testing.T) { | ||
| tests := []struct { | ||
| desc string | ||
| rawQuery string | ||
| }{ | ||
| { | ||
| desc: "query_param_key_only", | ||
| rawQuery: "force-xds", | ||
| }, | ||
| { | ||
| desc: "query_param_trailing_equals_value_empty", | ||
| rawQuery: "force-xds=", | ||
| }, | ||
| { | ||
| desc: "query_param_value_true", | ||
| rawQuery: "force-xds=true", | ||
| }, | ||
| { | ||
| desc: "query_param_value_false", | ||
| rawQuery: "force-xds=false", | ||
| }, | ||
|
Comment on lines
+687
to
+689
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| { | ||
| desc: "query_param_multiple_query_params", | ||
| rawQuery: "foo=bar&force-xds", | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.desc, func(t *testing.T) { | ||
| replaceResolvers(t) | ||
| simulateRunningOnGCE(t, false) | ||
| useCleanUniverseDomain(t) | ||
|
|
||
| // Override the random func used in the node ID. | ||
| origRandInd := randInt | ||
| randInt = func() int { return 666 } | ||
| defer func() { randInt = origRandInd }() | ||
|
|
||
| // Override xDS client pool. | ||
| oldXdsClientPool := xdsClientPool | ||
| xdsClientPool = xdsclient.NewPool(nil) | ||
| defer func() { xdsClientPool = oldXdsClientPool }() | ||
|
|
||
| // Target URI with force-xds query parameter. | ||
| builder := resolver.Get(c2pScheme) | ||
| target := resolver.Target{URL: url.URL{ | ||
| Scheme: c2pScheme, | ||
| Path: "test-path", | ||
| RawQuery: tt.rawQuery, | ||
| }} | ||
|
|
||
| // Build the google-c2p resolver. | ||
| res, err := builder.Build(target, nil, resolver.BuildOptions{}) | ||
| if err != nil { | ||
| t.Fatalf("failed to build resolver: %v", err) | ||
| } | ||
| defer res.Close() | ||
|
|
||
| // Query parameters are omitted since the downstream xDS target is configured using only active fields. | ||
| xdsTarget := resolver.Target{URL: url.URL{Scheme: xdsName, Host: c2pAuthority, Path: target.URL.Path}} | ||
| wantBootstrapConfig := bootstrapConfig(t, bootstrap.ConfigOptionsForTesting{ | ||
| Servers: []byte(`[{ | ||
| "server_uri": "dns:///directpath-pa.googleapis.com", | ||
| "channel_creds": [{"type": "google_default"}], | ||
| "server_features": ["ignore_resource_deletion"] | ||
| }]`), | ||
| Authorities: map[string]json.RawMessage{ | ||
| "traffic-director-c2p.xds.googleapis.com": []byte(`{ | ||
| "xds_servers": [ | ||
| { | ||
| "server_uri": "dns:///directpath-pa.googleapis.com", | ||
| "channel_creds": [{"type": "google_default"}], | ||
| "server_features": ["ignore_resource_deletion"] | ||
| } | ||
| ] | ||
| }`), | ||
| }, | ||
| Node: []byte(`{ | ||
| "id": "C2P-non-gcp-666", | ||
| "metadata": { | ||
| "TRAFFICDIRECTOR_DIRECTPATH_C2P_IPV6_CAPABLE": true | ||
| } | ||
| }`), | ||
| }) | ||
| verifyXDSClientBootstrapConfig(t, xdsClientPool, xdsTarget.String(), wantBootstrapConfig) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // TestBuildXDSClientOnGCEWithForceXDS validates that when running on GCE with | ||
| // the force-xds query param, we successfully build an xDS client with standard | ||
| // GCP Node ID format, locality zone, and dualstack capability. | ||
| func (s) TestBuildXDSClientOnGCEWithForceXDS(t *testing.T) { | ||
| replaceResolvers(t) | ||
| simulateRunningOnGCE(t, true) | ||
| useCleanUniverseDomain(t) | ||
|
|
||
| // Override the zone returned by the metadata server. | ||
| oldGetZone := getZone | ||
| getZone = func(time.Duration) string { return "test-zone" } | ||
| defer func() { getZone = oldGetZone }() | ||
|
|
||
| // Override testing random node ID. | ||
| origRandInd := randInt | ||
| randInt = func() int { return 777 } | ||
| defer func() { randInt = origRandInd }() | ||
|
|
||
| // Override IPv6 capability. | ||
| oldGetIPv6Capable := getIPv6Capable | ||
| getIPv6Capable = func(time.Duration) bool { return true } | ||
| defer func() { getIPv6Capable = oldGetIPv6Capable }() | ||
|
|
||
| oldXdsClientPool := xdsClientPool | ||
| xdsClientPool = xdsclient.NewPool(nil) | ||
| defer func() { xdsClientPool = oldXdsClientPool }() | ||
|
|
||
| // Target URI with force-xds query parameter. | ||
| target := resolver.Target{URL: url.URL{ | ||
| Scheme: c2pScheme, | ||
| Path: "test-path", | ||
| RawQuery: "force-xds", | ||
| }} | ||
|
|
||
| // Build the google-c2p resolver. | ||
| builder := resolver.Get(c2pScheme) | ||
| r, err := builder.Build(target, nil, resolver.BuildOptions{}) | ||
| if err != nil { | ||
| t.Fatalf("failed to build resolver: %v", err) | ||
| } | ||
| defer r.Close() | ||
|
|
||
| xdsTarget := resolver.Target{URL: url.URL{Scheme: xdsName, Host: c2pAuthority, Path: target.URL.Path}} | ||
| wantBootstrapConfig := bootstrapConfig(t, bootstrap.ConfigOptionsForTesting{ | ||
| Servers: []byte(`[{ | ||
| "server_uri": "dns:///directpath-pa.googleapis.com", | ||
| "channel_creds": [{"type": "google_default"}], | ||
| "server_features": ["ignore_resource_deletion"] | ||
| }]`), | ||
| Authorities: map[string]json.RawMessage{ | ||
| "traffic-director-c2p.xds.googleapis.com": []byte(`{ | ||
| "xds_servers": [ | ||
| { | ||
| "server_uri": "dns:///directpath-pa.googleapis.com", | ||
| "channel_creds": [{"type": "google_default"}], | ||
| "server_features": ["ignore_resource_deletion"] | ||
| } | ||
| ] | ||
| }`), | ||
| }, | ||
| Node: []byte(`{ | ||
| "id": "C2P-777", | ||
| "locality": { | ||
| "zone": "test-zone" | ||
| }, | ||
| "metadata": { | ||
| "TRAFFICDIRECTOR_DIRECTPATH_C2P_IPV6_CAPABLE": true | ||
| } | ||
| }`), | ||
| }) | ||
| verifyXDSClientBootstrapConfig(t, xdsClientPool, xdsTarget.String(), wantBootstrapConfig) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation checks for the presence of the
force-xdsquery parameter but does not validate its value. This means that a URI likegoogle-c2p:///target?force-xds=falsewould still force the xDS path, which is counter-intuitive. Consider checking the value of the parameter to ensure it is not explicitly set tofalse.