diff --git a/Cargo.toml b/Cargo.toml index f3afd94..6dece68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ crate-type = ["cdylib", "rlib"] [dependencies] pyo3 = { version = "0.27.1", features = ["abi3-py311"] } -zarrs = { version = "0.22.4", features = ["async", "zlib", "pcodec", "bz2"] } +zarrs = { version = "0.23.0-beta.3", features = ["async", "zlib", "pcodec", "bz2"] } rayon_iter_concurrent_limit = "0.2.0" rayon = "1.10.0" # fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path @@ -19,10 +19,11 @@ numpy = "0.27.0" unsafe_cell_slice = "0.2.0" serde_json = "1.0.128" pyo3-stub-gen = "0.17.1" -opendal = { version = "0.54.0", features = ["services-http"] } +opendal = { version = "0.55.0", features = ["services-http"] } tokio = { version = "1.41.1", features = ["rt-multi-thread"] } -zarrs_opendal = "0.9.0" +zarrs_opendal = "0.10.0" itertools = "0.14.0" +bytemuck = { version = "1.24.0", features = ["must_cast"] } [profile.release] lto = true diff --git a/src/chunk_item.rs b/src/chunk_item.rs index 643e362..7a79b9c 100644 --- a/src/chunk_item.rs +++ b/src/chunk_item.rs @@ -8,7 +8,7 @@ use pyo3::{ }; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use zarrs::{ - array::{ChunkRepresentation, DataType, FillValue}, + array::{ChunkShape, DataType, FillValue, NamedDataType}, array_subset::ArraySubset, metadata::v3::MetadataV3, storage::StoreKey, @@ -18,7 +18,9 @@ use crate::utils::PyErrExt; pub(crate) trait ChunksItem { fn key(&self) -> &StoreKey; - fn representation(&self) -> &ChunkRepresentation; + fn shape(&self) -> &[NonZeroU64]; + fn data_type(&self) -> &DataType; + fn fill_value(&self) -> &FillValue; } #[derive(Clone)] @@ -26,7 +28,9 @@ pub(crate) trait ChunksItem { #[pyclass] pub(crate) struct Basic { key: StoreKey, - representation: ChunkRepresentation, + shape: ChunkShape, + data_type: DataType, + fill_value: FillValue, } fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult> { @@ -62,7 +66,8 @@ impl Basic { fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult { let path: String = byte_interface.getattr("path")?.extract()?; - let chunk_shape = chunk_spec.getattr("shape")?.extract()?; + let shape: Vec = chunk_spec.getattr("shape")?.extract()?; + let mut dtype: String = chunk_spec .getattr("dtype")? .call_method0("to_native_dtype")? @@ -73,11 +78,14 @@ impl Basic { // but maps it to "string" internally https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L288 dtype = String::from("string"); } + let data_type = get_data_type_from_dtype(&dtype)?; let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?; - let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?; + let fill_value = FillValue::new(fill_value_to_bytes(&dtype, &fill_value)?); Ok(Self { key: StoreKey::new(path).map_py_err::()?, - representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?, + shape, + data_type, + fill_value, }) } } @@ -102,8 +110,17 @@ impl WithSubset { subset: Vec>, shape: Vec, ) -> PyResult { - let chunk_subset = - selection_to_array_subset(&chunk_subset, &item.representation.shape_u64())?; + let chunk_subset = selection_to_array_subset(&chunk_subset, &item.shape)?; + let shape: Vec = shape + .into_iter() + .map(|dim| { + NonZeroU64::new(dim).ok_or_else(|| { + PyErr::new::( + "subset dimensions must be greater than zero".to_string(), + ) + }) + }) + .collect::>>()?; let subset = selection_to_array_subset(&subset, &shape)?; // Check that subset and chunk_subset have the same number of elements. // This permits broadcasting of a constant input. @@ -124,8 +141,14 @@ impl ChunksItem for Basic { fn key(&self) -> &StoreKey { &self.key } - fn representation(&self) -> &ChunkRepresentation { - &self.representation + fn shape(&self) -> &[NonZeroU64] { + &self.shape + } + fn data_type(&self) -> &DataType { + &self.data_type + } + fn fill_value(&self) -> &FillValue { + &self.fill_value } } @@ -133,30 +156,21 @@ impl ChunksItem for WithSubset { fn key(&self) -> &StoreKey { &self.item.key } - fn representation(&self) -> &ChunkRepresentation { - &self.item.representation + fn shape(&self) -> &[NonZeroU64] { + &self.item.shape + } + fn data_type(&self) -> &DataType { + &self.item.data_type + } + fn fill_value(&self) -> &FillValue { + &self.item.fill_value } } -fn get_chunk_representation( - chunk_shape: Vec, - dtype: &str, - fill_value: Vec, -) -> PyResult { - // Get the chunk representation - let data_type = DataType::from_metadata( - &MetadataV3::new(dtype), - zarrs::config::global_config().data_type_aliases_v3(), - ) - .map_py_err::()?; - let chunk_shape = chunk_shape - .into_iter() - .map(|x| NonZeroU64::new(x).expect("chunk shapes should always be non-zero")) - .collect(); - let chunk_representation = - ChunkRepresentation::new(chunk_shape, data_type, FillValue::new(fill_value)) - .map_py_err::()?; - Ok(chunk_representation) +fn get_data_type_from_dtype(dtype: &str) -> PyResult { + let data_type = + NamedDataType::try_from(&MetadataV3::new(dtype)).map_py_err::()?; + Ok(data_type.into()) } fn slice_to_range(slice: &Bound<'_, PySlice>, length: isize) -> PyResult> { @@ -180,7 +194,7 @@ fn slice_to_range(slice: &Bound<'_, PySlice>, length: isize) -> PyResult], - shape: &[u64], + shape: &[NonZeroU64], ) -> PyResult { if selection.is_empty() { Ok(ArraySubset::new_with_shape(vec![1; shape.len()])) @@ -188,7 +202,7 @@ fn selection_to_array_subset( let chunk_ranges = selection .iter() .zip(shape) - .map(|(selection, &shape)| slice_to_range(selection, isize::try_from(shape)?)) + .map(|(selection, &shape)| slice_to_range(selection, isize::try_from(shape.get())?)) .collect::>>()?; Ok(ArraySubset::new_with_ranges(&chunk_ranges)) } diff --git a/src/concurrency.rs b/src/concurrency.rs index 364b33b..a8b6007 100644 --- a/src/concurrency.rs +++ b/src/concurrency.rs @@ -1,6 +1,6 @@ use pyo3::PyResult; use zarrs::array::{ - ArrayCodecTraits, RecommendedConcurrency, codec::CodecOptions, + RecommendedConcurrency, codec::ArrayCodecTraits, codec::CodecOptions, concurrency::calc_concurrency_outer_inner, }; @@ -25,11 +25,10 @@ where let Some(chunk_descriptions0) = self.first() else { return Ok(None); }; - let chunk_representation = chunk_descriptions0.representation(); let codec_concurrency = codec_pipeline_impl .codec_chain - .recommended_concurrency(chunk_representation) + .recommended_concurrency(chunk_descriptions0.shape(), chunk_descriptions0.data_type()) .map_codec_err()?; let min_concurrent_chunks = @@ -43,9 +42,7 @@ where ); let codec_options = codec_pipeline_impl .codec_options - .into_builder() - .concurrent_target(codec_concurrent_limit) - .build(); + .with_concurrent_target(codec_concurrent_limit); Ok(Some((chunk_concurrent_limit, codec_options))) } } diff --git a/src/lib.rs b/src/lib.rs index ccf6bd8..2786f8b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,21 +18,21 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; use unsafe_cell_slice::UnsafeCellSlice; use utils::is_whole_chunk; +use zarrs::array::codec::ArrayBytesDecodeIntoTarget; use zarrs::array::codec::{ - ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, - StoragePartialDecoder, + ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, StoragePartialDecoder, }; use zarrs::array::{ - ArrayBytes, ArrayBytesFixedDisjointView, ArrayMetadata, ArraySize, CodecChain, FillValue, - copy_fill_value_into, update_array_bytes, + ArrayBytes, ArrayBytesFixedDisjointView, ArrayMetadata, ChunkShapeTraits, CodecChain, + DataTypeExt, FillValue, copy_fill_value_into, update_array_bytes, }; use zarrs::array_subset::ArraySubset; use zarrs::config::global_config; -use zarrs::metadata::v2::data_type_metadata_v2_to_endianness; -use zarrs::metadata::v3::MetadataV3; -use zarrs::metadata_ext::v2_to_v3::{ +use zarrs::convert::{ ArrayMetadataV2ToV3Error, codec_metadata_v2_to_v3, data_type_metadata_v2_to_v3, }; +use zarrs::metadata::v2::data_type_metadata_v2_to_endianness; +use zarrs::metadata::v3::MetadataV3; use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey}; mod chunk_item; @@ -71,14 +71,21 @@ impl CodecPipelineImpl { let value_decoded = if let Some(value_encoded) = value_encoded { let value_encoded: Vec = value_encoded.into(); // zero-copy in this case codec_chain - .decode(value_encoded.into(), item.representation(), codec_options) + .decode( + value_encoded.into(), + item.shape(), + item.data_type(), + item.fill_value(), + codec_options, + ) .map_codec_err()? } else { - let array_size = ArraySize::new( - item.representation().data_type().size(), - item.representation().num_elements(), - ); - ArrayBytes::new_fill_value(array_size, item.representation().fill_value()) + ArrayBytes::new_fill_value( + item.data_type(), + item.shape().num_elements_u64(), + item.fill_value(), + ) + .map_py_err::()? }; Ok(value_decoded) } @@ -91,17 +98,20 @@ impl CodecPipelineImpl { codec_options: &CodecOptions, ) -> PyResult<()> { value_decoded - .validate( - item.representation().num_elements(), - item.representation().data_type().size(), - ) + .validate(item.shape().num_elements_u64(), item.data_type()) .map_codec_err()?; - if value_decoded.is_fill_value(item.representation().fill_value()) { + if value_decoded.is_fill_value(item.fill_value()) { self.store.erase(item.key()).map_py_err::() } else { let value_encoded = codec_chain - .encode(value_decoded, item.representation(), codec_options) + .encode( + value_decoded, + item.shape(), + item.data_type(), + item.fill_value(), + codec_options, + ) .map(Cow::into_owned) .map_codec_err()?; @@ -120,21 +130,23 @@ impl CodecPipelineImpl { chunk_subset: &ArraySubset, codec_options: &CodecOptions, ) -> PyResult<()> { - let array_shape = item.representation().shape_u64(); - if !chunk_subset.inbounds_shape(&array_shape) { + let array_shape = item.shape(); + if !chunk_subset.inbounds_shape(bytemuck::must_cast_slice(array_shape)) { return Err(PyErr::new::(format!( "chunk subset ({chunk_subset}) is out of bounds for array shape ({array_shape:?})" ))); } - let data_type_size = item.representation().data_type().size(); + let data_type_size = item.data_type().size(); - if chunk_subset.start().iter().all(|&o| o == 0) && chunk_subset.shape() == array_shape { + if chunk_subset.start().iter().all(|&o| o == 0) + && chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(array_shape) + { // Fast path if the chunk subset spans the entire chunk, no read required self.store_chunk_bytes(item, codec_chain, chunk_subset_bytes, codec_options) } else { // Validate the chunk subset bytes chunk_subset_bytes - .validate(chunk_subset.num_elements(), data_type_size) + .validate(chunk_subset.num_elements(), item.data_type()) .map_codec_err()?; // Retrieve the chunk @@ -143,7 +155,7 @@ impl CodecPipelineImpl { // Update the chunk let chunk_bytes_new = update_array_bytes( chunk_bytes_old, - &array_shape, + bytemuck::must_cast_slice(array_shape), chunk_subset, &chunk_subset_bytes, data_type_size, @@ -212,14 +224,9 @@ fn array_metadata_to_codec_metadata_v3( match metadata { ArrayMetadata::V3(metadata) => Ok(metadata.codecs), ArrayMetadata::V2(metadata) => { - let config = global_config(); let endianness = data_type_metadata_v2_to_endianness(&metadata.dtype) .map_err(ArrayMetadataV2ToV3Error::InvalidEndianness)?; - let data_type = data_type_metadata_v2_to_v3( - &metadata.dtype, - config.data_type_aliases_v2(), - config.data_type_aliases_v3(), - )?; + let data_type = data_type_metadata_v2_to_v3(&metadata.dtype)?; codec_metadata_v2_to_v3( metadata.order, @@ -228,8 +235,6 @@ fn array_metadata_to_codec_metadata_v3( endianness, &metadata.filters, &metadata.compressor, - config.codec_aliases_v2(), - config.codec_aliases_v3(), ) } } @@ -266,14 +271,10 @@ impl CodecPipelineImpl { let codec_chain = Arc::new(CodecChain::from_metadata(&codec_metadata).map_py_err::()?); - let mut codec_options = CodecOptionsBuilder::new(); - - codec_options = codec_options.validate_checksums(validate_checksums); - - let codec_options = codec_options.build(); + let codec_options = CodecOptions::default().with_validate_checksums(validate_checksums); - let chunk_concurrent_minimum = chunk_concurrent_minimum - .unwrap_or(zarrs::config::global_config().chunk_concurrent_minimum()); + let chunk_concurrent_minimum = + chunk_concurrent_minimum.unwrap_or(global_config().chunk_concurrent_minimum()); let chunk_concurrent_maximum = chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads()); let num_threads = num_threads.unwrap_or(rayon::current_num_threads()); @@ -330,7 +331,9 @@ impl CodecPipelineImpl { .clone() .partial_decoder( Arc::new(input_handle), - item.representation(), + item.shape(), + item.data_type(), + item.fill_value(), &codec_options, ) .map_codec_err()?; @@ -358,8 +361,7 @@ impl CodecPipelineImpl { ArrayBytesFixedDisjointView::new( output, // TODO: why is data_type in `item`, it should be derived from `output`, no? - item.representation() - .data_type() + item.data_type() .fixed_size() .ok_or("variable length data type not supported") .map_py_err::()?, @@ -371,7 +373,7 @@ impl CodecPipelineImpl { // See zarrs::array::Array::retrieve_chunk_subset_into if chunk_subset.start().iter().all(|&o| o == 0) - && chunk_subset.shape() == item.representation().shape_u64() + && chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(item.shape()) { // See zarrs::array::Array::retrieve_chunk_into if let Some(chunk_encoded) = @@ -381,16 +383,18 @@ impl CodecPipelineImpl { let chunk_encoded: Vec = chunk_encoded.into(); self.codec_chain.decode_into( Cow::Owned(chunk_encoded), - item.representation(), - &mut output_view, + item.shape(), + item.data_type(), + item.fill_value(), + ArrayBytesDecodeIntoTarget::Fixed(&mut output_view), &codec_options, ) } else { // The chunk is missing, write the fill value copy_fill_value_into( - item.representation().data_type(), - item.representation().fill_value(), - &mut output_view, + item.data_type(), + item.fill_value(), + ArrayBytesDecodeIntoTarget::Fixed(&mut output_view), ) } } else { @@ -400,7 +404,7 @@ impl CodecPipelineImpl { })?; partial_decoder.partial_decode_into( &chunk_subset, - &mut output_view, + ArrayBytesDecodeIntoTarget::Fixed(&mut output_view), &codec_options, ) } @@ -452,11 +456,7 @@ impl CodecPipelineImpl { let store_chunk = |item: chunk_item::WithSubset| match &input { InputValue::Array(input) => { let chunk_subset_bytes = input - .extract_array_subset( - &item.subset, - &input_shape, - item.item.representation().data_type(), - ) + .extract_array_subset(&item.subset, &input_shape, item.item.data_type()) .map_codec_err()?; self.store_chunk_subset_bytes( &item, @@ -468,12 +468,11 @@ impl CodecPipelineImpl { } InputValue::Constant(constant_value) => { let chunk_subset_bytes = ArrayBytes::new_fill_value( - ArraySize::new( - item.representation().data_type().size(), - item.chunk_subset.num_elements(), - ), + item.data_type(), + item.chunk_subset.num_elements(), constant_value, - ); + ) + .map_py_err::()?; self.store_chunk_subset_bytes( &item, diff --git a/src/utils.rs b/src/utils.rs index eda2aa0..c87acf5 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -57,5 +57,5 @@ impl PyUntypedArrayExt for Bound<'_, PyUntypedArray> { pub fn is_whole_chunk(item: &WithSubset) -> bool { item.chunk_subset.start().iter().all(|&o| o == 0) - && item.chunk_subset.shape() == item.representation().shape_u64() + && item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(item.shape()) } diff --git a/tests/test_v2.py b/tests/test_v2.py index 43de1d0..c75877f 100644 --- a/tests/test_v2.py +++ b/tests/test_v2.py @@ -60,12 +60,9 @@ def test_fill_single_value(store: Store) -> None: np.testing.assert_array_equal(result, expected) -@pytest.mark.filterwarnings( - "ignore:Array is unsupported by ZarrsCodecPipeline. data type |S1 is not supported:UserWarning" -) @pytest.mark.filterwarnings( # TODO: Fix handling of string fill values for Zarr v2 bytes data - "ignore:Array is unsupported by ZarrsCodecPipeline. incompatible fill value ..+. for data type bytes:UserWarning" + "ignore:Array is unsupported by ZarrsCodecPipeline. unsupported data type .+:UserWarning" ) @pytest.mark.parametrize( ("dtype", "expected_dtype", "fill_value", "fill_value_json"), @@ -111,10 +108,7 @@ async def test_v2_encode_decode( @pytest.mark.filterwarnings( - "ignore:Array is unsupported by ZarrsCodecPipeline. data type |U1 is not supported:UserWarning" -) -@pytest.mark.filterwarnings( - "ignore:Array is unsupported by ZarrsCodecPipeline. data type |S1 is not supported:UserWarning" + "ignore:Array is unsupported by ZarrsCodecPipeline. unsupported data type .+:UserWarning" ) @pytest.mark.parametrize( ("dtype", "value"),