Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
400 changes: 14 additions & 386 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ camino = { workspace = true }
clap = { workspace = true, features = ["std", "color", "help", "usage", "error-context", "suggestions", "derive"] }
wit-deps = { workspace = true }
tokio = { workspace = true, features = ["io-std", "fs", "macros", "rt-multi-thread"] }
tokio-util = { workspace = true, features = ["compat"] }
toml = { workspace = true, features = ["display", "parse"] }
tracing-subscriber = { workspace = true, features = ["ansi", "env-filter", "fmt", "json", "std"] }

Expand All @@ -49,7 +48,7 @@ wit-bindgen = { workspace = true, features = ["default"] }
[workspace.dependencies]
anyhow = { version = "1", default-features = false }
async-compression = { version = "0.4", default-features = false }
async-tar = { version = "0.5", default-features = false }
async-tar = { version = "0.6", default-features = false }
async-trait = { version = "0.1", default-features = false }
camino = { version = "1", default-features = false }
clap = { version = "4", default-features = false }
Expand Down
8 changes: 4 additions & 4 deletions crates/wit-deps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ repository.workspace = true

[dependencies]
anyhow = { workspace = true, features = ["std"] }
async-compression = { workspace = true, features = ["futures-io", "gzip"] }
async-tar = { workspace = true }
async-compression = { workspace = true, features = ["tokio", "gzip"] }
async-tar = { workspace = true, features = ["runtime-tokio"] }
async-trait = { workspace = true }
directories = { workspace = true }
futures = { workspace = true, features = ["async-await", "std"] }
hex = { workspace = true, features = ["alloc"] }
reqwest = { workspace = true, features = ["rustls-tls", "stream"] }
serde = { workspace = true, features = ["derive"] }
sha2 = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
tokio = { workspace = true, features = ["fs", "io-util", "sync"] }
tokio-stream = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true, features = ["compat"] }
tokio-util = { workspace = true, features = ["io"] }
toml = { workspace = true, features = ["display", "parse", "preserve_order"] }
tracing = { workspace = true, features = ["attributes"] }
url = { workspace = true, features = ["serde"] }
Expand Down
10 changes: 4 additions & 6 deletions crates/wit-deps/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use std::path::{Path, PathBuf};
use anyhow::{bail, Context as _};
use async_trait::async_trait;
use directories::ProjectDirs;
use futures::{io::BufReader, AsyncBufRead, AsyncWrite};
use tokio::fs::{self, File, OpenOptions};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use tokio::io::{AsyncBufRead, AsyncWrite, BufReader};
use url::{Host, Url};

/// Resource caching layer
Expand Down Expand Up @@ -130,12 +129,12 @@ impl Local {

#[async_trait]
impl Cache for Local {
type Read = BufReader<Compat<File>>;
type Write = Compat<File>;
type Read = BufReader<File>;
type Write = File;

async fn get(&self, url: &Url) -> anyhow::Result<Option<Self::Read>> {
match File::open(self.path(url)).await {
Ok(file) => Ok(Some(BufReader::new(file.compat()))),
Ok(file) => Ok(Some(BufReader::new(file))),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => bail!("failed to lookup `{url}` in cache: {e}"),
}
Expand All @@ -153,7 +152,6 @@ impl Cache for Local {
.write(true)
.open(path)
.await
.map(tokio_util::compat::TokioAsyncReadCompatExt::compat)
.context("failed to open file for writing")
}
}
Expand Down
25 changes: 15 additions & 10 deletions crates/wit-deps/src/digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use core::fmt;
use core::pin::Pin;
use core::task::{Context, Poll};

use futures::{AsyncRead, AsyncWrite};
use hex::FromHex;
use serde::ser::SerializeStruct;
use serde::{de, Deserialize, Serialize};
use sha2::{Digest as _, Sha256, Sha512};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

/// A resource digest
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
Expand Down Expand Up @@ -91,13 +91,18 @@ impl<T: AsyncRead + Unpin> AsyncRead for Reader<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.inner).poll_read(cx, buf).map_ok(|n| {
self.sha256.update(&buf[..n]);
self.sha512.update(&buf[..n]);
n
})
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let n = buf.filled().len();
match Pin::new(&mut self.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
let buf = buf.filled();
self.sha256.update(&buf[n..]);
self.sha512.update(&buf[n..]);
Poll::Ready(Ok(()))
}
other => other,
}
}
}

Expand Down Expand Up @@ -143,8 +148,8 @@ impl<T: AsyncWrite + Unpin> AsyncWrite for Writer<T> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.inner).poll_close(cx)
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}

Expand Down
22 changes: 16 additions & 6 deletions crates/wit-deps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use std::ffi::{OsStr, OsString};
use std::path::{Path, PathBuf};

use anyhow::Context;
use futures::{try_join, AsyncRead, AsyncWrite, FutureExt, Stream, TryStreamExt};
use futures::{try_join, FutureExt, Stream, TryStreamExt};
use tokio::fs;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::wrappers::ReadDirStream;
use tracing::{debug, instrument, trace};

