Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
12aeef6
Run plot comms synchronously on the R thread
lionel- Mar 28, 2026
03926f0
Fix `dev.hold()` issue introduced by pre-renderings
lionel- Mar 10, 2026
92f3ed2
Move device context to Console
lionel- Mar 10, 2026
a679edd
Set Prerender settings synchronously from the UI handler
lionel- Mar 10, 2026
959bcb0
Fix message interpolation
lionel- Mar 10, 2026
8a7eecf
Update stale comment
lionel- Mar 28, 2026
6308311
Address code review
lionel- Mar 28, 2026
7573327
Pass `Console` reference to handlers directly
lionel- Mar 29, 2026
fa4df0b
Add comment about the need for `Cell`
lionel- Mar 29, 2026
38bd585
Remove unneeded warmup
lionel- Mar 29, 2026
2194a31
Always record plot even if withheld
lionel- Mar 29, 2026
0c42552
Ignore UI comm busy events by default in tests
lionel- Mar 29, 2026
ce50616
Fix synchronisation between comm opening and plot updates
lionel- Mar 30, 2026
9d61505
Fix semantic conflicts from rebase
lionel- Apr 3, 2026
06b0cc8
Fix workspace inheritance
lionel- Apr 3, 2026
b9c7b26
Pass Console refs explicitly to device context methods
lionel- Apr 16, 2026
116a6f3
Store comms in a RefCell
lionel- Apr 16, 2026
a5db467
Remove `Console::get()` usage in `console_comm.rs`
lionel- Apr 16, 2026
0114eef
Make `Console::comm_` methods take non-mut `&self`
lionel- Apr 17, 2026
446eba5
Revert "Make `Console::comm_` methods take non-mut `&self`"
lionel- Apr 17, 2026
b120fc6
Revert "Remove `Console::get()` usage in `console_comm.rs`"
lionel- Apr 17, 2026
531f4e8
Don't pass whole `Console` to handlers after all
lionel- Apr 17, 2026
44baf44
Pass `session_mode` at construction time
lionel- Apr 17, 2026
e347603
Remove dependency on Console for fallback execution context
lionel- Apr 17, 2026
48aec70
Take `&self` in `comm_open_frontend()`
lionel- Apr 17, 2026
aca7204
Take `&self` in `comm_handle_msg()`
lionel- Apr 17, 2026
0a657f7
Use safer `with_` pattern
lionel- Apr 17, 2026
3b1675c
Drain events during `CommMsg` and `CommClose`
lionel- Apr 17, 2026
26a5c70
Remove `CommEvent::Barrier`
lionel- Apr 17, 2026
0ef8462
Drain events during `CommOpen` too
lionel- Apr 17, 2026
2436553
Reuse event drainer in execute request handler
lionel- Apr 17, 2026
5bc0250
Extract `handle_comm_notification()`
lionel- Apr 17, 2026
9a0725f
Fix semantic conflicts after rebase
lionel- Apr 17, 2026
a237e55
Fix UI comm reentrancy when sending events from R
lionel- Apr 17, 2026
5e88c9c
Don't panic on borrow errors in release builds
lionel- Apr 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions crates/amalthea/src/comm/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@
*
*/

use crossbeam::channel::Sender;
use serde_json::Value;

use crate::comm::comm_channel::CommMsg;
use crate::socket::comm::CommSocket;

