From 15411130901dc4806a44d2eee43fbd696596a96b Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 6 Feb 2026 15:04:24 -0500 Subject: [PATCH] Update to timely 0.26 --- Cargo.toml | 2 +- differential-dataflow/src/capture.rs | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 658c5e146..c4c406505 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ resolver = "2" [workspace.dependencies] differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.19.0" } -timely = { version = "0.25", default-features = false } +timely = { version = "0.26", default-features = false } columnar = { version = "0.11", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } diff --git a/differential-dataflow/src/capture.rs b/differential-dataflow/src/capture.rs index dbcc653b3..23d9ffd9e 100644 --- a/differential-dataflow/src/capture.rs +++ b/differential-dataflow/src/capture.rs @@ -417,7 +417,7 @@ pub mod source { // Deduplicate newly received updates, sending new updates and timestamp counts. let mut changes = changes_out.activate(); let mut counts = counts_out.activate(); - while let Some((capability, updates)) = input.next() { + input.for_each(|capability, updates| { let mut changes_session = changes.session(&capability); let mut counts_session = counts.session(&capability); for (data, time, diff) in updates.iter() { @@ -433,7 +433,7 @@ pub mod source { if !change_batch.is_empty() { counts_session.give_iterator(change_batch.drain()); } - } + }); } }); @@ -466,15 +466,15 @@ pub mod source { let mut capability: Option> = None; // Drain all relevant update counts in to the mutable antichain tracking its frontier. - while let Some((cap, counts)) = counts.next() { + counts.for_each(|cap, counts| { updates_frontier.update_iter(counts.iter().cloned()); capability = Some(cap.retain()); - } + }); // Drain all progress statements into the queue out of which we will work. - while let Some((cap, progress)) = input.next() { + input.for_each(|cap, progress| { progress_queue.extend(progress.iter().map(|x| (x.1).clone())); capability = Some(cap.retain()); - } + }); // Extract and act on actionable progress messages. // A progress message is actionable if `self.progress_frontier` is beyond the message's lower bound. @@ -532,14 +532,14 @@ pub mod source { move |_frontiers| { let mut antichain = shared_frontier2.borrow_mut(); let mut must_activate = false; - while let Some((_cap, frontier_changes)) = input.next() { + input.for_each(|_cap, frontier_changes| { for (_self, input_changes) in frontier_changes.iter() { // Apply the updates, and observe if the lower bound has changed. if antichain.update_iter(input_changes.unstable_internal_updates().iter().cloned()).next().is_some() { must_activate = true; } } - } + }); // If the lower bound has changed, we must activate MESSAGES. if must_activate { activator2.activate(); } }