diff --git a/CHANGELOG.md b/CHANGELOG.md index 754f12a5e1..9e59323507 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,13 @@ ## Unreleased +### Performance + +- (snapshots) Parallelize image hashing with rayon ([#3250](https://github.com/getsentry/sentry-cli/pull/3250)) + ### Fixes +- (snapshots) Chunk image uploads to avoid file descriptor exhaustion and 413 errors when uploading hundreds of images ([#3249](https://github.com/getsentry/sentry-cli/pull/3249)) - (sourcemaps) Skip non-base64 embedded sourcemaps during injection ([#3243](https://github.com/getsentry/sentry-cli/pull/3243)) ### New Features ✨ diff --git a/src/commands/build/snapshots.rs b/src/commands/build/snapshots.rs index cc48284323..115219a39a 100644 --- a/src/commands/build/snapshots.rs +++ b/src/commands/build/snapshots.rs @@ -9,8 +9,9 @@ use anyhow::{Context as _, Result}; use clap::{Arg, ArgMatches, Command}; use console::style; use itertools::Itertools as _; -use log::{debug, info, warn}; +use log::{debug, warn}; use objectstore_client::{ClientBuilder, ExpirationPolicy, Usecase}; +use rayon::prelude::*; use secrecy::ExposeSecret as _; use serde_json::Value; use sha2::{Digest as _, Sha256}; @@ -28,6 +29,7 @@ const EXPERIMENTAL_WARNING: &str = const IMAGE_EXTENSIONS: &[&str] = &["png", "jpg", "jpeg"]; const MAX_PIXELS_PER_IMAGE: u64 = 40_000_000; +const UPLOAD_BATCH_SIZE: usize = 100; pub fn make_command(command: Command) -> Command { command @@ -230,7 +232,7 @@ fn compute_sha256_hash(path: &Path) -> Result { let mut file = std::fs::File::open(path) .with_context(|| format!("Failed to open image for hashing: {}", path.display()))?; let mut hasher = Sha256::new(); - let mut buffer = [0u8; 8192]; + let mut buffer = [0u8; 65536]; loop { let bytes_read = file .read(&mut buffer) @@ -282,6 +284,11 @@ fn read_sidecar_metadata(image_path: &Path) -> Result> { }) } +struct PreparedImage { + path: PathBuf, + key: String, +} + fn upload_images( images: Vec, org: &str, @@ -297,30 +304,28 @@ fn upload_images( let client = ClientBuilder::new(options.objectstore.url) .token({ // TODO: replace with auth from `ObjectstoreUploadOptions` when appropriate - let auth = match authenticated_api.auth() { + match authenticated_api.auth() { Auth::Token(token) => token.raw().expose_secret().to_owned(), - }; - auth + } }) .configure_reqwest(|r| r.connect_timeout(Duration::from_secs(10))) .build()?; + let scopes = options.objectstore.scopes; + + let find_scope = |name: &str| { + scopes + .iter() + .find(|(k, _)| k == name) + .map(|(_, v)| v.clone()) + }; + let org_id = find_scope("org").context("Missing org in UploadOptions scope")?; + let project_id = find_scope("project").context("Missing project in UploadOptions scope")?; + let mut scope = Usecase::new("preprod").scope(); - let (mut org_id, mut project_id): (Option, Option) = (None, None); - for (key, value) in options.objectstore.scopes.into_iter() { - scope = scope.push(&key, value.clone()); - if key == "org" { - org_id = Some(value); - } else if key == "project" { - project_id = Some(value); - } + for (key, value) in scopes { + scope = scope.push(&key, value); } - let Some(org_id) = org_id else { - anyhow::bail!("Missing org in UploadOptions scope"); - }; - let Some(project_id) = project_id else { - anyhow::bail!("Missing project in UploadOptions scope"); - }; let session = scope.session(&client)?; @@ -329,13 +334,20 @@ fn upload_images( .build() .context("Failed to create tokio runtime")?; - let mut many_builder = session.many(); let mut manifest_entries = HashMap::new(); let mut collisions: HashMap> = HashMap::new(); - let mut kept_paths: HashMap = HashMap::new(); - for image in images { - debug!("Processing image: {}", image.path.display()); + let mut kept_paths = HashMap::new(); + let mut uploads = Vec::with_capacity(images.len()); + + let hashed_images: Vec<_> = images + .into_par_iter() + .map(|image| { + let hash = compute_sha256_hash(&image.path)?; + Ok((image, hash)) + }) + .collect::>>()?; + for (image, hash) in hashed_images { let image_file_name = image .relative_path .file_name() @@ -353,22 +365,7 @@ fn upload_images( continue; } - let hash = compute_sha256_hash(&image.path)?; - let file = runtime - .block_on(tokio::fs::File::open(&image.path)) - .with_context(|| { - format!("Failed to open image for upload: {}", image.path.display()) - })?; - let key = format!("{org_id}/{project_id}/{hash}"); - info!("Queueing {} as {key}", image.relative_path.display()); - - many_builder = many_builder.push( - session - .put_file(file) - .key(&key) - .expiration_policy(expiration), - ); let mut extra = read_sidecar_metadata(&image.path).unwrap_or_else(|err| { warn!("Error reading sidecar metadata, ignoring it instead: {err:#}"); @@ -377,6 +374,10 @@ fn upload_images( extra.insert("content_hash".to_owned(), serde_json::Value::String(hash)); kept_paths.insert(image_file_name.clone(), relative_path); + uploads.push(PreparedImage { + path: image.path, + key, + }); manifest_entries.insert( image_file_name, ImageMetadata::new(image.width, image.height, extra), @@ -384,40 +385,70 @@ fn upload_images( } if !collisions.is_empty() { - let mut details = String::new(); - for (name, excluded_paths) in &collisions { - let mut all_paths = vec![kept_paths[name].as_str()]; - all_paths.extend(excluded_paths.iter().map(|s| s.as_str())); - details.push_str(&format!("\n {name}: {}", all_paths.join(", "))); - } + let details: String = collisions + .iter() + .map(|(name, excluded)| { + let kept = &kept_paths[name]; + let all = std::iter::once(kept.as_str()) + .chain(excluded.iter().map(|s| s.as_str())) + .join(", "); + format!("\n {name}: {all}") + }) + .collect(); warn!("Some images share identical file names. Only the first occurrence of each is included:{details}"); } - let result = runtime.block_on(async { many_builder.send().error_for_failures().await }); + let total_count = uploads.len(); + let total_batches = total_count.div_ceil(UPLOAD_BATCH_SIZE); - let uploaded_count = manifest_entries.len(); + for (batch_idx, chunk) in uploads.chunks(UPLOAD_BATCH_SIZE).enumerate() { + debug!( + "Uploading batch {}/{total_batches} ({} images)", + batch_idx + 1, + chunk.len() + ); - match result { - Ok(()) => { - println!( - "{} Uploaded {} image {}", - style(">").dim(), - style(uploaded_count).yellow(), - if uploaded_count == 1 { "file" } else { "files" } + let mut many_builder = session.many(); + for prepared in chunk { + let file = runtime + .block_on(tokio::fs::File::open(&prepared.path)) + .with_context(|| { + format!( + "Failed to open image for upload: {}", + prepared.path.display() + ) + })?; + + many_builder = many_builder.push( + session + .put_file(file) + .key(&prepared.key) + .expiration_policy(expiration), ); - Ok(manifest_entries) } - Err(errors) => { + + let result = runtime.block_on(async { many_builder.send().error_for_failures().await }); + if let Err(errors) = result { + let errors: Vec<_> = errors.collect(); eprintln!("There were errors uploading images:"); - let mut error_count = 0; - for error in errors { - let error = anyhow::Error::new(error); + for error in &errors { eprintln!(" {}", style(format!("{error:#}")).red()); - error_count += 1; } - anyhow::bail!("Failed to upload {error_count} out of {uploaded_count} images") + let error_count = errors.len(); + let batch_num = batch_idx + 1; + anyhow::bail!( + "Failed to upload {error_count} images in batch {batch_num}/{total_batches}" + ); } } + + println!( + "{} Uploaded {} image {}", + style(">").dim(), + style(total_count).yellow(), + if total_count == 1 { "file" } else { "files" } + ); + Ok(manifest_entries) } #[cfg(test)]