Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 83 additions & 38 deletions src/storage/examples/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,7 @@ pub async fn cleanup_stale_buckets(
.list_buckets()
.set_parent(format!("projects/{project_id}"))
.by_item();
let mut pending = Vec::new();
let mut names = Vec::new();
let mut buckets_to_cleanup = Vec::new();
while let Some(bucket) = buckets.next().await {
let bucket = bucket?;
if bucket
Expand All @@ -759,50 +758,80 @@ pub async fn cleanup_stale_buckets(
.is_some_and(|v| v == "true")
&& bucket.create_time.is_some_and(|v| v < stale_deadline)
{
let client = client.clone();
let name = bucket.name.clone();
pending.push(tokio::spawn(
async move { cleanup_bucket(client, name).await },
));
names.push(bucket.name);
buckets_to_cleanup.push(bucket.name);
Comment thread
abhinavgautam01 marked this conversation as resolved.
}
}

println!("cleaning up {} buckets", pending.len());
let results = futures::future::join_all(pending).await;
let errors = results.into_iter().zip(names).filter(|(r, _)| r.is_err());
for (r, name) in errors {
println!("error deleting bucket {name}: {r:?}");
println!("cleaning up {} buckets", buckets_to_cleanup.len());

// Many integration-test workers can run at once against the same project.
// A fixed sleep between `DeleteBucket` calls in *this* process does not
// coordinate with other processes, so it does not prevent shared rate limits.
//
// Instead we:
// 1) Empty buckets in parallel — listing/deleting objects and other child
// resources per bucket still uses `join_all` inside each bucket so work
// completes quickly; different buckets are emptied concurrently.
// 2) Call `DeleteBucket` strictly one-after-another, and on retryable /
// rate-limit-style errors back off with exponential delay whose **initial**
// wait is at least 2 seconds. That slows down *every* runner when the API
// pushes back, which is the practical mitigation across processes.
let bucket_names = buckets_to_cleanup;
let empty_tasks = bucket_names.iter().cloned().map(|name| {
let c = client.clone();
async move { empty_bucket_contents(&c, &name).await }
});
for result in futures::future::join_all(empty_tasks).await {
if let Err(e) = result {
tracing::error!("error emptying stale bucket before delete: {e:?}");
}
}

for name in bucket_names {
if let Err(e) = delete_bucket_with_error_backoff(client, &name).await {
println!("error deleting bucket {name}: {e:?}");
}
}

Ok(())
}

pub async fn custom_project_billing(msg: &str) -> anyhow::Result<bool> {
let credentials = CredentialsBuilder::default().build()?;
let headers = match credentials.headers(http::Extensions::new()).await? {
CacheableResource::NotModified => unreachable!("no caching requested"),
CacheableResource::New { data, .. } => data,
};
let Some(project) = headers.get("x-goog-user-project") else {
return Ok(false);
};
tracing::warn!(
r#"Skipping: {msg} does not support custom billing projects.
The default credentials (see below) are configured to use project {project:?}.
{credentials:?}"#
);
Ok(true)
pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Result<()> {
empty_bucket_contents(&client, &name).await?;
delete_bucket_with_error_backoff(&client, &name).await
}

pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Result<()> {
/// Delete one bucket, using the client's retry loop with exponential backoff
/// (initial delay >= 2s, capped). See [`cleanup_stale_buckets`].
async fn delete_bucket_with_error_backoff(
client: &StorageControl,
name: &str,
) -> anyhow::Result<()> {
let backoff = ExponentialBackoffBuilder::new()
.with_initial_delay(Duration::from_secs(2))
.with_maximum_delay(Duration::from_secs(120))
.build()?;
client
.delete_bucket()
.set_name(name)
.with_backoff_policy(backoff)
.with_idempotency(true)
.send()
.await?;
Ok(())
}

/// List and remove objects, managed folders, folders, and anywhere caches so
/// the bucket can be deleted. Per-resource deletes are still parallelized with
/// `join_all` inside this bucket.
async fn empty_bucket_contents(client: &StorageControl, name: &str) -> anyhow::Result<()> {
use google_cloud_gax::{Result as GaxResult, paginator::ItemPaginator};
use google_cloud_wkt::FieldMask;

let current = client.get_bucket().set_name(name).send().await?;
// Configure the bucket to be garbage collected. Some buckets are created by
// sample code, which does not (and should not) include setting labels to
// automatically garbage collect the bucket.
let current = client.get_bucket().set_name(&name).send().await?;
if current
.labels
.get("integration-test")
Expand All @@ -827,7 +856,7 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res

let mut objects = client
.list_objects()
.set_parent(&name)
.set_parent(name)
.set_versions(true)
.by_item();
let mut pending = Vec::new();
Expand All @@ -850,11 +879,11 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res
.collect::<GaxResult<Vec<_>>>()
{
tracing::error!("Error cleaning up objects in bucket {name}: {e:?}");
};
}

if current.hierarchical_namespace.is_some_and(|h| h.enabled) {
let mut pending = Vec::new();
let mut folders = client.list_managed_folders().set_parent(&name).by_item();
let mut folders = client.list_managed_folders().set_parent(name).by_item();
while let Some(item) = folders.next().await {
let Ok(folder) = item else {
continue;
Expand All @@ -870,7 +899,7 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res
}

let mut pending = Vec::new();
let mut folders = client.list_folders().set_parent(&name).by_item();
let mut folders = client.list_folders().set_parent(name).by_item();
while let Some(item) = folders.next().await {
let Ok(folder) = item else {
continue;
Expand All @@ -883,11 +912,11 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res
.collect::<GaxResult<Vec<_>>>()
{
tracing::error!("Error cleaning up folders in bucket {name}: {e:?}");
};
}
}

let mut pending = Vec::new();
let mut caches = client.list_anywhere_caches().set_parent(&name).by_item();
let mut caches = client.list_anywhere_caches().set_parent(name).by_item();
while let Some(item) = caches.next().await {
let Ok(cache) = item else {
continue;
Expand All @@ -900,12 +929,28 @@ pub async fn cleanup_bucket(client: StorageControl, name: String) -> anyhow::Res
.collect::<GaxResult<Vec<_>>>()
{
tracing::error!("Error cleaning up caches in bucket {name}: {e:?}");
};
}

client.delete_bucket().set_name(&name).send().await?;
Ok(())
}

pub async fn custom_project_billing(msg: &str) -> anyhow::Result<bool> {
let credentials = CredentialsBuilder::default().build()?;
let headers = match credentials.headers(http::Extensions::new()).await? {
CacheableResource::NotModified => unreachable!("no caching requested"),
CacheableResource::New { data, .. } => data,
};
let Some(project) = headers.get("x-goog-user-project") else {
return Ok(false);
};
tracing::warn!(
r#"Skipping: {msg} does not support custom billing projects.
The default credentials (see below) are configured to use project {project:?}.
{credentials:?}"#
);
Ok(true)
}

fn enable_info_tracing() -> tracing::subscriber::DefaultGuard {
use tracing_subscriber::fmt::format::FmtSpan;
let subscriber = tracing_subscriber::fmt()
Expand Down
Loading