Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions encodings/alp/src/alp/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
mod between;
mod compare;
mod filter;
mod nan_count;

use vortex_array::compute::{ScalarAtFn, SliceFn, TakeFn, scalar_at, slice, take};
use vortex_array::compute::{NaNCountFn, ScalarAtFn, SliceFn, TakeFn, scalar_at, slice, take};
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::vtable::ComputeVTable;
use vortex_array::{Array, ArrayRef};
Expand All @@ -12,6 +12,9 @@ use vortex_scalar::Scalar;
use crate::{ALPArray, ALPEncoding, ALPFloat, match_each_alp_float_ptype};

impl ComputeVTable for ALPEncoding {
fn nan_count_fn(&self) -> Option<&dyn NaNCountFn<&dyn Array>> {
Some(self)
}
fn scalar_at_fn(&self) -> Option<&dyn ScalarAtFn<&dyn Array>> {
Some(self)
}
Expand Down
15 changes: 15 additions & 0 deletions encodings/alp/src/alp/compute/nan_count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use vortex_array::compute::{NaNCountFn, nan_count};
use vortex_error::VortexResult;

use crate::{ALPArray, ALPEncoding};

impl NaNCountFn<&ALPArray> for ALPEncoding {
fn nan_count(&self, array: &ALPArray) -> VortexResult<Option<usize>> {
// NANs can only be in patches
if let Some(patches) = array.patches() {
nan_count(patches.values())
} else {
Ok(Some(0))
}
}
}
7 changes: 6 additions & 1 deletion vortex-array/src/arrays/primitive/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::Array;
use crate::arrays::PrimitiveEncoding;
use crate::compute::{
FillNullFn, IsConstantFn, IsSortedFn, MinMaxFn, ScalarAtFn, SearchSortedFn,
FillNullFn, IsConstantFn, IsSortedFn, MinMaxFn, NaNCountFn, ScalarAtFn, SearchSortedFn,
SearchSortedUsizeFn, SliceFn, TakeFn, ToArrowFn, UncompressedSizeFn,
};
use crate::vtable::ComputeVTable;
Expand All @@ -14,6 +14,7 @@ mod is_constant;
mod is_sorted;
mod mask;
mod min_max;
mod nan_count;
mod scalar_at;
mod search_sorted;
mod slice;
Expand Down Expand Up @@ -41,6 +42,10 @@ impl ComputeVTable for PrimitiveEncoding {
Some(self)
}

fn nan_count_fn(&self) -> Option<&dyn NaNCountFn<&dyn Array>> {
Some(self)
}

fn scalar_at_fn(&self) -> Option<&dyn ScalarAtFn<&dyn Array>> {
Some(self)
}
Expand Down
56 changes: 56 additions & 0 deletions vortex-array/src/arrays/primitive/compute/nan_count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use vortex_dtype::{NativePType, match_each_float_ptype};
use vortex_error::VortexResult;
use vortex_mask::Mask;

use crate::Array;
use crate::arrays::{PrimitiveArray, PrimitiveEncoding};
use crate::compute::NaNCountFn;
use crate::variants::PrimitiveArrayTrait;

impl NaNCountFn<&PrimitiveArray> for PrimitiveEncoding {
fn nan_count(&self, array: &PrimitiveArray) -> VortexResult<Option<usize>> {
Ok(Some(match_each_float_ptype!(array.ptype(), |$F| {
compute_nan_count_with_validity(array.as_slice::<$F>(), array.validity_mask()?)
})))
}
}

#[inline]
fn compute_nan_count_with_validity<T: NativePType>(values: &[T], validity: Mask) -> usize {
match validity {
Mask::AllTrue(_) => values.iter().filter(|v| v.is_nan()).count(),
Mask::AllFalse(_) => 0,
Mask::Values(v) => values
.iter()
.zip(v.boolean_buffer().iter())
.filter_map(|(v, m)| m.then_some(v))
.filter(|v| v.is_nan())
.count(),
}
}

#[cfg(test)]
mod tests {
use vortex_buffer::buffer;

use crate::arrays::PrimitiveArray;
use crate::compute::nan_count;
use crate::validity::Validity;

#[test]
fn primitive_nan_count() {
let p = PrimitiveArray::new(
buffer![
-f32::NAN,
f32::NAN,
0.1,
1.1,
-0.0,
f32::INFINITY,
f32::NEG_INFINITY
],
Validity::NonNullable,
);
assert_eq!(nan_count(&p).unwrap(), Some(2));
}
}
3 changes: 2 additions & 1 deletion vortex-array/src/compute/conformance/binary_numeric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_err};
use vortex_scalar::{NumericOperator, PrimitiveScalar, Scalar};

use crate::arrays::ConstantArray;
use crate::compute::{numeric, scalar_at};
use crate::compute::numeric::numeric;
use crate::compute::scalar_at;
use crate::{Array, ArrayRef, ToCanonical};

