Skip to content

Commit 8130ab2

Browse files
timsaucerclaude
andcommitted
feat: sender-side session context for outbound pickles
`SessionContext.with_python_udf_inlining(False)` previously had no effect on `pickle.dumps(expr)` because `__reduce__` called `to_bytes()` without a context. Add a thread-local sender slot (`datafusion.ipc.set_sender_ctx`) that `__reduce__` consults, so the inlining toggle flows through pickle. Symmetric to the existing worker slot. Also switch the worker-side decode fallback from a freshly constructed `SessionContext` to `SessionContext.global_ctx()` so expressions arriving with no explicit/worker context can still resolve names registered on the global singleton. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ad0f956 commit 8130ab2

5 files changed

Lines changed: 307 additions & 15 deletions

File tree

docs/source/user-guide/io/distributing_work.rst

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,85 @@ What travels with the expression
116116
:py:class:`SessionContext`. Without that registration, evaluation
117117
raises an error.
118118

119+
Session contexts at a glance
120+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
121+
122+
There is only one type — :py:class:`SessionContext`. It can occupy
123+
up to four *slots* in a running program:
124+
125+
.. list-table::
126+
:header-rows: 1
127+
:widths: 12 18 40 30
128+
129+
* - Slot
130+
- Lifetime
131+
- Purpose
132+
- Set how
133+
* - User-held
134+
- Local variable / attribute
135+
- Build and run queries
136+
- ``ctx = SessionContext(...)``
137+
* - Global
138+
- Process singleton (lazy-init)
139+
- Backs module-level
140+
:py:func:`~datafusion.io.read_parquet`,
141+
:py:func:`~datafusion.io.read_csv`,
142+
:py:func:`~datafusion.io.read_json`,
143+
:py:func:`~datafusion.io.read_avro`; final fallback for
144+
:py:meth:`Expr.from_bytes`
145+
- Implicit; access via
146+
:py:meth:`SessionContext.global_ctx`
147+
* - Sender
148+
- Thread-local on the driver
149+
- Codec settings for outbound :py:func:`pickle.dumps` /
150+
:py:meth:`Expr.to_bytes` without ``ctx``
151+
- :py:func:`~datafusion.ipc.set_sender_ctx`
152+
* - Worker
153+
- Thread-local on the worker
154+
- Function registry for inbound :py:func:`pickle.loads` /
155+
:py:meth:`Expr.from_bytes` without ``ctx``
156+
- :py:func:`~datafusion.ipc.set_worker_ctx`
157+
158+
The same :py:class:`SessionContext` object may occupy more than one
159+
slot simultaneously — installing it into a slot is a reference, not
160+
a copy.
161+
162+
**Non-distributed user.** One user-held context. The global slot is
163+
invisible unless you call top-level ``read_*`` helpers. Sender and
164+
worker slots are unused.
165+
166+
**Distributed user.** Two questions to answer:
167+
168+
1. *Driver side — what wire format do I want?* The default (Python UDF
169+
inlining on) is self-contained; you do not need a sender context.
170+
To opt into the strict format,
171+
:py:func:`~datafusion.ipc.set_sender_ctx`
172+
with a session built via
173+
:py:meth:`SessionContext.with_python_udf_inlining(False)
174+
<datafusion.SessionContext.with_python_udf_inlining>`.
175+
176+
2. *Worker side — what registrations does decode need?* For built-ins
177+
and inline Python UDFs, nothing. For FFI-capsule UDFs (or
178+
strict-mode round-trips that travel by name), call
179+
:py:func:`~datafusion.ipc.set_worker_ctx` once per worker with a
180+
context that has the relevant registrations.
181+
182+
Resolution order on the worker side is *explicit argument →
183+
worker context → global context.* Explicit ``ctx=`` on
184+
:py:meth:`Expr.from_bytes` always wins; the sender slot is ignored
185+
on decode and the worker slot is ignored on encode.
186+
187+
Sharp edges:
188+
189+
* Sender and worker slots are **thread-local**. Background threads
190+
on either side see ``None`` until they install their own.
191+
* The global slot persists across ``fork`` workers (copy-on-write
192+
memory inherit) but not across ``spawn`` / ``forkserver`` workers
193+
(fresh process — register or install a worker context on
194+
start-up).
195+
* The inlining toggle is per-context state, not a global switch.
196+
Two contexts with different toggles can coexist in one process.
197+
119198
Registering shared UDFs on workers
120199
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
121200

@@ -143,9 +222,10 @@ as the *worker context*:
143222
144223
Inside a worker, expressions arriving from the driver resolve their
145224
by-name references against the installed worker context. If no worker
146-
context is installed, a fresh empty :py:class:`SessionContext` is
147-
used — fine for expressions that only reference built-ins and Python
148-
UDFs, but FFI-capsule-backed registrations will fail to resolve.
225+
context is installed, the global :py:class:`SessionContext` is used —
226+
fine for expressions that only reference built-ins and Python UDFs,
227+
but FFI-capsule-backed registrations must be installed on the global
228+
context to resolve.
149229

