From ed417887ffc6602ab49e44dd0b5c631699422dcc Mon Sep 17 00:00:00 2001 From: Bernd Wiswedel Date: Tue, 24 Feb 2026 18:19:35 +0100 Subject: [PATCH] AP-25628: add checkpoint/restore (CRaC support in executor) AP-25628 (PoC: "CRaC" for faster executor startup (suspend VM after start)) --- .../python3/PythonGatewayTrackerTest.java | 37 +++++++++++++++++ .../knime/python3/PythonGatewayTracker.java | 41 ++++++++++++++++--- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/org.knime.python3.tests/src/test/java/org/knime/python3/PythonGatewayTrackerTest.java b/org.knime.python3.tests/src/test/java/org/knime/python3/PythonGatewayTrackerTest.java index d7f3fb5d9..db9dcd8bb 100644 --- a/org.knime.python3.tests/src/test/java/org/knime/python3/PythonGatewayTrackerTest.java +++ b/org.knime.python3.tests/src/test/java/org/knime/python3/PythonGatewayTrackerTest.java @@ -48,7 +48,9 @@ */ package org.knime.python3; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -105,8 +107,18 @@ public void registerFromPandasColumnConverter(final String pythonModule, final S } private static class DummyPythonGateway implements PythonGateway { + private final IOException m_closeException; + private boolean m_isClosed = false; + DummyPythonGateway() { + this(null); + } + + DummyPythonGateway(final IOException closeException) { + m_closeException = closeException; + } + @Override public DummyEntryPoint getEntryPoint() { return null; @@ -125,6 +137,9 @@ public InputStream getStandardErrorStream() { @Override public void close() throws IOException { m_isClosed = true; + if (m_closeException != null) { + throw m_closeException; + } } public boolean isClosed() { @@ -198,4 +213,26 @@ public void testTrackerClosesOnGateClose() throws IOException { TRACKER.onPythonGatewayCreationGateClose(); assertTrue(gateway.isClosed()); } + + @SuppressWarnings("resource") + @Test + public void testTrackerClosesForCheckpoint() throws IOException { + final var gateway = new DummyPythonGateway(); + TRACKER.createTrackedGateway(gateway); + TRACKER.clearForCheckpoint(); + assertTrue(gateway.isClosed()); + } + + @SuppressWarnings("resource") + @Test + public void testCheckpointCleanupPropagatesIOException() { + final var gateway = new DummyPythonGateway(new IOException("close failed")); + TRACKER.createTrackedGateway(gateway); + + final var exception = assertThrows(IOException.class, TRACKER::clearForCheckpoint); + + assertTrue(gateway.isClosed()); + assertEquals("Aborting Python process failed.", exception.getMessage()); + assertEquals("close failed", exception.getCause().getMessage()); + } } diff --git a/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayTracker.java b/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayTracker.java index 63d5782ae..7b86371d3 100644 --- a/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayTracker.java +++ b/org.knime.python3/src/main/java/org/knime/python3/PythonGatewayTracker.java @@ -50,11 +50,15 @@ import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; +import org.knime.core.checkpoint.PhasedInit; +import org.knime.core.checkpoint.PhasedInitSupport; import org.knime.core.node.NodeLogger; import org.knime.python3.PythonGatewayCreationGate.PythonGatewayCreationGateListener; @@ -79,6 +83,18 @@ public final class PythonGatewayTracker implements PythonGatewayCreationGateList private PythonGatewayTracker() { m_openGateways = gatewaySet(); + // Support CRaC (Coordinated Restore at Checkpoint) and close all connections prior to checkpointing + PhasedInitSupport.registerOrActivate(new PhasedInit() { + @Override + public void beforeCheckpoint() throws RuntimeException { + try { + clearForCheckpoint(); + } catch (IOException ex) { + throw new UncheckedIOException( + "Error when forcefully terminating Python processes before checkpointing", ex); + } + } + }); } /** @@ -107,15 +123,30 @@ public void onPythonGatewayCreationGateClose() { } void clear() throws IOException { + clear(LOGGER::error, + "Found running Python processes (%d). Aborting them to allow installation process. " + + "If this leads to failures in node execution, " + + "please restart those nodes once the installation has finished. Triggered from thread '%s'."); + } + + void clearForCheckpoint() throws IOException { + clear(LOGGER::info, + "Found running Python processes (%d). Aborting them prior to checkpointing. Triggered from thread '%s'."); + } + + /** + * Closes all open gateways and logs a message using the provided consumer. + * + * @param logMessageConsumer consumer for logging messages + * @param logMessage message format string with placeholders for gateway count and thread name + * @throws IOException if an error occurs while closing the gateways + */ + private void clear(final Consumer logMessageConsumer, final String logMessage) throws IOException { if (m_openGateways.isEmpty()) { return; } - LOGGER.errorWithFormat( - "Found running Python processes (%d). Aborting them to allow installation process. " - + "If this leads to failures in node execution, " - + "please restart those nodes once the installation has finished. Triggered from thread '%s'.", - m_openGateways.size(), Thread.currentThread().getName()); + logMessageConsumer.accept(String.format(logMessage, m_openGateways.size(), Thread.currentThread().getName())); var exceptions = new ArrayList(); for (var gateway : m_openGateways) {