fn to_vec_of_scalar(array: &dyn Array) -> Vec<Scalar> {
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use itertools::Itertools;
pub use like::{LikeFn, LikeOptions, like};
pub use mask::*;
pub use min_max::{MinMaxFn, MinMaxResult, min_max};
pub use nan_count::*;
pub use numeric::*;
pub use optimize::*;
pub use scalar_at::{ScalarAtFn, scalar_at};
Expand Down Expand Up @@ -58,6 +59,7 @@ mod is_sorted;
mod like;
mod mask;
mod min_max;
mod nan_count;
mod numeric;
mod optimize;
mod scalar_at;
Expand Down
65 changes: 65 additions & 0 deletions vortex-array/src/compute/nan_count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use vortex_error::{VortexExpect, VortexResult, vortex_bail};
use vortex_scalar::ScalarValue;

use crate::stats::{Precision, Stat};
use crate::{Array, Encoding};

/// Computes the min and max of an array, returning the (min, max) values
/// If the array is empty or has only nulls, the result is `None`.
pub trait NaNCountFn<A> {
fn nan_count(&self, array: A) -> VortexResult<Option<usize>>;
}

impl<E: Encoding> NaNCountFn<&dyn Array> for E
where
E: for<'a> NaNCountFn<&'a E::Array>,
{
fn nan_count(&self, array: &dyn Array) -> VortexResult<Option<usize>> {
let array_ref = array
.as_any()
.downcast_ref::<E::Array>()
.vortex_expect("Failed to downcast array");
NaNCountFn::nan_count(self, array_ref)
}
}

/// Computes the nunmber of NaN values in the array
/// This will update the stats set of this array (as a side effect).
pub fn nan_count(array: &dyn Array) -> VortexResult<Option<usize>> {
if array.is_empty() || array.valid_count()? == 0 {
return Ok(Some(0));
}

let nan_count = array
.statistics()
.get_as::<usize>(Stat::NaNCount)
.and_then(Precision::as_exact);

if let Some(nan_count) = nan_count {
return Ok(Some(nan_count));
}

// Only float arrays can have NaNs
let nan_count = if !array.dtype().is_float() {
Some(0)
} else if let Some(fn_) = array.vtable().nan_count_fn() {
fn_.nan_count(array)?
} else {
let canonical = array.to_canonical()?;
if let Some(fn_) = canonical.as_ref().vtable().nan_count_fn() {
fn_.nan_count(canonical.as_ref())?
} else {
vortex_bail!(NotImplemented: "nan_count", array.encoding());
}
};

if let Some(nan_count) = nan_count {
// Update the stats set with the computed min/max
array.statistics().set(
Stat::NaNCount,
Precision::Exact(ScalarValue::from(nan_count)),
);
}

Ok(nan_count)
}
4 changes: 3 additions & 1 deletion vortex-array/src/stats/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use super::{
};
use crate::Array;
use crate::compute::{
MinMaxResult, is_constant, is_sorted, is_strict_sorted, min_max, sum, uncompressed_size,
MinMaxResult, is_constant, is_sorted, is_strict_sorted, min_max, nan_count, sum,
uncompressed_size,
};

/// A shared [`StatsSet`] stored in an array. Can be shared by copies of the array and can also be mutated in place.
Expand Down Expand Up @@ -135,6 +136,7 @@ impl StatsSetRef<'_> {
Stat::IsSorted => Some(is_sorted(self.dyn_array_ref)?.into()),
Stat::IsStrictSorted => Some(is_strict_sorted(self.dyn_array_ref)?.into()),
Stat::UncompressedSizeInBytes => Some(uncompressed_size(self.dyn_array_ref)?.into()),
Stat::NaNCount => Some(nan_count(self.dyn_array_ref)?.into()),
})
}

Expand Down
11 changes: 11 additions & 0 deletions vortex-array/src/stats/flatbuffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ impl WriteFlatBuffer for StatsSet {
uncompressed_size_in_bytes: self
.get_as::<u64>(Stat::UncompressedSizeInBytes)
.and_then(Precision::as_exact),
nan_count: self
.get_as::<u64>(Stat::NaNCount)
.and_then(Precision::as_exact),
};

crate::flatbuffers::ArrayStats::create(fbb, stat_args)
Expand Down Expand Up @@ -112,6 +115,14 @@ impl ReadFlatBuffer for StatsSet {
stats_set.set(Stat::Sum, Precision::Exact(ScalarValue::try_from(sum)?));
}
}
Stat::NaNCount => {
if let Some(nan_count) = fb.nan_count() {
stats_set.set(
Stat::NaNCount,
Precision::Exact(ScalarValue::from(nan_count)),
);
}
}
}
}

Expand Down
55 changes: 37 additions & 18 deletions vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,20 @@ use vortex_error::VortexExpect;