150230
Python 3.14 default change
151231
~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -198,6 +278,27 @@ Mismatched configurations raise a descriptive error: an inline blob
198278
fed to a strict receiver fails fast rather than silently dropping
199279
into ``cloudpickle.loads``.
200280

281+
To make the toggle apply through :py:func:`pickle.dumps` (which
282+
calls :py:meth:`Expr.to_bytes` with no context), install the strict
283+
session as the driver's *sender context*:
284+
285+
.. code-block:: python
286+
287+
from datafusion import SessionContext
288+
from datafusion.ipc import set_sender_ctx
289+
290+
set_sender_ctx(SessionContext().with_python_udf_inlining(False))
291+
# Every subsequent pickle.dumps(expr) on this thread encodes
292+
# without inlining the Python callable.
293+
294+
Pair with a matching strict worker context
295+
(:py:func:`~datafusion.ipc.set_worker_ctx`) so the ``pickle.loads``
296+
side also refuses inline payloads. Explicit
297+
:py:meth:`Expr.to_bytes(ctx) <Expr.to_bytes>` and
298+
:py:meth:`Expr.from_bytes(blob, ctx=ctx) <Expr.from_bytes>` calls
299+
honor the supplied ``ctx`` directly and ignore the sender / worker
300+
contexts.
301+
201302
Note that :py:func:`pickle.loads` itself remains unsafe on untrusted
202303
input regardless of this setting — an attacker producing the outer
203304
pickle envelope can execute arbitrary code before the codec ever

python/datafusion/context.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1786,6 +1786,16 @@ def with_python_udf_inlining(self, enabled: bool) -> SessionContext:
17861786
:meth:`Expr.from_bytes` input that may come from an untrusted
17871787
source — ``cloudpickle.loads`` will not be invoked.
17881788
1789+
The toggle applies directly to :meth:`Expr.to_bytes` /
1790+
:meth:`Expr.from_bytes` calls that pass this session as their
1791+
``ctx`` argument. To make the toggle apply through
1792+
:func:`pickle.dumps` (which calls :meth:`Expr.to_bytes` with no
1793+
context), install this session as the driver's sender context
1794+
via :func:`datafusion.ipc.set_sender_ctx` — and install it as
1795+
the worker's context via
1796+
:func:`datafusion.ipc.set_worker_ctx` for the corresponding
1797+
:func:`pickle.loads`.
1798+
17891799
``pickle.loads`` on untrusted bytes remains unsafe regardless of
17901800
this setting (see the `pickle module security warning
17911801
<https://docs.python.org/3/library/pickle.html#module-pickle>`_

python/datafusion/expr.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -457,9 +457,10 @@ def from_bytes(cls, buf: bytes, ctx: SessionContext | None = None) -> Expr:
457457
function references that travel by name — aggregate UDFs, window
458458
UDFs, FFI UDFs. When ``ctx`` is ``None`` the worker context
459459
installed via :func:`datafusion.ipc.set_worker_ctx` is consulted;
460-
if no worker context is installed, a fresh
460+
if no worker context is installed, the global
461461
:class:`SessionContext` is used (sufficient for built-ins and
462-
Python scalar UDFs).
462+
Python scalar UDFs, plus any UDFs registered on the global
463+
context).
463464
"""
464465
from datafusion.ipc import _resolve_ctx
465466

@@ -475,10 +476,17 @@ def __reduce__(self) -> tuple:
475476
UDFs require pre-registration on the worker. The worker's
476477
:class:`SessionContext` for resolving those references is
477478
looked up via :func:`datafusion.ipc.set_worker_ctx`, falling
478-
back to a fresh empty :class:`SessionContext` if none has been
479+
back to the global :class:`SessionContext` if none has been
479480
installed on the worker.
481+
482+
The encoding side honors a driver-side sender context installed
483+
via :func:`datafusion.ipc.set_sender_ctx` — that is how
484+
:meth:`SessionContext.with_python_udf_inlining` propagates
485+
through ``pickle.dumps``.
480486
"""
481-
return (Expr._reconstruct, (self.to_bytes(),))
487+
from datafusion.ipc import get_sender_ctx
488+
489+
return (Expr._reconstruct, (self.to_bytes(get_sender_ctx()),))
482490

483491
@classmethod
484492
def _reconstruct(cls, proto_bytes: bytes) -> Expr:

