Skip to content

Commit 5c99915

Browse files
timsaucerclaude
andcommitted
feat(pickle): stamp Python (major, minor) in UDF wire header
cloudpickle bytecode is not portable across Python minor versions — a payload produced on 3.11 fails to load on 3.12 with an opaque marshal/unpickle error. Embed the sender's (major, minor) in the DFPYUDF wire header and reject mismatches at decode time with an actionable error that names both versions, instead of letting the failure surface from inside cloudpickle.loads. Header layout becomes: DFPYUDF (7) | version (1) | py_major (1) | py_minor (1) | cloudpickle Extend the Security warnings on Expr.to_bytes / from_bytes / __reduce__ with a Portability section covering the cross-version constraint and cloudpickle's by-value/by-reference behavior (the callable inlines bytecode and closure cells, but imported names travel by reference and must be importable on the receiver). Add a matching Serialization model note to the datafusion.ipc module docstring. New tests: - codec::wire_header_tests: py-major/minor mismatch, truncated py-version bytes, round-trip with py-version - test_pickle_expr::test_cross_version_error_message: patches the py_minor byte inside an emitted payload and asserts the error message identifies the version mismatch Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent df34e0a commit 5c99915

4 files changed

Lines changed: 237 additions & 34 deletions

File tree

crates/core/src/codec.rs

Lines changed: 131 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,16 @@
4949
//! the same payload framing for that reason.
5050
//!
5151
//! Payloads emitted by these codecs are framed as
52-
//! `<family_magic: 7 bytes> <version: u8> <cloudpickle blob>`. The
53-
//! family magic identifies the UDF flavor; the version byte lets the
54-
//! decoder reject too-new or too-old payloads with a clean error
52+
//! `<family_magic: 7 bytes> <version: u8> <py_major: u8> <py_minor: u8> <cloudpickle blob>`.
53+
//! The family magic identifies the UDF flavor; the version byte lets
54+
//! the decoder reject too-new or too-old payloads with a clean error
5555
//! instead of falling into an opaque `cloudpickle` tuple-unpack
56-
//! failure when the tuple shape changes. Dispatch precedence on
57-
//! decode: **family match + supported version → `inner` codec →
56+
//! failure when the tuple shape changes; the Python `(major, minor)`
57+
//! bytes catch the cloudpickle-cross-minor-version case and raise an
58+
//! actionable error instead of an opaque `marshal` failure on load
59+
//! (cloudpickle payloads are not portable across Python minor
60+
//! versions). Dispatch precedence on decode: **family match +
61+
//! supported version + matching Python version → `inner` codec →
5862
//! caller's `FunctionRegistry` fallback.**
5963
//!
6064
//! ## Wire-format family registry
@@ -105,13 +109,17 @@ use crate::udf::PythonFunctionScalarUDF;
105109

106110
// Wire-format framing for inlined Python UDF payloads.
107111
//
108-
// Layout: `<family_magic: 7 bytes> <version: u8> <cloudpickle blob>`.
112+
// Layout: `<family_magic: 7 bytes> <version: u8> <py_major: u8> <py_minor: u8> <cloudpickle blob>`.
109113
// The family magic identifies the UDF flavor; the version byte lets
110114
// the decoder reject too-new or too-old payloads with a clean error
111115
// instead of falling into an opaque `cloudpickle` tuple-unpack failure
112-
// when the tuple shape changes. Bump [`WIRE_VERSION_CURRENT`] whenever
113-
// the tuple shape changes; raise [`WIRE_VERSION_MIN_SUPPORTED`] when
114-
// dropping support for an older shape.
116+
// when the tuple shape changes; the Python `(major, minor)` bytes
117+
// catch the cloudpickle-cross-minor-version case (cloudpickle is not
118+
// portable across Python minor versions) and raise an actionable
119+
// error instead of an opaque `marshal` failure on load. Bump
120+
// [`WIRE_VERSION_CURRENT`] whenever the tuple shape changes; raise
121+
// [`WIRE_VERSION_MIN_SUPPORTED`] when dropping support for an older
122+
// shape.
115123

