diff --git a/src/api.rs b/src/api.rs index 7c1d9685..45792c85 100644 --- a/src/api.rs +++ b/src/api.rs @@ -199,7 +199,7 @@ impl From for Error { fn from(value: EncodeError) -> Self { match value { EncodeError::Io(cause) => Self::Io(cause), - _ => Self::other(value), + _ => Self::Io(io::Error::other(value)), } } } diff --git a/src/api/blobs.rs b/src/api/blobs.rs index a79395a7..9e939836 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -437,16 +437,13 @@ impl Blobs { mut reader: R, ) -> RequestResult { let mut size = [0; 8]; - reader - .recv_exact(&mut size) - .await - .map_err(super::Error::other)?; + reader.recv_exact(&mut size).await?; let size = u64::from_le_bytes(size); let Some(size) = NonZeroU64::new(size) else { return if hash == Hash::EMPTY { Ok(reader) } else { - Err(super::Error::other("invalid size for hash").into()) + Err(io::Error::other("invalid size for hash").into()) }; }; let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE); @@ -651,7 +648,7 @@ impl<'a> AddProgress<'a> { _ => {} } } - Err(super::Error::other("unexpected end of stream").into()) + Err(io::Error::other("unexpected end of stream").into()) } pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult { @@ -704,7 +701,7 @@ impl IntoFuture for ObserveProgress { let mut rx = self.inner.await?; match rx.recv().await? { Some(bitfield) => Ok(bitfield), - None => Err(super::Error::other("unexpected end of stream").into()), + None => Err(io::Error::other("unexpected end of stream").into()), } }) } @@ -726,7 +723,7 @@ impl ObserveProgress { return Ok(item); } } - Err(super::Error::other("unexpected end of stream").into()) + Err(io::Error::other("unexpected end of stream").into()) } /// Returns an infinite stream of bitfields. The first bitfield is the @@ -805,7 +802,7 @@ impl ExportProgress { if let Some(size) = size { Ok(size) } else { - Err(super::Error::other("unexpected end of stream").into()) + Err(io::Error::other("unexpected end of stream").into()) } } } diff --git a/src/store/fs.rs b/src/store/fs.rs index ffc9a07e..3ea50f64 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -731,7 +731,7 @@ impl HashSpecificCommand for ExportPathMsg { async fn on_error(self, arg: SpawnArg) { let err = match arg { SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), - SpawnArg::Dead => io::Error::other("entity is dead"), + SpawnArg::Dead => err_entity_dead(), _ => unreachable!(), }; self.tx @@ -747,7 +747,7 @@ impl HashSpecificCommand for ExportBaoMsg { async fn on_error(self, arg: SpawnArg) { let err = match arg { SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), - SpawnArg::Dead => io::Error::other("entity is dead"), + SpawnArg::Dead => err_entity_dead(), _ => unreachable!(), }; self.tx @@ -763,7 +763,7 @@ impl HashSpecificCommand for ExportRangesMsg { async fn on_error(self, arg: SpawnArg) { let err = match arg { SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), - SpawnArg::Dead => io::Error::other("entity is dead"), + SpawnArg::Dead => err_entity_dead(), _ => unreachable!(), }; self.tx @@ -779,7 +779,7 @@ impl HashSpecificCommand for ImportBaoMsg { async fn on_error(self, arg: SpawnArg) { let err = match arg { SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), - SpawnArg::Dead => io::Error::other("entity is dead"), + SpawnArg::Dead => err_entity_dead(), _ => unreachable!(), }; self.tx.send(Err(api::Error::from(err))).await.ok(); @@ -798,13 +798,17 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) { async fn on_error(self, arg: SpawnArg) { let err = match arg { SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), - SpawnArg::Dead => io::Error::other("entity is dead"), + SpawnArg::Dead => err_entity_dead(), _ => unreachable!(), }; self.1.tx.send(AddProgressItem::Error(err)).await.ok(); } } +fn err_entity_dead() -> io::Error { + io::Error::other("entity is dead") +} + struct RtWrapper(Option); impl From for RtWrapper { @@ -849,7 +853,7 @@ async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc, ctx: A async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) -> api::Result<()> { let BatchMsg { tx, mut rx, .. } = cmd; trace!("created scope {}", id); - tx.send(id).await.map_err(api::Error::other)?; + tx.send(id).await?; while let Some(msg) = rx.recv().await? { match msg { BatchResponse::Drop(msg) => scope.on_drop(&msg), @@ -1262,9 +1266,7 @@ async fn export_path_impl( MemOrFile::Mem(data) => data.len() as u64, MemOrFile::File((_, size)) => *size, }; - tx.send(ExportProgressItem::Size(size)) - .await - .map_err(api::Error::other)?; + tx.send(ExportProgressItem::Size(size)).await?; match data { MemOrFile::Mem(data) => { let mut target = fs::File::create(&target)?; @@ -1320,9 +1322,7 @@ async fn export_path_impl( } }, } - tx.send(ExportProgressItem::Done) - .await - .map_err(api::Error::other)?; + tx.send(ExportProgressItem::Done).await?; Ok(()) } @@ -1378,9 +1378,7 @@ async fn copy_with_progress( let buf: &mut [u8] = &mut buf[..remaining]; file.read_exact_at(offset, buf)?; target.write_all(buf)?; - tx.try_send(T::from_offset(offset)) - .await - .map_err(|_e| io::Error::other(""))?; + tx.try_send(T::from_offset(offset)).await?; yield_now().await; offset += buf.len() as u64; } diff --git a/src/store/fs/import.rs b/src/store/fs/import.rs index f5c8fc1a..d6e250b8 100644 --- a/src/store/fs/import.rs +++ b/src/store/fs/import.rs @@ -196,12 +196,8 @@ async fn import_bytes_tiny_impl( let size = cmd.data.len() as u64; // send the required progress events // AddProgressItem::Done will be sent when finishing the import! - tx.send(AddProgressItem::Size(size)) - .await - .map_err(|_e| io::Error::other("error"))?; - tx.send(AddProgressItem::CopyDone) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.send(AddProgressItem::Size(size)).await?; + tx.send(AddProgressItem::CopyDone).await?; Ok(if raw_outboard_size(size) == 0 { // the thing is so small that it does not even need an outboard ImportEntry { @@ -286,12 +282,8 @@ async fn import_byte_stream_impl( ) -> io::Result { let ImportByteStreamRequest { format, scope } = cmd; let import_source = get_import_source(stream, tx, &options).await?; - tx.send(AddProgressItem::Size(import_source.size())) - .await - .map_err(|_e| io::Error::other("error"))?; - tx.send(AddProgressItem::CopyDone) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.send(AddProgressItem::Size(import_source.size())).await?; + tx.send(AddProgressItem::CopyDone).await?; compute_outboard(import_source, format, scope, options, tx).await } @@ -344,18 +336,14 @@ async fn get_import_source( data.extend_from_slice(&chunk); } // todo: don't send progress for every chunk if the chunks are small? - tx.try_send(AddProgressItem::CopyProgress(size)) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.try_send(AddProgressItem::CopyProgress(size)).await?; } Ok(if let Some((mut file, temp_path)) = disk { while let Some(chunk) = stream.next().await { let chunk = chunk?; file.write_all(&chunk)?; size += chunk.len() as u64; - tx.send(AddProgressItem::CopyProgress(size)) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.send(AddProgressItem::CopyProgress(size)).await?; } ImportSource::TempFile(temp_path, file, size) } else { @@ -473,14 +461,10 @@ async fn import_path_impl( } let size = path.metadata()?.len(); - tx.send(AddProgressItem::Size(size)) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.send(AddProgressItem::Size(size)).await?; let import_source = if size <= options.inline.max_data_inlined { let data = std::fs::read(path)?; - tx.send(AddProgressItem::CopyDone) - .await - .map_err(|_e| io::Error::other("error"))?; + tx.send(AddProgressItem::CopyDone).await?; ImportSource::Memory(data.into()) } else if mode == ImportMode::TryReference { // reference where it is. We are going to need the file handle to @@ -500,9 +484,7 @@ async fn import_path_impl( ); // copy from path to temp_path let file = OpenOptions::new().read(true).open(&temp_path)?; - tx.send(AddProgressItem::CopyDone) - .await - .map_err(|_| io::Error::other("error"))?; + tx.send(AddProgressItem::CopyDone).await?; ImportSource::TempFile(temp_path, file, size) }; compute_outboard(import_source, format, batch, options, tx).await diff --git a/src/store/fs/meta.rs b/src/store/fs/meta.rs index 6d17fc13..74620eff 100644 --- a/src/store/fs/meta.rs +++ b/src/store/fs/meta.rs @@ -363,7 +363,7 @@ async fn handle_list_tags(msg: ListTagsMsg, tables: &impl ReadableTables) -> Act res.push(crate::api::Result::Ok(info)); } } - Err(e) => res.push(Err(crate::api::Error::other(e))), + Err(e) => res.push(Err(api_error_from_storage_error(e))), } } tx.send(res).await.ok(); @@ -629,8 +629,12 @@ impl Actor { tx, .. } = cmd; - let res = tables.tags.insert(tag, value).map(|_| ()); - tx.send(res.map_err(crate::api::Error::other)).await.ok(); + let res = tables + .tags + .insert(tag, value) + .map_err(api_error_from_storage_error) + .map(|_| ()); + tx.send(res).await.ok(); Ok(()) } @@ -852,6 +856,13 @@ impl Actor { } } +/// Convert a redb StorageError into an api::Error +/// +/// This can't be a From instance because that would require exposing redb::StorageError in the public API. +fn api_error_from_storage_error(e: redb::StorageError) -> api::Error { + api::Error::Io(io::Error::other(e)) +} + #[derive(Debug)] struct DbWrapper(Option); @@ -953,8 +964,12 @@ async fn list_blobs_impl( _cmd: ListRequest, tx: &mut mpsc::Sender>, ) -> api::Result<()> { - for item in snapshot.blobs.iter().map_err(api::Error::other)? { - let (k, _) = item.map_err(api::Error::other)?; + for item in snapshot + .blobs + .iter() + .map_err(api_error_from_storage_error)? + { + let (k, _) = item.map_err(api_error_from_storage_error)?; let k = k.value(); tx.send(Ok(k)).await.ok(); } diff --git a/src/store/mem.rs b/src/store/mem.rs index 1d1583ac..fd47ecc8 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -542,7 +542,7 @@ async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc) -> Sco async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) -> api::Result<()> { let BatchMsg { tx, mut rx, .. } = cmd; trace!("created scope {}", id); - tx.send(id).await.map_err(api::Error::other)?; + tx.send(id).await?; while let Some(msg) = rx.recv().await? { match msg { BatchResponse::Drop(msg) => scope.on_drop(&msg), @@ -837,8 +837,7 @@ async fn export_path_impl( entry.0.state.borrow().data().read_exact_at(offset, buf)?; file.write_all(buf)?; tx.try_send(ExportProgressItem::CopyProgress(offset)) - .await - .map_err(|_e| io::Error::other(""))?; + .await?; yield_now().await; } Ok(()) diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index 7fc73482..4134dafc 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -96,11 +96,9 @@ impl Actor { async fn handle_command(&mut self, cmd: Command) -> Option> { match cmd { Command::ImportBao(ImportBaoMsg { tx, .. }) => { - tx.send(Err(api::Error::from(io::Error::other( - "import not supported", - )))) - .await - .ok(); + tx.send(Err(unsupported("import not supported").into())) + .await + .ok(); } Command::WaitIdle(WaitIdleMsg { tx, .. }) => { if self.tasks.is_empty() { @@ -112,17 +110,17 @@ impl Actor { } } Command::ImportBytes(ImportBytesMsg { tx, .. }) => { - tx.send(io::Error::other("import not supported").into()) + tx.send(unsupported("import not supported").into()) .await .ok(); } Command::ImportByteStream(ImportByteStreamMsg { tx, .. }) => { - tx.send(io::Error::other("import not supported").into()) + tx.send(unsupported("import not supported").into()) .await .ok(); } Command::ImportPath(ImportPathMsg { tx, .. }) => { - tx.send(io::Error::other("import not supported").into()) + tx.send(unsupported("import not supported").into()) .await .ok(); } @@ -163,7 +161,7 @@ impl Actor { } Command::CreateTag(cmd) => { cmd.tx - .send(Err(io::Error::other("create tag not supported").into())) + .send(Err(unsupported("create tag not supported").into())) .await .ok(); } @@ -172,19 +170,19 @@ impl Actor { } Command::RenameTag(cmd) => { cmd.tx - .send(Err(io::Error::other("rename tag not supported").into())) + .send(Err(unsupported("rename tag not supported").into())) .await .ok(); } Command::DeleteTags(cmd) => { cmd.tx - .send(Err(io::Error::other("delete tags not supported").into())) + .send(Err(unsupported("delete tags not supported").into())) .await .ok(); } Command::DeleteBlobs(cmd) => { cmd.tx - .send(Err(io::Error::other("delete blobs not supported").into())) + .send(Err(unsupported("delete blobs not supported").into())) .await .ok(); } @@ -213,7 +211,7 @@ impl Actor { } Command::SetTag(cmd) => { cmd.tx - .send(Err(io::Error::other("set tag not supported").into())) + .send(Err(unsupported("set tag not supported").into())) .await .ok(); } @@ -264,6 +262,10 @@ impl Actor { } } +fn unsupported(text: &str) -> io::Error { + io::Error::new(io::ErrorKind::Unsupported, text) +} + async fn export_bao( hash: Hash, entry: Option, @@ -413,8 +415,7 @@ async fn export_path_impl( data.as_ref().read_exact_at(offset, buf)?; file.write_all(buf)?; tx.try_send(ExportProgressItem::CopyProgress(offset)) - .await - .map_err(|_e| io::Error::other("error"))?; + .await?; yield_now().await; } Ok(())