From eed6e22edf431a3482d4c51fba79037a86f5d36a Mon Sep 17 00:00:00 2001 From: omnitrix Date: Thu, 19 Mar 2026 01:30:14 +0000 Subject: [PATCH 1/3] fix(connectors): support default credential provider chain for iceberg sink --- core/connectors/sinks/iceberg_sink/src/lib.rs | 4 +- .../sinks/iceberg_sink/src/props.rs | 51 ++++++++++++++++--- .../connectors/sinks/iceberg_sink/src/sink.rs | 32 ++++++------ .../connectors/fixtures/iceberg/container.rs | 39 ++++++++++++++ .../tests/connectors/fixtures/iceberg/mod.rs | 4 +- .../tests/connectors/fixtures/mod.rs | 5 ++ .../default_credentials_config/config.toml | 45 ++++++++++++++++ .../tests/connectors/iceberg/iceberg_sink.rs | 49 +++++++++++++++++- .../iceberg/sink_default_credentials.toml | 20 ++++++++ 9 files changed, 221 insertions(+), 28 deletions(-) create mode 100644 core/integration/tests/connectors/iceberg/default_credentials_config/config.toml create mode 100644 core/integration/tests/connectors/iceberg/sink_default_credentials.toml diff --git a/core/connectors/sinks/iceberg_sink/src/lib.rs b/core/connectors/sinks/iceberg_sink/src/lib.rs index 4d3954086b..33c0917283 100644 --- a/core/connectors/sinks/iceberg_sink/src/lib.rs +++ b/core/connectors/sinks/iceberg_sink/src/lib.rs @@ -60,8 +60,8 @@ pub struct IcebergSinkConfig { pub dynamic_routing: bool, pub dynamic_route_field: String, pub store_url: String, - pub store_access_key_id: String, - pub store_secret_access_key: String, + pub store_access_key_id: Option, + pub store_secret_access_key: Option, pub store_region: String, pub store_class: IcebergSinkStoreClass, } diff --git a/core/connectors/sinks/iceberg_sink/src/props.rs b/core/connectors/sinks/iceberg_sink/src/props.rs index aa01d6d006..b8292198c9 100644 --- a/core/connectors/sinks/iceberg_sink/src/props.rs +++ b/core/connectors/sinks/iceberg_sink/src/props.rs @@ -30,14 +30,49 @@ pub fn init_props(config: &IcebergSinkConfig) -> Result, fn get_props_s3(config: &IcebergSinkConfig) -> Result, Error> { let mut props: HashMap = HashMap::new(); props.insert("s3.region".to_string(), config.store_region.clone()); - props.insert( - "s3.access-key-id".to_string(), - config.store_access_key_id.clone(), - ); - props.insert( - "s3.secret-access-key".to_string(), - config.store_secret_access_key.clone(), - ); + if let Some(access_key_id) = &config.store_access_key_id { + props.insert("s3.access-key-id".to_string(), access_key_id.clone()); + } + if let Some(secret_access_key) = &config.store_secret_access_key { + props.insert( + "s3.secret-access-key".to_string(), + secret_access_key.clone(), + ); + } props.insert("s3.endpoint".to_string(), config.store_url.clone()); Ok(props) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{IcebergSinkConfig, IcebergSinkStoreClass, IcebergSinkTypes}; + + #[test] + fn test_get_props_s3() { + let mut config = IcebergSinkConfig { + tables: vec![], + catalog_type: IcebergSinkTypes::REST, + warehouse: "warehouse".to_string(), + uri: "http://localhost:8181".to_string(), + dynamic_routing: false, + dynamic_route_field: "".to_string(), + store_url: "http://localhost:9000".to_string(), + store_access_key_id: None, + store_secret_access_key: None, + store_region: "us-east-1".to_string(), + store_class: IcebergSinkStoreClass::S3, + }; + + let props_none = get_props_s3(&config).expect("Should return S3 properties"); + assert!(!props_none.contains_key("s3.access-key-id")); + assert!(!props_none.contains_key("s3.secret-access-key")); + + config.store_access_key_id = Some("admin".to_string()); + config.store_secret_access_key = Some("password".to_string()); + + let props_some = get_props_s3(&config).expect("Should return S3 properties"); + assert_eq!(props_some.get("s3.access-key-id").unwrap(), "admin"); + assert_eq!(props_some.get("s3.secret-access-key").unwrap(), "password"); + } +} diff --git a/core/connectors/sinks/iceberg_sink/src/sink.rs b/core/connectors/sinks/iceberg_sink/src/sink.rs index 1616e68aa7..6cd90e84f4 100644 --- a/core/connectors/sinks/iceberg_sink/src/sink.rs +++ b/core/connectors/sinks/iceberg_sink/src/sink.rs @@ -30,22 +30,22 @@ use tracing::{debug, error, info}; #[async_trait] impl Sink for IcebergSink { async fn open(&mut self) -> Result<(), Error> { - let redacted_store_key = self - .config - .store_access_key_id - .chars() - .take(3) - .collect::(); - let redacted_store_secret = self - .config - .store_secret_access_key - .chars() - .take(3) - .collect::(); - info!( - "Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***", - self.id, self.config.uri - ); + if let (Some(store_access_key_id), Some(store_secret_access_key)) = ( + &self.config.store_access_key_id, + &self.config.store_secret_access_key, + ) { + let redacted_store_key = store_access_key_id.chars().take(3).collect::(); + let redacted_store_secret = store_secret_access_key.chars().take(3).collect::(); + info!( + "Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***", + self.id, self.config.uri + ); + } else { + info!( + "Opened Iceberg sink connector with ID: {} for URL: {}. No explicit credentials provided, falling back to default credential provider chain", + self.id, self.config.uri + ); + } info!( "Configuring Iceberg catalog with the following config:\n-region: {}\n-url: {}\n-store class: {}\n-catalog type: {}\n", diff --git a/core/integration/tests/connectors/fixtures/iceberg/container.rs b/core/integration/tests/connectors/fixtures/iceberg/container.rs index fd1c55fd52..bf519d34cf 100644 --- a/core/integration/tests/connectors/fixtures/iceberg/container.rs +++ b/core/integration/tests/connectors/fixtures/iceberg/container.rs @@ -51,6 +51,9 @@ pub const ENV_SINK_STORE_SECRET: &str = pub const ENV_SINK_STORE_REGION: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PLUGIN_CONFIG_STORE_REGION"; pub const ENV_SINK_PATH: &str = "IGGY_CONNECTORS_SINK_ICEBERG_PATH"; +pub const ENV_AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID"; +pub const ENV_AWS_SECRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY"; + pub struct MinioContainer { #[allow(dead_code)] container: ContainerAsync, @@ -527,3 +530,39 @@ impl TestFixture for IcebergPreCreatedFixture { self.inner.connectors_runtime_envs() } } + +pub struct IcebergEnvAuthFixture { + inner: IcebergPreCreatedFixture, +} + +impl IcebergOps for IcebergEnvAuthFixture { + fn catalog_url(&self) -> &str { + self.inner.catalog_url() + } + + fn http_client(&self) -> &HttpClient { + self.inner.http_client() + } +} + +#[async_trait] +impl TestFixture for IcebergEnvAuthFixture { + async fn setup() -> Result { + let inner = IcebergPreCreatedFixture::setup().await?; + // Set credentials before test server initialization. + unsafe { + std::env::set_var(ENV_AWS_ACCESS_KEY_ID, MINIO_ACCESS_KEY); + std::env::set_var(ENV_AWS_SECRET_ACCESS_KEY, MINIO_SECRET_KEY); + } + Ok(Self { inner }) + } + + fn connectors_runtime_envs(&self) -> HashMap { + let mut envs = self.inner.connectors_runtime_envs(); + // Remove the explicit credentials injected by the underlying fixture. + // This forces the Iceberg Sink to use the default credential provider chain instead of explicit config. + envs.remove(ENV_SINK_STORE_ACCESS_KEY); + envs.remove(ENV_SINK_STORE_SECRET); + envs + } +} diff --git a/core/integration/tests/connectors/fixtures/iceberg/mod.rs b/core/integration/tests/connectors/fixtures/iceberg/mod.rs index 30c047f357..4724d841d1 100644 --- a/core/integration/tests/connectors/fixtures/iceberg/mod.rs +++ b/core/integration/tests/connectors/fixtures/iceberg/mod.rs @@ -19,4 +19,6 @@ mod container; -pub use container::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture}; +pub use container::{ + DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture, +}; diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index 8e0009bb3b..caceb12687 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -27,6 +27,7 @@ mod quickwit; mod wiremock; pub use elasticsearch::{ElasticsearchSinkFixture, ElasticsearchSourcePreCreatedFixture}; +<<<<<<< HEAD pub use http::{ HttpSinkIndividualFixture, HttpSinkJsonArrayFixture, HttpSinkMultiTopicFixture, HttpSinkNdjsonFixture, HttpSinkNoMetadataFixture, HttpSinkRawFixture, @@ -36,6 +37,10 @@ pub use influxdb::{ InfluxDbSinkBase64Fixture, InfluxDbSinkFixture, InfluxDbSinkNoMetadataFixture, InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture, InfluxDbSourceFixture, InfluxDbSourceRawFixture, InfluxDbSourceTextFixture, +======= +pub use iceberg::{ + DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture, +>>>>>>> b3247862 (fix(connectors): support default credential provider chain for iceberg sink) }; pub use mongodb::{ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, MongoDbSinkFailpointFixture, diff --git a/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml b/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml new file mode 100644 index 0000000000..9451da0e4d --- /dev/null +++ b/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "iceberg" +enabled = true +version = 0 +name = "Iceberg sink" +path = "../../target/release/libiggy_connector_iceberg_sink" +verbose = true + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "iceberg_sink_connector" + +# Notice: This configuration deliberately omits 'store_access_key_id' and 'store_secret_access_key'. +# It is used exclusively by integration tests to test the default credential provider chain fallback behavior. +[plugin_config] +tables = ["test.messages"] +catalog_type = "rest" +warehouse = "warehouse" +uri = "http://localhost:8181" +dynamic_routing = false +dynamic_route_field = "" +store_url = "http://localhost:9000" +store_region = "us-east-1" +store_class = "s3" diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs b/core/integration/tests/connectors/iceberg/iceberg_sink.rs index 7772141162..295fcdced6 100644 --- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs +++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs @@ -19,7 +19,7 @@ use crate::connectors::create_test_messages; use crate::connectors::fixtures::{ - DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture, + DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture, }; use bytes::Bytes; use iggy::prelude::{IggyMessage, Partitioning}; @@ -207,3 +207,50 @@ async fn iceberg_sink_handles_bulk_messages( assert_eq!(sinks.len(), 1); assert!(sinks[0].last_error.is_none()); } + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/iceberg/sink_default_credentials.toml")), + seed = seeds::connector_stream +)] +async fn iceberg_sink_uses_default_credential_chain( + harness: &TestHarness, + fixture: IcebergEnvAuthFixture, +) { + let client = harness.root_client().await.unwrap(); + let stream_id: iggy_common::Identifier = seeds::names::STREAM.try_into().unwrap(); + let topic_id: iggy_common::Identifier = seeds::names::TOPIC.try_into().unwrap(); + let test_messages = crate::connectors::create_test_messages(5); + let mut messages: Vec = test_messages + .iter() + .enumerate() + .map(|(i, msg)| { + let payload = serde_json::to_vec(msg).unwrap(); + IggyMessage::builder() + .id((i + 1) as u128) + .payload(bytes::Bytes::from(payload)) + .build() + .unwrap() + }) + .collect(); + client + .send_messages( + &stream_id, + &topic_id, + &Partitioning::partition_id(0), + &mut messages, + ) + .await + .expect("Failed to send fake messages"); + let snapshot_count = fixture + .wait_for_snapshots( + DEFAULT_NAMESPACE, + DEFAULT_TABLE, + 1, + SNAPSHOT_POLL_ATTEMPTS, + SNAPSHOT_POLL_INTERVAL_MS, + ) + .await + .expect("Data should be written to Iceberg table"); + assert!(snapshot_count >= 1); + drop(fixture); +} diff --git a/core/integration/tests/connectors/iceberg/sink_default_credentials.toml b/core/integration/tests/connectors/iceberg/sink_default_credentials.toml new file mode 100644 index 0000000000..c9ff51ebf7 --- /dev/null +++ b/core/integration/tests/connectors/iceberg/sink_default_credentials.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "tests/connectors/iceberg/default_credentials_config" From 26d8afce69715093720265258f95c437405f94b4 Mon Sep 17 00:00:00 2001 From: omnitrix Date: Sat, 4 Apr 2026 00:50:51 +0000 Subject: [PATCH 2/3] refactor(connectors): improve error handling for partial iceberg credentials --- .../connectors/sinks/iceberg_sink/src/sink.rs | 34 ++++++++++++------- .../tests/connectors/fixtures/mod.rs | 9 ++--- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/core/connectors/sinks/iceberg_sink/src/sink.rs b/core/connectors/sinks/iceberg_sink/src/sink.rs index 6cd90e84f4..d02e93831d 100644 --- a/core/connectors/sinks/iceberg_sink/src/sink.rs +++ b/core/connectors/sinks/iceberg_sink/src/sink.rs @@ -30,21 +30,31 @@ use tracing::{debug, error, info}; #[async_trait] impl Sink for IcebergSink { async fn open(&mut self) -> Result<(), Error> { - if let (Some(store_access_key_id), Some(store_secret_access_key)) = ( + match ( &self.config.store_access_key_id, &self.config.store_secret_access_key, ) { - let redacted_store_key = store_access_key_id.chars().take(3).collect::(); - let redacted_store_secret = store_secret_access_key.chars().take(3).collect::(); - info!( - "Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***", - self.id, self.config.uri - ); - } else { - info!( - "Opened Iceberg sink connector with ID: {} for URL: {}. No explicit credentials provided, falling back to default credential provider chain", - self.id, self.config.uri - ); + (Some(store_access_key_id), Some(store_secret_access_key)) => { + let redacted_store_key = store_access_key_id.chars().take(3).collect::(); + let redacted_store_secret = + store_secret_access_key.chars().take(3).collect::(); + info!( + "Opened Iceberg sink connector with ID: {} for URL: {}, store access key ID: {redacted_store_key}*** store secret: {redacted_store_secret}***", + self.id, self.config.uri + ); + } + (None, None) => { + info!( + "Opened Iceberg sink connector with ID: {} for URL: {}. No explicit credentials provided, falling back to default credential provider chain", + self.id, self.config.uri + ); + } + _ => { + error!( + "Partially configured Iceberg credentials. You must provide both store_access_key_id and store_secret_access_key, or omit both." + ); + return Err(Error::InvalidConfig); + } } info!( diff --git a/core/integration/tests/connectors/fixtures/mod.rs b/core/integration/tests/connectors/fixtures/mod.rs index caceb12687..2aebf97fd8 100644 --- a/core/integration/tests/connectors/fixtures/mod.rs +++ b/core/integration/tests/connectors/fixtures/mod.rs @@ -27,20 +27,17 @@ mod quickwit; mod wiremock; pub use elasticsearch::{ElasticsearchSinkFixture, ElasticsearchSourcePreCreatedFixture}; -<<<<<<< HEAD pub use http::{ HttpSinkIndividualFixture, HttpSinkJsonArrayFixture, HttpSinkMultiTopicFixture, HttpSinkNdjsonFixture, HttpSinkNoMetadataFixture, HttpSinkRawFixture, }; -pub use iceberg::{DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergOps, IcebergPreCreatedFixture}; +pub use iceberg::{ + DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture, +}; pub use influxdb::{ InfluxDbSinkBase64Fixture, InfluxDbSinkFixture, InfluxDbSinkNoMetadataFixture, InfluxDbSinkNsPrecisionFixture, InfluxDbSinkTextFixture, InfluxDbSourceFixture, InfluxDbSourceRawFixture, InfluxDbSourceTextFixture, -======= -pub use iceberg::{ - DEFAULT_NAMESPACE, DEFAULT_TABLE, IcebergEnvAuthFixture, IcebergOps, IcebergPreCreatedFixture, ->>>>>>> b3247862 (fix(connectors): support default credential provider chain for iceberg sink) }; pub use mongodb::{ MongoDbOps, MongoDbSinkAutoCreateFixture, MongoDbSinkBatchFixture, MongoDbSinkFailpointFixture, From 514350d4ccdab579a5b68d9c4ecf722f8da44c01 Mon Sep 17 00:00:00 2001 From: omnitrix Date: Tue, 7 Apr 2026 15:20:39 +0000 Subject: [PATCH 3/3] refactor(connectors): robust iceberg credential handling and safer test harness --- .../sinks/iceberg_sink/src/props.rs | 84 ++++++++++++++----- .../connectors/sinks/iceberg_sink/src/sink.rs | 7 +- .../connectors/fixtures/iceberg/container.rs | 14 ++-- .../default_credentials_config/config.toml | 2 +- .../tests/connectors/iceberg/iceberg_sink.rs | 15 ++-- 5 files changed, 86 insertions(+), 36 deletions(-) diff --git a/core/connectors/sinks/iceberg_sink/src/props.rs b/core/connectors/sinks/iceberg_sink/src/props.rs index b8292198c9..b2107de155 100644 --- a/core/connectors/sinks/iceberg_sink/src/props.rs +++ b/core/connectors/sinks/iceberg_sink/src/props.rs @@ -30,16 +30,20 @@ pub fn init_props(config: &IcebergSinkConfig) -> Result, fn get_props_s3(config: &IcebergSinkConfig) -> Result, Error> { let mut props: HashMap = HashMap::new(); props.insert("s3.region".to_string(), config.store_region.clone()); - if let Some(access_key_id) = &config.store_access_key_id { - props.insert("s3.access-key-id".to_string(), access_key_id.clone()); - } - if let Some(secret_access_key) = &config.store_secret_access_key { - props.insert( - "s3.secret-access-key".to_string(), - secret_access_key.clone(), - ); - } props.insert("s3.endpoint".to_string(), config.store_url.clone()); + match (&config.store_access_key_id, &config.store_secret_access_key) { + (Some(access_key_id), Some(secret_access_key)) => { + props.insert("s3.access-key-id".to_string(), access_key_id.clone()); + props.insert( + "s3.secret-access-key".to_string(), + secret_access_key.clone(), + ); + } + (None, None) => {} + _ => { + return Err(Error::InvalidConfigValue("Partially configured credentials. You must provide both store_access_key_id and store_secret_access_key, or omit both.".to_owned())); + } + } Ok(props) } @@ -48,9 +52,8 @@ mod tests { use super::*; use crate::{IcebergSinkConfig, IcebergSinkStoreClass, IcebergSinkTypes}; - #[test] - fn test_get_props_s3() { - let mut config = IcebergSinkConfig { + fn base_config() -> IcebergSinkConfig { + IcebergSinkConfig { tables: vec![], catalog_type: IcebergSinkTypes::REST, warehouse: "warehouse".to_string(), @@ -62,17 +65,56 @@ mod tests { store_secret_access_key: None, store_region: "us-east-1".to_string(), store_class: IcebergSinkStoreClass::S3, - }; + } + } - let props_none = get_props_s3(&config).expect("Should return S3 properties"); - assert!(!props_none.contains_key("s3.access-key-id")); - assert!(!props_none.contains_key("s3.secret-access-key")); + #[test] + fn test_get_props_s3_no_credentials() { + let config = base_config(); + let props = get_props_s3(&config).expect("Should succeed without credentials"); + assert_eq!(props.get("s3.region").unwrap(), "us-east-1"); + assert_eq!(props.get("s3.endpoint").unwrap(), "http://localhost:9000"); + assert!(!props.contains_key("s3.access-key-id")); + assert!(!props.contains_key("s3.secret-access-key")); + } + + #[test] + fn test_get_props_s3_full_credentials() { + let config = IcebergSinkConfig { + store_access_key_id: Some("admin".to_string()), + store_secret_access_key: Some("password".to_string()), + ..base_config() + }; + let props = get_props_s3(&config).expect("Should succeed with full credentials"); + assert_eq!(props.get("s3.region").unwrap(), "us-east-1"); + assert_eq!(props.get("s3.endpoint").unwrap(), "http://localhost:9000"); + assert_eq!(props.get("s3.access-key-id").unwrap(), "admin"); + assert_eq!(props.get("s3.secret-access-key").unwrap(), "password"); + } - config.store_access_key_id = Some("admin".to_string()); - config.store_secret_access_key = Some("password".to_string()); + #[test] + fn test_get_props_s3_partial_access_key() { + let config = IcebergSinkConfig { + store_access_key_id: Some("admin".to_string()), + store_secret_access_key: None, + ..base_config() + }; + assert!( + get_props_s3(&config).is_err(), + "Partial credentials (only access_key_id) should be rejected" + ); + } - let props_some = get_props_s3(&config).expect("Should return S3 properties"); - assert_eq!(props_some.get("s3.access-key-id").unwrap(), "admin"); - assert_eq!(props_some.get("s3.secret-access-key").unwrap(), "password"); + #[test] + fn test_get_props_s3_partial_secret_key() { + let config = IcebergSinkConfig { + store_access_key_id: None, + store_secret_access_key: Some("password".to_string()), + ..base_config() + }; + assert!( + get_props_s3(&config).is_err(), + "Partial credentials (only secret_access_key) should be rejected" + ); } } diff --git a/core/connectors/sinks/iceberg_sink/src/sink.rs b/core/connectors/sinks/iceberg_sink/src/sink.rs index d02e93831d..2cc1282261 100644 --- a/core/connectors/sinks/iceberg_sink/src/sink.rs +++ b/core/connectors/sinks/iceberg_sink/src/sink.rs @@ -50,10 +50,9 @@ impl Sink for IcebergSink { ); } _ => { - error!( - "Partially configured Iceberg credentials. You must provide both store_access_key_id and store_secret_access_key, or omit both." - ); - return Err(Error::InvalidConfig); + return Err(Error::InvalidConfigValue( + "Partially configured credentials. You must provide both store_access_key_id and store_secret_access_key, or omit both.".to_owned(), + )); } } diff --git a/core/integration/tests/connectors/fixtures/iceberg/container.rs b/core/integration/tests/connectors/fixtures/iceberg/container.rs index 3f9adc410b..4c0f4088b4 100644 --- a/core/integration/tests/connectors/fixtures/iceberg/container.rs +++ b/core/integration/tests/connectors/fixtures/iceberg/container.rs @@ -554,11 +554,6 @@ impl IcebergOps for IcebergEnvAuthFixture { impl TestFixture for IcebergEnvAuthFixture { async fn setup() -> Result { let inner = IcebergPreCreatedFixture::setup().await?; - // Set credentials before test server initialization. - unsafe { - std::env::set_var(ENV_AWS_ACCESS_KEY_ID, MINIO_ACCESS_KEY); - std::env::set_var(ENV_AWS_SECRET_ACCESS_KEY, MINIO_SECRET_KEY); - } Ok(Self { inner }) } @@ -568,6 +563,15 @@ impl TestFixture for IcebergEnvAuthFixture { // This forces the Iceberg Sink to use the default credential provider chain instead of explicit config. envs.remove(ENV_SINK_STORE_ACCESS_KEY); envs.remove(ENV_SINK_STORE_SECRET); + // Inject standard AWS env vars to test the default credential provider chain. + envs.insert( + ENV_AWS_ACCESS_KEY_ID.to_string(), + MINIO_ACCESS_KEY.to_string(), + ); + envs.insert( + ENV_AWS_SECRET_ACCESS_KEY.to_string(), + MINIO_SECRET_KEY.to_string(), + ); envs } } diff --git a/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml b/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml index 9451da0e4d..5c6ffc6b7e 100644 --- a/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml +++ b/core/integration/tests/connectors/iceberg/default_credentials_config/config.toml @@ -20,7 +20,7 @@ key = "iceberg" enabled = true version = 0 name = "Iceberg sink" -path = "../../target/release/libiggy_connector_iceberg_sink" +path = "../../target/debug/libiggy_connector_iceberg_sink" verbose = true [[streams]] diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs b/core/integration/tests/connectors/iceberg/iceberg_sink.rs index 295fcdced6..b40688fdcd 100644 --- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs +++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs @@ -216,20 +216,25 @@ async fn iceberg_sink_uses_default_credential_chain( harness: &TestHarness, fixture: IcebergEnvAuthFixture, ) { - let client = harness.root_client().await.unwrap(); - let stream_id: iggy_common::Identifier = seeds::names::STREAM.try_into().unwrap(); - let topic_id: iggy_common::Identifier = seeds::names::TOPIC.try_into().unwrap(); + let client = harness + .root_client() + .await + .expect("Failed to get root client"); + let stream_id: iggy_common::Identifier = + seeds::names::STREAM.try_into().expect("Invalid stream id"); + let topic_id: iggy_common::Identifier = + seeds::names::TOPIC.try_into().expect("Invalid topic id"); let test_messages = crate::connectors::create_test_messages(5); let mut messages: Vec = test_messages .iter() .enumerate() .map(|(i, msg)| { - let payload = serde_json::to_vec(msg).unwrap(); + let payload = serde_json::to_vec(msg).expect("Failed to serialize message"); IggyMessage::builder() .id((i + 1) as u128) .payload(bytes::Bytes::from(payload)) .build() - .unwrap() + .expect("Failed to build message") }) .collect(); client