116124
/// Family prefix for an inlined Python scalar UDF
117125
/// (cloudpickled tuple of name, callable, input schema, return field,
@@ -126,25 +134,35 @@ pub(crate) const WIRE_VERSION_CURRENT: u8 = 1;
126134
pub(crate) const WIRE_VERSION_MIN_SUPPORTED: u8 = 1;
127135

128136
/// Tag `buf` with the framing header for `family` at the current
129-
/// wire-format version. Append-only — the caller writes the
130-
/// cloudpickle payload after.
131-
fn write_wire_header(buf: &mut Vec<u8>, family: &[u8]) {
137+
/// wire-format version, stamping `py_version` as `(major, minor)`
138+
/// bytes. Append-only — the caller writes the cloudpickle payload
139+
/// after.
140+
fn write_wire_header(buf: &mut Vec<u8>, family: &[u8], py_version: (u8, u8)) {
132141
buf.extend_from_slice(family);
133142
buf.push(WIRE_VERSION_CURRENT);
143+
buf.push(py_version.0);
144+
buf.push(py_version.1);
134145
}
135146

136147
/// Inspect the framing on `buf`.
137148
///
138149
/// * `Ok(None)` — `buf` does not carry `family`. The caller should
139150
/// delegate to its `inner` codec.
140151
/// * `Ok(Some(payload))` — `buf` carries `family` at a version this
141-
/// build accepts; `payload` is the cloudpickle blob.
142-
/// * `Err(_)` — `buf` carries `family` but at a version outside
143-
/// `WIRE_VERSION_MIN_SUPPORTED..=WIRE_VERSION_CURRENT`. The error
144-
/// names the version and the supported range so an operator can
145-
/// diagnose sender/receiver version drift instead of seeing an
146-
/// opaque cloudpickle tuple-unpack failure.
147-
fn strip_wire_header<'a>(buf: &'a [u8], family: &[u8], kind: &str) -> Result<Option<&'a [u8]>> {
152+
/// build accepts and a Python `(major, minor)` matching
153+
/// `expected_py`; `payload` is the cloudpickle blob.
154+
/// * `Err(_)` — `buf` carries `family` but the wire-format version
155+
/// is outside `WIRE_VERSION_MIN_SUPPORTED..=WIRE_VERSION_CURRENT`,
156+
/// or the stamped Python `(major, minor)` does not match
157+
/// `expected_py`. The error names the offending values so an
158+
/// operator can diagnose sender/receiver drift instead of seeing
159+
/// an opaque cloudpickle tuple-unpack or `marshal` failure.
160+
fn strip_wire_header<'a>(
161+
buf: &'a [u8],
162+
family: &[u8],
163+
kind: &str,
164+
expected_py: (u8, u8),
165+
) -> Result<Option<&'a [u8]>> {
148166
if !buf.starts_with(family) {
149167
return Ok(None);
150168
}
@@ -161,7 +179,28 @@ fn strip_wire_header<'a>(buf: &'a [u8], family: &[u8], kind: &str) -> Result<Opt
161179
Align datafusion-python versions on sender and receiver."
162180
)));
163181
}
164-
Ok(Some(&buf[version_idx + 1..]))
182+
let py_major_idx = version_idx + 1;
183+
let Some(&encoded_major) = buf.get(py_major_idx) else {
184+
return Err(datafusion::error::DataFusionError::Execution(format!(
185+
"Truncated inline Python {kind} payload: missing Python major version byte"
186+
)));
187+
};
188+
let py_minor_idx = version_idx + 2;
189+
let Some(&encoded_minor) = buf.get(py_minor_idx) else {
190+
return Err(datafusion::error::DataFusionError::Execution(format!(
191+
"Truncated inline Python {kind} payload: missing Python minor version byte"
192+
)));
193+
};
194+
let (current_major, current_minor) = expected_py;
195+
if encoded_major != current_major || encoded_minor != current_minor {
196+
return Err(datafusion::error::DataFusionError::Execution(format!(
197+
"Inline Python {kind} payload was serialized on Python \
198+
{encoded_major}.{encoded_minor} but this process is running Python \
199+
{current_major}.{current_minor}. cloudpickle payloads are not portable \
200+
across Python minor versions. Align Python versions on sender and receiver."
201+
)));
202+
}
203+
Ok(Some(&buf[py_minor_idx + 1..]))
165204
}
166205

