From 3d8b898a70abb8207b8e0f70f0d9e4852d947cab Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Thu, 14 May 2026 13:30:10 -0400 Subject: [PATCH 1/4] Claude: use updated iceberg-rust fork --- Cargo.lock | 121 +++++++++++++++++-- Cargo.toml | 12 +- deny.toml | 2 + src/storage-types/Cargo.toml | 3 + src/storage-types/src/connections.rs | 167 +++++++++++++++++++++++---- 5 files changed, 263 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96ec5963eef9a..d456d1edecbaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1542,6 +1542,16 @@ dependencies = [ "piper", ] +[[package]] +name = "bnum" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f781dba93de3a5ef6dc5b17c9958b208f6f3f021623b360fb605ea51ce443f10" +dependencies = [ + "serde", + "serde-big-array", +] + [[package]] name = "bon" version = "3.8.1" @@ -2940,7 +2950,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1c298552016db86f0d49e5de09818dd86c536f66095013cc415f4f85744033f" dependencies = [ - "erased-serde", + "erased-serde 0.3.26", "lazy_static", "regex", "serde", @@ -3196,6 +3206,17 @@ dependencies = [ "serde", ] +[[package]] +name = "erased-serde" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2add8a07dd6a8d93ff627029c51de145e12686fbc36ecb298ac22e74cf02dec" +dependencies = [ + "serde", + "serde_core", + "typeid", +] + [[package]] name = "errno" version = "0.3.13" @@ -3308,6 +3329,18 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "fastnum" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4089ab2dfd45d8ddc92febb5ca80644389d5ebb954f40231274a3f18341762e2" +dependencies = [ + "bnum", + "num-integer", + "num-traits", + "serde", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -4251,8 +4284,8 @@ dependencies = [ [[package]] name = "iceberg" -version = "0.7.0" -source = "git+https://github.com/MaterializeInc/iceberg-rust.git?rev=c31a98afe789#c31a98afe789b0dcfd2ab9cc23e2ecf5120aaaac" +version = "0.9.0" +source = "git+https://github.com/MaterializeInc/iceberg-rust.git?rev=cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92#cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92" dependencies = [ "anyhow", "apache-avro", @@ -4275,22 +4308,19 @@ dependencies = [ "chrono", "derive_builder", "expect-test", + "fastnum", "flate2", "fnv", "futures", "itertools 0.13.0", "moka", "murmur3", - "num-bigint", "once_cell", - "opendal", "ordered-float 4.6.0", "parquet", "rand 0.8.5", - "reqsign", "reqwest", "roaring", - "rust_decimal", "serde", "serde_bytes", "serde_derive", @@ -4300,6 +4330,7 @@ dependencies = [ "strum", "tokio", "typed-builder 0.20.1", + "typetag", "url", "uuid", "zstd", @@ -4307,13 +4338,10 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" -version = "0.7.0" -source = "git+https://github.com/MaterializeInc/iceberg-rust.git?rev=c31a98afe789#c31a98afe789b0dcfd2ab9cc23e2ecf5120aaaac" +version = "0.9.0" +source = "git+https://github.com/MaterializeInc/iceberg-rust.git?rev=cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92#cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92" dependencies = [ "async-trait", - "aws-credential-types", - "aws-sigv4", - "aws-types", "chrono", "http 1.4.0", "iceberg", @@ -4328,6 +4356,24 @@ dependencies = [ "uuid", ] +[[package]] +name = "iceberg-storage-opendal" +version = "0.9.0" +source = "git+https://github.com/MaterializeInc/iceberg-rust.git?rev=cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92#cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "cfg-if", + "iceberg", + "opendal", + "reqsign", + "reqwest", + "serde", + "typetag", + "url", +] + [[package]] name = "id-arena" version = "2.2.1" @@ -4494,6 +4540,15 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "inventory" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4f0c30c76f2f4ccee3fe55a2435f691ca00c0e4bd87abe4f4a851b1d4dac39b" +dependencies = [ + "rustversion", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -8177,6 +8232,8 @@ dependencies = [ "aws-config", "aws-credential-types", "aws-sdk-sts", + "aws-sigv4", + "aws-smithy-runtime-api", "aws-types", "base64 0.22.1", "bytes", @@ -8189,6 +8246,7 @@ dependencies = [ "http 1.4.0", "iceberg", "iceberg-catalog-rest", + "iceberg-storage-opendal", "insta", "itertools 0.14.0", "mysql_async", @@ -11066,6 +11124,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "serde-big-array" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11fc7cc2c76d73e0f27ee52abbd64eec84d46f370c88371120433196934e4b7f" +dependencies = [ + "serde", +] + [[package]] name = "serde-value" version = "0.7.0" @@ -12657,12 +12724,42 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "typeid" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" + [[package]] name = "typenum" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" +[[package]] +name = "typetag" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be2212c8a9b9bcfca32024de14998494cf9a5dfa59ea1b829de98bac374b86bf" +dependencies = [ + "erased-serde 0.4.10", + "inventory", + "once_cell", + "serde", + "typetag-impl", +] + +[[package]] +name = "typetag-impl" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27a7a9b72ba121f6f1f6c3632b85604cac41aedb5ddc70accbebb6cac83de846" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "tz-rs" version = "0.6.14" diff --git a/Cargo.toml b/Cargo.toml index 1f71ea92e6c7b..53a8df6cee545 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -366,8 +366,9 @@ hyper-0-14 = { package = "hyper", version = "0.14", features = ["client", "tcp"] hyper-openssl = "0.10.2" hyper-util = "0.1.20" tower-service = "0.3.3" -iceberg = "0.7.0" -iceberg-catalog-rest = "0.7.0" +iceberg = "0.9.0" +iceberg-catalog-rest = "0.9.0" +iceberg-storage-opendal = { version = "0.9.0", default-features = false, features = ["opendal-s3"] } imbl = { version = "7.0.0", features = ["serde"] } include_dir = "0.7.4" indexmap = { version = "2.10.0", default-features = false, features = ["std"] } @@ -621,9 +622,10 @@ tiberius = { git = "https://github.com/MaterializeInc/tiberius", rev="64ca594cc2 async-compression = { git = "https://github.com/MaterializeInc/async-compression.git", rev = "fe7411eb6104a02a89e2c3a76ab326dd6594214d" } # Custom iceberg features for mz -# All changes should go to the `mz_changes` branch. -iceberg = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "c31a98afe789" } -iceberg-catalog-rest = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "c31a98afe789" } +# All changes should go to the `kynan-mz_changes_v0.9.0` branch. +iceberg = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92" } +iceberg-catalog-rest = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92" } +iceberg-storage-opendal = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92" } # Custom duckdb crate to support mz needs # All changes should go to the `mz_changes` branch. diff --git a/deny.toml b/deny.toml index 3449f71f5eb6b..44e2eab541bcb 100644 --- a/deny.toml +++ b/deny.toml @@ -128,6 +128,8 @@ skip = [ { name = "fallible-iterator", version = "0.3.0" }, # arrow { name = "hashbrown", version = "0.16.1" }, + # Used by dynfmt; iceberg/typetag pulls in v0.4. + { name = "erased-serde", version = "0.3.26" }, ] [[bans.deny]] diff --git a/src/storage-types/Cargo.toml b/src/storage-types/Cargo.toml index e7051e44e0148..f8de6fa6657fe 100644 --- a/src/storage-types/Cargo.toml +++ b/src/storage-types/Cargo.toml @@ -20,6 +20,8 @@ async-trait.workspace = true aws-config.workspace = true aws-credential-types.workspace = true aws-sdk-sts.workspace = true +aws-sigv4.workspace = true +aws-smithy-runtime-api.workspace = true aws-types.workspace = true bytes.workspace = true columnation.workspace = true @@ -74,6 +76,7 @@ uuid = { workspace = true, features = ["serde", "v4"] } base64.workspace = true iceberg.workspace = true iceberg-catalog-rest.workspace = true +iceberg-storage-opendal.workspace = true [dev-dependencies] criterion.workspace = true diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 81b12cb071fa0..97d319f112685 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -13,18 +13,24 @@ use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet}; use std::net::SocketAddr; use std::sync::Arc; +use std::time::SystemTime; use anyhow::{Context, anyhow}; use async_trait::async_trait; -use aws_credential_types::provider::ProvideCredentials; +use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider}; +use aws_sigv4::http_request::{SignableBody, SignableRequest, SigningSettings, sign}; +use aws_sigv4::sign::v4; +// Aliased to avoid colliding with `mz_ccsr::tls::Identity`. +use aws_smithy_runtime_api::client::identity::Identity as AwsIdentity; +use http::{HeaderName, HeaderValue}; use iceberg::Catalog; use iceberg::CatalogBuilder; -use iceberg::io::{ - AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, S3_ACCESS_KEY_ID, - S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY, -}; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg_catalog_rest::{ - REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RestCatalogBuilder, + REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RequestAuthenticator, RestCatalogBuilder, +}; +use iceberg_storage_opendal::{ + AwsCredential, AwsCredentialLoad, CustomAwsCredentialLoader, OpenDalStorageFactory, }; use itertools::Itertools; use mz_ccsr::tls::{Certificate, Identity}; @@ -51,6 +57,7 @@ use rdkafka::ClientContext; use rdkafka::config::FromClientConfigAndContext; use rdkafka::consumer::{BaseConsumer, Consumer}; use regex::Regex; +use reqwest::Request; use serde::{Deserialize, Deserializer, Serialize}; use tokio::net; use tokio::runtime::Handle; @@ -89,11 +96,11 @@ const REST_CATALOG_PROP_CREDENTIAL: &str = "credential"; struct AwsSdkCredentialLoader { /// The underlying AWS SDK credentials provider. For assume role auth, this provider /// already handles the full chain: ambient creds -> jump role -> user role. - provider: aws_credential_types::provider::SharedCredentialsProvider, + provider: SharedCredentialsProvider, } impl AwsSdkCredentialLoader { - fn new(provider: aws_credential_types::provider::SharedCredentialsProvider) -> Self { + fn new(provider: SharedCredentialsProvider) -> Self { Self { provider } } } @@ -129,6 +136,96 @@ impl AwsCredentialLoad for AwsSdkCredentialLoader { } } +/// Signs each outgoing REST-catalog request with AWS SigV4. +/// +/// Holds a [`SharedCredentialsProvider`] (not static `Credentials`) so each +/// request signs with refreshable creds from Materialize's chain +/// (ambient -> jump role -> user role w/ external ID). +struct Sigv4Authenticator { + provider: SharedCredentialsProvider, + region: String, + /// The AWS signing name. `"s3tables"` for AWS S3 Tables REST catalog. + signing_name: String, +} + +impl std::fmt::Debug for Sigv4Authenticator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Sigv4Authenticator") + .field("region", &self.region) + .field("signing_name", &self.signing_name) + .finish_non_exhaustive() + } +} + +fn sigv4_err(e: impl Into) -> iceberg::Error { + iceberg::Error::new(iceberg::ErrorKind::DataInvalid, "AWS SigV4").with_source(e) +} + +#[async_trait] +impl RequestAuthenticator for Sigv4Authenticator { + async fn authenticate_request(&self, req: &mut Request) -> iceberg::Result<()> { + let creds = self + .provider + .provide_credentials() + .await + .map_err(sigv4_err)?; + let identity: AwsIdentity = creds.into(); + let params = v4::SigningParams::builder() + .identity(&identity) + .region(&self.region) + .name(&self.signing_name) + .time(SystemTime::now()) + .settings(SigningSettings::default()) + .build() + .map_err(sigv4_err)? + .into(); + let body: Vec = req + .body() + .and_then(|b| b.as_bytes()) + .map(|b| b.to_vec()) + .unwrap_or_default(); + let headers: Vec<(&str, &str)> = req + .headers() + .iter() + .map(|(k, v)| (k.as_str(), v.to_str().unwrap_or(""))) + .collect(); + let signable = SignableRequest::new( + req.method().as_str(), + req.url().as_str(), + headers.into_iter(), + SignableBody::Bytes(&body), + ) + .map_err(sigv4_err)?; + let (instructions, _sig) = sign(signable, ¶ms).map_err(sigv4_err)?.into_parts(); + let (new_headers, new_query) = instructions.into_parts(); + for header in new_headers { + let mut value = HeaderValue::from_str(header.value()).map_err(sigv4_err)?; + value.set_sensitive(header.sensitive()); + req.headers_mut() + .insert(HeaderName::from_static(header.name()), value); + } + if !new_query.is_empty() { + let url = req.url_mut(); + let mut pairs = url.query_pairs_mut(); + for (name, value) in new_query { + pairs.append_pair(name, &value); + } + } + Ok(()) + } + + // SigV4 is stateless: nothing to cache, invalidate, or refresh. + async fn ensure_cached(&self) -> iceberg::Result<()> { + Ok(()) + } + async fn invalidate_cache(&self) -> iceberg::Result<()> { + Ok(()) + } + async fn regenerate_cache(&self) -> iceberg::Result<()> { + Ok(()) + } +} + /// An extension trait for [`SecretsReader`] #[async_trait::async_trait] trait SecretsReaderExt { @@ -606,7 +703,7 @@ impl IcebergCatalogConnection { .unwrap_or_else(|| "us-east-1".to_string()); let mut props = vec![ - (S3_REGION.to_string(), aws_region), + (S3_REGION.to_string(), aws_region.clone()), (S3_DISABLE_EC2_METADATA.to_string(), "true".to_string()), ( REST_CATALOG_PROP_WAREHOUSE.to_string(), @@ -631,11 +728,35 @@ impl IcebergCatalogConnection { )); } - // Build the catalog with aws_config for REST API signing. - // For AssumeRole auth, we also add a FileIO extension so OpenDAL can - // use our credential chain for S3 object access. + // Sign REST catalog requests with the Materialize AWS credential chain + // via a custom `RequestAuthenticator`. For AssumeRole auth, also feed + // the chain to OpenDAL's S3 loader so data-file IO uses the same creds. + let credentials_provider = aws_config + .credentials_provider() + .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?; + + let authenticator = Arc::new(Sigv4Authenticator { + provider: credentials_provider.clone(), + region: aws_region.clone(), + signing_name: "s3tables".to_string(), + }); + + let customized_credential_load = if matches!(aws_auth, AwsAuth::AssumeRole(_)) { + Some(CustomAwsCredentialLoader::new(Arc::new( + AwsSdkCredentialLoader::new(credentials_provider), + ))) + } else { + None + }; + + let storage_factory = Arc::new(OpenDalStorageFactory::S3 { + configured_scheme: "s3".to_string(), + customized_credential_load, + }); + let catalog = RestCatalogBuilder::default() - .with_aws_config(aws_config.clone()) + .with_storage_factory(storage_factory) + .with_authenticator(authenticator) .load("IcebergCatalog", props.into_iter().collect()) .await .with_context(|| { @@ -646,18 +767,6 @@ impl IcebergCatalogConnection { ) })?; - let catalog = if matches!(aws_auth, AwsAuth::AssumeRole(_)) { - let credentials_provider = aws_config - .credentials_provider() - .ok_or_else(|| anyhow!("aws_config missing credentials provider"))?; - let file_io_loader = CustomAwsCredentialLoader::new(Arc::new( - AwsSdkCredentialLoader::new(credentials_provider), - )); - catalog.with_file_io_extension(file_io_loader) - } else { - catalog - }; - Ok(Arc::new(catalog)) } @@ -691,6 +800,14 @@ impl IcebergCatalogConnection { } let catalog = RestCatalogBuilder::default() + .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 { + configured_scheme: "s3".to_string(), + // Polaris returns a config with: + // s3.access-key-id, s3.secret-access-key, s3.endpoint, ... + // `iceberg-rust` forwards these props to `opendal`. + // N.B. This is not confirmed to work with other catalog & storage implementations. + customized_credential_load: None, + })) .load("IcebergCatalog", props.into_iter().collect()) .await .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?; From 08ff8595b40a0fd4ac0441fce45f999c5c5382f1 Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Wed, 20 May 2026 10:01:07 -0400 Subject: [PATCH 2/4] Kynan: Iceberg sink's AWS connection isn't used. --- src/sql-parser/src/ast/defs/ddl.rs | 17 ++-- src/sql-parser/src/parser.rs | 11 ++- src/sql-parser/tests/testdata/ddl | 13 +++- src/sql/src/plan/statement/ddl.rs | 26 ++++--- src/sql/src/pure.rs | 107 ++++++++++++++------------ src/storage-types/src/connections.rs | 2 + src/storage-types/src/sinks.rs | 47 +++++++---- test/iceberg/catalog.td | 5 +- test/iceberg/commit-conflict-setup.td | 8 -- test/iceberg/key-validation.td | 12 --- test/iceberg/large-upsert-batch.td | 8 -- test/iceberg/mode-append.td | 10 --- test/iceberg/nested-records.td | 11 --- 13 files changed, 137 insertions(+), 140 deletions(-) diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 527e82a153366..4beaf51fc602f 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -1542,8 +1542,11 @@ pub enum CreateSinkConnection { headers: Option, }, Iceberg { - connection: T::ItemName, - aws_connection: T::ItemName, + catalog_connection: T::ItemName, + + /// AWS creds for the storage layer. + aws_connection: Option, + key: Option, options: Vec>, }, @@ -1575,20 +1578,22 @@ impl AstDisplay for CreateSinkConnection { } } CreateSinkConnection::Iceberg { - connection, + catalog_connection, aws_connection, key, options, } => { f.write_str("ICEBERG CATALOG CONNECTION "); - f.write_node(connection); + f.write_node(catalog_connection); if !options.is_empty() { f.write_str(" ("); f.write_node(&display::comma_separated(options)); f.write_str(")"); } - f.write_str(" USING AWS CONNECTION "); - f.write_node(aws_connection); + if let Some(aws_connection) = aws_connection { + f.write_str(" USING AWS CONNECTION "); + f.write_node(aws_connection); + } if let Some(key) = key.as_ref() { f.write_str(" "); f.write_node(key); diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 419220ef1da4d..6232ebb693453 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -3856,7 +3856,7 @@ impl<'a> Parser<'a> { &mut self, ) -> Result, ParserError> { self.expect_keyword(CONNECTION)?; - let connection = self.parse_raw_name()?; + let catalog_connection = self.parse_raw_name()?; let options = if self.consume_token(&Token::LParen) { let options = self.parse_comma_separated(Parser::parse_iceberg_sink_config_option)?; @@ -3866,8 +3866,11 @@ impl<'a> Parser<'a> { vec![] }; - self.expect_keywords(&[USING, AWS, CONNECTION])?; - let aws_connection = self.parse_raw_name()?; + let aws_connection = if self.parse_keywords(&[USING, AWS, CONNECTION]) { + Some(self.parse_raw_name()?) + } else { + None + }; let key = if self.parse_keyword(KEY) { let key_columns = self.parse_parenthesized_column_list(Mandatory)?; @@ -3882,7 +3885,7 @@ impl<'a> Parser<'a> { }; Ok(CreateSinkConnection::Iceberg { - connection, + catalog_connection, aws_connection, key, options, diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 465ca01b1bb2d..4a98459b0ffd1 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -1009,19 +1009,26 @@ CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') KEY FORMAT => CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("foo")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaSinkConfigOption { name: Topic, value: Some(Value(String("topic"))) }], key: None, headers: None }, format: Some(KeyValue { key: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }), value: Json { array: false } }), envelope: None, mode: None, with_options: [] }) +parse-statement +CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') KEY (a) NOT ENFORCED MODE UPSERT; +---- +CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') KEY (a) NOT ENFORCED MODE UPSERT +=> +CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: None, key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] }) + parse-statement CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) NOT ENFORCED MODE UPSERT; ---- CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) NOT ENFORCED MODE UPSERT => -CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] }) +CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Some(Name(UnresolvedItemName([Ident("aws_conn")]))), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] }) parse-statement CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) MODE UPSERT; ---- CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) MODE UPSERT => -CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: false }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] }) +CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Some(Name(UnresolvedItemName([Ident("aws_conn")]))), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: false }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] }) parse-statement CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (BLAH = 'boo!') USING AWS CONNECTION aws_conn MODE UPSERT; @@ -1035,7 +1042,7 @@ CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = ' ---- CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn MODE APPEND => -CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] }) +CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Some(Name(UnresolvedItemName([Ident("aws_conn")]))), key: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] }) parse-statement CREATE INDEX foo ON myschema.bar (a, b) diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 9fc5b1175fe96..af3aab6d4e517 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -3466,13 +3466,13 @@ fn plan_sink( commit_interval, )?, CreateSinkConnection::Iceberg { - connection, + catalog_connection, aws_connection, options, .. } => iceberg_sink_builder( scx, - connection, + catalog_connection, aws_connection, options, relation_key_indices, @@ -3644,7 +3644,7 @@ impl std::convert::TryFrom>> for CsrConfigOptionExtract fn iceberg_sink_builder( scx: &StatementContext, catalog_connection: ResolvedItemName, - aws_connection: ResolvedItemName, + storage_connection: Option, options: Vec>, relation_key_indices: Option>, key_desc_and_indices: Option<(RelationDesc, Vec)>, @@ -3658,10 +3658,9 @@ fn iceberg_sink_builder( // (e.g. interval -> string) don't trip the check. ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides) .map_err(|e| sql_err!("{}", e))?; + let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?; let catalog_connection_id = catalog_connection_item.id(); - let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?; - let aws_connection_id = aws_connection_item.id(); if !matches!( catalog_connection_item.connection()?, Connection::IcebergCatalog(_) @@ -3675,13 +3674,16 @@ fn iceberg_sink_builder( ); }; - if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) { + let storage_connection_item = storage_connection + .map(|c| scx.get_item_by_resolved_name(&c)) + .transpose()?; + let storage_connection_id = storage_connection_item.as_ref().map(|c| c.id()); + if let Some(c) = &storage_connection_item + && !matches!(c.connection()?, Connection::Aws(_)) + { sql_bail!( "{} is not an AWS connection", - scx.catalog - .resolve_full_name(aws_connection_item.name()) - .to_string() - .quoted() + scx.catalog.resolve_full_name(c.name()).to_string().quoted() ); } @@ -3704,8 +3706,8 @@ fn iceberg_sink_builder( Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection { catalog_connection_id, catalog_connection: catalog_connection_id, - aws_connection_id, - aws_connection: aws_connection_id, + storage_connection_id, + storage_connection: storage_connection_id, table, namespace, relation_key_indices, diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 1a2e1fe424ae3..60a355b630261 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -544,13 +544,13 @@ async fn purify_create_sink( } } CreateSinkConnection::Iceberg { - connection, + catalog_connection, aws_connection, .. } => { let scx = StatementContext::new(None, &catalog); let connection = { - let item = scx.get_item_by_resolved_name(connection)?; + let item = scx.get_item_by_resolved_name(catalog_connection)?; // Get Iceberg connection match item.connection()? { Connection::IcebergCatalog(connection) => { @@ -563,63 +563,72 @@ async fn purify_create_sink( } }; - let aws_conn_id = aws_connection.item_id(); - - let aws_connection = { - let item = scx.get_item_by_resolved_name(aws_connection)?; - // Get AWS connection - match item.connection()? { - Connection::Aws(aws_connection) => aws_connection.clone(), - _ => sql_bail!( - "{} is not an aws connection", - scx.catalog.resolve_full_name(item.name()) - ), - } - }; + // Validate the sink's (optional) AWS connection even though we never use it. + // TODO(kynan): If we do start using the sink's creds, check again that this validation + // accurately reflects what we need. + // Consider rolling the storage creds validation into the catalog connection's "connect" fn, + // which already validates the catalog creds (currently also used for the storage layer). + if let Some(aws_connection) = aws_connection { + let aws_conn_id = aws_connection.item_id(); + let aws_connection = { + let item = scx.get_item_by_resolved_name(aws_connection)?; + // Get AWS connection + match item.connection()? { + Connection::Aws(aws_connection) => aws_connection.clone(), + _ => sql_bail!( + "{} is not an aws connection", + scx.catalog.resolve_full_name(item.name()) + ), + } + }; - // For S3 Tables connections in the Materialize Cloud product, verify the - // AWS region matches the environment's region. This check only applies when - // the enable_s3_tables_region_check dyncfg is set. - if let Some(s3tables) = connection.s3tables_catalog() { - let enable_region_check = - ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs()); - if enable_region_check { - let env_id = &catalog.config().environment_id; - if matches!(env_id.cloud_provider(), CloudProvider::Aws) { - let env_region = env_id.cloud_provider_region(); - // Later on we default to "us-east-1" if the region is not set on the S3 Tables - // connection, so we need to do the same check here. - let s3_tables_region = s3tables - .aws_connection - .connection - .region - .clone() - .unwrap_or_else(|| "us-east-1".to_string()); - if s3_tables_region != env_region { - Err(IcebergSinkPurificationError::S3TablesRegionMismatch { - s3_tables_region, - environment_region: env_region.to_string(), - })?; + // For S3 Tables connections in the Materialize Cloud product, verify the + // AWS region matches the environment's region. This check only applies when + // the enable_s3_tables_region_check dyncfg is set. + if let Some(s3tables) = connection.s3tables_catalog() { + let enable_region_check = + ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs()); + if enable_region_check { + let env_id = &catalog.config().environment_id; + if matches!(env_id.cloud_provider(), CloudProvider::Aws) { + let env_region = env_id.cloud_provider_region(); + // Later on we default to "us-east-1" if the region is not set on the S3 Tables + // connection, so we need to do the same check here. + let s3_tables_region = s3tables + .aws_connection + .connection + .region + .clone() + .unwrap_or_else(|| "us-east-1".to_string()); + if s3_tables_region != env_region { + Err(IcebergSinkPurificationError::S3TablesRegionMismatch { + s3_tables_region, + environment_region: env_region.to_string(), + })?; + } } } } + + let _sdk_config = aws_connection + .load_sdk_config( + &storage_configuration.connection_context, + aws_conn_id.clone(), + InTask::No, + mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES + .get(storage_configuration.config_set()), + ) + .await + .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?; } + // Now that we've validated the sink's storage creds (if they exist) + // we _could_ use them to build a complete Iceberg client (both catalog and storage). + // TODO(kynan): Actually use those sink-specific creds here instead of ignoring them. let _catalog = connection .connect(storage_configuration, InTask::No) .await .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?; - - let _sdk_config = aws_connection - .load_sdk_config( - &storage_configuration.connection_context, - aws_conn_id.clone(), - InTask::No, - mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES - .get(storage_configuration.config_set()), - ) - .await - .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?; } } diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 97d319f112685..31badd1be6c76 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -741,6 +741,8 @@ impl IcebergCatalogConnection { signing_name: "s3tables".to_string(), }); + // N.B. We're using the AWS credentials from the catalog connection for the storage layer + // even though the sink comes with its own (unused) AWS credentials for storage. let customized_credential_load = if matches!(aws_auth, AwsAuth::AssumeRole(_)) { Some(CustomAwsCredentialLoader::new(Arc::new( AwsSdkCredentialLoader::new(credentials_provider), diff --git a/src/storage-types/src/sinks.rs b/src/storage-types/src/sinks.rs index ccbaebccb97a6..ffd96c6d17fa7 100644 --- a/src/storage-types/src/sinks.rs +++ b/src/storage-types/src/sinks.rs @@ -687,8 +687,18 @@ pub fn iceberg_type_overrides( pub struct IcebergSinkConnection { pub catalog_connection_id: CatalogItemId, pub catalog_connection: C::IcebergCatalog, - pub aws_connection_id: CatalogItemId, - pub aws_connection: C::Aws, + + /// We allow users to specify a separate (from the catalog) connection + /// for the storage layer, but we currently ignore it. + /// S3 Tables uses the same AWS connection for catalog and storage. + /// BigLake/Lakehouse uses the same GCP connection for catalog and storage. + /// + /// TODO(kynan): Once we need separate storage creds, make this generic. + /// And check that the [`IcebergSinkConnection::alter_compatible`] + /// implementation still handles `storage_connection` acceptably. + pub storage_connection_id: Option, + pub storage_connection: Option, + /// A natural key of the sinked relation (view or source). pub relation_key_indices: Option>, /// The user-specified key for the sink. @@ -709,8 +719,8 @@ impl IcebergSinkConnection { let IcebergSinkConnection { catalog_connection_id: connection_id, catalog_connection, - aws_connection_id, - aws_connection, + storage_connection_id, + storage_connection, relation_key_indices, key_desc_and_indices, namespace, @@ -728,15 +738,24 @@ impl IcebergSinkConnection { .is_ok(), "catalog_connection", ), + // We don't use `storage_connection_id` and `storage_connection`, + // so allow them to be removed. ( - aws_connection_id == &other.aws_connection_id, - "aws_connection_id", + other.storage_connection_id.is_none() + || storage_connection_id == &other.storage_connection_id, + "storage_connection_id", ), ( - aws_connection - .alter_compatible(id, &other.aws_connection) - .is_ok(), - "aws_connection", + match &other.storage_connection { + None => true, // Removing a storage connection OR not adding a storage connection. + Some(after) => { + match storage_connection { + None => false, // Adding a storage connection where there wasn't one before. + Some(before) => before.alter_compatible(id, after).is_ok(), + } + } + }, + "storage_connection", ), ( relation_key_indices == &other.relation_key_indices, @@ -772,8 +791,8 @@ impl IntoInlineConnection let IcebergSinkConnection { catalog_connection_id, catalog_connection, - aws_connection_id, - aws_connection, + storage_connection_id, + storage_connection, relation_key_indices, key_desc_and_indices, namespace, @@ -784,8 +803,8 @@ impl IntoInlineConnection catalog_connection: r .resolve_connection(catalog_connection) .unwrap_iceberg_catalog(), - aws_connection_id, - aws_connection: r.resolve_connection(aws_connection).unwrap_aws(), + storage_connection_id, + storage_connection: storage_connection.map(|c| r.resolve_connection(c).unwrap_aws()), relation_key_indices, key_desc_and_indices, namespace, diff --git a/test/iceberg/catalog.td b/test/iceberg/catalog.td index 2401779ad2684..75896876c6c13 100644 --- a/test/iceberg/catalog.td +++ b/test/iceberg/catalog.td @@ -28,6 +28,8 @@ > insert into foo values (1, 'one'), (2, 'two'), (3, 'three'); +# NOTE: This sink has "USING AWS CONNECTION aws_conn", which we validate but don't use. +# It's included here just to make sure its presence doesn't break the sink. > CREATE SINK demo FROM foo INTO ICEBERG CATALOG CONNECTION polaris ( @@ -169,7 +171,6 @@ SELECT a, b FROM polaris.default_namespace.demo_table ORDER BY a NAMESPACE 'default_namespace', TABLE 'uint_table' ) - USING AWS CONNECTION aws_conn KEY (id) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); @@ -205,7 +206,6 @@ SELECT id, name FROM iceberg_scan('s3://test-bucket/default_namespace/uint_table NAMESPACE 'default_namespace', TABLE 'interval_table' ) - USING AWS CONNECTION aws_conn KEY (id) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); @@ -244,7 +244,6 @@ SELECT id, dur FROM iceberg_scan('s3://test-bucket/default_namespace/interval_ta NAMESPACE 'default_namespace', TABLE 'range_table' ) - USING AWS CONNECTION aws_conn KEY (id) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); diff --git a/test/iceberg/commit-conflict-setup.td b/test/iceberg/commit-conflict-setup.td index 80a07f86ee048..7c4fab6d9ae5f 100644 --- a/test/iceberg/commit-conflict-setup.td +++ b/test/iceberg/commit-conflict-setup.td @@ -13,13 +13,6 @@ > CREATE SECRET access_key_secret AS '${arg.s3-access-key}' -> CREATE CONNECTION aws_conn TO AWS ( - ACCESS KEY ID = 'tduser', - SECRET ACCESS KEY = SECRET access_key_secret, - ENDPOINT = '${arg.aws-endpoint}', - REGION = 'us-east-1' - ); - > CREATE CONNECTION polaris TO ICEBERG CATALOG ( CATALOG TYPE = 'REST', URL = 'http://polaris:8181/api/catalog', @@ -40,7 +33,6 @@ NAMESPACE 'default_namespace', TABLE 'conflict_table' ) - USING AWS CONNECTION aws_conn KEY (id) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '2s'); diff --git a/test/iceberg/key-validation.td b/test/iceberg/key-validation.td index 92ad6b3f97a04..6495ec85b3a9e 100644 --- a/test/iceberg/key-validation.td +++ b/test/iceberg/key-validation.td @@ -12,13 +12,6 @@ > CREATE SECRET IF NOT EXISTS access_key_secret AS '${arg.s3-access-key}' -> CREATE CONNECTION IF NOT EXISTS aws_conn TO AWS ( - ACCESS KEY ID = 'tduser', - SECRET ACCESS KEY = SECRET access_key_secret, - ENDPOINT = '${arg.aws-endpoint}', - REGION = 'us-east-1' - ); - > CREATE CONNECTION IF NOT EXISTS polaris TO ICEBERG CATALOG ( CATALOG TYPE = 'REST', URL = 'http://polaris:8181/api/catalog', @@ -36,7 +29,6 @@ NAMESPACE 'default_namespace', TABLE 'key_float_table' ) - USING AWS CONNECTION aws_conn KEY (k) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); @@ -51,7 +43,6 @@ contains:cannot be used as an Iceberg equality delete key NAMESPACE 'default_namespace', TABLE 'key_double_table' ) - USING AWS CONNECTION aws_conn KEY (k) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); @@ -66,7 +57,6 @@ contains:cannot be used as an Iceberg equality delete key NAMESPACE 'default_namespace', TABLE 'key_map_table' ) - USING AWS CONNECTION aws_conn KEY (k) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); @@ -81,7 +71,6 @@ contains:cannot be used as an Iceberg equality delete key NAMESPACE 'default_namespace', TABLE 'key_list_table' ) - USING AWS CONNECTION aws_conn KEY (k) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); @@ -97,7 +86,6 @@ contains:cannot be used as an Iceberg equality delete key NAMESPACE 'default_namespace', TABLE 'val_float_table' ) - USING AWS CONNECTION aws_conn KEY (k) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); diff --git a/test/iceberg/large-upsert-batch.td b/test/iceberg/large-upsert-batch.td index a34ba323b42e1..3d0b0c514dbaf 100644 --- a/test/iceberg/large-upsert-batch.td +++ b/test/iceberg/large-upsert-batch.td @@ -19,13 +19,6 @@ > CREATE SECRET access_key_secret AS '${arg.s3-access-key}' -> CREATE CONNECTION aws_conn TO AWS ( - ACCESS KEY ID = 'tduser', - SECRET ACCESS KEY = SECRET access_key_secret, - ENDPOINT = '${arg.aws-endpoint}', - REGION = 'us-east-1' - ); - > CREATE CONNECTION polaris TO ICEBERG CATALOG ( CATALOG TYPE = 'REST', URL = 'http://polaris:8181/api/catalog', @@ -42,7 +35,6 @@ NAMESPACE 'default_namespace', TABLE 'large_upsert_table' ) - USING AWS CONNECTION aws_conn KEY (key) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '120s'); diff --git a/test/iceberg/mode-append.td b/test/iceberg/mode-append.td index 07a248659194b..c9a068d25b43b 100644 --- a/test/iceberg/mode-append.td +++ b/test/iceberg/mode-append.td @@ -18,13 +18,6 @@ > CREATE SECRET IF NOT EXISTS append_access_key_secret AS '${arg.s3-access-key}' -> CREATE CONNECTION IF NOT EXISTS append_aws_conn TO AWS ( - ACCESS KEY ID = 'tduser', - SECRET ACCESS KEY = SECRET append_access_key_secret, - ENDPOINT = '${arg.aws-endpoint}', - REGION = 'us-east-1' - ); - > CREATE CONNECTION append_polaris TO ICEBERG CATALOG ( CATALOG TYPE = 'REST', URL = 'http://polaris:8181/api/catalog', @@ -44,7 +37,6 @@ NAMESPACE 'default_namespace', TABLE 'append_demo_table' ) - USING AWS CONNECTION append_aws_conn MODE APPEND WITH (COMMIT INTERVAL '1s'); @@ -121,7 +113,6 @@ SELECT a, b, _mz_diff FROM polaris.default_namespace.append_demo_table ORDER BY NAMESPACE 'default_namespace', TABLE 'append_clash_table' ) - USING AWS CONNECTION append_aws_conn MODE APPEND WITH (COMMIT INTERVAL '1s'); contains:conflicts with the system column that MODE APPEND adds to the Iceberg table @@ -134,7 +125,6 @@ contains:conflicts with the system column that MODE APPEND adds to the Iceberg t NAMESPACE 'default_namespace', TABLE 'append_clash_table_1' ) - USING AWS CONNECTION append_aws_conn MODE APPEND WITH (COMMIT INTERVAL '1s'); contains:conflicts with the system column that MODE APPEND adds to the Iceberg table diff --git a/test/iceberg/nested-records.td b/test/iceberg/nested-records.td index a63023a229867..774430bfdca88 100644 --- a/test/iceberg/nested-records.td +++ b/test/iceberg/nested-records.td @@ -11,13 +11,6 @@ > CREATE SECRET nested_access_key_secret AS '${arg.s3-access-key}' -> CREATE CONNECTION nested_aws_conn TO AWS ( - ACCESS KEY ID = 'tduser', - SECRET ACCESS KEY = SECRET nested_access_key_secret, - ENDPOINT = '${arg.aws-endpoint}', - REGION = 'us-east-1' - ); - > CREATE CONNECTION nested_polaris TO ICEBERG CATALOG ( CATALOG TYPE = 'REST', URL = 'http://polaris:8181/api/catalog', @@ -46,7 +39,6 @@ NAMESPACE 'default_namespace', TABLE 'nested_record_table' ) - USING AWS CONNECTION nested_aws_conn KEY (id) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); @@ -95,7 +87,6 @@ SELECT id, name, addr.street, addr.city, addr.zip FROM iceberg_scan('s3://test-b NAMESPACE 'default_namespace', TABLE 'deeply_nested_table' ) - USING AWS CONNECTION nested_aws_conn KEY (id) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); @@ -127,7 +118,6 @@ SELECT id, data.inner.x, data.inner.y, data.z FROM iceberg_scan('s3://test-bucke NAMESPACE 'default_namespace', TABLE 'nullable_record_table' ) - USING AWS CONNECTION nested_aws_conn KEY (id) NOT ENFORCED MODE UPSERT WITH (COMMIT INTERVAL '1s'); @@ -149,7 +139,6 @@ SELECT id, rec.required_field, rec.optional_field FROM iceberg_scan('s3://test-b NAMESPACE 'default_namespace', TABLE 'nested_record_append_table' ) - USING AWS CONNECTION nested_aws_conn MODE APPEND WITH (COMMIT INTERVAL '1s'); From f5d62c4ea445e6c34fc4d1b79de1f0bf2fa87407 Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Thu, 7 May 2026 14:23:51 -0400 Subject: [PATCH 3/4] Claude: GCP Connection, holds Service Account Key --- Cargo.lock | 92 ++++++++++++++++-- Cargo.toml | 1 + deny.toml | 18 +++- .../src/catalog/builtin_table_updates.rs | 1 + src/adapter/src/catalog/state.rs | 1 + src/adapter/src/coord/ddl.rs | 2 + src/sql-lexer/src/keywords.txt | 2 + src/sql-parser/src/ast/defs/ddl.rs | 8 ++ src/sql-parser/src/parser.rs | 19 ++-- src/sql-parser/tests/testdata/ddl | 7 ++ src/sql/src/plan.rs | 3 + src/sql/src/plan/statement/ddl.rs | 1 + src/sql/src/plan/statement/ddl/connection.rs | 10 ++ src/storage-types/Cargo.toml | 1 + src/storage-types/src/connections.rs | 10 ++ src/storage-types/src/connections/gcp.rs | 97 +++++++++++++++++++ 16 files changed, 259 insertions(+), 14 deletions(-) create mode 100644 src/storage-types/src/connections/gcp.rs diff --git a/Cargo.lock b/Cargo.lock index d456d1edecbaf..819f4e4352d24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2162,6 +2162,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -3670,6 +3680,32 @@ dependencies = [ "slab", ] +[[package]] +name = "gcp_auth" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2b3d0b409a042a380111af38136310839af8ac1a0917fb6e84515ed1e4bf3ee" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "chrono", + "http 1.4.0", + "http-body-util", + "hyper 1.9.0", + "hyper-rustls", + "hyper-util", + "ring", + "rustls-pki-types", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tracing", + "tracing-futures", + "url", +] + [[package]] name = "generator" version = "0.7.5" @@ -4183,6 +4219,7 @@ dependencies = [ "hyper 1.9.0", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -5478,10 +5515,10 @@ dependencies = [ "mz-ore", "mz-sql-parser", "open", - "openssl-probe", + "openssl-probe 0.1.6", "reqwest", "rpassword", - "security-framework", + "security-framework 2.10.0", "semver", "serde", "serde-aux", @@ -8242,6 +8279,7 @@ dependencies = [ "dec", "derivative", "differential-dataflow", + "gcp_auth", "hex", "http 1.4.0", "iceberg", @@ -8545,10 +8583,10 @@ dependencies = [ "libc", "log", "openssl", - "openssl-probe", + "openssl-probe 0.1.6", "openssl-sys", "schannel", - "security-framework", + "security-framework 2.10.0", "security-framework-sys", "tempfile", ] @@ -8954,6 +8992,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-probe" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" + [[package]] name = "openssl-src" version = "300.3.1+3.3.1" @@ -10789,12 +10833,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" dependencies = [ "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe 0.2.1", + "rustls-pki-types", + "schannel", + "security-framework 3.7.0", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -10940,7 +10997,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6" dependencies = [ "bitflags 1.3.2", - "core-foundation", + "core-foundation 0.9.3", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" +dependencies = [ + "bitflags 2.11.0", + "core-foundation 0.10.1", "core-foundation-sys", "libc", "security-framework-sys", @@ -11746,7 +11816,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" dependencies = [ "bitflags 2.11.0", - "core-foundation", + "core-foundation 0.9.3", "system-configuration-sys", ] @@ -12554,6 +12624,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 53a8df6cee545..1e5679419899a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -346,6 +346,7 @@ futures = "0.3.32" futures-core = "0.3.31" futures-task = "0.3.31" futures-util = "0.3.31" +gcp_auth = "0.12.6" glob = "0.3.3" globset = "0.4.18" governor = "0.10.1" diff --git a/deny.toml b/deny.toml index 44e2eab541bcb..a36382191b6a4 100644 --- a/deny.toml +++ b/deny.toml @@ -130,6 +130,19 @@ skip = [ { name = "hashbrown", version = "0.16.1" }, # Used by dynfmt; iceberg/typetag pulls in v0.4. { name = "erased-serde", version = "0.3.26" }, + # gcp_auth → hyper-rustls → rustls-native-certs pulls newer versions + # while native-tls still pulls older versions. + { name = "core-foundation", version = "0.10.1" }, + { name = "security-framework", version = "3.7.0" }, + { name = "openssl-probe", version = "0.2.1" }, + # reqsign (via iceberg-storage-opendal / opendal) pins older deps + # than the workspace. + { name = "jsonwebtoken", version = "9.3.1" }, + { name = "quick-xml", version = "0.37.5" }, + # aws-lc-rs (via jsonwebtoken 10) and ring pull different `untrusted`. + { name = "untrusted", version = "0.7.1" }, + # Held back by lazy_static 1.4.0 (used by num-bigint-dig). + { name = "spin", version = "0.5.2" }, ] [[bans.deny]] @@ -206,9 +219,11 @@ wrappers = [ ] # We prefer the system's native TLS or OpenSSL to Rustls, since they are more -# mature and more widely used. +# mature and more widely used. `gcp_auth` only ships with rustls-based TLS, +# so allow it through. [[bans.deny]] name = "rustls" +wrappers = ["hyper-rustls", "tokio-rustls"] # once_cell is going to be added to std, and doesn't use macros # Unfortunately, its heavily used, so we have lots of exceptions. @@ -219,6 +234,7 @@ wrappers = [ "findshlibs", "launchdarkly-server-sdk", "launchdarkly-server-sdk-evaluation", + "num-bigint-dig", "prometheus", "rayon-core", "sharded-slab", diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index 4c9b4e065903c..cfa477c1f1ece 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -1039,6 +1039,7 @@ impl CatalogState { updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff)); } ConnectionDetails::Csr(_) + | ConnectionDetails::Gcp(_) | ConnectionDetails::Postgres(_) | ConnectionDetails::MySql(_) | ConnectionDetails::SqlServer(_) diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index e32543b731747..22f37c4d7ad0d 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -2777,6 +2777,7 @@ impl ConnectionResolver for CatalogState { Ssh(conn) => Ssh(conn), Aws(conn) => Aws(conn), AwsPrivatelink(conn) => AwsPrivatelink(conn), + Gcp(conn) => Gcp(conn), MySql(conn) => MySql(conn.into_inline_connection(self)), SqlServer(conn) => SqlServer(conn.into_inline_connection(self)), IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)), diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index dfd1cc0dc72e1..bec4c8248b0cc 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1135,6 +1135,7 @@ impl Coordinator { ConnectionDetails::Csr(_) | ConnectionDetails::Ssh { .. } | ConnectionDetails::Aws(_) + | ConnectionDetails::Gcp(_) | ConnectionDetails::IcebergCatalog(_) => {} }, CatalogItem::Table(_) => { @@ -1311,6 +1312,7 @@ impl Coordinator { ConnectionDetails::Csr(_) | ConnectionDetails::Ssh { .. } | ConnectionDetails::Aws(_) + | ConnectionDetails::Gcp(_) | ConnectionDetails::IcebergCatalog(_) => {} } } diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index b5832107b0644..873b3c833feff 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -25,6 +25,7 @@ Abort Access +Account Action Add Added @@ -200,6 +201,7 @@ Full Fullname Function Fusion +Gcp Generator Grant Greatest diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 4beaf51fc602f..4984f6eddfbfb 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -791,6 +791,7 @@ pub enum ConnectionOptionName { Scope, SecretAccessKey, SecurityProtocol, + ServiceAccountKey, ServiceName, SshTunnel, SslCertificate, @@ -834,6 +835,7 @@ impl AstDisplay for ConnectionOptionName { ConnectionOptionName::Scope => "SCOPE", ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL", ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY", + ConnectionOptionName::ServiceAccountKey => "SERVICE ACCOUNT KEY", ConnectionOptionName::ServiceName => "SERVICE NAME", ConnectionOptionName::SshTunnel => "SSH TUNNEL", ConnectionOptionName::SslCertificate => "SSL CERTIFICATE", @@ -883,6 +885,7 @@ impl WithOptionName for ConnectionOptionName { | ConnectionOptionName::Scope | ConnectionOptionName::SecurityProtocol | ConnectionOptionName::SecretAccessKey + | ConnectionOptionName::ServiceAccountKey | ConnectionOptionName::ServiceName | ConnectionOptionName::SshTunnel | ConnectionOptionName::SslCertificate @@ -911,6 +914,7 @@ impl_display_t!(ConnectionOption); pub enum CreateConnectionType { Aws, AwsPrivatelink, + Gcp, Kafka, Csr, Postgres, @@ -928,6 +932,7 @@ impl CreateConnectionType { Self::Postgres => "postgres", Self::Aws => "aws", Self::AwsPrivatelink => "aws-privatelink", + Self::Gcp => "gcp", Self::Ssh => "ssh-tunnel", Self::MySql => "mysql", Self::SqlServer => "sql-server", @@ -954,6 +959,9 @@ impl AstDisplay for CreateConnectionType { Self::AwsPrivatelink => { f.write_str("AWS PRIVATELINK"); } + Self::Gcp => { + f.write_str("GCP"); + } Self::Ssh => { f.write_str("SSH TUNNEL"); } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 6232ebb693453..e76d4bbe2e8be 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -2456,9 +2456,9 @@ impl<'a> Parser<'a> { TO => true, _ => unreachable!(), }; - let connection_type = match self - .expect_one_of_keywords(&[AWS, KAFKA, CONFLUENT, POSTGRES, SSH, SQL, MYSQL, ICEBERG])? - { + let connection_type = match self.expect_one_of_keywords(&[ + AWS, GCP, KAFKA, CONFLUENT, POSTGRES, SSH, SQL, MYSQL, ICEBERG, + ])? { AWS => { if self.parse_keyword(PRIVATELINK) { CreateConnectionType::AwsPrivatelink @@ -2466,6 +2466,7 @@ impl<'a> Parser<'a> { CreateConnectionType::Aws } } + GCP => CreateConnectionType::Gcp, KAFKA => CreateConnectionType::Kafka, CONFLUENT => { self.expect_keywords(&[SCHEMA, REGISTRY])?; @@ -2873,10 +2874,14 @@ impl<'a> Parser<'a> { self.expect_keywords(&[ACCESS, KEY])?; ConnectionOptionName::SecretAccessKey } - SERVICE => { - self.expect_keyword(NAME)?; - ConnectionOptionName::ServiceName - } + SERVICE => match self.expect_one_of_keywords(&[ACCOUNT, NAME])? { + ACCOUNT => { + self.expect_keyword(KEY)?; + ConnectionOptionName::ServiceAccountKey + } + NAME => ConnectionOptionName::ServiceName, + _ => unreachable!(), + }, SESSION => { self.expect_keyword(TOKEN)?; ConnectionOptionName::SessionToken diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 4a98459b0ffd1..e5bc38f3656dd 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -561,6 +561,13 @@ CREATE CONNECTION privatelinkconn TO AWS PRIVATELINK (SERVICE NAME = 'com.amazon => CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("privatelinkconn")]), connection_type: AwsPrivatelink, if_not_exists: false, values: [ConnectionOption { name: ServiceName, value: Some(Value(String("com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc"))) }, ConnectionOption { name: AvailabilityZones, value: Some(Sequence([Value(String("use1-az1")), Value(String("use1-az4"))])) }], with_options: [] }) +parse-statement +CREATE CONNECTION gcpconn TO GCP (SERVICE ACCOUNT KEY = SECRET keyfile) +---- +CREATE CONNECTION gcpconn TO GCP (SERVICE ACCOUNT KEY = SECRET keyfile) +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("gcpconn")]), connection_type: Gcp, if_not_exists: false, values: [ConnectionOption { name: ServiceAccountKey, value: Some(Secret(Name(UnresolvedItemName([Ident("keyfile")])))) }], with_options: [] }) + parse-statement CREATE CONNECTION privatelinkconn TO AWS PRIVATELINK (SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc', AVAILABILITY ZONES ('use1-az1', 'use1-az4')) WITH (VALIDATE = FALSE) ---- diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 6a885020b7486..a99d4535b3af7 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -57,6 +57,7 @@ use mz_sql_parser::ast::{ }; use mz_ssh_util::keys::SshKeyPair; use mz_storage_types::connections::aws::AwsConnection; +use mz_storage_types::connections::gcp::GcpConnection; use mz_storage_types::connections::inline::ReferencedConnection; use mz_storage_types::connections::{ AwsPrivatelinkConnection, CsrConnection, IcebergCatalogConnection, KafkaConnection, @@ -1676,6 +1677,7 @@ pub enum ConnectionDetails { }, Aws(AwsConnection), AwsPrivatelink(AwsPrivatelinkConnection), + Gcp(GcpConnection), MySql(MySqlConnection), SqlServer(SqlServerConnectionDetails), IcebergCatalog(IcebergCatalogConnection), @@ -1698,6 +1700,7 @@ impl ConnectionDetails { ConnectionDetails::AwsPrivatelink(c) => { mz_storage_types::connections::Connection::AwsPrivatelink(c.clone()) } + ConnectionDetails::Gcp(c) => mz_storage_types::connections::Connection::Gcp(c.clone()), ConnectionDetails::MySql(c) => { mz_storage_types::connections::Connection::MySql(c.clone()) } diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index af3aab6d4e517..b226547b47908 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -7017,6 +7017,7 @@ pub fn plan_alter_connection( let connection_type = match connection { Connection::Aws(_) => CreateConnectionType::Aws, Connection::AwsPrivatelink(_) => CreateConnectionType::AwsPrivatelink, + Connection::Gcp(_) => CreateConnectionType::Gcp, Connection::Kafka(_) => CreateConnectionType::Kafka, Connection::Csr(_) => CreateConnectionType::Csr, Connection::Postgres(_) => CreateConnectionType::Postgres, diff --git a/src/sql/src/plan/statement/ddl/connection.rs b/src/sql/src/plan/statement/ddl/connection.rs index 81825849f7247..e55271857cc8a 100644 --- a/src/sql/src/plan/statement/ddl/connection.rs +++ b/src/sql/src/plan/statement/ddl/connection.rs @@ -29,6 +29,7 @@ use mz_ssh_util::keys::SshKeyPair; use mz_storage_types::connections::aws::{ AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials, }; +use mz_storage_types::connections::gcp::GcpConnection; use mz_storage_types::connections::inline::ReferencedConnection; use mz_storage_types::connections::string_or_secret::StringOrSecret; use mz_storage_types::connections::{ @@ -72,6 +73,7 @@ generate_extracted_config!( (Scope, String), (SecretAccessKey, with_options::Secret), (SecurityProtocol, String), + (ServiceAccountKey, with_options::Secret), (ServiceName, String), (SshTunnel, with_options::Object), (SslCertificate, StringOrSecret), @@ -115,6 +117,7 @@ pub(super) fn validate_options_per_connection_type( ] .as_slice(), CreateConnectionType::AwsPrivatelink => &[AvailabilityZones, Port, ServiceName], + CreateConnectionType::Gcp => &[ServiceAccountKey], CreateConnectionType::Csr => &[ AwsPrivatelink, Password, @@ -312,6 +315,13 @@ impl ConnectionOptionExtracted { } ConnectionDetails::AwsPrivatelink(connection) } + CreateConnectionType::Gcp => { + let credentials_json = self + .service_account_key + .ok_or_else(|| sql_err!("SERVICE ACCOUNT KEY option is required"))? + .into(); + ConnectionDetails::Gcp(GcpConnection { credentials_json }) + } CreateConnectionType::Kafka => { let (tls, sasl) = plan_kafka_security(scx, &self)?; let (static_brokers, matching_rules) = self.get_brokers_and_rules(scx)?; diff --git a/src/storage-types/Cargo.toml b/src/storage-types/Cargo.toml index f8de6fa6657fe..156a7f35ff93a 100644 --- a/src/storage-types/Cargo.toml +++ b/src/storage-types/Cargo.toml @@ -28,6 +28,7 @@ columnation.workspace = true dec.workspace = true derivative.workspace = true differential-dataflow.workspace = true +gcp_auth.workspace = true hex.workspace = true http.workspace = true itertools.workspace = true diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 31badd1be6c76..a32572311076b 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -80,6 +80,7 @@ use crate::dyncfgs::{ use crate::errors::{ContextCreationError, CsrConnectError}; pub mod aws; +pub mod gcp; pub mod inline; pub mod string_or_secret; @@ -355,6 +356,7 @@ pub enum Connection { Ssh(SshConnection), Aws(AwsConnection), AwsPrivatelink(AwsPrivatelinkConnection), + Gcp(gcp::GcpConnection), MySql(MySqlConnection), SqlServer(SqlServerConnectionDetails), IcebergCatalog(IcebergCatalogConnection), @@ -371,6 +373,7 @@ impl IntoInlineConnection Connection::Ssh(ssh) => Connection::Ssh(ssh), Connection::Aws(aws) => Connection::Aws(aws), Connection::AwsPrivatelink(awspl) => Connection::AwsPrivatelink(awspl), + Connection::Gcp(gcp) => Connection::Gcp(gcp), Connection::MySql(mysql) => Connection::MySql(mysql.into_inline_connection(r)), Connection::SqlServer(sql_server) => { Connection::SqlServer(sql_server.into_inline_connection(r)) @@ -392,6 +395,7 @@ impl Connection { Connection::Ssh(conn) => conn.validate_by_default(), Connection::Aws(conn) => conn.validate_by_default(), Connection::AwsPrivatelink(conn) => conn.validate_by_default(), + Connection::Gcp(conn) => conn.validate_by_default(), Connection::MySql(conn) => conn.validate_by_default(), Connection::SqlServer(conn) => conn.validate_by_default(), Connection::IcebergCatalog(conn) => conn.validate_by_default(), @@ -415,6 +419,7 @@ impl Connection { Connection::Ssh(conn) => conn.validate(id, storage_configuration).await?, Connection::Aws(conn) => conn.validate(id, storage_configuration).await?, Connection::AwsPrivatelink(conn) => conn.validate(id, storage_configuration).await?, + Connection::Gcp(conn) => conn.validate(id, storage_configuration).await?, Connection::MySql(conn) => { conn.validate(id, storage_configuration).await?; } @@ -494,6 +499,8 @@ pub enum ConnectionValidationError { SqlServer(#[from] SqlServerConnectionValidationError), #[error(transparent)] Aws(#[from] AwsConnectionValidationError), + #[error(transparent)] + Gcp(#[from] gcp::GcpConnectionValidationError), #[error("{}", .0.display_with_causes())] Other(#[from] anyhow::Error), } @@ -506,6 +513,7 @@ impl ConnectionValidationError { ConnectionValidationError::MySql(e) => e.detail(), ConnectionValidationError::SqlServer(e) => e.detail(), ConnectionValidationError::Aws(e) => e.detail(), + ConnectionValidationError::Gcp(e) => e.detail(), ConnectionValidationError::Other(_) => None, } } @@ -517,6 +525,7 @@ impl ConnectionValidationError { ConnectionValidationError::MySql(e) => e.hint(), ConnectionValidationError::SqlServer(e) => e.hint(), ConnectionValidationError::Aws(e) => e.hint(), + ConnectionValidationError::Gcp(e) => e.hint(), ConnectionValidationError::Other(_) => None, } } @@ -527,6 +536,7 @@ impl AlterCompatible for Connection { match (self, other) { (Self::Aws(s), Self::Aws(o)) => s.alter_compatible(id, o), (Self::AwsPrivatelink(s), Self::AwsPrivatelink(o)) => s.alter_compatible(id, o), + (Self::Gcp(s), Self::Gcp(o)) => s.alter_compatible(id, o), (Self::Ssh(s), Self::Ssh(o)) => s.alter_compatible(id, o), (Self::Csr(s), Self::Csr(o)) => s.alter_compatible(id, o), (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o), diff --git a/src/storage-types/src/connections/gcp.rs b/src/storage-types/src/connections/gcp.rs new file mode 100644 index 0000000000000..65a6bf0bef4fa --- /dev/null +++ b/src/storage-types/src/connections/gcp.rs @@ -0,0 +1,97 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! GCP configuration for sources and sinks. + +use gcp_auth::{CustomServiceAccount, TokenProvider}; +use mz_ore::error::ErrorExt; +use mz_repr::{CatalogItemId, GlobalId}; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; + +use crate::AlterCompatible; +use crate::configuration::StorageConfiguration; +use crate::controller::AlterError; + +/// Scope used when probing the credentials during validation. Picked because +/// every service-account key is allowed to mint tokens for it, so a successful +/// response confirms the key is well-formed and accepted by Google's token +/// endpoint without requiring any specific IAM grants on the service account. +const VALIDATION_SCOPE: &str = "https://www.googleapis.com/auth/cloud-platform"; + +/// GCP connection configuration. +#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub struct GcpConnection { + /// Secret containing a GCP service-account key in JSON format, + /// as produced by `gcloud iam service-accounts keys create`. + pub credentials_json: CatalogItemId, +} + +impl AlterCompatible for GcpConnection { + fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> { + // Every element of the GCP connection is configurable. + Ok(()) + } +} + +impl GcpConnection { + /// Validates this connection by reading the service-account key out of the + /// secrets store, parsing it, and exchanging it for an OAuth2 access token + /// at Google's token endpoint. + pub(crate) async fn validate( + &self, + _id: CatalogItemId, + storage_configuration: &StorageConfiguration, + ) -> Result<(), GcpConnectionValidationError> { + let json = storage_configuration + .connection_context + .secrets_reader + .read_string(self.credentials_json) + .await + .map_err(GcpConnectionValidationError::SecretRead)?; + let service_account = CustomServiceAccount::from_json(&json) + .map_err(GcpConnectionValidationError::ParseKey)?; + service_account + .token(&[VALIDATION_SCOPE]) + .await + .map_err(GcpConnectionValidationError::FetchToken)?; + Ok(()) + } + + pub(crate) fn validate_by_default(&self) -> bool { + false + } +} + +/// An error returned by `GcpConnection::validate`. +#[derive(thiserror::Error, Debug)] +pub enum GcpConnectionValidationError { + #[error("failed to read service-account key from secret store: {}", .0.display_with_causes())] + SecretRead(#[source] anyhow::Error), + #[error("failed to parse service-account key JSON: {}", .0.display_with_causes())] + ParseKey(#[source] gcp_auth::Error), + #[error("failed to obtain access token from Google: {}", .0.display_with_causes())] + FetchToken(#[source] gcp_auth::Error), +} + +impl GcpConnectionValidationError { + pub fn detail(&self) -> Option { + None + } + + pub fn hint(&self) -> Option { + match self { + GcpConnectionValidationError::ParseKey(_) => Some( + "The secret must hold the JSON output of `gcloud iam service-accounts keys create`." + .into(), + ), + _ => None, + } + } +} From dfbced7b9555e5dc49df7c469061b1fa6dfb5552 Mon Sep 17 00:00:00 2001 From: Kynan Rilee Date: Mon, 18 May 2026 16:25:46 -0400 Subject: [PATCH 4/4] Kynan: Iceberg REST catalog can use a GCP connection --- Cargo.lock | 110 +++++++++++++- Cargo.toml | 2 +- src/sql-parser/src/ast/defs/ddl.rs | 3 + src/sql-parser/src/parser.rs | 6 + src/sql-parser/tests/testdata/ddl | 9 +- src/sql/src/plan/statement/ddl/connection.rs | 60 ++++++-- src/storage-types/src/connections.rs | 148 +++++++++++++++---- src/storage-types/src/connections/gcp.rs | 95 ++++++++++-- src/storage-types/src/connections/inline.rs | 10 ++ 9 files changed, 378 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 819f4e4352d24..2bd538430253e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,17 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.7.8" @@ -1529,6 +1540,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "blocking" version = "1.6.1" @@ -1773,6 +1793,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.2.60" @@ -4546,6 +4575,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" dependencies = [ + "block-padding", "generic-array", ] @@ -4773,6 +4803,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "jsonwebtoken" version = "10.3.0" @@ -5714,7 +5759,7 @@ dependencies = [ name = "mz-authenticator" version = "0.1.0" dependencies = [ - "jsonwebtoken", + "jsonwebtoken 10.3.0", "mz-adapter", "mz-adapter-types", "mz-auth", @@ -5812,7 +5857,7 @@ dependencies = [ "hyper 1.9.0", "hyper-openssl", "hyper-util", - "jsonwebtoken", + "jsonwebtoken 10.3.0", "launchdarkly-server-sdk", "mz-alloc", "mz-alloc-default", @@ -6415,7 +6460,7 @@ dependencies = [ "insta", "ipnet", "itertools 0.14.0", - "jsonwebtoken", + "jsonwebtoken 10.3.0", "maplit", "mime", "mz-adapter", @@ -6683,7 +6728,7 @@ dependencies = [ "clap", "derivative", "futures", - "jsonwebtoken", + "jsonwebtoken 10.3.0", "lru", "mz-auth", "mz-ore", @@ -6704,7 +6749,7 @@ dependencies = [ name = "mz-frontegg-client" version = "0.0.0" dependencies = [ - "jsonwebtoken", + "jsonwebtoken 10.3.0", "mz-frontegg-auth", "mz-ore", "reqwest", @@ -6727,7 +6772,7 @@ dependencies = [ "chrono", "clap", "hyper 1.9.0", - "jsonwebtoken", + "jsonwebtoken 10.3.0", "mz-frontegg-auth", "mz-ore", "openssl", @@ -6839,7 +6884,7 @@ dependencies = [ "aws-sdk-kms", "base64 0.22.1", "clap", - "jsonwebtoken", + "jsonwebtoken 10.3.0", "mz-aws-util", "mz-ore", "pem", @@ -6976,7 +7021,7 @@ dependencies = [ "anyhow", "axum", "base64 0.22.1", - "jsonwebtoken", + "jsonwebtoken 10.3.0", "mz-ore", "openssl", "reqwest", @@ -9303,6 +9348,16 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "pem" version = "3.0.6" @@ -9565,6 +9620,21 @@ dependencies = [ "spki", ] +[[package]] +name = "pkcs5" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6" +dependencies = [ + "aes", + "cbc", + "der", + "pbkdf2", + "scrypt", + "sha2", + "spki", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -9572,6 +9642,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ "der", + "pkcs5", + "rand_core 0.6.4", "spki", ] @@ -10518,11 +10590,13 @@ dependencies = [ "hmac", "home", "http 1.4.0", + "jsonwebtoken 9.3.1", "log", "percent-encoding", "quick-xml 0.37.5", "rand 0.8.5", "reqwest", + "rsa", "rust-ini", "serde", "serde_json", @@ -10893,6 +10967,15 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "salsa20" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" +dependencies = [ + "cipher", +] + [[package]] name = "same-file" version = "1.0.4" @@ -10961,6 +11044,17 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3cf7c11c38cb994f3d40e8a8cde3bbd1f72a435e4c49e85d6553d8312306152" +[[package]] +name = "scrypt" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" +dependencies = [ + "pbkdf2", + "salsa20", + "sha2", +] + [[package]] name = "seahash" version = "4.1.0" diff --git a/Cargo.toml b/Cargo.toml index 1e5679419899a..cdfcfff9d5898 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -369,7 +369,7 @@ hyper-util = "0.1.20" tower-service = "0.3.3" iceberg = "0.9.0" iceberg-catalog-rest = "0.9.0" -iceberg-storage-opendal = { version = "0.9.0", default-features = false, features = ["opendal-s3"] } +iceberg-storage-opendal = { version = "0.9.0", default-features = false, features = ["opendal-s3", "opendal-gcs"] } imbl = { version = "7.0.0", features = ["serde"] } include_dir = "0.7.4" indexmap = { version = "2.10.0", default-features = false, features = ["std"] } diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 4984f6eddfbfb..fc9cac0630d31 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -777,6 +777,7 @@ pub enum ConnectionOptionName { Credential, Database, Endpoint, + GcpConnection, Host, Password, Port, @@ -817,6 +818,7 @@ impl AstDisplay for ConnectionOptionName { ConnectionOptionName::Credential => "CREDENTIAL", ConnectionOptionName::Database => "DATABASE", ConnectionOptionName::Endpoint => "ENDPOINT", + ConnectionOptionName::GcpConnection => "GCP CONNECTION", ConnectionOptionName::Host => "HOST", ConnectionOptionName::Password => "PASSWORD", ConnectionOptionName::Port => "PORT", @@ -869,6 +871,7 @@ impl WithOptionName for ConnectionOptionName { | ConnectionOptionName::Credential | ConnectionOptionName::Database | ConnectionOptionName::Endpoint + | ConnectionOptionName::GcpConnection | ConnectionOptionName::Host | ConnectionOptionName::Password | ConnectionOptionName::Port diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index e76d4bbe2e8be..24520bb39ff64 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -2787,6 +2787,7 @@ impl<'a> Parser<'a> { CREDENTIAL, DATABASE, ENDPOINT, + GCP, HOST, PASSWORD, PORT, @@ -2840,6 +2841,10 @@ impl<'a> Parser<'a> { CREDENTIAL => ConnectionOptionName::Credential, DATABASE => ConnectionOptionName::Database, ENDPOINT => ConnectionOptionName::Endpoint, + GCP => { + self.expect_keyword(CONNECTION)?; + ConnectionOptionName::GcpConnection + } HOST => ConnectionOptionName::Host, PASSWORD => ConnectionOptionName::Password, PORT => ConnectionOptionName::Port, @@ -2920,6 +2925,7 @@ impl<'a> Parser<'a> { ConnectionOptionName::Brokers => Some(WithOptionValue::Sequence( self.parse_list_value(Parser::parse_kafka_broker_or_matching_rule)?, )), + ConnectionOptionName::GcpConnection => Some(self.parse_object_option_value()?), ConnectionOptionName::SshTunnel => Some(self.parse_object_option_value()?), _ => self.parse_optional_option_value()?, }; diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index e5bc38f3656dd..5b04b72e850ad 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -596,6 +596,13 @@ CREATE CONNECTION icebergcatalog TO ICEBERG CATALOG (CATALOG TYPE = 's3tablesres => CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("icebergcatalog")]), connection_type: IcebergCatalog, if_not_exists: false, values: [ConnectionOption { name: CatalogType, value: Some(Value(String("s3tablesrest"))) }, ConnectionOption { name: AwsConnection, value: Some(Item(Name(UnresolvedItemName([Ident("awsconn")])))) }, ConnectionOption { name: Warehouse, value: Some(Value(String("wh"))) }], with_options: [] }) +parse-statement +CREATE CONNECTION biglake TO ICEBERG CATALOG (CATALOG TYPE = 'rest', URL = 'https://biglake.googleapis.com/iceberg/v1/restcatalog', GCP CONNECTION = gcpconn, WAREHOUSE = 'gs://iceberg-test') +---- +CREATE CONNECTION biglake TO ICEBERG CATALOG (CATALOG TYPE = 'rest', URL = 'https://biglake.googleapis.com/iceberg/v1/restcatalog', GCP CONNECTION = gcpconn, WAREHOUSE = 'gs://iceberg-test') +=> +CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("biglake")]), connection_type: IcebergCatalog, if_not_exists: false, values: [ConnectionOption { name: CatalogType, value: Some(Value(String("rest"))) }, ConnectionOption { name: Url, value: Some(Value(String("https://biglake.googleapis.com/iceberg/v1/restcatalog"))) }, ConnectionOption { name: GcpConnection, value: Some(Item(Name(UnresolvedItemName([Ident("gcpconn")])))) }, ConnectionOption { name: Warehouse, value: Some(Value(String("gs://iceberg-test"))) }], with_options: [] }) + parse-statement CREATE CONNECTION pgconn FOR postgres HOST foo, PORT 1234, SSL CERTIFICATE AUTHORITY 'foo', SSH TUNNEL tun, DATABASE 'db', PASSWORD 'pw', SSL CERTIFICATE 'cert', SSL KEY 'key', SSL MODE 'mode', USER 'postgres' ---- @@ -2834,7 +2841,7 @@ CreateConnection(CreateConnectionStatement { name: UnresolvedItemName([Ident("my parse-statement CREATE CONNECTION my_ssh_tunnel FOR SSH TUNNEL (PUBLIC KEY 3 = nope) ---- -error: Expected one of ACCESS or ASSUME or AVAILABILITY or AWS or BROKER or BROKERS or CATALOG or CREDENTIAL or DATABASE or ENDPOINT or HOST or PASSWORD or PORT or PUBLIC or PROGRESS or REGION or ROLE or SASL or SCOPE or SECRET or SECURITY or SERVICE or SESSION or SSH or SSL or URL or USER or USERNAME or WAREHOUSE, found left parenthesis +error: Expected one of ACCESS or ASSUME or AVAILABILITY or AWS or BROKER or BROKERS or CATALOG or CREDENTIAL or DATABASE or ENDPOINT or GCP or HOST or PASSWORD or PORT or PUBLIC or PROGRESS or REGION or ROLE or SASL or SCOPE or SECRET or SECURITY or SERVICE or SESSION or SSH or SSL or URL or USER or USERNAME or WAREHOUSE, found left parenthesis CREATE CONNECTION my_ssh_tunnel FOR SSH TUNNEL (PUBLIC KEY 3 = nope) ^ diff --git a/src/sql/src/plan/statement/ddl/connection.rs b/src/sql/src/plan/statement/ddl/connection.rs index e55271857cc8a..8f51e2fa03f9c 100644 --- a/src/sql/src/plan/statement/ddl/connection.rs +++ b/src/sql/src/plan/statement/ddl/connection.rs @@ -29,15 +29,16 @@ use mz_ssh_util::keys::SshKeyPair; use mz_storage_types::connections::aws::{ AwsAssumeRole, AwsAuth, AwsConnection, AwsConnectionReference, AwsCredentials, }; -use mz_storage_types::connections::gcp::GcpConnection; +use mz_storage_types::connections::gcp::{GcpConnection, GcpConnectionReference}; use mz_storage_types::connections::inline::ReferencedConnection; use mz_storage_types::connections::string_or_secret::StringOrSecret; use mz_storage_types::connections::{ AwsPrivatelink, AwsPrivatelinkConnection, AwsPrivatelinkRule, CsrConnection, - CsrConnectionHttpAuth, IcebergCatalogConnection, IcebergCatalogImpl, IcebergCatalogType, - KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, MySqlConnection, - MySqlSslMode, PostgresConnection, RestIcebergCatalog, S3TablesRestIcebergCatalog, - SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity, Tunnel, + CsrConnectionHttpAuth, IcebergCatalogAuth, IcebergCatalogConnection, IcebergCatalogImpl, + IcebergCatalogType, KafkaConnection, KafkaSaslConfig, KafkaTlsConfig, KafkaTopicOptions, + MySqlConnection, MySqlSslMode, PostgresConnection, RestIcebergCatalog, + S3TablesRestIcebergCatalog, SqlServerConnectionDetails, SshConnection, SshTunnel, TlsIdentity, + Tunnel, }; use crate::names::Aug; @@ -59,6 +60,7 @@ generate_extracted_config!( (Credential, StringOrSecret), (Database, String), (Endpoint, String), + (GcpConnection, with_options::Object), (Host, String), (Password, with_options::Secret), (Port, u16), @@ -189,6 +191,7 @@ pub(super) fn validate_options_per_connection_type( AwsConnection, CatalogType, Credential, + GcpConnection, Scope, Url, Warehouse, @@ -670,9 +673,15 @@ impl ConnectionOptionExtracted { let warehouse = self.warehouse.clone(); let credential = self.credential.clone(); let aws_connection = get_aws_connection_reference(scx, &self)?; + let gcp_connection = get_gcp_connection_reference(scx, &self)?; let catalog = match catalog_type { IcebergCatalogType::S3TablesRest => { + if gcp_connection.is_some() { + sql_bail!( + "invalid CONNECTION: ICEBERG s3tablesrest connections do not support GCP CONNECTION" + ); + } let Some(warehouse) = warehouse else { sql_bail!( "invalid CONNECTION: ICEBERG s3tablesrest connections must specify WAREHOUSE" @@ -690,17 +699,26 @@ impl ConnectionOptionExtracted { }) } IcebergCatalogType::Rest => { - let Some(credential) = credential else { + if aws_connection.is_some() { sql_bail!( - "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL" + "invalid CONNECTION: ICEBERG rest connections do not support AWS CONNECTION.\n\nTry s3tablesrest instead." ); + } + let auth = match (credential, gcp_connection) { + (Some(_), Some(_)) => sql_bail!( + "invalid CONNECTION: ICEBERG rest connections may set CREDENTIAL or GCP CONNECTION, not both" + ), + (Some(credential), None) => IcebergCatalogAuth::OAuth { + credential, + scope: self.scope.clone(), + }, + (None, Some(gcp_connection)) => IcebergCatalogAuth::Gcp(gcp_connection), + (None, None) => sql_bail!( + "invalid CONNECTION: ICEBERG rest connections require a CREDENTIAL or GCP CONNECTION" + ), }; - IcebergCatalogImpl::Rest(RestIcebergCatalog { - credential, - scope: self.scope.clone(), - warehouse, - }) + IcebergCatalogImpl::Rest(RestIcebergCatalog { auth, warehouse }) } }; @@ -818,6 +836,24 @@ fn get_aws_connection_reference( _ => sql_bail!("{} is not an AWS connection", item.name().item), }) } +fn get_gcp_connection_reference( + scx: &StatementContext, + conn_options: &ConnectionOptionExtracted, +) -> Result>, PlanError> { + let Some(gcp_connection_id) = conn_options.gcp_connection else { + return Ok(None); + }; + + let id = CatalogItemId::from(gcp_connection_id); + let item = scx.catalog.get_item(&id); + Ok(match item.connection()? { + Connection::Gcp(_) => Some(GcpConnectionReference { + connection_id: id, + connection: id, + }), + _ => sql_bail!("{} is not a GCP connection", item.name().item), + }) +} fn plan_kafka_security( scx: &StatementContext, diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index a32572311076b..3a53385bec59f 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -22,10 +22,14 @@ use aws_sigv4::http_request::{SignableBody, SignableRequest, SigningSettings, si use aws_sigv4::sign::v4; // Aliased to avoid colliding with `mz_ccsr::tls::Identity`. use aws_smithy_runtime_api::client::identity::Identity as AwsIdentity; +use base64::Engine; use http::{HeaderName, HeaderValue}; use iceberg::Catalog; use iceberg::CatalogBuilder; -use iceberg::io::{S3_ACCESS_KEY_ID, S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::io::{ + GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, GCS_DISABLE_VM_METADATA, GCS_USER_PROJECT, + S3_ACCESS_KEY_ID, S3_DISABLE_EC2_METADATA, S3_REGION, S3_SECRET_ACCESS_KEY, +}; use iceberg_catalog_rest::{ REST_CATALOG_PROP_URI, REST_CATALOG_PROP_WAREHOUSE, RequestAuthenticator, RestCatalogBuilder, }; @@ -70,6 +74,7 @@ use crate::configuration::StorageConfiguration; use crate::connections::aws::{ AwsAuth, AwsConnection, AwsConnectionReference, AwsConnectionValidationError, }; +use crate::connections::gcp::{GcpConnectionReference, GcpTokenProvider}; use crate::connections::string_or_secret::StringOrSecret; use crate::controller::AlterError; use crate::dyncfgs::{ @@ -466,6 +471,13 @@ impl Connection { } } + pub fn unwrap_gcp(self) -> ::Gcp { + match self { + Self::Gcp(conn) => conn, + o => unreachable!("{o:?} is not a GCP connection"), + } + } + pub fn unwrap_ssh(self) -> ::Ssh { match self { Self::Ssh(conn) => conn, @@ -554,12 +566,22 @@ impl AlterCompatible for Connection { } } +/// Auth mechanism for Iceberg REST catalogs. +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub enum IcebergCatalogAuth { + /// Use Iceberg catalog REST API's standard OAuth flow. + OAuth { + /// client_id:client_secret + credential: StringOrSecret, + /// OAuth2 scope + scope: Option, + }, + Gcp(GcpConnectionReference), +} + #[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] -pub struct RestIcebergCatalog { - /// For REST catalogs, the oauth2 credential in a `CLIENT_ID:CLIENT_SECRET` format - pub credential: StringOrSecret, - /// The oauth2 scope for REST catalogs - pub scope: Option, +pub struct RestIcebergCatalog { + pub auth: IcebergCatalogAuth, /// The warehouse for REST catalogs pub warehouse: Option, } @@ -572,6 +594,30 @@ pub struct S3TablesRestIcebergCatalog { pub warehouse: String, } +impl IntoInlineConnection + for IcebergCatalogAuth +{ + fn into_inline_connection(self, r: R) -> IcebergCatalogAuth { + match self { + IcebergCatalogAuth::Gcp(x) => IcebergCatalogAuth::Gcp(x.into_inline_connection(&r)), + IcebergCatalogAuth::OAuth { credential, scope } => { + IcebergCatalogAuth::OAuth { credential, scope } + } + } + } +} + +impl IntoInlineConnection + for RestIcebergCatalog +{ + fn into_inline_connection(self, r: R) -> RestIcebergCatalog { + RestIcebergCatalog { + auth: self.auth.into_inline_connection(&r), + warehouse: self.warehouse, + } + } +} + impl IntoInlineConnection for S3TablesRestIcebergCatalog { @@ -591,7 +637,7 @@ pub enum IcebergCatalogType { #[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] pub enum IcebergCatalogImpl { - Rest(RestIcebergCatalog), + Rest(RestIcebergCatalog), S3TablesRest(S3TablesRestIcebergCatalog), } @@ -600,7 +646,9 @@ impl IntoInlineConnection { fn into_inline_connection(self, r: R) -> IcebergCatalogImpl { match self { - IcebergCatalogImpl::Rest(rest) => IcebergCatalogImpl::Rest(rest), + IcebergCatalogImpl::Rest(rest) => { + IcebergCatalogImpl::Rest(rest.into_inline_connection(r)) + } IcebergCatalogImpl::S3TablesRest(s3tables) => { IcebergCatalogImpl::S3TablesRest(s3tables.into_inline_connection(r)) } @@ -797,29 +845,71 @@ impl IcebergCatalogConnection { props.insert(REST_CATALOG_PROP_WAREHOUSE.to_string(), warehouse.clone()); } - let credential = rest - .credential - .get_string( - in_task, - &storage_configuration.connection_context.secrets_reader, - ) - .await - .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?; - props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential); + // Catalog auth is configured through a combination of `props` and `.with_authenticator(...)`, + // which happen at different stages of the [`RestCatalogBuilder`] -> [`RestCatalog`] + // construction pipeline. + let (storage_factory, custom_authenticator) = match &rest.auth { + IcebergCatalogAuth::OAuth { credential, scope } => { + let credential = credential + .get_string( + in_task, + &storage_configuration.connection_context.secrets_reader, + ) + .await + .map_err(|e| anyhow!("failed to read Iceberg catalog credential: {e}"))?; + props.insert(REST_CATALOG_PROP_CREDENTIAL.to_string(), credential); - if let Some(scope) = &rest.scope { - props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone()); - } + if let Some(scope) = scope { + props.insert(REST_CATALOG_PROP_SCOPE.to_string(), scope.clone()); + } + ( + OpenDalStorageFactory::S3 { + configured_scheme: "s3".to_string(), + // When used with MinIO, Polaris returns a config with: + // s3.access-key-id, s3.secret-access-key, s3.endpoint, ... + // `iceberg-rust` forwards these props to `opendal`. + // N.B. This is not confirmed to work with other catalog & storage implementations. + customized_credential_load: None, + }, + None, + ) + } + IcebergCatalogAuth::Gcp(gcp_connection_reference) => { + let (creds_json, service_account) = gcp_connection_reference + .connection + .read_credentials(storage_configuration) + .await + .map_err(|e| anyhow!("failed to parse GCP service account JSON: {e}"))?; - let catalog = RestCatalogBuilder::default() - .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 { - configured_scheme: "s3".to_string(), - // Polaris returns a config with: - // s3.access-key-id, s3.secret-access-key, s3.endpoint, ... - // `iceberg-rust` forwards these props to `opendal`. - // N.B. This is not confirmed to work with other catalog & storage implementations. - customized_credential_load: None, - })) + props.insert( + GCS_CREDENTIALS_JSON.to_owned(), + base64::engine::general_purpose::STANDARD.encode(creds_json), + ); + props.insert(GCS_DISABLE_VM_METADATA.to_owned(), "true".to_owned()); + props.insert(GCS_DISABLE_CONFIG_LOAD.to_owned(), "true".to_owned()); + if let Some(project_id) = service_account.project_id() { + props.insert(GCS_USER_PROJECT.to_owned(), project_id.to_owned()); + props.insert( + "header.x-goog-user-project".to_owned(), + project_id.to_owned(), + ); + } + + ( + OpenDalStorageFactory::Gcs, + Some(iceberg_catalog_rest::BearerTokenAuthenticator::new( + Arc::new(GcpTokenProvider { service_account }), + )), + ) + } + }; + + let mut catalog = + RestCatalogBuilder::default().with_storage_factory(Arc::new(storage_factory)); + if let Some(auth) = custom_authenticator { + catalog = catalog.with_authenticator(Arc::new(auth)); + } + let catalog = catalog .load("IcebergCatalog", props.into_iter().collect()) .await .map_err(|e| anyhow!("failed to create Iceberg catalog: {e}"))?; diff --git a/src/storage-types/src/connections/gcp.rs b/src/storage-types/src/connections/gcp.rs index 65a6bf0bef4fa..4a0f7ca161acd 100644 --- a/src/storage-types/src/connections/gcp.rs +++ b/src/storage-types/src/connections/gcp.rs @@ -9,7 +9,9 @@ //! GCP configuration for sources and sinks. -use gcp_auth::{CustomServiceAccount, TokenProvider}; +use async_trait::async_trait; +use gcp_auth::{CustomServiceAccount, TokenProvider as _}; +use iceberg_catalog_rest::TokenProvider; use mz_ore::error::ErrorExt; use mz_repr::{CatalogItemId, GlobalId}; use proptest_derive::Arbitrary; @@ -17,13 +19,15 @@ use serde::{Deserialize, Serialize}; use crate::AlterCompatible; use crate::configuration::StorageConfiguration; +use crate::connections::inline::{ + ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection, + ReferencedConnection, +}; use crate::controller::AlterError; -/// Scope used when probing the credentials during validation. Picked because -/// every service-account key is allowed to mint tokens for it, so a successful -/// response confirms the key is well-formed and accepted by Google's token -/// endpoint without requiring any specific IAM grants on the service account. -const VALIDATION_SCOPE: &str = "https://www.googleapis.com/auth/cloud-platform"; +/// Every service-account key is allowed to mint tokens for this scope, +/// and it's a blanket scope that covers both BigLake (Iceberg catalog) and GCS. +const GCP_SCOPE: &str = "https://www.googleapis.com/auth/cloud-platform"; /// GCP connection configuration. #[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] @@ -41,24 +45,31 @@ impl AlterCompatible for GcpConnection { } impl GcpConnection { - /// Validates this connection by reading the service-account key out of the - /// secrets store, parsing it, and exchanging it for an OAuth2 access token - /// at Google's token endpoint. - pub(crate) async fn validate( + /// Returns (credentials JSON, parsed service account) because both forms are useful. + pub(crate) async fn read_credentials( &self, - _id: CatalogItemId, storage_configuration: &StorageConfiguration, - ) -> Result<(), GcpConnectionValidationError> { + ) -> Result<(String, CustomServiceAccount), GcpConnectionValidationError> { let json = storage_configuration .connection_context .secrets_reader .read_string(self.credentials_json) .await .map_err(GcpConnectionValidationError::SecretRead)?; - let service_account = CustomServiceAccount::from_json(&json) + let account = CustomServiceAccount::from_json(&json) .map_err(GcpConnectionValidationError::ParseKey)?; + Ok((json, account)) + } + + /// Pull the service account key and check that we can retrieve an access token. + pub(crate) async fn validate( + &self, + _id: CatalogItemId, + storage_configuration: &StorageConfiguration, + ) -> Result<(), GcpConnectionValidationError> { + let (_, service_account) = self.read_credentials(storage_configuration).await?; service_account - .token(&[VALIDATION_SCOPE]) + .token(&[GCP_SCOPE]) .await .map_err(GcpConnectionValidationError::FetchToken)?; Ok(()) @@ -95,3 +106,59 @@ impl GcpConnectionValidationError { } } } + +/// References a GCP connection. +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct GcpConnectionReference { + /// ID of the GCP connection. + pub connection_id: CatalogItemId, + /// GCP connection object. + pub connection: C::Gcp, +} + +impl IntoInlineConnection + for GcpConnectionReference +{ + fn into_inline_connection(self, r: R) -> GcpConnectionReference { + let GcpConnectionReference { + connection, + connection_id, + } = self; + + GcpConnectionReference { + connection: r.resolve_connection(connection).unwrap_gcp(), + connection_id, + } + } +} + +#[derive(Debug)] +pub struct GcpTokenProvider { + pub(crate) service_account: CustomServiceAccount, +} + +#[async_trait] +impl TokenProvider for GcpTokenProvider { + async fn token(&self) -> iceberg::Result { + let token = self + .service_account + .token(&[GCP_SCOPE]) + .await + .map_err(|e| { + iceberg::Error::new( + iceberg::ErrorKind::Unexpected, + format!("gcp_auth: {}", e.display_with_causes()), + ) + })?; + Ok(token.as_str().to_string()) + } + + async fn invalidate(&self) -> iceberg::Result<()> { + // gcp_auth caches+refreshes internally based on Token::has_expired. + Ok(()) + } + + async fn regenerate(&self) -> iceberg::Result<()> { + Ok(()) + } +} diff --git a/src/storage-types/src/connections/inline.rs b/src/storage-types/src/connections/inline.rs index eca7992c5680f..6c02cc7574e4c 100644 --- a/src/storage-types/src/connections/inline.rs +++ b/src/storage-types/src/connections/inline.rs @@ -77,6 +77,14 @@ pub trait ConnectionAccess: Clone + Debug + Eq + PartialEq + Serialize + 'static + Serialize + for<'a> Deserialize<'a> + AlterCompatible; + type Gcp: Clone + + Debug + + Eq + + PartialEq + + Hash + + Serialize + + for<'a> Deserialize<'a> + + AlterCompatible; type Ssh: Clone + Debug + Eq @@ -129,6 +137,7 @@ impl ConnectionAccess for ReferencedConnection { type Kafka = CatalogItemId; type Pg = CatalogItemId; type Aws = CatalogItemId; + type Gcp = CatalogItemId; type Ssh = CatalogItemId; type Csr = CatalogItemId; type MySql = CatalogItemId; @@ -144,6 +153,7 @@ impl ConnectionAccess for InlinedConnection { type Kafka = super::KafkaConnection; type Pg = super::PostgresConnection; type Aws = super::aws::AwsConnection; + type Gcp = super::gcp::GcpConnection; type Ssh = super::SshConnection; type Csr = super::CsrConnection; type MySql = super::MySqlConnection;