-
-
Notifications
You must be signed in to change notification settings - Fork 242
fix(snapshots): Chunk image uploads to avoid fd exhaustion and 413 errors #3249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
00db720
3c123e0
31c6346
835e271
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,8 +9,9 @@ use anyhow::{Context as _, Result}; | |
| use clap::{Arg, ArgMatches, Command}; | ||
| use console::style; | ||
| use itertools::Itertools as _; | ||
| use log::{debug, info, warn}; | ||
| use log::{debug, warn}; | ||
| use objectstore_client::{ClientBuilder, ExpirationPolicy, Usecase}; | ||
| use rayon::prelude::*; | ||
| use secrecy::ExposeSecret as _; | ||
| use serde_json::Value; | ||
| use sha2::{Digest as _, Sha256}; | ||
|
|
@@ -28,6 +29,7 @@ const EXPERIMENTAL_WARNING: &str = | |
|
|
||
| const IMAGE_EXTENSIONS: &[&str] = &["png", "jpg", "jpeg"]; | ||
| const MAX_PIXELS_PER_IMAGE: u64 = 40_000_000; | ||
| const UPLOAD_BATCH_SIZE: usize = 100; | ||
|
|
||
| pub fn make_command(command: Command) -> Command { | ||
| command | ||
|
|
@@ -230,7 +232,7 @@ fn compute_sha256_hash(path: &Path) -> Result<String> { | |
| let mut file = std::fs::File::open(path) | ||
| .with_context(|| format!("Failed to open image for hashing: {}", path.display()))?; | ||
| let mut hasher = Sha256::new(); | ||
| let mut buffer = [0u8; 8192]; | ||
| let mut buffer = [0u8; 65536]; | ||
| loop { | ||
| let bytes_read = file | ||
| .read(&mut buffer) | ||
|
|
@@ -282,6 +284,11 @@ fn read_sidecar_metadata(image_path: &Path) -> Result<HashMap<String, Value>> { | |
| }) | ||
| } | ||
|
|
||
| struct PreparedImage { | ||
| path: PathBuf, | ||
| key: String, | ||
| } | ||
|
|
||
| fn upload_images( | ||
| images: Vec<ImageInfo>, | ||
| org: &str, | ||
|
|
@@ -297,30 +304,28 @@ fn upload_images( | |
| let client = ClientBuilder::new(options.objectstore.url) | ||
| .token({ | ||
| // TODO: replace with auth from `ObjectstoreUploadOptions` when appropriate | ||
| let auth = match authenticated_api.auth() { | ||
| match authenticated_api.auth() { | ||
| Auth::Token(token) => token.raw().expose_secret().to_owned(), | ||
| }; | ||
| auth | ||
| } | ||
| }) | ||
| .configure_reqwest(|r| r.connect_timeout(Duration::from_secs(10))) | ||
| .build()?; | ||
|
|
||
| let scopes = options.objectstore.scopes; | ||
|
|
||
| let find_scope = |name: &str| { | ||
| scopes | ||
| .iter() | ||
| .find(|(k, _)| k == name) | ||
| .map(|(_, v)| v.clone()) | ||
| }; | ||
| let org_id = find_scope("org").context("Missing org in UploadOptions scope")?; | ||
| let project_id = find_scope("project").context("Missing project in UploadOptions scope")?; | ||
|
|
||
| let mut scope = Usecase::new("preprod").scope(); | ||
| let (mut org_id, mut project_id): (Option<String>, Option<String>) = (None, None); | ||
| for (key, value) in options.objectstore.scopes.into_iter() { | ||
| scope = scope.push(&key, value.clone()); | ||
| if key == "org" { | ||
| org_id = Some(value); | ||
| } else if key == "project" { | ||
| project_id = Some(value); | ||
| } | ||
| for (key, value) in scopes { | ||
| scope = scope.push(&key, value); | ||
| } | ||
| let Some(org_id) = org_id else { | ||
| anyhow::bail!("Missing org in UploadOptions scope"); | ||
| }; | ||
| let Some(project_id) = project_id else { | ||
| anyhow::bail!("Missing project in UploadOptions scope"); | ||
| }; | ||
|
|
||
| let session = scope.session(&client)?; | ||
|
|
||
|
|
@@ -329,13 +334,20 @@ fn upload_images( | |
| .build() | ||
| .context("Failed to create tokio runtime")?; | ||
|
|
||
| let mut many_builder = session.many(); | ||
| let mut manifest_entries = HashMap::new(); | ||
| let mut collisions: HashMap<String, Vec<String>> = HashMap::new(); | ||
| let mut kept_paths: HashMap<String, String> = HashMap::new(); | ||
| for image in images { | ||
| debug!("Processing image: {}", image.path.display()); | ||
| let mut kept_paths = HashMap::new(); | ||
| let mut uploads = Vec::with_capacity(images.len()); | ||
|
|
||
| let hashed_images: Vec<_> = images | ||
| .into_par_iter() | ||
| .map(|image| { | ||
| let hash = compute_sha256_hash(&image.path)?; | ||
| Ok((image, hash)) | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| for (image, hash) in hashed_images { | ||
| let image_file_name = image | ||
| .relative_path | ||
| .file_name() | ||
|
|
@@ -353,22 +365,7 @@ fn upload_images( | |
| continue; | ||
| } | ||
|
|
||
| let hash = compute_sha256_hash(&image.path)?; | ||
| let file = runtime | ||
| .block_on(tokio::fs::File::open(&image.path)) | ||
| .with_context(|| { | ||
| format!("Failed to open image for upload: {}", image.path.display()) | ||
| })?; | ||
|
|
||
| let key = format!("{org_id}/{project_id}/{hash}"); | ||
| info!("Queueing {} as {key}", image.relative_path.display()); | ||
|
|
||
| many_builder = many_builder.push( | ||
| session | ||
| .put_file(file) | ||
| .key(&key) | ||
| .expiration_policy(expiration), | ||
| ); | ||
|
|
||
| let mut extra = read_sidecar_metadata(&image.path).unwrap_or_else(|err| { | ||
| warn!("Error reading sidecar metadata, ignoring it instead: {err:#}"); | ||
|
|
@@ -377,47 +374,81 @@ fn upload_images( | |
| extra.insert("content_hash".to_owned(), serde_json::Value::String(hash)); | ||
|
|
||
| kept_paths.insert(image_file_name.clone(), relative_path); | ||
| uploads.push(PreparedImage { | ||
| path: image.path, | ||
| key, | ||
| }); | ||
| manifest_entries.insert( | ||
| image_file_name, | ||
| ImageMetadata::new(image.width, image.height, extra), | ||
| ); | ||
| } | ||
|
|
||
| if !collisions.is_empty() { | ||
| let mut details = String::new(); | ||
| for (name, excluded_paths) in &collisions { | ||
| let mut all_paths = vec![kept_paths[name].as_str()]; | ||
| all_paths.extend(excluded_paths.iter().map(|s| s.as_str())); | ||
| details.push_str(&format!("\n {name}: {}", all_paths.join(", "))); | ||
| } | ||
| let details: String = collisions | ||
| .iter() | ||
| .map(|(name, excluded)| { | ||
| let kept = &kept_paths[name]; | ||
| let all = std::iter::once(kept.as_str()) | ||
| .chain(excluded.iter().map(|s| s.as_str())) | ||
| .join(", "); | ||
| format!("\n {name}: {all}") | ||
| }) | ||
| .collect(); | ||
| warn!("Some images share identical file names. Only the first occurrence of each is included:{details}"); | ||
| } | ||
|
|
||
| let result = runtime.block_on(async { many_builder.send().error_for_failures().await }); | ||
| let total_count = uploads.len(); | ||
| let total_batches = total_count.div_ceil(UPLOAD_BATCH_SIZE); | ||
|
|
||
| let uploaded_count = manifest_entries.len(); | ||
| for (batch_idx, chunk) in uploads.chunks(UPLOAD_BATCH_SIZE).enumerate() { | ||
| debug!( | ||
| "Uploading batch {}/{total_batches} ({} images)", | ||
| batch_idx + 1, | ||
| chunk.len() | ||
| ); | ||
|
|
||
| match result { | ||
| Ok(()) => { | ||
| println!( | ||
| "{} Uploaded {} image {}", | ||
| style(">").dim(), | ||
| style(uploaded_count).yellow(), | ||
| if uploaded_count == 1 { "file" } else { "files" } | ||
| let mut many_builder = session.many(); | ||
| for prepared in chunk { | ||
| let file = runtime | ||
| .block_on(tokio::fs::File::open(&prepared.path)) | ||
| .with_context(|| { | ||
| format!( | ||
| "Failed to open image for upload: {}", | ||
| prepared.path.display() | ||
| ) | ||
| })?; | ||
|
|
||
| many_builder = many_builder.push( | ||
| session | ||
| .put_file(file) | ||
| .key(&prepared.key) | ||
| .expiration_policy(expiration), | ||
| ); | ||
| Ok(manifest_entries) | ||
| } | ||
| Err(errors) => { | ||
|
|
||
| let result = runtime.block_on(async { many_builder.send().error_for_failures().await }); | ||
| if let Err(errors) = result { | ||
| let errors: Vec<_> = errors.collect(); | ||
| eprintln!("There were errors uploading images:"); | ||
| let mut error_count = 0; | ||
| for error in errors { | ||
| let error = anyhow::Error::new(error); | ||
| for error in &errors { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Upload error messages lose cause chain detailLow Severity The old code wrapped each upload error in |
||
| eprintln!(" {}", style(format!("{error:#}")).red()); | ||
| error_count += 1; | ||
| } | ||
| anyhow::bail!("Failed to upload {error_count} out of {uploaded_count} images") | ||
| let error_count = errors.len(); | ||
| let batch_num = batch_idx + 1; | ||
| anyhow::bail!( | ||
| "Failed to upload {error_count} images in batch {batch_num}/{total_batches}" | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| println!( | ||
| "{} Uploaded {} image {}", | ||
| style(">").dim(), | ||
| style(total_count).yellow(), | ||
| if total_count == 1 { "file" } else { "files" } | ||
| ); | ||
| Ok(manifest_entries) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
open to your guys' thoughts, but through some testing, seems to be a good number