Skip to content

Fix non-deterministic iteration in SessionStateBuilder#21262

Open
shehab-ali wants to merge 1 commit intoapache:mainfrom
shehab-ali:shehab-ali/fix-indeterministic-udf
Open

Fix non-deterministic iteration in SessionStateBuilder#21262
shehab-ali wants to merge 1 commit intoapache:mainfrom
shehab-ali:shehab-ali/fix-indeterministic-udf

Conversation

@shehab-ali
Copy link
Copy Markdown

@shehab-ali shehab-ali commented Mar 30, 2026

Which issue does this PR close?

Problem: SessionStateBuilder::new_from_existing() previously used HashMap::into_values().collect_vec() for scalar, aggregate, and window functions. Since the HashMap stores entries for both the canonical name and each alias (all pointing to the same Arc<UDF>), into_values() produced duplicate entries in arbitrary order. When build() re-registered them, the last-writer-wins behavior on shared alias keys was nondeterministic.

Fix: Introduced dedup_function_registry_by_canonical_name() which:

Iterates HashMap keys in sorted order (deterministic)
Keeps only one Arc<UDF> per unique canonical name (no duplicates)
This ensures build() re-registers each function exactly once, making the round-trip through new_from_existing()build() deterministic and alias-preserving.

Say you register a UDF named "postgres_to_char" with alias "to_char". The SessionState HashMap stores:
"postgres_to_char" → Arc<UDF(postgres_to_char)>
"to_char"          → Arc<UDF(postgres_to_char)>   // same arc, alias entry

Before the fix — into_values().collect_vec() produces:

// Two copies of the same UDF, in arbitrary HashMap iteration order
[Arc<UDF(postgres_to_char)>, Arc<UDF(postgres_to_char)>]

build() then calls register_udf() for each entry. Each register_udf call re-inserts both the canonical name and all aliases:

// First registration (say the "to_char" entry came first):
"postgres_to_char" → Arc_1
"to_char"          → Arc_1

// Second registration (the "postgres_to_char" entry):
"postgres_to_char" → Arc_2   // overwrites Arc_1
"to_char"          → Arc_2   // overwrites Arc_1

If `with_updated_config()` returns `Some` (which creates a new Arc each call), 
Arc_1 and Arc_2 are different objects for the same logical function. 
Which one ends up stored depends on HashMap iteration order — which is random per run. 
Scale this to many UDFs with aliases and the rebuilt state becomes non-deterministic.

After the fix — filtering to key == canonical_name produces:

// Only the canonical entry, no duplicates
[Arc<UDF(postgres_to_char)>]

Bug Example

Problem: When SessionState is rebuilt via SessionStateBuilder::new_from_existing(), a user's UDF override can be silently reverted, causing the wrong function to execute at query time.

Concrete example: a user registers postgres_to_char with alias "to_char" to override the built-in to_char. Queries using to_char(...) correctly call the custom implementation. But when any code path rebuilds the session state, HashMap::into_values() emits both the custom UDF and the built-in (which survived under its "date_format" alias key). build() re-registers both, and last writer wins. If the built-in happens to register last, "to_char" now points back to the built-in, and subsequent queries silently call the wrong function. This happens nondeterministically depending on HashMap iteration order.

What changed: the filter key.as_str() == udf.name() only keeps entries where the HashMap key is the UDF's canonical name. This drops the built-in because its canonical key ("to_char") was overwritten by the custom UDF; it only exists under "date_format", which doesn't match its canonical name "to_char". So the built-in is never passed to build(), and the custom UDF's override is preserved.

Are these changes tested?

Added unit test

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Mar 30, 2026
@shehab-ali shehab-ali force-pushed the shehab-ali/fix-indeterministic-udf branch from a9333c6 to 97e37c6 Compare March 30, 2026 20:12
Comment thread datafusion/core/src/execution/session_state.rs
/// one [`Arc`] per logical function.
fn dedup_function_registry_by_canonical_name<T>(
map: &HashMap<String, Arc<T>>,
canonical_name: impl Fn(&T) -> &str,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need canonical_name as it's own function if all the uses are the same function?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those function have different types though: WindowUDF, ScalarUDF, AggregateUDF

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see now, my bad.

@shehab-ali shehab-ali force-pushed the shehab-ali/fix-indeterministic-udf branch from 97e37c6 to 936c4bd Compare March 30, 2026 20:50
@shehab-ali shehab-ali marked this pull request as ready for review March 31, 2026 14:13
Copy link
Copy Markdown
Contributor

@ahmed-mez ahmed-mez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing this! I hope the maintainers review this soon.

/// matches the canonical name. The session stores one hash map entry per alias
/// plus the canonical name; filtering to canonical-name entries yields exactly
/// one [`Arc`] per logical function.
fn dedup_function_registry_by_canonical_name<T>(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I think the function could consume the map instead of borrowing it, to avoid some arc clones

  fn dedup_function_registry_by_canonical_name<T>(
      map: HashMap<String, Arc<T>>,
      canonical_name: impl Fn(&T) -> &str,
  ) -> Vec<Arc<T>> {
      map.into_iter()
          .filter(|(key, udf)| key.as_str() == canonical_name(udf.as_ref()))
          .map(|(_, udf)| udf)  // no Arc::clone needed
          .collect()
  }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, it might be more robust to dedup by identity

  fn dedup_by_identity<T>(map: HashMap<String, Arc<T>>) -> Vec<Arc<T>> {
      let mut seen = HashSet::new();
      map.into_values()
          .filter(|arc| seen.insert(Arc::as_ptr(arc)))
          .collect()
  }

@Jefffrey
Copy link
Copy Markdown
Contributor

Thanks for the description of the bug that's being encountered here. However, I'm having trouble understanding the implications. Is it a potential performance issue, if we have a massive number of UDFs? There's mention of it not being deterministic but what knock-on effects would this have? I tried out the test introduced here but it passes on main, so I want to understand what exactly this is fixing.

@shehab-ali
Copy link
Copy Markdown
Author

shehab-ali commented Apr 17, 2026

The bug showed up when SessionState is rebuilt via SessionStateBuilder::new_from_existing(), a user's UDF override can be silently reverted, causing the wrong function to execute at query time.

Example: a user registers postgres_to_char with alias "to_char" to override the built-in to_char. Queries using to_char() correctly call the custom implementation. But when any code path rebuilds the session state, HashMap::into_values() emits both the custom UDF and the built-in (which survived under its "date_format" alias key). build() re-registers both, and last writer wins. If the built-in happens to register last, "to_char" now points back to the built-in, and subsequent queries silently call the wrong function. This happens nondeterministically depending on HashMap iteration order.

With these changes, the filter key.as_str() == udf.name() only keeps entries where the HashMap key is the UDF's canonical name. This drops the built-in because its canonical key ("to_char") was overwritten by the custom UDF; it only exists under "date_format", which doesn't match its canonical name "to_char". So the built-in is never passed to build(), and the custom UDF's override is preserved.

@Jefffrey
Copy link
Copy Markdown
Contributor

Thanks for the example. I was able to reproduce it via modifying the test included here to have two registered UDFs as described. It would be good to enhance the test introduced here to clearly reproduce the error (even if it happens randomly).

With these changes, the filter key.as_str() == udf.name() only keeps entries where the HashMap key is the UDF's canonical name. This drops the built-in because its canonical key ("to_char") was overwritten by the custom UDF; it only exists under "date_format", which doesn't match its canonical name "to_char". So the built-in is never passed to build(), and the custom UDF's override is preserved.

Is this dropping of date_format intentional? It doesn't seem ideal since the roundtripped session state will now be different from the original.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants