Skip to content

Commit dd8bd9d

Browse files
AP-25628: add checkpoint/restore (CRaC support in executor)
AP-25628 (PoC: "CRaC" for faster executor startup (suspend VM after start))
1 parent d45bb38 commit dd8bd9d

File tree

2 files changed

+73
-5
lines changed

2 files changed

+73
-5
lines changed

org.knime.python3.tests/src/test/java/org/knime/python3/PythonGatewayTrackerTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@
4848
*/
4949
package org.knime.python3;
5050

51+
import static org.junit.Assert.assertEquals;
5152
import static org.junit.Assert.assertFalse;
53+
import static org.junit.Assert.assertThrows;
5254
import static org.junit.Assert.assertTrue;
5355

5456
import java.io.IOException;
@@ -105,8 +107,18 @@ public void registerFromPandasColumnConverter(final String pythonModule, final S
105107
}
106108

107109
private static class DummyPythonGateway implements PythonGateway<DummyEntryPoint> {
110+
private final IOException m_closeException;
111+
108112
private boolean m_isClosed = false;
109113

114+
DummyPythonGateway() {
115+
this(null);
116+
}
117+
118+
DummyPythonGateway(final IOException closeException) {
119+
m_closeException = closeException;
120+
}
121+
110122
@Override
111123
public DummyEntryPoint getEntryPoint() {
112124
return null;
@@ -125,6 +137,9 @@ public InputStream getStandardErrorStream() {
125137
@Override
126138
public void close() throws IOException {
127139
m_isClosed = true;
140+
if (m_closeException != null) {
141+
throw m_closeException;
142+
}
128143
}
129144

130145
public boolean isClosed() {
@@ -198,4 +213,26 @@ public void testTrackerClosesOnGateClose() throws IOException {
198213
TRACKER.onPythonGatewayCreationGateClose();
199214
assertTrue(gateway.isClosed());
200215
}
216+
217+
@SuppressWarnings("resource")
218+
@Test
219+
public void testTrackerClosesForCheckpoint() throws IOException {
220+
final var gateway = new DummyPythonGateway();
221+
TRACKER.createTrackedGateway(gateway);
222+
TRACKER.clearForCheckpoint();
223+
assertTrue(gateway.isClosed());
224+
}
225+
226+
@SuppressWarnings("resource")
227+
@Test
228+
public void testCheckpointCleanupPropagatesIOException() {
229+
final var gateway = new DummyPythonGateway(new IOException("close failed"));
230+
TRACKER.createTrackedGateway(gateway);
231+
232+
final var exception = assertThrows(IOException.class, TRACKER::clearForCheckpoint);
233+
234+
assertTrue(gateway.isClosed());
235+
assertEquals("Aborting Python process failed.", exception.getMessage());
236+
assertEquals("close failed", exception.getCause().getMessage());
237+
}
201238
}

org.knime.python3/src/main/java/org/knime/python3/PythonGatewayTracker.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,15 @@
5050

5151
import java.io.IOException;
5252
import java.io.InputStream;
53+
import java.io.UncheckedIOException;
5354
import java.util.ArrayList;
5455
import java.util.Collections;
5556
import java.util.Map;
5657
import java.util.Set;
58+
import java.util.function.Consumer;
5759

60+
import org.knime.core.checkpoint.PhasedInit;
61+
import org.knime.core.checkpoint.PhasedInitSupport;
5862
import org.knime.core.node.NodeLogger;
5963
import org.knime.python3.PythonGatewayCreationGate.PythonGatewayCreationGateListener;
6064

@@ -79,6 +83,18 @@ public final class PythonGatewayTracker implements PythonGatewayCreationGateList
7983

8084
private PythonGatewayTracker() {
8185
m_openGateways = gatewaySet();
86+
// Support CRaC (Coordinated Restore at Checkpoint) and close all connections prior to checkpointing
87+
PhasedInitSupport.registerOrActivate(new PhasedInit<RuntimeException>() {
88+
@Override
89+
public void beforeCheckpoint() throws RuntimeException {
90+
try {
91+
clearForCheckpoint();
92+
} catch (IOException ex) {
93+
throw new UncheckedIOException(
94+
"Error when forcefully terminating Python processes before checkpointing", ex);
95+
}
96+
}
97+
});
8298
}
8399

84100
/**
@@ -107,15 +123,30 @@ public void onPythonGatewayCreationGateClose() {
107123
}
108124

109125
void clear() throws IOException {
126+
clear(LOGGER::error,
127+
"Found running Python processes (%d). Aborting them to allow installation process. "
128+
+ "If this leads to failures in node execution, "
129+
+ "please restart those nodes once the installation has finished. Triggered from thread '%s'.");
130+
}
131+
132+
void clearForCheckpoint() throws IOException {
133+
clear(LOGGER::info,
134+
"Found running Python processes (%d). Aborting them prior to checkpointing. Triggered from thread '%s'.");
135+
}
136+
137+
/**
138+
* Closes all open gateways and logs a message using the provided consumer.
139+
*
140+
* @param logMessageConsumer consumer for logging messages
141+
* @param logMessage message format string with placeholders for gateway count and thread name
142+
* @throws IOException if an error occurs while closing the gateways
143+
*/
144+
private void clear(final Consumer<String> logMessageConsumer, final String logMessage) throws IOException {
110145
if (m_openGateways.isEmpty()) {
111146
return;
112147
}
113148

114-
LOGGER.errorWithFormat(
115-
"Found running Python processes (%d). Aborting them to allow installation process. "
116-
+ "If this leads to failures in node execution, "
117-
+ "please restart those nodes once the installation has finished. Triggered from thread '%s'.",
118-
m_openGateways.size(), Thread.currentThread().getName());
149+
logMessageConsumer.accept(String.format(logMessage, m_openGateways.size(), Thread.currentThread().getName()));
119150

120151
var exceptions = new ArrayList<Exception>();
121152
for (var gateway : m_openGateways) {

0 commit comments

Comments
 (0)