167206
/// `LogicalExtensionCodec` parked on every `SessionContext`. Holds
@@ -386,9 +425,11 @@ pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>)
386425
};
387426

388427
Python::attach(|py| -> Result<bool> {
428+
let py_version = current_python_version(py)
429+
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
389430
let bytes = encode_python_scalar_udf(py, py_udf)
390431
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
391-
write_wire_header(buf, PY_SCALAR_UDF_FAMILY);
432+
write_wire_header(buf, PY_SCALAR_UDF_FAMILY, py_version);
392433
buf.extend_from_slice(&bytes);
393434
Ok(true)
394435
})
@@ -399,11 +440,13 @@ pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>)
399440
/// the caller to delegate to its `inner` codec (and eventually the
400441
/// `FunctionRegistry`).
401442
pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result<Option<Arc<ScalarUDF>>> {
402-
let Some(payload) = strip_wire_header(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF")? else {
403-
return Ok(None);
404-
};
405-
406443
Python::attach(|py| -> Result<Option<Arc<ScalarUDF>>> {
444+
let py_version = current_python_version(py)
445+
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
446+
let Some(payload) = strip_wire_header(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", py_version)?
447+
else {
448+
return Ok(None);
449+
};
407450
let udf = decode_python_scalar_udf(py, payload)
408451
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
409452
Ok(Some(Arc::new(ScalarUDF::new_from_impl(udf))))
@@ -567,6 +610,20 @@ fn volatility_wire_str(v: Volatility) -> &'static str {
567610
}
568611
}
569612

613+
/// Read the interpreter's `sys.version_info` as `(major, minor)`.
614+
///
615+
/// Used by encoder/decoder to stamp and verify the Python version a
616+
/// cloudpickle payload was produced on. cloudpickle is not portable
617+
/// across Python minor versions; the wire header carries these bytes
618+
/// so a mismatch surfaces an actionable error instead of an opaque
619+
/// `marshal` failure at `cloudpickle.loads` time.
620+
fn current_python_version(py: Python<'_>) -> PyResult<(u8, u8)> {
621+
let version_info = py.import("sys")?.getattr("version_info")?;
622+
let major: u8 = version_info.getattr("major")?.extract()?;
623+
let minor: u8 = version_info.getattr("minor")?.extract()?;
624+
Ok((major, minor))
625+
}
626+
570627
/// Cached handle to the `cloudpickle` module.
571628
///
572629
/// The encode/decode helpers above would otherwise re-resolve the
@@ -589,28 +646,32 @@ fn cloudpickle<'py>(py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
589646
mod wire_header_tests {
590647
use super::*;
591648

649+
const TEST_PY: (u8, u8) = (3, 12);
650+
592651
#[test]
593652
fn strip_returns_none_when_family_absent() {
594653
let buf = b"OTHER_PAYLOAD";
595654
assert!(matches!(
596-
strip_wire_header(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF"),
655+
strip_wire_header(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", TEST_PY),
597656
Ok(None)
598657
));
599658
}
600659

601660
#[test]
602661
fn strip_errors_on_truncated_version_byte() {
603662
let buf = PY_SCALAR_UDF_FAMILY;
604-
let err = strip_wire_header(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF").unwrap_err();
663+
let err = strip_wire_header(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", TEST_PY).unwrap_err();
605664
assert!(format!("{err}").contains("missing wire-format version byte"));
606665
}
607666

608667
#[test]
609668
fn strip_errors_on_too_new_version() {
610669
let mut buf = PY_SCALAR_UDF_FAMILY.to_vec();
611670
buf.push(WIRE_VERSION_CURRENT.saturating_add(1));
671+
buf.push(TEST_PY.0);
672+
buf.push(TEST_PY.1);
612673
buf.extend_from_slice(b"payload");
613-
let err = strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF").unwrap_err();
674+
let err = strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", TEST_PY).unwrap_err();
614675
let msg = format!("{err}");
615676
assert!(msg.contains("wire-format version v"));
616677
assert!(msg.contains("supports"));
@@ -624,17 +685,56 @@ mod wire_header_tests {
624685
}
625686
let mut buf = PY_SCALAR_UDF_FAMILY.to_vec();
626687
buf.push(WIRE_VERSION_MIN_SUPPORTED - 1);
688+
buf.push(TEST_PY.0);
689+
buf.push(TEST_PY.1);
690+
buf.extend_from_slice(b"payload");
691+
assert!(strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", TEST_PY).is_err());
692+
}
693+
694+
#[test]
695+
fn strip_errors_on_truncated_py_major() {
696+
let mut buf = PY_SCALAR_UDF_FAMILY.to_vec();
697+
buf.push(WIRE_VERSION_CURRENT);
698+
let err = strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", TEST_PY).unwrap_err();
699+
assert!(format!("{err}").contains("missing Python major version byte"));
700+
}
701+
702+
#[test]
703+
fn strip_errors_on_truncated_py_minor() {
704+
let mut buf = PY_SCALAR_UDF_FAMILY.to_vec();
705+
buf.push(WIRE_VERSION_CURRENT);
706+
buf.push(TEST_PY.0);
707+
let err = strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", TEST_PY).unwrap_err();
708+
assert!(format!("{err}").contains("missing Python minor version byte"));
709+
}
710+
711+
#[test]
712+
fn strip_errors_on_py_minor_mismatch() {
713+
let mut buf = Vec::new();
714+
write_wire_header(&mut buf, PY_SCALAR_UDF_FAMILY, (3, 11));
715+
buf.extend_from_slice(b"payload");
716+
let err = strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", (3, 12)).unwrap_err();
717+
let msg = format!("{err}");
718+
assert!(msg.contains("Python 3.11"));
719+
assert!(msg.contains("Python 3.12"));
720+
assert!(msg.contains("not portable across Python minor versions"));
721+
}
722+
723+
#[test]
724+
fn strip_errors_on_py_major_mismatch() {
725+
let mut buf = Vec::new();
726+
write_wire_header(&mut buf, PY_SCALAR_UDF_FAMILY, (3, 12));
627727
buf.extend_from_slice(b"payload");
628-
assert!(strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF").is_err());
728+
assert!(strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", (4, 0)).is_err());
629729
}
630730

631731
#[test]
632732
fn write_then_strip_round_trips_payload() {
633733
let mut buf = Vec::new();
634-
write_wire_header(&mut buf, PY_SCALAR_UDF_FAMILY);
734+
write_wire_header(&mut buf, PY_SCALAR_UDF_FAMILY, TEST_PY);
635735
buf.extend_from_slice(b"scalar-payload");
636736

637-
let payload = strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF")
737+
let payload = strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", TEST_PY)
638738
.unwrap()
639739
.unwrap();
640740
assert_eq!(payload, b"scalar-payload");

python/datafusion/expr.py

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -454,13 +454,59 @@ def to_bytes(self, ctx: SessionContext | None = None) -> bytes:
454454
UDFs imported via the FFI capsule protocol travel by name only
455455
and must be registered on the worker.
456456
457-
.. warning::
457+
.. warning:: Security
458458
Bytes returned here may embed a cloudpickled Python
459459
callable (when the expression carries a Python scalar UDF).
460460
Reconstructing them via :meth:`from_bytes` or
461461
:func:`pickle.loads` executes arbitrary Python on the
462462
receiver. Only accept payloads from trusted sources.
463463
464+
.. warning:: Portability
465+
cloudpickle serializes Python bytecode, which is **not
466+
stable across Python minor versions**. A payload produced
467+
on Python 3.11 will fail to load on Python 3.12. The
468+
wire format stamps the sender's ``(major, minor)``;
469+
:meth:`from_bytes` raises a :class:`ValueError` naming
470+
both versions on mismatch.
471+
472+
cloudpickle captures the UDF callable **by value** —
473+
bytecode and closure cells inlined — but names the
474+
callable resolves via ``import`` are captured **by
475+
reference** (module path only) and must be importable on
476+
the receiver.
477+
478+
**Self-contained — works anywhere:**
479+
480+
.. code-block:: python
481+
482+
# Lambda: bytecode captured inline
483+
udf(lambda x: x * 2, [pa.int64()], pa.int64(),
484+
volatility="immutable")
485+
486+
# Locally-defined function: bytecode captured inline
487+
def double(x):
488+
return x * 2
489+
udf(double, [pa.int64()], pa.int64(), volatility="immutable")
490+
491+
# Closure over a local variable: value captured inline
492+
factor = 3
493+
udf(lambda x: x * factor, [pa.int64()], pa.int64(),
494+
volatility="immutable")
495+
496+
**Requires matching environment on receiver:**
497+
498+
.. code-block:: python
499+
500+
# Top-level import: `foo` must be installed on receiver
501+
from foo import double
502+
udf(double, [pa.int64()], pa.int64(), volatility="immutable")
503+
504+
# Bound method of an imported class: same caveat
505+
from mylib import Transformer
506+
t = Transformer()
507+
udf(t.transform, [pa.int64()], pa.int64(),
508+
volatility="immutable")
509+
464510
Examples:
465511
>>> from datafusion import col, lit
466512
>>> blob = (col("a") + lit(1)).to_bytes()
@@ -483,12 +529,21 @@ def from_bytes(cls, buf: bytes, ctx: SessionContext | None = None) -> Expr:
483529
(sufficient for built-ins and Python scalar UDFs, plus any UDFs
484530
registered on the global context).
485531
486-
.. warning::
532+
.. warning:: Security
487533
Decoding may invoke ``cloudpickle.loads`` on bytes embedded
488534
in the payload, which executes arbitrary Python code. Treat
489535
``buf`` as code, not data — only decode bytes you produced
490536
yourself or received from a trusted sender.
491537
538+
.. warning:: Portability
539+
cloudpickle payloads are **not portable across Python
540+
minor versions**. The wire format stamps the sender's
541+
``(major, minor)``; if it does not match the current
542+
interpreter, this method raises :class:`ValueError`
543+
naming both versions. Modules the UDF imports must also
544+
be importable on the receiver — see :meth:`to_bytes` for
545+
by-value vs. by-reference details.
546+
492547
Examples:
493548
>>> from datafusion import Expr, col, lit
494549
>>> blob = (col("a") + lit(1)).to_bytes()
@@ -512,12 +567,18 @@ def __reduce__(self) -> tuple[Callable[[bytes], Expr], tuple[bytes]]:
512567
back to the global :class:`SessionContext` if none has been
513568
installed on the worker.
514569
515-
.. warning::
570+
.. warning:: Security
516571
:func:`pickle.loads` on the returned tuple executes
517572
arbitrary Python on the receiver, including any
518573
cloudpickled UDF callable embedded in the payload. Only
519574
unpickle expressions from trusted sources.
520575
576+
.. warning:: Portability
577+
Sender and receiver must run the same Python
578+
``(major, minor)`` version; cloudpickle bytecode is not
579+
portable across minor versions. See :meth:`to_bytes` for
580+
details on what travels by value vs. by reference.
581+
521582
Examples:
522583
>>> import pickle
523584
>>> from datafusion import col, lit

python/datafusion/ipc.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,21 @@ def init_worker():
3737
3838
Built-in functions and Python scalar UDFs travel inside the shipped
3939
expression itself and do not need pre-registration on the worker.
40+
41+
.. note:: Serialization model
42+
43+
Expressions containing Python scalar UDFs are serialized using
44+
:mod:`cloudpickle`. The callable itself travels **by value**
45+
(bytecode and closure cells inlined), but any names the callable
46+
resolves via ``import`` are captured **by reference** and must be
47+
importable on the receiving worker.
48+
49+
The serialized payload is stamped with the sender's Python
50+
``(major, minor)`` version. Loading on a different minor version
51+
raises :class:`ValueError` with an actionable message — cloudpickle
52+
payloads are not portable across Python minor versions. See
53+
:meth:`datafusion.Expr.to_bytes` for examples of what travels by
54+
value vs. by reference.
4055
"""
4156

4257
from __future__ import annotations

0 commit comments

Comments
 (0)