/// Comm events sent to the frontend via Shell.
pub enum CommEvent {
/// A new Comm was opened
Opened(CommSocket, Value),
/// A new Comm was opened. The optional `Sender` is a synchronisation
/// barrier: if provided, Shell signals it after processing the open
/// (sending `comm_open` on IOPub). The caller blocks on the paired
/// receiver to guarantee that the `comm_open` message has been sent
/// before any subsequent messages.
Opened(CommSocket, Value, Option<Sender<()>>),

/// A message was received on a Comm; the first value is the comm ID, and the
/// second value is the message.
Expand Down
68 changes: 37 additions & 31 deletions crates/amalthea/src/language/shell_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

use async_trait::async_trait;
use crossbeam::channel::Receiver;

use crate::comm::comm_channel::Comm;
use crate::comm::comm_channel::CommMsg;
Expand All @@ -25,10 +26,10 @@ use crate::wire::kernel_info_request::KernelInfoRequest;
use crate::wire::originator::Originator;

/// Result of a `handle_comm_msg` or `handle_comm_close` call on the
/// `ShellHandler`. `Handled` means the kernel processed the message
/// synchronously (blocking Shell until done). `NotHandled` means amalthea
/// should fall back to the historical `incoming_tx` path. This fallback is
/// temporary until all comms are migrated to the blocking path.
/// `ShellHandler`. `Handled` means the kernel dispatched the message
/// (possibly asynchronously via a completion receiver). `NotHandled` means
/// amalthea should fall back to the historical `incoming_tx` path. This
/// fallback is temporary until all comms are migrated to the new path.
pub enum CommHandled {
Handled,
NotHandled,
Expand All @@ -53,17 +54,17 @@ pub trait ShellHandler: Send {
req: &IsCompleteRequest,
) -> crate::Result<IsCompleteReply>;

/// Handles a request to execute code.
///
/// The `originator` is an opaque byte array identifying the peer that sent
/// the request; it is needed to perform an input request during execution.
/// Kicks off execution of the given request and returns a channel that
/// will receive the reply once execution completes. Shell select-loops
/// on this receiver together with `comm_event_rx` so it can process
/// comm events (e.g. barrier handshakes) while execution is in progress.
///
/// Docs: https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute
async fn handle_execute_request(
fn start_execute_request(
&mut self,
originator: Originator,
req: &ExecuteRequest,
) -> crate::Result<ExecuteReply>;
) -> Receiver<crate::Result<ExecuteReply>>;

/// Handles a request to provide completions for the given code fragment.
///
Expand All @@ -80,26 +81,30 @@ pub trait ShellHandler: Send {
/// Docs: https://jupyter-client.readthedocs.io/en/stable/messaging.html#history
async fn handle_history_request(&self, req: &HistoryRequest) -> crate::Result<HistoryReply>;

/// Handles a request to open a comm.
///
/// https://jupyter-client.readthedocs.io/en/stable/messaging.html#opening-a-comm
/// Handle a request to open a comm.
///
/// Returns true if the handler handled the request (and opened the comm), false if it did not.
/// Returns `(true, Some(receiver))` if the comm was opened and the handler
/// was dispatched asynchronously. Shell will select-loop on the receiver
/// and `comm_event_rx` to drain comm events while the handler runs.
///
/// * `target` - The target name of the comm, such as `positron.variables`
/// * `comm` - The comm channel to use to communicate with the frontend
/// * `data` - The `data` payload from the `comm_open` message
async fn handle_comm_open(
&self,
/// Returns `(true, None)` if the comm was opened synchronously.
/// Returns `(false, None)` if the comm was not handled.
fn handle_comm_open(
&mut self,
target: Comm,
comm: CommSocket,
data: serde_json::Value,
) -> crate::Result<bool>;
) -> crate::Result<(bool, Option<Receiver<()>>)>;

/// Handle an incoming comm message (RPC or data). Return
/// `CommHandled::Handled` if the message was processed, or
/// `CommHandled::NotHandled` to fall back to the existing
/// `incoming_tx` path.
/// Handle an incoming comm message (RPC or data).
///
/// Returns `(CommHandled::Handled, Some(receiver))` if the message was
/// dispatched to the R thread. Shell will select-loop on the receiver
/// and `comm_event_rx` to drain comm events (e.g. barriers from
/// `comm_open_backend`) while the handler runs.
///
/// Returns `(CommHandled::NotHandled, None)` to fall back to the
/// existing `incoming_tx` path.
///
/// * `comm_id` - The comm's unique identifier
/// * `comm_name` - The comm's target name (e.g. `"positron.dataExplorer"`)
Expand All @@ -112,21 +117,22 @@ pub trait ShellHandler: Send {
_comm_name: &str,
_msg: CommMsg,
_originator: Originator,
) -> crate::Result<CommHandled> {
Ok(CommHandled::NotHandled)
) -> crate::Result<(CommHandled, Option<Receiver<()>>)> {
Ok((CommHandled::NotHandled, None))
}

/// Handle a comm close. Return `CommHandled::Handled` if the close
/// was processed, or `CommHandled::NotHandled` to fall back to the
/// existing `incoming_tx` path.
/// Handle a comm close.
///
/// Same pattern as `handle_comm_msg`: returns a completion receiver
/// so Shell can drain comm events while the handler runs.
///
/// * `comm_id` - The comm's unique identifier
/// * `comm_name` - The comm's target name
fn handle_comm_close(
&mut self,
_comm_id: &str,
_comm_name: &str,
) -> crate::Result<CommHandled> {
Ok(CommHandled::NotHandled)
) -> crate::Result<(CommHandled, Option<Receiver<()>>)> {
Ok((CommHandled::NotHandled, None))
}
}
Loading
Loading