diff --git a/src/storage/examples/src/lib.rs b/src/storage/examples/src/lib.rs index 092cc97fef..823cdd8d8d 100644 --- a/src/storage/examples/src/lib.rs +++ b/src/storage/examples/src/lib.rs @@ -749,8 +749,7 @@ pub async fn cleanup_stale_buckets( .list_buckets() .set_parent(format!("projects/{project_id}")) .by_item(); - let mut pending = Vec::new(); - let mut names = Vec::new(); + let mut buckets_to_cleanup = Vec::new(); while let Some(bucket) = buckets.next().await { let bucket = bucket?; if bucket @@ -759,50 +758,80 @@ pub async fn cleanup_stale_buckets( .is_some_and(|v| v == "true") && bucket.create_time.is_some_and(|v| v < stale_deadline) { - let client = client.clone(); - let name = bucket.name.clone(); - pending.push(tokio::spawn( - async move { cleanup_bucket(client, name).await }, - )); - names.push(bucket.name); + buckets_to_cleanup.push(bucket.name); } } - println!("cleaning up {} buckets", pending.len()); - let results = futures::future::join_all(pending).await; - let errors = results.into_iter().zip(names).filter(|(r, _)| r.is_err()); - for (r, name) in errors { - println!("error deleting bucket {name}: {r:?}"); + println!("cleaning up {} buckets", buckets_to_cleanup.len()); + + // Many integration-test workers can run at once against the same project. + // A fixed sleep between `DeleteBucket` calls in *this* process does not + // coordinate with other processes, so it does not prevent shared rate limits. + // + // Instead we: + // 1) Empty buckets in parallel — listing/deleting objects and other child + // resources per bucket still uses `join_all` inside each bucket so work + // completes quickly; different buckets are emptied concurrently. + // 2) Call `DeleteBucket` strictly one-after-another, and on retryable / + // rate-limit-style errors back off with exponential delay whose **initial** + // wait is at least 2 seconds. That slows down *every* runner when the API + // pushes back, which is the practical mitigation across processes. + let bucket_names = buckets_to_cleanup; + let empty_tasks = bucket_names.iter().cloned().map(|name| { + let c = client.clone(); + async move { empty_bucket_contents(&c, &name).await } + }); + for result in futures::future::join_all(empty_tasks).await { + if let Err(e) = result { + tracing::error!("error emptying stale bucket before delete: {e:?}"); + } + } + + for name in bucket_names { + if let Err(e) = delete_bucket_with_error_backoff(client, &name).await { + println!("error deleting bucket {name}: {e:?}"); + } } Ok(()) } -pub async fn custom_project_billing(msg: &str) -> anyhow::Result { - let credentials = CredentialsBuilder::default().build()?; - let headers = match credentials.headers(http::Extensions::new()).await? { - CacheableResource::NotModified => unreachable!("no caching requested"), - CacheableResource::New { data, .. } => data, - }; - let Some(project) = headers.get("x-goog-user-project") else { - return Ok(false); - }; - tracing::warn!( - r#"Skipping: {msg} does not support custom billing projects. -The default credentials (see below) are configured to use project {project:?}. -{credentials:?}"# - ); - Ok(true) +pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Result<()> { + empty_bucket_contents(&client, &name).await?; + delete_bucket_with_error_backoff(&client, &name).await } -pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Result<()> { +/// Delete one bucket, using the client's retry loop with exponential backoff +/// (initial delay >= 2s, capped). See [`cleanup_stale_buckets`]. +async fn delete_bucket_with_error_backoff( + client: &StorageControl, + name: &str, +) -> anyhow::Result<()> { + let backoff = ExponentialBackoffBuilder::new() + .with_initial_delay(Duration::from_secs(2)) + .with_maximum_delay(Duration::from_secs(120)) + .build()?; + client + .delete_bucket() + .set_name(name) + .with_backoff_policy(backoff) + .with_idempotency(true) + .send() + .await?; + Ok(()) +} + +/// List and remove objects, managed folders, folders, and anywhere caches so +/// the bucket can be deleted. Per-resource deletes are still parallelized with +/// `join_all` inside this bucket. +async fn empty_bucket_contents(client: &StorageControl, name: &str) -> anyhow::Result<()> { use google_cloud_gax::{Result as GaxResult, paginator::ItemPaginator}; use google_cloud_wkt::FieldMask; + let current = client.get_bucket().set_name(name).send().await?; // Configure the bucket to be garbage collected. Some buckets are created by // sample code, which does not (and should not) include setting labels to // automatically garbage collect the bucket. - let current = client.get_bucket().set_name(&name).send().await?; if current .labels .get("integration-test") @@ -827,7 +856,7 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res let mut objects = client .list_objects() - .set_parent(&name) + .set_parent(name) .set_versions(true) .by_item(); let mut pending = Vec::new(); @@ -850,11 +879,11 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res .collect::>>() { tracing::error!("Error cleaning up objects in bucket {name}: {e:?}"); - }; + } if current.hierarchical_namespace.is_some_and(|h| h.enabled) { let mut pending = Vec::new(); - let mut folders = client.list_managed_folders().set_parent(&name).by_item(); + let mut folders = client.list_managed_folders().set_parent(name).by_item(); while let Some(item) = folders.next().await { let Ok(folder) = item else { continue; @@ -870,7 +899,7 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res } let mut pending = Vec::new(); - let mut folders = client.list_folders().set_parent(&name).by_item(); + let mut folders = client.list_folders().set_parent(name).by_item(); while let Some(item) = folders.next().await { let Ok(folder) = item else { continue; @@ -883,11 +912,11 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res .collect::>>() { tracing::error!("Error cleaning up folders in bucket {name}: {e:?}"); - }; + } } let mut pending = Vec::new(); - let mut caches = client.list_anywhere_caches().set_parent(&name).by_item(); + let mut caches = client.list_anywhere_caches().set_parent(name).by_item(); while let Some(item) = caches.next().await { let Ok(cache) = item else { continue; @@ -900,12 +929,28 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res .collect::>>() { tracing::error!("Error cleaning up caches in bucket {name}: {e:?}"); - }; + } - client.delete_bucket().set_name(&name).send().await?; Ok(()) } +pub async fn custom_project_billing(msg: &str) -> anyhow::Result { + let credentials = CredentialsBuilder::default().build()?; + let headers = match credentials.headers(http::Extensions::new()).await? { + CacheableResource::NotModified => unreachable!("no caching requested"), + CacheableResource::New { data, .. } => data, + }; + let Some(project) = headers.get("x-goog-user-project") else { + return Ok(false); + }; + tracing::warn!( + r#"Skipping: {msg} does not support custom billing projects. +The default credentials (see below) are configured to use project {project:?}. +{credentials:?}"# + ); + Ok(true) +} + fn enable_info_tracing() -> tracing::subscriber::DefaultGuard { use tracing_subscriber::fmt::format::FmtSpan; let subscriber = tracing_subscriber::fmt()