Skip to content

Commit d680e12

Browse files
timsaucerclaude
andcommitted
feat: per-session Python UDF inlining toggle + sender ctx + strict refusal
Adds a per-session toggle that turns inline Python UDF encoding on or off, plus the supporting plumbing to make it usable through pickle.dumps. Codec layer: * PythonLogicalCodec / PythonPhysicalCodec gain a python_udf_inlining bool (default true) and a with_python_udf_inlining(enabled) builder. Each try_encode_udf{,af,wf} short-circuits to inner when the toggle is off; each try_decode_udf{,af,wf} that recognizes a DFPY* magic on a strict codec returns a clean Execution error instead of invoking cloudpickle.loads. The refusal message names the UDF and the wire family so an operator can see at a glance whether to re-encode the bytes or register the UDF on the receiver. Session layer: * PySessionContext::with_python_udf_inlining(enabled) returns a new session whose stacked logical + physical codecs both carry the toggle. The Arc<SessionState> is cloned (cheap), only the codec pair is rebuilt, so registrations and config stay attached. * SessionContext.with_python_udf_inlining(*, enabled) is the Python wrapper. enabled is keyword-only because positional booleans at the call site read as opaque. Sender-side context: * datafusion.ipc gains set_sender_ctx / get_sender_ctx / clear_sender_ctx thread-locals. Expr.__reduce__ now consults get_sender_ctx() to pick the codec for outbound pickles, which is the only path through which a strict session affects pickle.dumps (the protocol calls __reduce__ with no arguments). Without a sender context the default codec is used. Tests: * test_pickle_expr.py picks up TestPythonUdfInliningToggle (covers both directions of the toggle plus the explicit-ctx fast path), TestWorkerCtxLifecycle (set/clear/threading), and TestSenderCtxLifecycle. * New test_pickle_multiprocessing.py + helpers exercise the full driver -> worker round-trip on a multiprocessing.Pool with set_*_ctx installed in the worker initializer. * CI workflow gets a 30-minute timeout-minutes backstop so a hung pickle worker can't block the matrix indefinitely. User-guide docs and the runnable examples land in PR4 of this series. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d0baeb6 commit d680e12

9 files changed

Lines changed: 714 additions & 39 deletions

File tree

.github/workflows/test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ env:
2929
jobs:
3030
test-matrix:
3131
runs-on: ubuntu-latest
32+
# Backstop: a hung multiprocessing worker (e.g. during a pickle regression)
33+
# should not block CI longer than this.
34+
timeout-minutes: 30
3235
strategy:
3336
fail-fast: false
3437
matrix:

crates/core/src/codec.rs

Lines changed: 102 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -191,16 +191,44 @@ fn strip_wire_header<'a>(buf: &'a [u8], family: &[u8], kind: &str) -> Result<Opt
191191
#[derive(Debug)]
192192
pub struct PythonLogicalCodec {
193193
inner: Arc<dyn LogicalExtensionCodec>,
194+
python_udf_inlining: bool,
194195
}
195196

196197
impl PythonLogicalCodec {
197198
pub fn new(inner: Arc<dyn LogicalExtensionCodec>) -> Self {
198-
Self { inner }
199+
Self {
200+
inner,
201+
python_udf_inlining: true,
202+
}
199203
}
200204

201205
pub fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
202206
&self.inner
203207
}
208+
209+
/// Whether Python-defined UDFs are encoded inline (and decoded
210+
/// from cloudpickle blobs). Defaults to `true`. Set to `false`
211+
/// when the codec sits on a session that must produce
212+
/// cross-language wire bytes, or reject `cloudpickle.loads` on
213+
/// untrusted `from_bytes` input.
214+
///
215+
/// Security scope: strict mode (`false`) narrows only the codec
216+
/// layer — it stops `Expr::from_bytes` from invoking
217+
/// `cloudpickle.loads` on the inline `DFPY*` payload. It does
218+
/// **not** make `pickle.loads(untrusted_bytes)` safe; treat every
219+
/// `pickle.loads` on untrusted input as unsafe regardless of this
220+
/// setting. See Python's [pickle module security warning][1] for
221+
/// why `pickle.loads` is unsafe in general.
222+
///
223+
/// [1]: https://docs.python.org/3/library/pickle.html#module-pickle
224+
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
225+
self.python_udf_inlining = enabled;
226+
self
227+
}
228+
229+
pub fn python_udf_inlining(&self) -> bool {
230+
self.python_udf_inlining
231+
}
204232
}
205233

206234
impl Default for PythonLogicalCodec {
@@ -260,48 +288,76 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
260288
}
261289

