Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ crate-type = ["cdylib", "rlib"]

[dependencies]
pyo3 = { version = "0.27.1", features = ["abi3-py311"] }
zarrs = { version = "0.22.4", features = ["async", "zlib", "pcodec", "bz2"] }
zarrs = { version = "0.23.0-beta.3", features = ["async", "zlib", "pcodec", "bz2"] }
rayon_iter_concurrent_limit = "0.2.0"
rayon = "1.10.0"
# fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path
Expand All @@ -19,10 +19,11 @@ numpy = "0.27.0"
unsafe_cell_slice = "0.2.0"
serde_json = "1.0.128"
pyo3-stub-gen = "0.17.1"
opendal = { version = "0.54.0", features = ["services-http"] }
opendal = { version = "0.55.0", features = ["services-http"] }
tokio = { version = "1.41.1", features = ["rt-multi-thread"] }
zarrs_opendal = "0.9.0"
zarrs_opendal = "0.10.0"
itertools = "0.14.0"
bytemuck = { version = "1.24.0", features = ["must_cast"] }

[profile.release]
lto = true
80 changes: 47 additions & 33 deletions src/chunk_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use pyo3::{
};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use zarrs::{
array::{ChunkRepresentation, DataType, FillValue},
array::{ChunkShape, DataType, FillValue, NamedDataType},
array_subset::ArraySubset,
metadata::v3::MetadataV3,
storage::StoreKey,
Expand All @@ -18,15 +18,19 @@ use crate::utils::PyErrExt;

pub(crate) trait ChunksItem {
fn key(&self) -> &StoreKey;
fn representation(&self) -> &ChunkRepresentation;
fn shape(&self) -> &[NonZeroU64];
fn data_type(&self) -> &DataType;
fn fill_value(&self) -> &FillValue;
}

#[derive(Clone)]
#[gen_stub_pyclass]
#[pyclass]
pub(crate) struct Basic {
key: StoreKey,
representation: ChunkRepresentation,
shape: ChunkShape,
data_type: DataType,
fill_value: FillValue,
}

fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult<Vec<u8>> {
Expand Down Expand Up @@ -62,7 +66,8 @@ impl Basic {
fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult<Self> {
let path: String = byte_interface.getattr("path")?.extract()?;

let chunk_shape = chunk_spec.getattr("shape")?.extract()?;
let shape: Vec<NonZeroU64> = chunk_spec.getattr("shape")?.extract()?;

let mut dtype: String = chunk_spec
.getattr("dtype")?
.call_method0("to_native_dtype")?
Expand All @@ -73,11 +78,14 @@ impl Basic {
// but maps it to "string" internally https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L288
dtype = String::from("string");
}
let data_type = get_data_type_from_dtype(&dtype)?;
let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?;
let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?;
let fill_value = FillValue::new(fill_value_to_bytes(&dtype, &fill_value)?);
Ok(Self {
key: StoreKey::new(path).map_py_err::<PyValueError>()?,
representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?,
shape,
data_type,
fill_value,
})
}
}
Expand All @@ -102,8 +110,17 @@ impl WithSubset {
subset: Vec<Bound<'_, PySlice>>,
shape: Vec<u64>,
) -> PyResult<Self> {
let chunk_subset =
selection_to_array_subset(&chunk_subset, &item.representation.shape_u64())?;
let chunk_subset = selection_to_array_subset(&chunk_subset, &item.shape)?;
let shape: Vec<NonZeroU64> = shape
.into_iter()
.map(|dim| {
NonZeroU64::new(dim).ok_or_else(|| {
PyErr::new::<PyValueError, _>(
"subset dimensions must be greater than zero".to_string(),
)
})
})
.collect::<PyResult<Vec<NonZeroU64>>>()?;
let subset = selection_to_array_subset(&subset, &shape)?;
// Check that subset and chunk_subset have the same number of elements.
// This permits broadcasting of a constant input.
Expand All @@ -124,39 +141,36 @@ impl ChunksItem for Basic {
fn key(&self) -> &StoreKey {
&self.key
}
fn representation(&self) -> &ChunkRepresentation {
&self.representation
fn shape(&self) -> &[NonZeroU64] {
&self.shape
}
fn data_type(&self) -> &DataType {
&self.data_type
}
fn fill_value(&self) -> &FillValue {
&self.fill_value
}
}

impl ChunksItem for WithSubset {
fn key(&self) -> &StoreKey {
&self.item.key
}
fn representation(&self) -> &ChunkRepresentation {
&self.item.representation
fn shape(&self) -> &[NonZeroU64] {
&self.item.shape
}
fn data_type(&self) -> &DataType {
&self.item.data_type
}
fn fill_value(&self) -> &FillValue {
&self.item.fill_value
}
}

fn get_chunk_representation(
chunk_shape: Vec<u64>,
dtype: &str,
fill_value: Vec<u8>,
) -> PyResult<ChunkRepresentation> {
// Get the chunk representation
let data_type = DataType::from_metadata(
&MetadataV3::new(dtype),
zarrs::config::global_config().data_type_aliases_v3(),
)
.map_py_err::<PyRuntimeError>()?;
let chunk_shape = chunk_shape
.into_iter()
.map(|x| NonZeroU64::new(x).expect("chunk shapes should always be non-zero"))
.collect();
let chunk_representation =
ChunkRepresentation::new(chunk_shape, data_type, FillValue::new(fill_value))
.map_py_err::<PyValueError>()?;
Ok(chunk_representation)
fn get_data_type_from_dtype(dtype: &str) -> PyResult<DataType> {
let data_type =
NamedDataType::try_from(&MetadataV3::new(dtype)).map_py_err::<PyRuntimeError>()?;
Ok(data_type.into())
}

fn slice_to_range(slice: &Bound<'_, PySlice>, length: isize) -> PyResult<std::ops::Range<u64>> {
Expand All @@ -180,15 +194,15 @@ fn slice_to_range(slice: &Bound<'_, PySlice>, length: isize) -> PyResult<std::op

fn selection_to_array_subset(
selection: &[Bound<'_, PySlice>],
shape: &[u64],
shape: &[NonZeroU64],
) -> PyResult<ArraySubset> {
if selection.is_empty() {
Ok(ArraySubset::new_with_shape(vec![1; shape.len()]))
} else {
let chunk_ranges = selection
.iter()
.zip(shape)
.map(|(selection, &shape)| slice_to_range(selection, isize::try_from(shape)?))
.map(|(selection, &shape)| slice_to_range(selection, isize::try_from(shape.get())?))
.collect::<PyResult<Vec<_>>>()?;
Ok(ArraySubset::new_with_ranges(&chunk_ranges))
}
Expand Down
9 changes: 3 additions & 6 deletions src/concurrency.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use pyo3::PyResult;
use zarrs::array::{
ArrayCodecTraits, RecommendedConcurrency, codec::CodecOptions,
RecommendedConcurrency, codec::ArrayCodecTraits, codec::CodecOptions,
concurrency::calc_concurrency_outer_inner,
};

Expand All @@ -25,11 +25,10 @@ where
let Some(chunk_descriptions0) = self.first() else {
return Ok(None);
};
let chunk_representation = chunk_descriptions0.representation();

let codec_concurrency = codec_pipeline_impl
.codec_chain
.recommended_concurrency(chunk_representation)
.recommended_concurrency(chunk_descriptions0.shape(), chunk_descriptions0.data_type())
.map_codec_err()?;

let min_concurrent_chunks =
Expand All @@ -43,9 +42,7 @@ where
);
let codec_options = codec_pipeline_impl
.codec_options
.into_builder()
.concurrent_target(codec_concurrent_limit)
.build();
.with_concurrent_target(codec_concurrent_limit);
Ok(Some((chunk_concurrent_limit, codec_options)))
}
}
Loading