/// Statistics that are used for pruning files (i.e., we want to ensure they are computed when compressing/writing).
/// Sum is included for boolean arrays.
pub const PRUNING_STATS: &[Stat] = &[Stat::Min, Stat::Max, Stat::Sum, Stat::NullCount];
pub const PRUNING_STATS: &[Stat] = &[
Stat::Min,
Stat::Max,
Stat::Sum,
Stat::NullCount,
Stat::NaNCount,
];

/// Stats to keep when serializing arrays to layouts
pub const STATS_TO_WRITE: &[Stat] = &[
Stat::Min,
Stat::Max,
Stat::NullCount,
Stat::NaNCount,
Stat::Sum,
Stat::IsConstant,
Stat::IsSorted,
Expand Down Expand Up @@ -75,6 +82,8 @@ pub enum Stat {
NullCount = 6,
/// The uncompressed size of the array in bytes
UncompressedSizeInBytes = 7,
/// The number of NaN values in the array
NaNCount = 8,
}

/// These structs allow the extraction of the bound from the `Precision` value.
Expand All @@ -87,6 +96,7 @@ pub struct IsSorted;
pub struct IsStrictSorted;
pub struct NullCount;
pub struct UncompressedSizeInBytes;
pub struct NaNCount;

impl StatType<bool> for IsConstant {
type Bound = Precision<bool>;
Expand Down Expand Up @@ -136,19 +146,26 @@ impl<T: PartialOrd + Clone + Debug> StatType<T> for Sum {
const STAT: Stat = Stat::Sum;
}

impl<T: PartialOrd + Clone> StatType<T> for NaNCount {
type Bound = UpperBound<T>;

const STAT: Stat = Stat::NaNCount;
}

impl Stat {
/// Whether the statistic is commutative (i.e., whether merging can be done independently of ordering)
/// e.g., min/max are commutative, but is_sorted is not
pub fn is_commutative(&self) -> bool {
// NOTE: we prefer this syntax to force a compile error if we add a new stat
match self {
Stat::IsConstant
| Stat::Max
| Stat::Min
| Stat::NullCount
| Stat::Sum
| Stat::UncompressedSizeInBytes => true,
Stat::IsSorted | Stat::IsStrictSorted => false,
Self::IsConstant
| Self::Max
| Self::Min
| Self::NullCount
| Self::Sum
| Self::NaNCount
| Self::UncompressedSizeInBytes => true,
Self::IsSorted | Self::IsStrictSorted => false,
}
}

Expand All @@ -159,14 +176,15 @@ impl Stat {

pub fn dtype(&self, data_type: &DType) -> Option<DType> {
Some(match self {
Stat::IsConstant => DType::Bool(NonNullable),
Stat::IsSorted => DType::Bool(NonNullable),
Stat::IsStrictSorted => DType::Bool(NonNullable),
Stat::Max => data_type.clone(),
Stat::Min => data_type.clone(),
Stat::NullCount => DType::Primitive(PType::U64, NonNullable),
Stat::UncompressedSizeInBytes => DType::Primitive(PType::U64, NonNullable),
Stat::Sum => {
Self::IsConstant => DType::Bool(NonNullable),
Self::IsSorted => DType::Bool(NonNullable),
Self::IsStrictSorted => DType::Bool(NonNullable),
Self::Max => data_type.clone(),
Self::Min => data_type.clone(),
Self::NullCount => DType::Primitive(PType::U64, NonNullable),
Self::UncompressedSizeInBytes => DType::Primitive(PType::U64, NonNullable),
Self::NaNCount => DType::Primitive(PType::U64, NonNullable),
Self::Sum => {
// Any array that cannot be summed has a sum DType of null.
// Any array that can be summed, but overflows, has a sum _value_ of null.
// Therefore, we make integer sum stats nullable.
Expand Down Expand Up @@ -207,13 +225,14 @@ impl Stat {
Self::Min => "min",
Self::NullCount => "null_count",
Self::UncompressedSizeInBytes => "uncompressed_size_in_bytes",
Stat::Sum => "sum",
Self::Sum => "sum",
Self::NaNCount => "nan_count",
}
}
}

pub fn as_stat_bitset_bytes(stats: &[Stat]) -> Vec<u8> {
let max_stat = u8::from(last::<Stat>().vortex_expect("last stat")) as usize;
let max_stat = u8::from(last::<Stat>().vortex_expect("last stat")) as usize + 1;
// TODO(ngates): use vortex-buffer::BitBuffer
let mut stat_bitset = BooleanBufferBuilder::new_from_buffer(
MutableBuffer::from_len_zeroed(max_stat.div_ceil(8)),
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/stats/stat_bound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ impl<T: PartialOrd + Clone> StatBound<T> for Precision<T> {
value
}

fn into_value(self) -> Precision<T> {
self
}

fn union(&self, other: &Self) -> Option<Self> {
self.clone()
.zip(other.clone())
Expand Down Expand Up @@ -84,8 +88,4 @@ impl<T: PartialOrd + Clone> StatBound<T> for Precision<T> {
_ => None,
}
}

fn into_value(self) -> Precision<T> {
self
}
}
Loading
Loading