Expand Down Expand Up @@ -197,7 +198,7 @@ pub async fn untar(
use std::io::{Error, Result};

async fn unpack(e: &mut async_tar::Entry<impl Unpin + AsyncRead>, dst: &Path) -> Result<()> {
e.unpack(dst).await.map_err(|e| {
e.unpack(dst).await.map_err(|e: Error| {
Error::new(
e.kind(),
format!("failed to unpack `{}`: {e}", dst.display()),
Expand All @@ -219,7 +220,7 @@ pub async fn untar(
let Ok(path) = path.strip_prefix(prefix) else {
return Ok(untared);
};
let mut path = path.into_iter();
let mut path = path.iter();
match array::from_fn::<_, 6, _>(|_| path.next().and_then(OsStr::to_str)) {
[Some(name), None, ..]
| [Some("wit"), Some(name), None, ..]
Expand Down Expand Up @@ -268,10 +269,19 @@ where
let path = path.as_ref();
let mut tar = async_tar::Builder::new(dst);
tar.mode(async_tar::HeaderMode::Deterministic);
for name in read_wits(path).await?.try_collect::<BTreeSet<_>>().await? {
tar.append_path_with_name(path.join(&name), Path::new("wit").join(name))
.await?;
let res = async {
for name in read_wits(path).await?.try_collect::<BTreeSet<_>>().await? {
tar.append_path_with_name(path.join(&name), Path::new("wit").join(name))
.await?;
}
std::io::Result::Ok(())
}
.await;
if res.is_err() {
// Finalize the builder to avoid a panic on drop.
let _ = tar.finish().await;
}
res?;
tar.into_inner().await
}

Expand Down
2 changes: 1 addition & 1 deletion crates/wit-deps/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};

use anyhow::Context;
use futures::io::sink;
use serde::{Deserialize, Serialize};
use tokio::io::sink;
use url::Url;

/// Source of this dependency
Expand Down
60 changes: 30 additions & 30 deletions crates/wit-deps/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use std::sync::Arc;

use anyhow::ensure;
use anyhow::{bail, Context as _};
use async_compression::futures::bufread::GzipDecoder;
use futures::io::BufReader;
use futures::lock::Mutex;
use futures::{stream, AsyncWriteExt, StreamExt, TryStreamExt};
use async_compression::tokio::bufread::GzipDecoder;
use futures::{stream, StreamExt, TryStreamExt};
use hex::FromHex;
use serde::{de, Deserialize};
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, trace, warn};
use url::Url;

Expand Down Expand Up @@ -400,34 +400,34 @@ impl Entry {
.error_for_status()
.context("GET request failed")
.map_err(std::io::Error::other)?;
let tar_gz = res
.bytes_stream()
.map_err(std::io::Error::other)
.then(|chunk| async {
let chunk = chunk?;
let mut cache = cache.lock().await;
let cache_res = if let Some(w) = cache.as_mut().map(|w| async {
if let Err(e) = w.write(&chunk).await {
error!("failed to write chunk to cache: {e}");
if let Err(e) = w.close().await {
error!("failed to close cache writer: {e}");
let tar_gz =
res.bytes_stream()
.map_err(std::io::Error::other)
.then(|chunk| async {
let chunk = chunk?;
let mut cache = cache.lock().await;
let cache_res = if let Some(w) = cache.as_mut().map(|w| async {
if let Err(e) = w.write(&chunk).await {
error!("failed to write chunk to cache: {e}");
if let Err(e) = w.shutdown().await {
error!("failed to close cache writer: {e}");
}
return Err(e);
}
return Err(e);
Ok(())
}) {
Some(w.await)
} else {
None
}
Ok(())
}) {
Some(w.await)
} else {
None
}
.transpose();
if cache_res.is_err() {
// Drop the cache writer if a failure occurs
cache.take();
}
Ok(chunk)
})
.into_async_read();
.transpose();
if cache_res.is_err() {
// Drop the cache writer if a failure occurs
cache.take();
}
Ok::<_, std::io::Error>(chunk)
});
let tar_gz = tokio_util::io::StreamReader::new(tar_gz);
let mut hashed = DigestReader::from(Box::pin(tar_gz));
let deps = untar(
GzipDecoder::new(BufReader::new(&mut hashed)),
Expand Down
5 changes: 2 additions & 3 deletions src/bin/wit-deps/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use anyhow::Context;
use clap::{Parser, Subcommand};
use tokio::fs::File;
use tokio::io;
use tokio_util::compat::TokioAsyncWriteCompatExt;
use tracing_subscriber::prelude::*;
use wit_deps::Identifier;

Expand Down Expand Up @@ -101,9 +100,9 @@ async fn main() -> anyhow::Result<ExitCode> {
let output = File::create(&output).await.with_context(|| {
format!("failed to create output path `{}`", output.display())
})?;
wit_deps::tar(package, output.compat_write()).await?;
wit_deps::tar(package, output).await?;
} else {
wit_deps::tar(package, io::stdout().compat_write()).await?;
wit_deps::tar(package, io::stdout()).await?;
}
Ok(ExitCode::SUCCESS)
}
Expand Down
Loading