diff --git a/core/connectors/sinks/iceberg_sink/src/lib.rs b/core/connectors/sinks/iceberg_sink/src/lib.rs index 4d3954086b..33c0917283 100644 --- a/core/connectors/sinks/iceberg_sink/src/lib.rs +++ b/core/connectors/sinks/iceberg_sink/src/lib.rs @@ -60,8 +60,8 @@ pub struct IcebergSinkConfig { pub dynamic_routing: bool, pub dynamic_route_field: String, pub store_url: String, - pub store_access_key_id: String, - pub store_secret_access_key: String, + pub store_access_key_id: Option, + pub store_secret_access_key: Option, pub store_region: String, pub store_class: IcebergSinkStoreClass, } diff --git a/core/connectors/sinks/iceberg_sink/src/props.rs b/core/connectors/sinks/iceberg_sink/src/props.rs index aa01d6d006..b2107de155 100644 --- a/core/connectors/sinks/iceberg_sink/src/props.rs +++ b/core/connectors/sinks/iceberg_sink/src/props.rs @@ -30,14 +30,91 @@ pub fn init_props(config: &IcebergSinkConfig) -> Result, fn get_props_s3(config: &IcebergSinkConfig) -> Result, Error> { let mut props: HashMap = HashMap::new(); props.insert("s3.region".to_string(), config.store_region.clone()); - props.insert( - "s3.access-key-id".to_string(), - config.store_access_key_id.clone(), - ); - props.insert( - "s3.secret-access-key".to_string(), - config.store_secret_access_key.clone(), - ); props.insert("s3.endpoint".to_string(), config.store_url.clone()); + match (&config.store_access_key_id, &config.store_secret_access_key) { + (Some(access_key_id), Some(secret_access_key)) => { + props.insert("s3.access-key-id".to_string(), access_key_id.clone()); + props.insert( + "s3.secret-access-key".to_string(), + secret_access_key.clone(), + ); + } + (None, None) => {} + _ => { + return Err(Error::InvalidConfigValue("Partially configured credentials. You must provide both store_access_key_id and store_secret_access_key, or omit both.".to_owned())); + } + } Ok(props) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{IcebergSinkConfig, IcebergSinkStoreClass, IcebergSinkTypes}; + + fn base_config() -> IcebergSinkConfig { + IcebergSinkConfig { + tables: vec![], + catalog_type: IcebergSinkTypes::REST, + warehouse: "warehouse".to_string(), + uri: "http://localhost:8181".to_string(), + dynamic_routing: false, + dynamic_route_field: "".to_string(), + store_url: "http://localhost:9000".to_string(), + store_access_key_id: None, + store_secret_access_key: None, + store_region: "us-east-1".to_string(), + store_class: IcebergSinkStoreClass::S3, + } + } + + #[test] + fn test_get_props_s3_no_credentials() { + let config = base_config(); + let props = get_props_s3(&config).expect("Should succeed without credentials"); + assert_eq!(props.get("s3.region").unwrap(), "us-east-1"); + assert_eq!(props.get("s3.endpoint").unwrap(), "http://localhost:9000"); + assert!(!props.contains_key("s3.access-key-id")); + assert!(!props.contains_key("s3.secret-access-key")); + } + + #[test] + fn test_get_props_s3_full_credentials() { + let config = IcebergSinkConfig { + store_access_key_id: Some("admin".to_string()), + store_secret_access_key: Some("password".to_string()), + ..base_config() + }; + let props = get_props_s3(&config).expect("Should succeed with full credentials"); + assert_eq!(props.get("s3.region").unwrap(), "us-east-1"); + assert_eq!(props.get("s3.endpoint").unwrap(), "http://localhost:9000"); + assert_eq!(props.get("s3.access-key-id").unwrap(), "admin"); + assert_eq!(props.get("s3.secret-access-key").unwrap(), "password"); + } + + #[test] + fn test_get_props_s3_partial_access_key() { + let config = IcebergSinkConfig { + store_access_key_id: Some("admin".to_string()), + store_secret_access_key: None, + ..base_config() + }; + assert!( + get_props_s3(&config).is_err(), + "Partial credentials (only access_key_id) should be rejected" + ); + } + + #[test] + fn test_get_props_s3_partial_secret_key() { + let config = IcebergSinkConfig { + store_access_key_id: None, + store_secret_access_key: Some("password".to_string()), + ..base_config() + }; + assert!( + get_props_s3(&config).is_err(), + "Partial credentials (only secret_access_key) should be rejected" + ); + } +} diff --git a/core/connectors/sinks/iceberg_sink/src/sink.rs b/core/connectors/sinks/iceberg_sink/src/sink.rs index 1616e68aa7..2cc1282261 100644 --- a/core/connectors/sinks/iceberg_sink/src/sink.rs +++ b/core/connectors/sinks/iceberg_sink/src/sink.rs @@ -30,22 +30,31 @@ use tracing::{debug, error, info}; #[async_trait] impl Sink for IcebergSink { async fn open(&mut self) -> Result<(), Error> { - let redacted_store_key = self - .config - .store_access_key_id - .chars() - .take(3) - .collect::(); - let redacted_store_secret = self - .config - .store_secret_access_key - .chars() - .take(3) - .collect::(); - info!( - "Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***", - self.id, self.config.uri - ); + match ( + &self.config.store_access_key_id, + &self.config.store_secret_access_key, + ) { + (Some(store_access_key_id), Some(store_secret_access_key)) => { + let redacted_store_key = store_access_key_id.chars().take(3).collect::(); + let redacted_store_secret = + store_secret_access_key.chars().take(3).collect::(); + info!( + "Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***", + self.id, self.config.uri + ); + } + (None, None) => { + info!( + "Opened Iceberg sink connector with ID: {} for URL: {}. No explicit credentials provided, falling back to default credential provider chain", + self.id, self.config.uri + ); + } + _ => { + return Err(Error::InvalidConfigValue( + "Partially configured credentials. You must provide both store_access_key_id and store_secret_access_key, or omit both.".to_owned(), + )); + } + } info!( "Configuring Iceberg catalog with the following config:\n-region: {}\n-url: {}\n-store class: {}\n-catalog type: {}\n", diff --git a/core/integration/tests/connectors/fixtures/iceberg/container.rs b/core/integration/tests/connectors/fixtures/iceberg/container.rs index e0fb272150..4c0f4088b4 100644 --- a/core/integration/tests/connectors/fixtures/iceberg/container.rs +++ b/core/integration/tests/connectors/fixtures/iceberg/container.rs @@ -52,6 +52,9 @@ pub const ENV_SINK_STORE_SECRET: &str = pub const ENV_SINK_STORE_REGION: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PLUGIN_CONFIG_STORE_REGION"; pub const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PATH"; +pub const ENV_AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID"; +pub const ENV_AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY"; + pub struct MinioContainer { #[allow(dead_code)] container: ContainerAsync, @@ -532,3 +535,43 @@ impl TestFixture for IcebergPreCreatedFixture { self.inner.connectors_runtime_envs() } } + +pub struct IcebergEnvAuthFixture { + inner: IcebergPreCreatedFixture, +} + +impl IcebergOps for IcebergEnvAuthFixture { + fn catalog_url(&self) -> &str { + self.inner.catalog_url() + } + + fn http_client(&self) -> &HttpClient { + self.inner.http_client() + } +} + +#[async_trait] +impl TestFixture for IcebergEnvAuthFixture { + async fn setup() -> Result { + let inner = IcebergPreCreatedFixture::setup().await?; + Ok(Self { inner }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = self.inner.connectors_runtime_envs(); + // Remove the explicit credentials injected by the underlying fixture. + // This forces the Iceberg Sink to use the default credential provider chain instead of explicit config. + envs.remove(ENV_SINK_STORE_ACCESS_KEY); + envs.remove(ENV_SINK_STORE_SECRET); + // Inject standard AWS env vars to test the default credential provider chain. + envs.insert( + ENV_AWS_ACCESS_KEY_ID.to_string(), + MINIO_ACCESS_KEY.to_string(), + ); + envs.insert( + ENV_AWS_SECRET_ACCESS_KEY.to_string(), + MINIO_SECRET_KEY.to_string(), + ); + envs + } +} diff --git a/core/integration/tests/connectors/fixtures/iceberg/mod.rs b/core/integration/tests/connectors/fixtures/iceberg/mod.rs index 30c047f357..4724d841d1 100644 --- a/core/integration/tests/connectors/fixtures/iceberg/mod.rs +++ b/core/integration/tests/connectors/fixtures/iceberg/mod.rs @@ -19,4 +19,6 @@ mod container; -pub use container::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture}; +pub use container::{ + DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture, +}; diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 8e0009bb3b..2aebf97fd8 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -31,7 +31,9 @@ pub use http::{ HttpSinkIndividualFixture, HttpSinkJsonArrayFixture, HttpSinkMultiTopicFixture, HttpSinkNdjsonFixture, HttpSinkNoMetadataFixture, HttpSinkRawFixture, }; -pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture}; +pub use iceberg::{ + DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture, +}; pub use influxdb::{ InfluxDbSinkBase64Fixture, InfluxDbSinkFixture, InfluxDbSinkNoMetadataFixture, InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture, InfluxDbSourceFixture, diff --git a/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml b/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml new file mode 100644 index 0000000000..5c6ffc6b7e --- /dev/null +++ b/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "iceberg" +enabled = true +version = 0 +name = "Iceberg sink" +path = "../../target/debug/libiggy_connector_iceberg_sink" +verbose = true + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "iceberg_sink_connector" + +# Notice: This configuration deliberately omits 'store_access_key_id' and 'store_secret_access_key'. +# It is used exclusively by integration tests to test the default credential provider chain fallback behavior. +[plugin_config] +tables = ["test.messages"] +catalog_type = "rest" +warehouse = "warehouse" +uri = "http://localhost:8181" +dynamic_routing = false +dynamic_route_field = "" +store_url = "http://localhost:9000" +store_region = "us-east-1" +store_class = "s3" diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs b/core/integration/tests/connectors/iceberg/iceberg_sink.rs index 7772141162..b40688fdcd 100644 --- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs +++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs @@ -19,7 +19,7 @@ use crate::connectors::create_test_messages; use crate::connectors::fixtures::{ - DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture, + DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture, }; use bytes::Bytes; use iggy::prelude::{IggyMessage, Partitioning}; @@ -207,3 +207,55 @@ async fn iceberg_sink_handles_bulk_messages( assert_eq!(sinks.len(), 1); assert!(sinks[0].last_error.is_none()); } + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/iceberg/sink_default_credentials.toml")), + seed = seeds::connector_stream +)] +async fn iceberg_sink_uses_default_credential_chain( + harness: &TestHarness, + fixture: IcebergEnvAuthFixture, +) { + let client = harness + .root_client() + .await + .expect("Failed to get root client"); + let stream_id: iggy_common::Identifier = + seeds::names::STREAM.try_into().expect("Invalid stream id"); + let topic_id: iggy_common::Identifier = + seeds::names::TOPIC.try_into().expect("Invalid topic id"); + let test_messages = crate::connectors::create_test_messages(5); + let mut messages: Vec = test_messages + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(bytes::Bytes::from(payload)) + .build() + .expect("Failed to build message") + }) + .collect(); + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send fake messages"); + let snapshot_count = fixture + .wait_for_snapshots( + DEFAULT_NAMESPACE, + DEFAULT_TABLE, + 1, + SNAPSHOT_POLL_ATTEMPTS, + SNAPSHOT_POLL_INTERVAL_MS, + ) + .await + .expect("Data should be written to Iceberg table"); + assert!(snapshot_count >= 1); + drop(fixture); +} diff --git a/core/integration/tests/connectors/iceberg/sink_default_credentials.toml b/core/integration/tests/connectors/iceberg/sink_default_credentials.toml new file mode 100644 index 0000000000..c9ff51ebf7 --- /dev/null +++ b/core/integration/tests/connectors/iceberg/sink_default_credentials.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "tests/connectors/iceberg/default_credentials_config"