feat: vendor-pluggable S3 credentials for native scans#4309
feat: vendor-pluggable S3 credentials for native scans#4309mbutrovich wants to merge 37 commits into
Conversation
# Conflicts: # docs/source/contributor-guide/index.md
b549155 to
0cd8a36
Compare
… activation (fs.s3a.comet.credential.provider.class for Parquet, s3.comet.credential.provider.class for Iceberg), so the bridge is opt-in per Spark config rather than implicit on classpath presence.
|
edit: |
|
CC @snmvaughan |
karuppayya
left a comment
There was a problem hiding this comment.
Left some comments. Will do another pass later today
| InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? "" : dispatchKey); | ||
| Map<String, String> props = | ||
| catalogProperties == null ? Collections.emptyMap() : catalogProperties; | ||
| INSTANCES.computeIfAbsent( |
There was a problem hiding this comment.
A vendor whose initialize throws gets re-attempted on every get_credential call from object_store. Should we cache error per key and backoff. May be a followup
|
Thanks for the feedback @karuppayya! I think I addressed everything but:
I will update my internal credential provider to align with these SPI changes and test again. |
|
Updated my internal implementation to match the latest SPI changes, and things are working well! |
|
I guess the JNI call to That said, the current shape is a reasonable starting point . We can keeping refining in subsqeuent PRs. |
I think there are still opportunities to figure out how to get better parallelism and hide I/O latency in Comet's execution model, but yeah right now it's fairly restricted. I think at least for the OpenDAL/Iceberg case we have a knob you can tune to fire off more tasks for data loading, which I think would introduce parallelism on this path. |
| ) { | ||
| Ok(b) => Some(b), | ||
| Err(e) => { | ||
| log::warn!( |
There was a problem hiding this comment.
It's probably a better idea to fail here and let the user fix the error. Falling thru to the default provider chain will either fail or worse, succeed and lead to curious results.
There was a problem hiding this comment.
Good call. The user explicitly named a provider, so silently falling back hides a real misconfiguration and can resolve to the wrong identity. Changed to propagate the error out of create_store.
| private static final class InstanceKey { | ||
| final String providerClassName; | ||
| final String dispatchKey; | ||
| final Map<String, String> catalogProperties; |
There was a problem hiding this comment.
This could lead to KEY_TO_HANDLE getting quite large if there are many (JVM) sessions or if some catalog implementation refreshes some catalog property per table. We could limit the KEY_TO_HANDLE size and evict older keys to keep this limited.
There was a problem hiding this comment.
Fair point on the catalog-refresh case, the design doesn't forbid it.
A few options I considered and where they break down:
- Driver-side
SparkListener/ session-close hook:KEY_TO_HANDLElives on executors, so a driver-side hook doesn't reach it.onApplicationEndonly fires at app shutdown, which the existing JVM shutdown hook already covers. - Per-session clearing on the executor: Spark has no "session ended on executor" event because executors are session-agnostic. In Spark Connect / Thrift Server one JVM serves many sessions concurrently.
InstanceKeyis(providerClassName, dispatchKey, catalogProperties)with no session identity, and two sessions configured with the same triple collapse to one entry viacomputeIfAbsent, so clearing on session X close would invalidate session Y's live bridge. - Plain LRU: the handle is held by native
CometS3CredentialBridgeinstances by value and reused across scans, with no Drop callback into the JVM, so eviction can invalidate a live bridge mid-job.
The path that's both bounded and safe under parallel sessions is reference counting: a JNI callback from CometS3CredentialBridge::Drop decrements, entry evicts at zero. That's a real change rather than a one-line cap.
Do you have a catalog in mind that churns catalogProperties per table? Otherwise I would prefer to land this as-is and open a followup for the refcounted lifecycle once we have a concrete trigger.
|
|
||
| /// Per-scan credential provider that delegates to the JVM SPI via JNI. `handle` is the JVM-side | ||
| /// identity for the `(provider_class, dispatch_key, catalog_properties)` triple returned by | ||
| /// `ensureInitialized`. `bucket_jstr` / `path_jstr` are interned once at construction to avoid |
There was a problem hiding this comment.
Couldn't we have per path credentials?
There was a problem hiding this comment.
The JVM SPI takes (bucket, path), but the upstream traits don't carry a per-request path: object_store::CredentialProvider::get_credential(&self) is arg-less (object_store 0.13.2, client/mod.rs:907) and reqsign_core::ProvideCredential::provide_credential(&self, ctx: &Context) takes only the signing Context (reqsign-core 3.0.0, api.rs:51). So today the effective identity is per-bucket on the Parquet path and per-table-location on the Iceberg path, even though the Java side nominally exposes path.
Added a doc note on CometS3CredentialBridge so this isn't a surprise. True per-path would mean wrapping each object_store operation, which isn't a hook either trait offers.
| // Extract vended credentials from FileIO (REST catalog credential vending). | ||
| // FileIO properties take precedence over Hadoop-derived properties because | ||
| // they contain per-table credentials vended by the REST catalog. | ||
| // Forward the full FileIO property bag (including credentials.uri, OAuth tokens, |
There was a problem hiding this comment.
Properties like OAuth tokens, bearer tokens, etc. should not really be here as this will get baked into a protobuf that is sent unencrypted over the wire to executors. Also, if tokens have an expiry then they need to be refreshed or the credentials provider will fail.
There was a problem hiding this comment.
On the wire: this rides the same channel that already carries Hadoop delegation tokens, S3A vended credentials, and Iceberg REST credentials from driver to executors via SparkSession / Hadoop conf, so the property bag here isn't a new exposure relative to that baseline. Deployments that need wire encryption already have spark.network.crypto.enabled.
On expiry: the properties forwarded in the proto are the catalog bootstrap identity (REST URI, OAuth client config), not the live credential. getCredentialsForPath is called per request and is the refresh contract, which is why the SPI is shaped this way rather than serializing a one-shot credential into the plan.
Were you flagging a specific provider where the bootstrap bag itself carries a short-lived bearer token?
Which issue does this PR close?
Closes #4332.
Rationale for this change
Comet's native scan paths (
object_storefor raw Parquet,opendalviaiceberg-rustfor Iceberg) bypass Spark's Hadoop S3A credential infrastructure. Vendors with per-path STS, REST-vended creds, or other custom mechanisms cannot reach Comet through any existing SPI.AWSCredentialsProvider.getCredentials()is parameterless, Hadoop S3A custom signers never return credentials outside the signing pipeline, and Spark'sCloudCredentialsProvideryields one JWT per service name with no path argument.This PR adds a narrow, S3-specific SPI plus JNI plumbing to call it from native code. Activation is config-driven and modeled on
parquet.crypto.factory.class(PME KMS, #2447). The user names one vendor class in a Spark or Hadoop config and the vendor dispatches across backends inside it.Design rationale (keying, lifecycle, returns-or-throws, no Comet-side cache, property-bag handling, error-fidelity caveats) lives in the contributor guide page
s3-credential-provider-design.md. Operator setup and vendor contract live in the user guide pages3-credential-providers.md.What is in this PR
org.apache.comet.cloud.s3(in thesparkmodule, since refactor: Move most ofcomet-commonmodule intocomet-spark#4325 collapsedcommonto a minimal bootstrap):CometS3CredentialProvider(AutoCloseable,default initialize(Map)),CometS3Credentials,CometS3AccessMode,CometS3CredentialContext, andCometS3CredentialDispatcherkeyed by(FQCN, dispatchKey, catalogProperties)withensureInitialized(...)returning alonghandle, hot-pathgetCredentialsForPath(handle, ...), and a JVM shutdown hook that closes every cached provider.org.apache.comet.util.ClassLoaders.loadClassprefers the thread context ClassLoader. Both the dispatcher andIcebergReflection.loadClassdelegate to it.CometS3CredentialBridge(undernative/core/src/cloud/s3/) implementingobject_store::CredentialProviderandreqsign_core::ProvideCredential, plus a JNI handle innative/jni-bridge.fs.s3a.comet.credential.provider.class(with per-bucket override) for Parquet, ands3.comet.credential.provider.classon the Spark catalog property for Iceberg.dispatchKeyis the bucket on the Parquet path and the V2 catalog name on the Iceberg path.catalog_properties. The storage-prefix filter (s3.,gcs.,adls.,client.) moves native-side toiceberg_scan.rs::load_file_io.IcebergScanExecgets a manual redactingDebugso plan dumps do not leak the property bag.iceberg-rustpin bumped to83b4595(forreqsign-core3.0 andCustomAwsCredentialLoader).testcontainersbumped to 1.21.4 anddocker-javato 3.7.1 for modern Docker daemons.IcebergRESTVendedS3Provider(test scope, Spark 4.x build only) wrapping Iceberg'sVendedCredentialsProvider. Test scope keepsiceberg-awsand AWS SDK v2 off Comet's runtime classpath.How are these changes tested?
CometS3CredentialDispatcherTest: handle round-trip,ensureInitializedidempotence, distinctdispatchKeyandcatalogPropertiesisolation,closeAllswallows provider exceptions, missing-class / wrong-interface / no-arg-ctor / empty-FQCN failure modes, get-without-init guard.IcebergRESTVendedS3ProviderTest(Spark 4.x).CometS3CredentialBridgeSuite(Minio): Parquet on S3, Iceberg on S3, REST plus SPI integration with a sentinel non-storage-prefix key reachinginitialize(Map), multi-catalog isolation across two catalogs sharing one FQCN. Added todev/ci/check-suites.pyignore list (manual, like other Docker-dependent S3 suites).