Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl From<EncodeError> for Error {
fn from(value: EncodeError) -> Self {
match value {
EncodeError::Io(cause) => Self::Io(cause),
_ => Self::other(value),
_ => Self::Io(io::Error::other(value)),
}
}
}
Expand Down
15 changes: 6 additions & 9 deletions src/api/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,16 +437,13 @@ impl Blobs {
mut reader: R,
) -> RequestResult<R> {
let mut size = [0; 8];
reader
.recv_exact(&mut size)
.await
.map_err(super::Error::other)?;
reader.recv_exact(&mut size).await?;
let size = u64::from_le_bytes(size);
let Some(size) = NonZeroU64::new(size) else {
return if hash == Hash::EMPTY {
Ok(reader)
} else {
Err(super::Error::other("invalid size for hash").into())
Err(io::Error::other("invalid size for hash").into())
};
};
let tree = BaoTree::new(size.get(), IROH_BLOCK_SIZE);
Expand Down Expand Up @@ -651,7 +648,7 @@ impl<'a> AddProgress<'a> {
_ => {}
}
}
Err(super::Error::other("unexpected end of stream").into())
Err(io::Error::other("unexpected end of stream").into())
}

pub async fn with_named_tag(self, name: impl AsRef<[u8]>) -> RequestResult<HashAndFormat> {
Expand Down Expand Up @@ -704,7 +701,7 @@ impl IntoFuture for ObserveProgress {
let mut rx = self.inner.await?;
match rx.recv().await? {
Some(bitfield) => Ok(bitfield),
None => Err(super::Error::other("unexpected end of stream").into()),
None => Err(io::Error::other("unexpected end of stream").into()),
}
})
}
Expand All @@ -726,7 +723,7 @@ impl ObserveProgress {
return Ok(item);
}
}
Err(super::Error::other("unexpected end of stream").into())
Err(io::Error::other("unexpected end of stream").into())
}