262290
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
263-
if try_encode_python_scalar_udf(node, buf)? {
291+
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
264292
return Ok(());
265293
}
266294
self.inner.try_encode_udf(node, buf)
267295
}
268296

269297
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
270-
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
271-
return Ok(udf);
298+
if self.python_udf_inlining {
299+
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
300+
return Ok(udf);
301+
}
302+
} else if buf.starts_with(PY_SCALAR_UDF_FAMILY) {
303+
return Err(refuse_inline_payload("scalar UDF", name));
272304
}
273305
self.inner.try_decode_udf(name, buf)
274306
}
275307

276308
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
277-
if try_encode_python_agg_udf(node, buf)? {
309+
if self.python_udf_inlining && try_encode_python_agg_udf(node, buf)? {
278310
return Ok(());
279311
}
280312
self.inner.try_encode_udaf(node, buf)
281313
}
282314

283315
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
284-
if let Some(udaf) = try_decode_python_agg_udf(buf)? {
285-
return Ok(udaf);
316+
if self.python_udf_inlining {
317+
if let Some(udaf) = try_decode_python_agg_udf(buf)? {
318+
return Ok(udaf);
319+
}
320+
} else if buf.starts_with(PY_AGG_UDF_FAMILY) {
321+
return Err(refuse_inline_payload("aggregate UDF", name));
286322
}
287323
self.inner.try_decode_udaf(name, buf)
288324
}
289325

290326
fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
291-
if try_encode_python_window_udf(node, buf)? {
327+
if self.python_udf_inlining && try_encode_python_window_udf(node, buf)? {
292328
return Ok(());
293329
}
294330
self.inner.try_encode_udwf(node, buf)
295331
}
296332

297333
fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
298-
if let Some(udwf) = try_decode_python_window_udf(buf)? {
299-
return Ok(udwf);
334+
if self.python_udf_inlining {
335+
if let Some(udwf) = try_decode_python_window_udf(buf)? {
336+
return Ok(udwf);
337+
}
338+
} else if buf.starts_with(PY_WINDOW_UDF_FAMILY) {
339+
return Err(refuse_inline_payload("window UDF", name));
300340
}
301341
self.inner.try_decode_udwf(name, buf)
302342
}
303343
}
304344

345+
/// Build the error returned by a strict codec when it receives an
346+
/// inline Python-UDF payload it has been told not to deserialize.
347+
fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError {
348+
// `Execution`, not `Plan`: this is a wire-format decode refusal at
349+
// codec time, not a planner-stage failure. Downstream error
350+
// classification keys off the variant — surfacing this as a planner
351+
// error would mis-route it into "fix your SQL" buckets.
352+
datafusion::error::DataFusionError::Execution(format!(
353+
"Refusing to deserialize inline Python {kind} '{name}': Python UDF \
354+
inlining is disabled on this session. Ask the sender to re-encode \
355+
with inlining disabled (so the UDF travels by name), or register \
356+
'{name}' on this receiver's session and enable inlining on both \
357+
sides — receivers cannot re-encode bytes they did not produce."
358+
))
359+
}
360+
305361
/// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked
306362
/// on the same `SessionContext`. Carries the Python-aware encoding
307363
/// hooks for physical-layer types (`ExecutionPlan`, `PhysicalExpr`)
@@ -317,16 +373,30 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
317373
#[derive(Debug)]
318374
pub struct PythonPhysicalCodec {
319375
inner: Arc<dyn PhysicalExtensionCodec>,
376+
python_udf_inlining: bool,
320377
}
321378

322379
impl PythonPhysicalCodec {
323380
pub fn new(inner: Arc<dyn PhysicalExtensionCodec>) -> Self {
324-
Self { inner }
381+
Self {
382+
inner,
383+
python_udf_inlining: true,
384+
}
325385
}
326386

327387
pub fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
328388
&self.inner
329389
}
390+
391+
/// See [`PythonLogicalCodec::with_python_udf_inlining`].
392+
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
393+
self.python_udf_inlining = enabled;
394+
self
395+
}
396+
397+
pub fn python_udf_inlining(&self) -> bool {
398+
self.python_udf_inlining
399+
}
330400
}
331401

332402
impl Default for PythonPhysicalCodec {
@@ -350,15 +420,19 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
350420
}
351421

352422
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
353-
if try_encode_python_scalar_udf(node, buf)? {
423+
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
354424
return Ok(());
355425
}
356426
self.inner.try_encode_udf(node, buf)
357427
}
358428

359429
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
360-
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
361-
return Ok(udf);
430+
if self.python_udf_inlining {
431+
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
432+
return Ok(udf);
433+
}
434+
} else if buf.starts_with(PY_SCALAR_UDF_FAMILY) {
435+
return Err(refuse_inline_payload("scalar UDF", name));
362436
}
363437
self.inner.try_decode_udf(name, buf)
364438
}
@@ -376,29 +450,37 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
376450
}
377451

378452
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
379-
if try_encode_python_agg_udf(node, buf)? {
453+
if self.python_udf_inlining && try_encode_python_agg_udf(node, buf)? {
380454
return Ok(());
381455
}
382456
self.inner.try_encode_udaf(node, buf)
383457
}
384458

385459
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
386-
if let Some(udaf) = try_decode_python_agg_udf(buf)? {
387-
return Ok(udaf);
460+
if self.python_udf_inlining {
461+
if let Some(udaf) = try_decode_python_agg_udf(buf)? {
462+
return Ok(udaf);
463+
}
464+
} else if buf.starts_with(PY_AGG_UDF_FAMILY) {
465+
return Err(refuse_inline_payload("aggregate UDF", name));
388466
}
389467
self.inner.try_decode_udaf(name, buf)
390468
}
391469

392470
fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
393-
if try_encode_python_window_udf(node, buf)? {
471+
if self.python_udf_inlining && try_encode_python_window_udf(node, buf)? {
394472
return Ok(());
395473
}
396474
self.inner.try_encode_udwf(node, buf)
397475
}
398476

399477
fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
400-
if let Some(udwf) = try_decode_python_window_udf(buf)? {
401-
return Ok(udwf);
478+
if self.python_udf_inlining {
479+
if let Some(udwf) = try_decode_python_window_udf(buf)? {
480+
return Ok(udwf);
481+
}
482+
} else if buf.starts_with(PY_WINDOW_UDF_FAMILY) {
483+
return Err(refuse_inline_payload("window UDF", name));
402484
}
403485
self.inner.try_decode_udwf(name, buf)
404486
}

crates/core/src/context.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,6 +1407,22 @@ impl PySessionContext {
14071407
physical_codec,
14081408
})
14091409
}
1410+
1411+
pub fn with_python_udf_inlining(&self, enabled: bool) -> Self {
1412+
let logical_codec = Arc::new(
1413+
PythonLogicalCodec::new(Arc::clone(self.logical_codec.inner()))
1414+
.with_python_udf_inlining(enabled),
1415+
);
1416+
let physical_codec = Arc::new(
1417+
PythonPhysicalCodec::new(Arc::clone(self.physical_codec.inner()))
1418+
.with_python_udf_inlining(enabled),
1419+
);
1420+
Self {
1421+
ctx: Arc::clone(&self.ctx),
1422+
logical_codec,
1423+
physical_codec,
1424+
}
1425+
}
14101426
}
14111427

