diff --git a/differential-dataflow/Cargo.toml b/differential-dataflow/Cargo.toml index 90f285421..cadc98369 100644 --- a/differential-dataflow/Cargo.toml +++ b/differential-dataflow/Cargo.toml @@ -24,10 +24,11 @@ mimalloc = "0.1.48" [dependencies] columnar = { workspace = true } -columnation = "0.1.0" +columnation = "0.1.1" fnv="1.0.2" paste = "1.0" serde = { version = "1.0", features = ["derive"] } +smallvec = "1.15.1" timely = {workspace = true} [features] diff --git a/differential-dataflow/src/dynamic/mod.rs b/differential-dataflow/src/dynamic/mod.rs index f77c8c5e4..27217b778 100644 --- a/differential-dataflow/src/dynamic/mod.rs +++ b/differential-dataflow/src/dynamic/mod.rs @@ -50,12 +50,12 @@ where let mut output = output.activate(); input.for_each(|cap, data| { let mut new_time = cap.time().clone(); - let mut vec = std::mem::take(&mut new_time.inner).into_vec(); + let mut vec = std::mem::take(&mut new_time.inner).into_inner(); vec.truncate(level - 1); new_time.inner = PointStamp::new(vec); let new_cap = cap.delayed(&new_time); for (_data, time, _diff) in data.iter_mut() { - let mut vec = std::mem::take(&mut time.inner).into_vec(); + let mut vec = std::mem::take(&mut time.inner).into_inner(); vec.truncate(level - 1); time.inner = PointStamp::new(vec); } diff --git a/differential-dataflow/src/dynamic/pointstamp.rs b/differential-dataflow/src/dynamic/pointstamp.rs index abc2c574d..095acc2b7 100644 --- a/differential-dataflow/src/dynamic/pointstamp.rs +++ b/differential-dataflow/src/dynamic/pointstamp.rs @@ -13,6 +13,7 @@ use columnar::Columnar; use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; /// A sequence of timestamps, partially ordered by the product order. /// @@ -22,7 +23,7 @@ use serde::{Deserialize, Serialize}; #[columnar(derive(Eq, PartialEq, Ord, PartialOrd))] pub struct PointStamp { /// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes. - vector: Vec, + vector: SmallVec<[T; 1]>, } impl PartialEq<[T]> for PointStamp { @@ -61,18 +62,18 @@ impl PointStamp { /// Create a new sequence. /// /// This method will modify `vector` to ensure it does not end with `T::minimum()`. - pub fn new(mut vector: Vec) -> Self { + pub fn new(mut vector: SmallVec<[T; 1]>) -> Self { while vector.last() == Some(&T::minimum()) { vector.pop(); } PointStamp { vector } } - /// Returns the wrapped vector. + /// Returns the wrapped small vector. /// /// This method is the support way to mutate the contents of `self`, by extracting /// the vector and then re-introducing it with `PointStamp::new` to re-establish /// the invariant that the vector not end with `T::minimum`. - pub fn into_vec(self) -> Vec { + pub fn into_inner(self) -> SmallVec<[T; 1]> { self.vector } } @@ -102,7 +103,7 @@ impl PartialOrder for PointStamp { use timely::progress::timestamp::Refines; impl Refines<()> for PointStamp { fn to_inner(_outer: ()) -> Self { - Self { vector: Vec::new() } + Self { vector: Default::default() } } fn to_outer(self) -> () { () @@ -158,7 +159,7 @@ impl PathSummary> for PointStampSummary vector.push(action.results_in(&T::minimum())?); } - Some(PointStamp::new(vector)) + Some(PointStamp::new(vector.into())) } fn followed_by(&self, other: &Self) -> Option { // The output `retain` will be the minimum of the two inputs. @@ -215,7 +216,7 @@ impl PartialOrder for PointStampSummary { use timely::progress::Timestamp; impl Timestamp for PointStamp { fn minimum() -> Self { - Self::new(Vec::new()) + Self::new(Default::default()) } type Summary = PointStampSummary; } @@ -227,7 +228,7 @@ impl Lattice for PointStamp { fn join(&self, other: &Self) -> Self { let min_len = ::std::cmp::min(self.vector.len(), other.vector.len()); let max_len = ::std::cmp::max(self.vector.len(), other.vector.len()); - let mut vector = Vec::with_capacity(max_len); + let mut vector = SmallVec::with_capacity(max_len); // For coordinates in both inputs, apply `join` to the pair. for index in 0..min_len { vector.push(self.vector[index].join(&other.vector[index])); @@ -243,7 +244,7 @@ impl Lattice for PointStamp { } fn meet(&self, other: &Self) -> Self { let min_len = ::std::cmp::min(self.vector.len(), other.vector.len()); - let mut vector = Vec::with_capacity(min_len); + let mut vector = SmallVec::with_capacity(min_len); // For coordinates in both inputs, apply `meet` to the pair. for index in 0..min_len { vector.push(self.vector[index].meet(&other.vector[index])); @@ -255,24 +256,24 @@ impl Lattice for PointStamp { mod columnation { use columnation::{Columnation, Region}; - + use smallvec::SmallVec; use crate::dynamic::pointstamp::PointStamp; - impl Columnation for PointStamp { + impl Columnation for PointStamp { type InnerRegion = PointStampStack; } /// Stack for PointStamp. Part of Columnation implementation. - pub struct PointStampStack>( as Columnation>::InnerRegion); + pub struct PointStampStack>( as Columnation>::InnerRegion); - impl> Default for PointStampStack { + impl> Default for PointStampStack { #[inline] fn default() -> Self { Self(Default::default()) } } - impl> Region for PointStampStack { + impl> Region for PointStampStack { type Item = PointStamp; #[inline]