python/datafusion/ipc.py

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
"""Worker-side setup for distributing DataFusion expressions.
18+
"""Driver- and worker-side setup for distributing DataFusion expressions.
1919
2020
When a :class:`Expr` is shipped to a worker process (e.g. through
2121
:func:`multiprocessing.Pool` or a Ray actor), the worker reconstructs the
@@ -38,6 +38,24 @@
3838
inside the shipped expression itself and do not need pre-registration
3939
on the worker.
4040
41+
On the driver side, call :func:`set_sender_ctx` to control how
42+
:func:`pickle.dumps` encodes expressions — for example, to apply
43+
:meth:`SessionContext.with_python_udf_inlining` to every pickled
44+
expression on this thread:
45+
46+
>>> # doctest: +SKIP
47+
>>> from datafusion import SessionContext
48+
>>> from datafusion.ipc import set_sender_ctx
49+
>>>
50+
>>> driver_ctx = SessionContext().with_python_udf_inlining(False)
51+
>>> set_sender_ctx(driver_ctx)
52+
>>> pickle.dumps(expr) # encoded with inlining disabled
53+
54+
Without a sender context the default codec is used (Python UDF
55+
inlining on). The sender context only affects pickle / ``to_bytes``
56+
encoding; explicit ``expr.to_bytes(ctx)`` calls still use the supplied
57+
``ctx``.
58+
4159
See :doc:`/user-guide/io/distributing_work` for the full pattern.
4260
"""
4361

@@ -51,8 +69,11 @@
5169

5270

5371
__all__ = [
72+
"clear_sender_ctx",
5473
"clear_worker_ctx",
74+
"get_sender_ctx",
5575
"get_worker_ctx",
76+
"set_sender_ctx",
5677
"set_worker_ctx",
5778
]
5879

@@ -74,10 +95,10 @@ def set_worker_ctx(ctx: SessionContext) -> None:
7495
def clear_worker_ctx() -> None:
7596
"""Remove this worker's installed :class:`SessionContext`.
7697
77-
After clearing, expressions reconstructed in this worker fall back to a
78-
fresh empty :class:`SessionContext` — adequate for built-ins and Python
98+
After clearing, expressions reconstructed in this worker fall back to
99+
the global :class:`SessionContext` — adequate for built-ins and Python
79100
UDFs (scalar, aggregate, window), but anything imported via the FFI
80-
capsule protocol will fail to resolve.
101+
capsule protocol must be registered on the global context to resolve.
81102
"""
82103
if hasattr(_local, "ctx"):
83104
del _local.ctx
@@ -88,12 +109,48 @@ def get_worker_ctx() -> SessionContext | None:
88109
return getattr(_local, "ctx", None)
89110

90111

112+
def set_sender_ctx(ctx: SessionContext) -> None:
113+
"""Install this driver's :class:`SessionContext` for outbound pickles.
114+
115+
Controls how :func:`pickle.dumps` encodes :class:`Expr` instances on
116+
this thread. The most useful application is propagating a session
117+
configured with
118+
:meth:`SessionContext.with_python_udf_inlining` so the toggle takes
119+
effect through pickle (which otherwise calls
120+
:meth:`Expr.to_bytes` with no context and uses the default codec).
121+
122+
Idempotent: overwrites any previous value. Stored in a thread-local
123+
slot, so worker threads on the driver may install their own contexts.
124+
Does not affect :meth:`Expr.to_bytes` calls that pass an explicit
125+
``ctx`` — those continue to use the supplied context.
126+
"""
127+
_local.sender_ctx = ctx
128+
129+
130+
def clear_sender_ctx() -> None:
131+
"""Remove this driver's installed sender :class:`SessionContext`.
132+
133+
After clearing, pickled expressions fall back to the default codec
134+
(Python UDF inlining on).
135+
"""
136+
if hasattr(_local, "sender_ctx"):
137+
del _local.sender_ctx
138+
139+
140+
def get_sender_ctx() -> SessionContext | None:
141+
"""Return this driver's installed sender :class:`SessionContext`, or ``None``."""
142+
return getattr(_local, "sender_ctx", None)
143+
144+
91145
def _resolve_ctx(
92146
explicit_ctx: SessionContext | None = None,
93147
) -> SessionContext:
94148
"""Resolve a context for Expr reconstruction.
95149
96-
Priority: explicit argument > worker context > fresh context.
150+
Priority: explicit argument > worker context > global context.
151+
Falling back to the global :class:`SessionContext` (instead of a
152+
freshly constructed one) preserves any registrations the user has
153+
installed on it.
97154
"""
98155
if explicit_ctx is not None:
99156
return explicit_ctx
@@ -102,4 +159,4 @@ def _resolve_ctx(
102159
return worker
103160
from datafusion.context import SessionContext # noqa: PLC0415
104161

105-
return SessionContext()
162+
return SessionContext.global_ctx()

0 commit comments

Comments
 (0)