/// Returns an infinite stream of bitfields. The first bitfield is the
Expand Down Expand Up @@ -805,7 +802,7 @@ impl ExportProgress {
if let Some(size) = size {
Ok(size)
} else {
Err(super::Error::other("unexpected end of stream").into())
Err(io::Error::other("unexpected end of stream").into())
}
}
}
Expand Down
28 changes: 13 additions & 15 deletions src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ impl HashSpecificCommand for ExportPathMsg {
async fn on_error(self, arg: SpawnArg<EmParams>) {
let err = match arg {
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
SpawnArg::Dead => io::Error::other("entity is dead"),
SpawnArg::Dead => err_entity_dead(),
_ => unreachable!(),
};
self.tx
Expand All @@ -747,7 +747,7 @@ impl HashSpecificCommand for ExportBaoMsg {
async fn on_error(self, arg: SpawnArg<EmParams>) {
let err = match arg {
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
SpawnArg::Dead => io::Error::other("entity is dead"),
SpawnArg::Dead => err_entity_dead(),
_ => unreachable!(),
};
self.tx
Expand All @@ -763,7 +763,7 @@ impl HashSpecificCommand for ExportRangesMsg {
async fn on_error(self, arg: SpawnArg<EmParams>) {
let err = match arg {
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
SpawnArg::Dead => io::Error::other("entity is dead"),
SpawnArg::Dead => err_entity_dead(),
_ => unreachable!(),
};
self.tx
Expand All @@ -779,7 +779,7 @@ impl HashSpecificCommand for ImportBaoMsg {
async fn on_error(self, arg: SpawnArg<EmParams>) {
let err = match arg {
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
SpawnArg::Dead => io::Error::other("entity is dead"),
SpawnArg::Dead => err_entity_dead(),
_ => unreachable!(),
};
self.tx.send(Err(api::Error::from(err))).await.ok();
Expand All @@ -798,13 +798,17 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
async fn on_error(self, arg: SpawnArg<EmParams>) {
let err = match arg {
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
SpawnArg::Dead => io::Error::other("entity is dead"),
SpawnArg::Dead => err_entity_dead(),
_ => unreachable!(),
};
self.1.tx.send(AddProgressItem::Error(err)).await.ok();
}
}

fn err_entity_dead() -> io::Error {
io::Error::other("entity is dead")
}

struct RtWrapper(Option<tokio::runtime::Runtime>);

impl From<tokio::runtime::Runtime> for RtWrapper {
Expand Down Expand Up @@ -849,7 +853,7 @@ async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: A
async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
let BatchMsg { tx, mut rx, .. } = cmd;
trace!("created scope {}", id);
tx.send(id).await.map_err(api::Error::other)?;
tx.send(id).await?;
while let Some(msg) = rx.recv().await? {
match msg {
BatchResponse::Drop(msg) => scope.on_drop(&msg),
Expand Down Expand Up @@ -1262,9 +1266,7 @@ async fn export_path_impl(
MemOrFile::Mem(data) => data.len() as u64,
MemOrFile::File((_, size)) => *size,
};
tx.send(ExportProgressItem::Size(size))
.await
.map_err(api::Error::other)?;
tx.send(ExportProgressItem::Size(size)).await?;
match data {
MemOrFile::Mem(data) => {
let mut target = fs::File::create(&target)?;
Expand Down Expand Up @@ -1320,9 +1322,7 @@ async fn export_path_impl(
}
},
}
tx.send(ExportProgressItem::Done)
.await
.map_err(api::Error::other)?;
tx.send(ExportProgressItem::Done).await?;
Ok(())
}

Expand Down Expand Up @@ -1378,9 +1378,7 @@ async fn copy_with_progress<T: CopyProgress>(
let buf: &mut [u8] = &mut buf[..remaining];
file.read_exact_at(offset, buf)?;
target.write_all(buf)?;
tx.try_send(T::from_offset(offset))
.await
.map_err(|_e| io::Error::other(""))?;
tx.try_send(T::from_offset(offset)).await?;
yield_now().await;
offset += buf.len() as u64;
}
Expand Down
36 changes: 9 additions & 27 deletions src/store/fs/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,8 @@ async fn import_bytes_tiny_impl(
let size = cmd.data.len() as u64;
// send the required progress events
// AddProgressItem::Done will be sent when finishing the import!
tx.send(AddProgressItem::Size(size))
.await
.map_err(|_e| io::Error::other("error"))?;
tx.send(AddProgressItem::CopyDone)
.await
.map_err(|_e| io::Error::other("error"))?;
tx.send(AddProgressItem::Size(size)).await?;
tx.send(AddProgressItem::CopyDone).await?;
Ok(if raw_outboard_size(size) == 0 {
// the thing is so small that it does not even need an outboard
ImportEntry {
Expand Down Expand Up @@ -286,12 +282,8 @@ async fn import_byte_stream_impl(
) -> io::Result<ImportEntry> {
let ImportByteStreamRequest { format, scope } = cmd;
let import_source = get_import_source(stream, tx, &options).await?;
tx.send(AddProgressItem::Size(import_source.size()))
.await
.map_err(|_e| io::Error::other("error"))?;
tx.send(AddProgressItem::CopyDone)
.await
.map_err(|_e| io::Error::other("error"))?;
tx.send(AddProgressItem::Size(import_source.size())).await?;
tx.send(AddProgressItem::CopyDone).await?;
compute_outboard(import_source, format, scope, options, tx).await
}

Expand Down Expand Up @@ -344,18 +336,14 @@ async fn get_import_source(
data.extend_from_slice(&chunk);
}
// todo: don't send progress for every chunk if the chunks are small?
tx.try_send(AddProgressItem::CopyProgress(size))
.await
.map_err(|_e| io::Error::other("error"))?;
tx.try_send(AddProgressItem::CopyProgress(size)).await?;
}
Ok(if let Some((mut file, temp_path)) = disk {
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk)?;
size += chunk.len() as u64;
tx.send(AddProgressItem::CopyProgress(size))
.await
.map_err(|_e| io::Error::other("error"))?;
tx.send(AddProgressItem::CopyProgress(size)).await?;
}
ImportSource::TempFile(temp_path, file, size)
} else {
Expand Down Expand Up @@ -473,14 +461,10 @@ async fn import_path_impl(
}

let size = path.metadata()?.len();
tx.send(AddProgressItem::Size(size))
.await
.map_err(|_e| io::Error::other("error"))?;
tx.send(AddProgressItem::Size(size)).await?;
let import_source = if size <= options.inline.max_data_inlined {
let data = std::fs::read(path)?;
tx.send(AddProgressItem::CopyDone)
.await
.map_err(|_e| io::Error::other("error"))?;
tx.send(AddProgressItem::CopyDone).await?;
ImportSource::Memory(data.into())
} else if mode == ImportMode::TryReference {
// reference where it is. We are going to need the file handle to
Expand All @@ -500,9 +484,7 @@ async fn import_path_impl(
);
// copy from path to temp_path
let file = OpenOptions::new().read(true).open(&temp_path)?;
tx.send(AddProgressItem::CopyDone)
.await
.map_err(|_| io::Error::other("error"))?;
tx.send(AddProgressItem::CopyDone).await?;
ImportSource::TempFile(temp_path, file, size)
};
compute_outboard(import_source, format, batch, options, tx).await
Expand Down
25 changes: 20 additions & 5 deletions src/store/fs/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ async fn handle_list_tags(msg: ListTagsMsg, tables: &impl ReadableTables) -> Act
res.push(crate::api::Result::Ok(info));
}
}
Err(e) => res.push(Err(crate::api::Error::other(e))),
Err(e) => res.push(Err(api_error_from_storage_error(e))),
}
}
tx.send(res).await.ok();
Expand Down Expand Up @@ -629,8 +629,12 @@ impl Actor {
tx,
..
} = cmd;
let res = tables.tags.insert(tag, value).map(|_| ());
tx.send(res.map_err(crate::api::Error::other)).await.ok();
let res = tables
.tags
.insert(tag, value)
.map_err(api_error_from_storage_error)
.map(|_| ());
tx.send(res).await.ok();
Ok(())
}

Expand Down Expand Up @@ -852,6 +856,13 @@ impl Actor {
}
}

/// Convert a redb StorageError into an api::Error
///
/// This can't be a From instance because that would require exposing redb::StorageError in the public API.
fn api_error_from_storage_error(e: redb::StorageError) -> api::Error {
api::Error::Io(io::Error::other(e))
}

#[derive(Debug)]
struct DbWrapper(Option<Database>);

Expand Down Expand Up @@ -953,8 +964,12 @@ async fn list_blobs_impl(
_cmd: ListRequest,
tx: &mut mpsc::Sender<api::Result<Hash>>,
) -> api::Result<()> {
for item in snapshot.blobs.iter().map_err(api::Error::other)? {
let (k, _) = item.map_err(api::Error::other)?;
for item in snapshot
.blobs
.iter()
.map_err(api_error_from_storage_error)?
{
let (k, _) = item.map_err(api_error_from_storage_error)?;
let k = k.value();
tx.send(Ok(k)).await.ok();
}
Expand Down
5 changes: 2 additions & 3 deletions src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Sco
async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
let BatchMsg { tx, mut rx, .. } = cmd;
trace!("created scope {}", id);
tx.send(id).await.map_err(api::Error::other)?;
tx.send(id).await?;
while let Some(msg) = rx.recv().await? {
match msg {
BatchResponse::Drop(msg) => scope.on_drop(&msg),
Expand Down Expand Up @@ -837,8 +837,7 @@ async fn export_path_impl(
entry.0.state.borrow().data().read_exact_at(offset, buf)?;
file.write_all(buf)?;
tx.try_send(ExportProgressItem::CopyProgress(offset))
.await
.map_err(|_e| io::Error::other(""))?;
.await?;
yield_now().await;
}
Ok(())
Expand Down
Loading
Loading