14121428
impl PySessionContext {

python/datafusion/context.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,3 +1769,46 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext:
17691769
new = SessionContext.__new__(SessionContext)
17701770
new.ctx = new_internal
17711771
return new
1772+
1773+
def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext:
1774+
"""Toggle inline encoding of Python-defined UDFs on this session.
1775+
1776+
``enabled`` is keyword-only:
1777+
``with_python_udf_inlining(enabled=False)`` reads at the call
1778+
site as the inverse of
1779+
``with_python_udf_inlining(enabled=True)``, where a positional
1780+
``True`` / ``False`` would not.
1781+
1782+
When ``True`` (the default), Python scalar, aggregate, and window
1783+
UDFs travel inside the serialized expression and are
1784+
reconstructed on the receiver without pre-registration.
1785+
1786+
Set ``False`` to:
1787+
1788+
* Produce serialized bytes that round-trip through a non-Python
1789+
decoder (cross-language portability). UDFs are stored by name
1790+
only; the receiver must have matching registrations.
1791+
* Refuse to reconstruct Python UDFs from
1792+
:meth:`Expr.from_bytes` input that may come from an untrusted
1793+
source — ``cloudpickle.loads`` will not be invoked.
1794+
1795+
The toggle applies directly to :meth:`Expr.to_bytes` /
1796+
:meth:`Expr.from_bytes` calls that pass this session as their
1797+
``ctx`` argument. To make the toggle apply through
1798+
:func:`pickle.dumps` (which calls :meth:`Expr.to_bytes` with no
1799+
context), install this session as the driver's sender context
1800+
via :func:`datafusion.ipc.set_sender_ctx` — and install it as
1801+
the worker's context via
1802+
:func:`datafusion.ipc.set_worker_ctx` for the corresponding
1803+
:func:`pickle.loads`.
1804+
1805+
For the full security model, see
1806+
:doc:`/user-guide/io/distributing_work` (Security section). In
1807+
short: this toggle narrows only the :meth:`Expr.from_bytes`
1808+
surface; :func:`pickle.loads` on untrusted bytes remains
1809+
unsafe regardless of the toggle.
1810+
"""
1811+
new_internal = self.ctx.with_python_udf_inlining(enabled)
1812+
new = SessionContext.__new__(SessionContext)
1813+
new.ctx = new_internal
1814+
return new

python/datafusion/expr.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -440,13 +440,16 @@ def to_bytes(self, ctx: SessionContext | None = None) -> bytes:
440440
worker process for distributed evaluation.
441441
442442
When ``ctx`` is supplied, encoding routes through that session's
443-
installed :class:`LogicalExtensionCodec`. When ``ctx`` is
444-
``None``, the default codec is used.
443+
installed :class:`LogicalExtensionCodec` (so settings like
444+
:meth:`SessionContext.with_python_udf_inlining` take effect).
445+
When ``ctx`` is ``None``, the default codec is used (Python UDF
446+
inlining on, no user-installed extension codec).
445447
446448
Built-in functions and Python UDFs (scalar, aggregate, window)
447449
travel inside the returned bytes; the worker does not need to
448450
pre-register them. UDFs imported via the FFI capsule protocol
449-
travel by name only and must be registered on the worker.
451+
travel by name only and must be registered on the worker. See
452+
:doc:`/user-guide/io/distributing_work`.
450453
"""
451454
ctx_arg = ctx.ctx if ctx is not None else None
452455
return self.expr.to_bytes(ctx_arg)
@@ -457,12 +460,13 @@ def from_bytes(cls, buf: bytes, ctx: SessionContext | None = None) -> Expr:
457460
458461
Accepts output of :meth:`to_bytes` or :func:`pickle.dumps`.
459462
``ctx`` is the :class:`SessionContext` used to resolve any
460-
function references that travel by name (e.g. FFI UDFs). When
461-
``ctx`` is ``None`` the worker context installed via
462-
:func:`datafusion.ipc.set_worker_ctx` is consulted; if no worker
463-
context is installed, the global :class:`SessionContext` is used
464-
(sufficient for built-ins and Python UDFs, plus any UDFs
465-
registered on the global context).
463+
function references that travel by name — aggregate UDFs, window
464+
UDFs, FFI UDFs. When ``ctx`` is ``None`` the worker context
465+
installed via :func:`datafusion.ipc.set_worker_ctx` is consulted;
466+
if no worker context is installed, the global
467+
:class:`SessionContext` is used (sufficient for built-ins and
468+
Python scalar UDFs, plus any UDFs registered on the global
469+
context).
466470
"""
467471
from datafusion.ipc import _resolve_ctx
468472

@@ -474,15 +478,21 @@ def __reduce__(self) -> tuple:
474478
475479
Lets expressions be shipped to worker processes via
476480
:func:`pickle.dumps` / :func:`pickle.loads`. Built-in functions
477-
and Python UDFs (scalar, aggregate, window) travel inside the
478-
pickle bytes; only FFI-capsule UDFs require pre-registration on
479-
the worker. The worker's :class:`SessionContext` for resolving
480-
those references is looked up via
481-
:func:`datafusion.ipc.set_worker_ctx`, falling back to the
482-
global :class:`SessionContext` if none has been installed on
483-
the worker.
481+
and Python UDFs travel inside the pickle bytes; only FFI-capsule
482+
UDFs require pre-registration on the worker. The worker's
483+
:class:`SessionContext` for resolving those references is
484+
looked up via :func:`datafusion.ipc.set_worker_ctx`, falling
485+
back to the global :class:`SessionContext` if none has been
486+
installed on the worker.
487+
488+
The encoding side honors a driver-side sender context installed
489+
via :func:`datafusion.ipc.set_sender_ctx` — that is how
490+
:meth:`SessionContext.with_python_udf_inlining` propagates
491+
through ``pickle.dumps``.
484492
"""
485-
return (Expr._reconstruct, (self.to_bytes(),))
493+
from datafusion.ipc import get_sender_ctx
494+
495+
return (Expr._reconstruct, (self.to_bytes(get_sender_ctx()),))
486496

487497
@classmethod
488498
def _reconstruct(cls, proto_bytes: bytes) -> Expr:

0 commit comments

Comments
 (0)