From dc4054500d775a32de58e5886fc4ee15d4305d2b Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 4 Dec 2025 13:23:53 +0200 Subject: [PATCH 1/4] io::Error::other purge - don't send a special error if sending to an expected pipe fails irpc will already produce an io error BrokenPipe, which is more informative than just "error" - don't use other("...") for unsupported ops --- src/store/fs.rs | 4 +--- src/store/fs/import.rs | 36 +++++++++--------------------------- src/store/mem.rs | 3 +-- src/store/readonly_mem.rs | 31 ++++++++++++++++--------------- 4 files changed, 27 insertions(+), 47 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index ffc9a07ee..ff8e9d963 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -1378,9 +1378,7 @@ async fn copy_with_progress( let buf: &mut [u8] = &mut buf[..remaining]; file.read_exact_at(offset, buf)?; target.write_all(buf)?; - tx.try_send(T::from_offset(offset)) - .await - .map_err(|_e| io::Error::other(""))?; + tx.try_send(T::from_offset(offset)).await?; yield_now().await; offset += buf.len() as u64; } diff --git a/src/store/fs/import.rs b/src/store/fs/import.rs index f5c8fc1aa..d6e250b89 100644 --- a/src/store/fs/import.rs +++ b/src/store/fs/import.rs @@ -196,12 +196,8 @@ async fn import_bytes_tiny_impl( let size = cmd.data.len() as u64; // send the required progress events // AddProgressItem::Done will be sent when finishing the import! - tx.send(AddProgressItem::Size(size)) - .await - .map_err(|_e| io::Error::other("error"))?; - tx.send(AddProgressItem::CopyDone) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.send(AddProgressItem::Size(size)).await?; + tx.send(AddProgressItem::CopyDone).await?; Ok(if raw_outboard_size(size) == 0 { // the thing is so small that it does not even need an outboard ImportEntry { @@ -286,12 +282,8 @@ async fn import_byte_stream_impl( ) -> io::Result { let ImportByteStreamRequest { format, scope } = cmd; let import_source = get_import_source(stream, tx, &options).await?; - tx.send(AddProgressItem::Size(import_source.size())) - .await - .map_err(|_e| io::Error::other("error"))?; - tx.send(AddProgressItem::CopyDone) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.send(AddProgressItem::Size(import_source.size())).await?; + tx.send(AddProgressItem::CopyDone).await?; compute_outboard(import_source, format, scope, options, tx).await } @@ -344,18 +336,14 @@ async fn get_import_source( data.extend_from_slice(&chunk); } // todo: don't send progress for every chunk if the chunks are small? - tx.try_send(AddProgressItem::CopyProgress(size)) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.try_send(AddProgressItem::CopyProgress(size)).await?; } Ok(if let Some((mut file, temp_path)) = disk { while let Some(chunk) = stream.next().await { let chunk = chunk?; file.write_all(&chunk)?; size += chunk.len() as u64; - tx.send(AddProgressItem::CopyProgress(size)) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.send(AddProgressItem::CopyProgress(size)).await?; } ImportSource::TempFile(temp_path, file, size) } else { @@ -473,14 +461,10 @@ async fn import_path_impl( } let size = path.metadata()?.len(); - tx.send(AddProgressItem::Size(size)) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.send(AddProgressItem::Size(size)).await?; let import_source = if size <= options.inline.max_data_inlined { let data = std::fs::read(path)?; - tx.send(AddProgressItem::CopyDone) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.send(AddProgressItem::CopyDone).await?; ImportSource::Memory(data.into()) } else if mode == ImportMode::TryReference { // reference where it is. We are going to need the file handle to @@ -500,9 +484,7 @@ async fn import_path_impl( ); // copy from path to temp_path let file = OpenOptions::new().read(true).open(&temp_path)?; - tx.send(AddProgressItem::CopyDone) - .await - .map_err(|_| io::Error::other("error"))?; + tx.send(AddProgressItem::CopyDone).await?; ImportSource::TempFile(temp_path, file, size) }; compute_outboard(import_source, format, batch, options, tx).await diff --git a/src/store/mem.rs b/src/store/mem.rs index 1d1583ac3..69fff6f30 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -837,8 +837,7 @@ async fn export_path_impl( entry.0.state.borrow().data().read_exact_at(offset, buf)?; file.write_all(buf)?; tx.try_send(ExportProgressItem::CopyProgress(offset)) - .await - .map_err(|_e| io::Error::other(""))?; + .await?; yield_now().await; } Ok(()) diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index 7fc734827..4134dafcd 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -96,11 +96,9 @@ impl Actor { async fn handle_command(&mut self, cmd: Command) -> Option> { match cmd { Command::ImportBao(ImportBaoMsg { tx, .. }) => { - tx.send(Err(api::Error::from(io::Error::other( - "import not supported", - )))) - .await - .ok(); + tx.send(Err(unsupported("import not supported").into())) + .await + .ok(); } Command::WaitIdle(WaitIdleMsg { tx, .. }) => { if self.tasks.is_empty() { @@ -112,17 +110,17 @@ impl Actor { } } Command::ImportBytes(ImportBytesMsg { tx, .. }) => { - tx.send(io::Error::other("import not supported").into()) + tx.send(unsupported("import not supported").into()) .await .ok(); } Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => { - tx.send(io::Error::other("import not supported").into()) + tx.send(unsupported("import not supported").into()) .await .ok(); } Command::ImportPath(ImportPathMsg { tx, .. }) => { - tx.send(io::Error::other("import not supported").into()) + tx.send(unsupported("import not supported").into()) .await .ok(); } @@ -163,7 +161,7 @@ impl Actor { } Command::CreateTag(cmd) => { cmd.tx - .send(Err(io::Error::other("create tag not supported").into())) + .send(Err(unsupported("create tag not supported").into())) .await .ok(); } @@ -172,19 +170,19 @@ impl Actor { } Command::RenameTag(cmd) => { cmd.tx - .send(Err(io::Error::other("rename tag not supported").into())) + .send(Err(unsupported("rename tag not supported").into())) .await .ok(); } Command::DeleteTags(cmd) => { cmd.tx - .send(Err(io::Error::other("delete tags not supported").into())) + .send(Err(unsupported("delete tags not supported").into())) .await .ok(); } Command::DeleteBlobs(cmd) => { cmd.tx - .send(Err(io::Error::other("delete blobs not supported").into())) + .send(Err(unsupported("delete blobs not supported").into())) .await .ok(); } @@ -213,7 +211,7 @@ impl Actor { } Command::SetTag(cmd) => { cmd.tx - .send(Err(io::Error::other("set tag not supported").into())) + .send(Err(unsupported("set tag not supported").into())) .await .ok(); } @@ -264,6 +262,10 @@ impl Actor { } } +fn unsupported(text: &str) -> io::Error { + io::Error::new(io::ErrorKind::Unsupported, text) +} + async fn export_bao( hash: Hash, entry: Option, @@ -413,8 +415,7 @@ async fn export_path_impl( data.as_ref().read_exact_at(offset, buf)?; file.write_all(buf)?; tx.try_send(ExportProgressItem::CopyProgress(offset)) - .await - .map_err(|_e| io::Error::other("error"))?; + .await?; yield_now().await; } Ok(()) From f1bbd6ce897647a0e35f2dfb13937871b020c1a5 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 4 Dec 2025 13:32:09 +0200 Subject: [PATCH 2/4] DRY entity is dead error --- src/store/fs.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index ff8e9d963..2b7d2f7c7 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -731,7 +731,7 @@ impl HashSpecificCommand for ExportPathMsg { async fn on_error(self, arg: SpawnArg) { let err = match arg { SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), - SpawnArg::Dead => io::Error::other("entity is dead"), + SpawnArg::Dead => err_entity_dead(), _ => unreachable!(), }; self.tx @@ -747,7 +747,7 @@ impl HashSpecificCommand for ExportBaoMsg { async fn on_error(self, arg: SpawnArg) { let err = match arg { SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), - SpawnArg::Dead => io::Error::other("entity is dead"), + SpawnArg::Dead => err_entity_dead(), _ => unreachable!(), }; self.tx @@ -763,7 +763,7 @@ impl HashSpecificCommand for ExportRangesMsg { async fn on_error(self, arg: SpawnArg) { let err = match arg { SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), - SpawnArg::Dead => io::Error::other("entity is dead"), + SpawnArg::Dead => err_entity_dead(), _ => unreachable!(), }; self.tx @@ -779,7 +779,7 @@ impl HashSpecificCommand for ImportBaoMsg { async fn on_error(self, arg: SpawnArg) { let err = match arg { SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), - SpawnArg::Dead => io::Error::other("entity is dead"), + SpawnArg::Dead => err_entity_dead(), _ => unreachable!(), }; self.tx.send(Err(api::Error::from(err))).await.ok(); @@ -798,13 +798,17 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) { async fn on_error(self, arg: SpawnArg) { let err = match arg { SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), - SpawnArg::Dead => io::Error::other("entity is dead"), + SpawnArg::Dead => err_entity_dead(), _ => unreachable!(), }; self.1.tx.send(AddProgressItem::Error(err)).await.ok(); } } +fn err_entity_dead() -> io::Error { + io::Error::other("entity is dead") +} + struct RtWrapper(Option); impl From for RtWrapper { From 2a72daffdac951cc0c4494ba2b09760c18d9c924 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 4 Dec 2025 14:28:29 +0200 Subject: [PATCH 3/4] Replace api::Error::other with dedicated errors. --- src/api.rs | 2 +- src/api/blobs.rs | 12 ++++++------ src/store/fs.rs | 10 +++------- src/store/fs/meta.rs | 25 ++++++++++++++++++++----- src/store/mem.rs | 2 +- 5 files changed, 31 insertions(+), 20 deletions(-) diff --git a/src/api.rs b/src/api.rs index 7c1d96854..45792c851 100644 --- a/src/api.rs +++ b/src/api.rs @@ -199,7 +199,7 @@ impl From for Error { fn from(value: EncodeError) -> Self { match value { EncodeError::Io(cause) => Self::Io(cause), - _ => Self::other(value), + _ => Self::Io(io::Error::other(value)), } } } diff --git a/src/api/blobs.rs b/src/api/blobs.rs index a79395a74..12ca20b08 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -440,13 +440,13 @@ impl Blobs { reader .recv_exact(&mut size) .await - .map_err(super::Error::other)?; + .map_err(io::Error::from)?; let size = u64::from_le_bytes(size); let Some(size) = NonZeroU64::new(size) else { return if hash == Hash::EMPTY { Ok(reader) } else { - Err(super::Error::other("invalid size for hash").into()) + Err(io::Error::other("invalid size for hash").into()) }; }; let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE); @@ -651,7 +651,7 @@ impl<'a> AddProgress<'a> { _ => {} } } - Err(super::Error::other("unexpected end of stream").into()) + Err(io::Error::other("unexpected end of stream").into()) } pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult { @@ -704,7 +704,7 @@ impl IntoFuture for ObserveProgress { let mut rx = self.inner.await?; match rx.recv().await? { Some(bitfield) => Ok(bitfield), - None => Err(super::Error::other("unexpected end of stream").into()), + None => Err(io::Error::other("unexpected end of stream").into()), } }) } @@ -726,7 +726,7 @@ impl ObserveProgress { return Ok(item); } } - Err(super::Error::other("unexpected end of stream").into()) + Err(io::Error::other("unexpected end of stream").into()) } /// Returns an infinite stream of bitfields. The first bitfield is the @@ -805,7 +805,7 @@ impl ExportProgress { if let Some(size) = size { Ok(size) } else { - Err(super::Error::other("unexpected end of stream").into()) + Err(io::Error::other("unexpected end of stream").into()) } } } diff --git a/src/store/fs.rs b/src/store/fs.rs index 2b7d2f7c7..3ea50f64d 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -853,7 +853,7 @@ async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc, ctx: A async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) -> api::Result<()> { let BatchMsg { tx, mut rx, .. } = cmd; trace!("created scope {}", id); - tx.send(id).await.map_err(api::Error::other)?; + tx.send(id).await?; while let Some(msg) = rx.recv().await? { match msg { BatchResponse::Drop(msg) => scope.on_drop(&msg), @@ -1266,9 +1266,7 @@ async fn export_path_impl( MemOrFile::Mem(data) => data.len() as u64, MemOrFile::File((_, size)) => *size, }; - tx.send(ExportProgressItem::Size(size)) - .await - .map_err(api::Error::other)?; + tx.send(ExportProgressItem::Size(size)).await?; match data { MemOrFile::Mem(data) => { let mut target = fs::File::create(&target)?; @@ -1324,9 +1322,7 @@ async fn export_path_impl( } }, } - tx.send(ExportProgressItem::Done) - .await - .map_err(api::Error::other)?; + tx.send(ExportProgressItem::Done).await?; Ok(()) } diff --git a/src/store/fs/meta.rs b/src/store/fs/meta.rs index 6d17fc13d..74620eff5 100644 --- a/src/store/fs/meta.rs +++ b/src/store/fs/meta.rs @@ -363,7 +363,7 @@ async fn handle_list_tags(msg: ListTagsMsg, tables: &impl ReadableTables) -> Act res.push(crate::api::Result::Ok(info)); } } - Err(e) => res.push(Err(crate::api::Error::other(e))), + Err(e) => res.push(Err(api_error_from_storage_error(e))), } } tx.send(res).await.ok(); @@ -629,8 +629,12 @@ impl Actor { tx, .. } = cmd; - let res = tables.tags.insert(tag, value).map(|_| ()); - tx.send(res.map_err(crate::api::Error::other)).await.ok(); + let res = tables + .tags + .insert(tag, value) + .map_err(api_error_from_storage_error) + .map(|_| ()); + tx.send(res).await.ok(); Ok(()) } @@ -852,6 +856,13 @@ impl Actor { } } +/// Convert a redb StorageError into an api::Error +/// +/// This can't be a From instance because that would require exposing redb::StorageError in the public API. +fn api_error_from_storage_error(e: redb::StorageError) -> api::Error { + api::Error::Io(io::Error::other(e)) +} + #[derive(Debug)] struct DbWrapper(Option); @@ -953,8 +964,12 @@ async fn list_blobs_impl( _cmd: ListRequest, tx: &mut mpsc::Sender>, ) -> api::Result<()> { - for item in snapshot.blobs.iter().map_err(api::Error::other)? { - let (k, _) = item.map_err(api::Error::other)?; + for item in snapshot + .blobs + .iter() + .map_err(api_error_from_storage_error)? + { + let (k, _) = item.map_err(api_error_from_storage_error)?; let k = k.value(); tx.send(Ok(k)).await.ok(); } diff --git a/src/store/mem.rs b/src/store/mem.rs index 69fff6f30..fd47ecc8b 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -542,7 +542,7 @@ async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc) -> Sco async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) -> api::Result<()> { let BatchMsg { tx, mut rx, .. } = cmd; trace!("created scope {}", id); - tx.send(id).await.map_err(api::Error::other)?; + tx.send(id).await?; while let Some(msg) = rx.recv().await? { match msg { BatchResponse::Drop(msg) => scope.on_drop(&msg), From e637c0e0b343e7f5a3bf84a8392906b25ec4c8e5 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 4 Dec 2025 16:09:07 +0200 Subject: [PATCH 4/4] clippy --- src/api/blobs.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/api/blobs.rs b/src/api/blobs.rs index 12ca20b08..9e939836f 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -437,10 +437,7 @@ impl Blobs { mut reader: R, ) -> RequestResult { let mut size = [0; 8]; - reader - .recv_exact(&mut size) - .await - .map_err(io::Error::from)?; + reader.recv_exact(&mut size).await?; let size = u64::from_le_bytes(size); let Some(size) = NonZeroU64::new(size) else { return if hash == Hash::EMPTY {