Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion differential-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ mimalloc = "0.1.48"

[dependencies]
columnar = { workspace = true }
columnation = "0.1.0"
columnation = "0.1.1"
fnv="1.0.2"
paste = "1.0"
serde = { version = "1.0", features = ["derive"] }
smallvec = "1.15.1"
timely = {workspace = true}

[features]
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/src/dynamic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ where
let mut output = output.activate();
input.for_each(|cap, data| {
let mut new_time = cap.time().clone();
let mut vec = std::mem::take(&mut new_time.inner).into_vec();
let mut vec = std::mem::take(&mut new_time.inner).into_inner();
vec.truncate(level - 1);
new_time.inner = PointStamp::new(vec);
let new_cap = cap.delayed(&new_time);
for (_data, time, _diff) in data.iter_mut() {
let mut vec = std::mem::take(&mut time.inner).into_vec();
let mut vec = std::mem::take(&mut time.inner).into_inner();
vec.truncate(level - 1);
time.inner = PointStamp::new(vec);
}
Expand Down
29 changes: 15 additions & 14 deletions differential-dataflow/src/dynamic/pointstamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use columnar::Columnar;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;

/// A sequence of timestamps, partially ordered by the product order.
///
Expand All @@ -22,7 +23,7 @@ use serde::{Deserialize, Serialize};
#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]
pub struct PointStamp<T> {
/// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes.
vector: Vec<T>,
vector: SmallVec<[T; 1]>,
}

impl<T: Timestamp> PartialEq<[T]> for PointStamp<T> {
Expand Down Expand Up @@ -61,18 +62,18 @@ impl<T: Timestamp> PointStamp<T> {
/// Create a new sequence.
///
/// This method will modify `vector` to ensure it does not end with `T::minimum()`.
pub fn new(mut vector: Vec<T>) -> Self {
pub fn new(mut vector: SmallVec<[T; 1]>) -> Self {
while vector.last() == Some(&T::minimum()) {
vector.pop();
}
PointStamp { vector }
}
/// Returns the wrapped vector.
/// Returns the wrapped small vector.
///
/// This method is the support way to mutate the contents of `self`, by extracting
/// the vector and then re-introducing it with `PointStamp::new` to re-establish
/// the invariant that the vector not end with `T::minimum`.
pub fn into_vec(self) -> Vec<T> {
pub fn into_inner(self) -> SmallVec<[T; 1]> {
self.vector
}
}
Expand Down Expand Up @@ -102,7 +103,7 @@ impl<T: PartialOrder + Timestamp> PartialOrder for PointStamp<T> {
use timely::progress::timestamp::Refines;
impl<T: Timestamp> Refines<()> for PointStamp<T> {
fn to_inner(_outer: ()) -> Self {
Self { vector: Vec::new() }
Self { vector: Default::default() }
}
fn to_outer(self) -> () {
()
Expand Down Expand Up @@ -158,7 +159,7 @@ impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary>
vector.push(action.results_in(&T::minimum())?);
}

Some(PointStamp::new(vector))
Some(PointStamp::new(vector.into()))
}
fn followed_by(&self, other: &Self) -> Option<Self> {
// The output `retain` will be the minimum of the two inputs.
Expand Down Expand Up @@ -215,7 +216,7 @@ impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
use timely::progress::Timestamp;
impl<T: Timestamp> Timestamp for PointStamp<T> {
fn minimum() -> Self {
Self::new(Vec::new())
Self::new(Default::default())
}
type Summary = PointStampSummary<T::Summary>;
}
Expand All @@ -227,7 +228,7 @@ impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
fn join(&self, other: &Self) -> Self {
let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
let max_len = ::std::cmp::max(self.vector.len(), other.vector.len());
let mut vector = Vec::with_capacity(max_len);
let mut vector = SmallVec::with_capacity(max_len);
// For coordinates in both inputs, apply `join` to the pair.
for index in 0..min_len {
vector.push(self.vector[index].join(&other.vector[index]));
Expand All @@ -243,7 +244,7 @@ impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
}
fn meet(&self, other: &Self) -> Self {
let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
let mut vector = Vec::with_capacity(min_len);
let mut vector = SmallVec::with_capacity(min_len);
// For coordinates in both inputs, apply `meet` to the pair.
for index in 0..min_len {
vector.push(self.vector[index].meet(&other.vector[index]));
Expand All @@ -255,24 +256,24 @@ impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {

mod columnation {
use columnation::{Columnation, Region};

use smallvec::SmallVec;
use crate::dynamic::pointstamp::PointStamp;

impl<T: Columnation> Columnation for PointStamp<T> {
impl<T: Columnation+Clone> Columnation for PointStamp<T> {
type InnerRegion = PointStampStack<T::InnerRegion>;
}

/// Stack for PointStamp. Part of Columnation implementation.
pub struct PointStampStack<R: Region<Item: Columnation>>(<Vec<R::Item> as Columnation>::InnerRegion);
pub struct PointStampStack<R: Region<Item: Columnation+Clone>>(<SmallVec<[R::Item; 1]> as Columnation>::InnerRegion);

impl<R: Region<Item: Columnation>> Default for PointStampStack<R> {
impl<R: Region<Item: Columnation+Clone>> Default for PointStampStack<R> {
#[inline]
fn default() -> Self {
Self(Default::default())
}
}

impl<R: Region<Item: Columnation>> Region for PointStampStack<R> {
impl<R: Region<Item: Columnation+Clone>> Region for PointStampStack<R> {
type Item = PointStamp<R::Item>;

#[inline]
Expand Down