From fc026d8c0081772882e78088ae333997395da7f7 Mon Sep 17 00:00:00 2001 From: abhinavgautam01 Date: Thu, 7 May 2026 16:44:36 +0530 Subject: [PATCH 1/4] fix(storage): rate limit bucket deletion in cleanup Serialize bucket deletion to respect GCP API rate limit (~1 request per 2 seconds). Uses structured logging with tracing crate for consistency with repository standards. --- src/storage/examples/src/lib.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/storage/examples/src/lib.rs b/src/storage/examples/src/lib.rs index 092cc97fef..7ef059df38 100644 --- a/src/storage/examples/src/lib.rs +++ b/src/storage/examples/src/lib.rs @@ -749,8 +749,7 @@ pub async fn cleanup_stale_buckets( .list_buckets() .set_parent(format!("projects/{project_id}")) .by_item(); - let mut pending = Vec::new(); - let mut names = Vec::new(); + let mut buckets_to_cleanup = Vec::new(); while let Some(bucket) = buckets.next().await { let bucket = bucket?; if bucket @@ -759,20 +758,20 @@ pub async fn cleanup_stale_buckets( .is_some_and(|v| v == "true") && bucket.create_time.is_some_and(|v| v < stale_deadline) { - let client = client.clone(); - let name = bucket.name.clone(); - pending.push(tokio::spawn( - async move { cleanup_bucket(client, name).await }, - )); - names.push(bucket.name); + buckets_to_cleanup.push(bucket.name); } } - println!("cleaning up {} buckets", pending.len()); - let results = futures::future::join_all(pending).await; - let errors = results.into_iter().zip(names).filter(|(r, _)| r.is_err()); - for (r, name) in errors { - println!("error deleting bucket {name}: {r:?}"); + println!("cleaning up {} buckets", buckets_to_cleanup.len()); + // Serialize bucket deletion to respect GCP rate limit (~1 deletion every 2 seconds) + for (idx, name) in buckets_to_cleanup.iter().enumerate() { + if let Err(e) = cleanup_bucket(client.clone(), name.clone()).await { + println!("error deleting bucket {name}: {e:?}"); + } + // Add delay between deletions, but not after the last one + if idx < buckets_to_cleanup.len() - 1 { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + } } Ok(()) From 3c1cc715735187910e2978e69fc7975858186d75 Mon Sep 17 00:00:00 2001 From: abhinavgautam01 Date: Sat, 9 May 2026 22:09:35 +0530 Subject: [PATCH 2/4] fix(storage-samples): parallel empty buckets, serialize deletes with backoff Replace fixed inter-delete sleeps with exponential backoff on retryable DeleteBucket errors (initial delay >= 2s). Empty stale buckets in parallel, then delete buckets sequentially. Add comments explaining multi-worker rate limits. --- src/storage/examples/src/lib.rs | 144 ++++++++++++++++++++++++-------- 1 file changed, 108 insertions(+), 36 deletions(-) diff --git a/src/storage/examples/src/lib.rs b/src/storage/examples/src/lib.rs index 7ef059df38..6c42795c35 100644 --- a/src/storage/examples/src/lib.rs +++ b/src/storage/examples/src/lib.rs @@ -23,7 +23,10 @@ use google_cloud_gax::options::RequestOptionsBuilder; use google_cloud_gax::paginator::ItemPaginator as _; use google_cloud_gax::throttle_result::ThrottleResult; use google_cloud_gax::{ - exponential_backoff::ExponentialBackoffBuilder, retry_policy::RetryPolicyExt, + backoff_policy::BackoffPolicy, + error::rpc::Code, + exponential_backoff::ExponentialBackoffBuilder, + retry_policy::RetryPolicyExt, retry_state::RetryState, }; use google_cloud_storage::client::{Storage, StorageControl}; @@ -763,45 +766,93 @@ pub async fn cleanup_stale_buckets( } println!("cleaning up {} buckets", buckets_to_cleanup.len()); - // Serialize bucket deletion to respect GCP rate limit (~1 deletion every 2 seconds) - for (idx, name) in buckets_to_cleanup.iter().enumerate() { - if let Err(e) = cleanup_bucket(client.clone(), name.clone()).await { - println!("error deleting bucket {name}: {e:?}"); + + // Many integration-test workers can run at once against the same project. + // A fixed sleep between `DeleteBucket` calls in *this* process does not + // coordinate with other processes, so it does not prevent shared rate limits. + // + // Instead we: + // 1) Empty buckets in parallel — listing/deleting objects and other child + // resources per bucket still uses `join_all` inside each bucket so work + // completes quickly; different buckets are emptied concurrently. + // 2) Call `DeleteBucket` strictly one-after-another, and on retryable / + // rate-limit-style errors back off with exponential delay whose **initial** + // wait is at least 2 seconds. That slows down *every* runner when the API + // pushes back, which is the practical mitigation across processes. + let bucket_names = buckets_to_cleanup; + let empty_tasks = bucket_names.iter().cloned().map(|name| { + let c = client.clone(); + async move { empty_bucket_contents(&c, &name).await } + }); + for result in futures::future::join_all(empty_tasks).await { + if let Err(e) = result { + tracing::error!("error emptying stale bucket before delete: {e:?}"); } - // Add delay between deletions, but not after the last one - if idx < buckets_to_cleanup.len() - 1 { - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + } + + for name in bucket_names { + if let Err(e) = delete_bucket_with_error_backoff(client, &name).await { + println!("error deleting bucket {name}: {e:?}"); } } Ok(()) } -pub async fn custom_project_billing(msg: &str) -> anyhow::Result { - let credentials = CredentialsBuilder::default().build()?; - let headers = match credentials.headers(http::Extensions::new()).await? { - CacheableResource::NotModified => unreachable!("no caching requested"), - CacheableResource::New { data, .. } => data, - }; - let Some(project) = headers.get("x-goog-user-project") else { - return Ok(false); - }; - tracing::warn!( - r#"Skipping: {msg} does not support custom billing projects. -The default credentials (see below) are configured to use project {project:?}. -{credentials:?}"# - ); - Ok(true) +fn delete_bucket_error_should_backoff(e: &google_cloud_gax::error::Error) -> bool { + if e.is_transient_and_before_rpc() { + return true; + } + matches!( + e.status().map(|s| s.code), + Some( + Code::ResourceExhausted + | Code::Unavailable + | Code::DeadlineExceeded + | Code::Aborted + | Code::Internal + ) + ) } -pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Result<()> { +/// Delete one bucket, sleeping with exponential backoff (initial delay ≥ 2s, +/// capped) between attempts on retryable errors. See [`cleanup_stale_buckets`]. +async fn delete_bucket_with_error_backoff( + client: &StorageControl, + name: &str, +) -> anyhow::Result<()> { + let backoff = ExponentialBackoffBuilder::new() + .with_initial_delay(Duration::from_secs(2)) + .with_maximum_delay(Duration::from_secs(120)) + .build()?; + let mut state = RetryState::new(true); + let deadline = std::time::Instant::now() + Duration::from_secs(900); + loop { + match client.delete_bucket().set_name(name).send().await { + Ok(()) => return Ok(()), + Err(e) => { + if std::time::Instant::now() > deadline { + return Err(e.into()); + } + if !delete_bucket_error_should_backoff(&e) { + return Err(e.into()); + } + let wait = backoff.on_failure(&state); + tokio::time::sleep(wait).await; + state.attempt_count = state.attempt_count.saturating_add(1); + } + } + } +} + +/// List and remove objects, managed folders, folders, and anywhere caches so +/// the bucket can be deleted. Per-resource deletes are still parallelized with +/// `join_all` inside this bucket. +async fn empty_bucket_contents(client: &StorageControl, name: &str) -> anyhow::Result<()> { use google_cloud_gax::{Result as GaxResult, paginator::ItemPaginator}; use google_cloud_wkt::FieldMask; - // Configure the bucket to be garbage collected. Some buckets are created by - // sample code, which does not (and should not) include setting labels to - // automatically garbage collect the bucket. - let current = client.get_bucket().set_name(&name).send().await?; + let current = client.get_bucket().set_name(name).send().await?; if current .labels .get("integration-test") @@ -826,7 +877,7 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res let mut objects = client .list_objects() - .set_parent(&name) + .set_parent(name) .set_versions(true) .by_item(); let mut pending = Vec::new(); @@ -849,11 +900,11 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res .collect::>>() { tracing::error!("Error cleaning up objects in bucket {name}: {e:?}"); - }; + } if current.hierarchical_namespace.is_some_and(|h| h.enabled) { let mut pending = Vec::new(); - let mut folders = client.list_managed_folders().set_parent(&name).by_item(); + let mut folders = client.list_managed_folders().set_parent(name).by_item(); while let Some(item) = folders.next().await { let Ok(folder) = item else { continue; @@ -869,7 +920,7 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res } let mut pending = Vec::new(); - let mut folders = client.list_folders().set_parent(&name).by_item(); + let mut folders = client.list_folders().set_parent(name).by_item(); while let Some(item) = folders.next().await { let Ok(folder) = item else { continue; @@ -882,11 +933,11 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res .collect::>>() { tracing::error!("Error cleaning up folders in bucket {name}: {e:?}"); - }; + } } let mut pending = Vec::new(); - let mut caches = client.list_anywhere_caches().set_parent(&name).by_item(); + let mut caches = client.list_anywhere_caches().set_parent(name).by_item(); while let Some(item) = caches.next().await { let Ok(cache) = item else { continue; @@ -899,12 +950,33 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res .collect::>>() { tracing::error!("Error cleaning up caches in bucket {name}: {e:?}"); - }; + } - client.delete_bucket().set_name(&name).send().await?; Ok(()) } +pub async fn custom_project_billing(msg: &str) -> anyhow::Result { + let credentials = CredentialsBuilder::default().build()?; + let headers = match credentials.headers(http::Extensions::new()).await? { + CacheableResource::NotModified => unreachable!("no caching requested"), + CacheableResource::New { data, .. } => data, + }; + let Some(project) = headers.get("x-goog-user-project") else { + return Ok(false); + }; + tracing::warn!( + r#"Skipping: {msg} does not support custom billing projects. +The default credentials (see below) are configured to use project {project:?}. +{credentials:?}"# + ); + Ok(true) +} + +pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Result<()> { + empty_bucket_contents(&client, &name).await?; + delete_bucket_with_error_backoff(&client, &name).await +} + fn enable_info_tracing() -> tracing::subscriber::DefaultGuard { use tracing_subscriber::fmt::format::FmtSpan; let subscriber = tracing_subscriber::fmt() From 56afcd0f6593634a930e931a13044dbc5bf1ad6d Mon Sep 17 00:00:00 2001 From: abhinavgautam01 Date: Mon, 11 May 2026 18:57:36 +0530 Subject: [PATCH 3/4] chore(storage-samples): rustfmt gax imports --- src/storage/examples/src/lib.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/storage/examples/src/lib.rs b/src/storage/examples/src/lib.rs index 6c42795c35..22eedffbc0 100644 --- a/src/storage/examples/src/lib.rs +++ b/src/storage/examples/src/lib.rs @@ -23,10 +23,8 @@ use google_cloud_gax::options::RequestOptionsBuilder; use google_cloud_gax::paginator::ItemPaginator as _; use google_cloud_gax::throttle_result::ThrottleResult; use google_cloud_gax::{ - backoff_policy::BackoffPolicy, - error::rpc::Code, - exponential_backoff::ExponentialBackoffBuilder, - retry_policy::RetryPolicyExt, + backoff_policy::BackoffPolicy, error::rpc::Code, + exponential_backoff::ExponentialBackoffBuilder, retry_policy::RetryPolicyExt, retry_state::RetryState, }; use google_cloud_storage::client::{Storage, StorageControl}; From 15dadf5b2c128c4686bcfecd3fc86832a13e02d1 Mon Sep 17 00:00:00 2001 From: abhinavgautam01 Date: Mon, 11 May 2026 19:56:39 +0530 Subject: [PATCH 4/4] refactor(storage-samples): use client retry for delete bucket cleanup - Replace manual delete retry loop with with_backoff_policy + idempotency - Restore GC label comment in empty_bucket_contents - Group cleanup_bucket with other cleanup helpers; ASCII doc comment --- src/storage/examples/src/lib.rs | 56 ++++++++++----------------------- 1 file changed, 16 insertions(+), 40 deletions(-) diff --git a/src/storage/examples/src/lib.rs b/src/storage/examples/src/lib.rs index 22eedffbc0..823cdd8d8d 100644 --- a/src/storage/examples/src/lib.rs +++ b/src/storage/examples/src/lib.rs @@ -23,7 +23,6 @@ use google_cloud_gax::options::RequestOptionsBuilder; use google_cloud_gax::paginator::ItemPaginator as _; use google_cloud_gax::throttle_result::ThrottleResult; use google_cloud_gax::{ - backoff_policy::BackoffPolicy, error::rpc::Code, exponential_backoff::ExponentialBackoffBuilder, retry_policy::RetryPolicyExt, retry_state::RetryState, }; @@ -797,24 +796,13 @@ pub async fn cleanup_stale_buckets( Ok(()) } -fn delete_bucket_error_should_backoff(e: &google_cloud_gax::error::Error) -> bool { - if e.is_transient_and_before_rpc() { - return true; - } - matches!( - e.status().map(|s| s.code), - Some( - Code::ResourceExhausted - | Code::Unavailable - | Code::DeadlineExceeded - | Code::Aborted - | Code::Internal - ) - ) +pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Result<()> { + empty_bucket_contents(&client, &name).await?; + delete_bucket_with_error_backoff(&client, &name).await } -/// Delete one bucket, sleeping with exponential backoff (initial delay ≥ 2s, -/// capped) between attempts on retryable errors. See [`cleanup_stale_buckets`]. +/// Delete one bucket, using the client's retry loop with exponential backoff +/// (initial delay >= 2s, capped). See [`cleanup_stale_buckets`]. async fn delete_bucket_with_error_backoff( client: &StorageControl, name: &str, @@ -823,24 +811,14 @@ async fn delete_bucket_with_error_backoff( .with_initial_delay(Duration::from_secs(2)) .with_maximum_delay(Duration::from_secs(120)) .build()?; - let mut state = RetryState::new(true); - let deadline = std::time::Instant::now() + Duration::from_secs(900); - loop { - match client.delete_bucket().set_name(name).send().await { - Ok(()) => return Ok(()), - Err(e) => { - if std::time::Instant::now() > deadline { - return Err(e.into()); - } - if !delete_bucket_error_should_backoff(&e) { - return Err(e.into()); - } - let wait = backoff.on_failure(&state); - tokio::time::sleep(wait).await; - state.attempt_count = state.attempt_count.saturating_add(1); - } - } - } + client + .delete_bucket() + .set_name(name) + .with_backoff_policy(backoff) + .with_idempotency(true) + .send() + .await?; + Ok(()) } /// List and remove objects, managed folders, folders, and anywhere caches so @@ -851,6 +829,9 @@ async fn empty_bucket_contents(client: &StorageControl, name: &str) -> anyhow::R use google_cloud_wkt::FieldMask; let current = client.get_bucket().set_name(name).send().await?; + // Configure the bucket to be garbage collected. Some buckets are created by + // sample code, which does not (and should not) include setting labels to + // automatically garbage collect the bucket. if current .labels .get("integration-test") @@ -970,11 +951,6 @@ The default credentials (see below) are configured to use project {project:?}. Ok(true) } -pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Result<()> { - empty_bucket_contents(&client, &name).await?; - delete_bucket_with_error_backoff(&client, &name).await -} - fn enable_info_tracing() -> tracing::subscriber::DefaultGuard { use tracing_subscriber::fmt::format::FmtSpan; let subscriber = tracing_subscriber::fmt()