Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -962,12 +962,35 @@ private void resetForNewExecution() {
noMoreResults = false;
updateCount = -1;

// Per JDBC spec, re-execution does not explicitly close the previous server-side
// operation handle. The server manages operation handle lifecycle — handles are
// cleaned up when the session closes or the server evicts idle operations.
// Attempting to close handles here would corrupt Thrift HTTP transport connections
// when the server returns unexpected responses (e.g., WireMock 404 in tests).
// For direct results, the server already closed the handle.
// Close the previous server-side operation if it exists. This prevents resource
// leaks when a Statement is re-executed (e.g., PreparedStatement in a loop).
// This matches the behavior of pgJDBC, MySQL Connector/J, Trino JDBC, and
// Databricks Python SQL Connector — all close the previous operation on re-execute.
//
// Note on directResultsReceived: we check the flag value from the PREVIOUS execution
// here. The flag is reset to false below, after this close attempt.
//
// Note on latency: this close is synchronous (adds one RPC round-trip before the next
// execution). This is consistent with pgJDBC's closeForNextExecution() which is also
// synchronous. The correctness benefit (no orphaned server operations) outweighs the
// latency cost for typical usage patterns.
//
// Skip if: (1) no previous execution (statementId==null), or
// (2) server already closed the operation (direct results).
if (statementId != null && !directResultsReceived) {
try {
connection.getSession().getDatabricksClient().closeStatement(statementId);
} catch (Exception e) {
// Don't block re-execution if closing the previous operation fails.
// This covers: network errors, operation already expired/evicted on server,
// and transport-level errors (e.g., unexpected server responses).
// The new execution will create a fresh operation with a new statementId.
LOGGER.debug(
"Failed to close previous server operation {} during re-execution: {}",
statementId,
e.getMessage());
}
}

directResultsReceived = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,15 +1028,156 @@ public void testReExecutionClosesPreviousResultSetWithoutServerHandleClose() thr
// First execution
statement.executeQuery(STATEMENT);

// Second execution — previous ResultSet is closed per JDBC spec.
// Server handle is NOT explicitly closed (server manages handle lifecycle).
// Second execution — previous ResultSet closed. No server close because
// statementId is null in this mock setup (mock doesn't call setStatementId).
statement.executeQuery(STATEMENT);

verify(firstResult, times(1)).close();
verify(client, never()).closeStatement(any(StatementId.class));
assertEquals(secondResult, statement.getResultSet());
}

@Test
public void testReExecutionClosesServerOperation() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

DatabricksResultSet firstResult = mock(DatabricksResultSet.class);
DatabricksResultSet secondResult = mock(DatabricksResultSet.class);

when(client.executeStatement(
eq(STATEMENT),
eq(new Warehouse(WAREHOUSE_ID)),
eq(new HashMap<>()),
eq(StatementType.QUERY),
any(IDatabricksSession.class),
eq(statement),
any()))
.thenReturn(firstResult)
.thenReturn(secondResult);

// First execution
statement.executeQuery(STATEMENT);
// Simulate server setting the statementId (normally done inside executeStatement)
StatementId firstStatementId = new StatementId("first-stmt-id");
statement.setStatementId(firstStatementId);

// Second execution — should close the first server operation
statement.executeQuery(STATEMENT);

verify(client, times(1)).closeStatement(eq(firstStatementId));
verify(firstResult, times(1)).close();
assertEquals(secondResult, statement.getResultSet());
}

@Test
public void testReExecutionSkipsServerCloseForDirectResults() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

DatabricksResultSet firstResult = mock(DatabricksResultSet.class);
DatabricksResultSet secondResult = mock(DatabricksResultSet.class);

when(client.executeStatement(
eq(STATEMENT),
eq(new Warehouse(WAREHOUSE_ID)),
eq(new HashMap<>()),
eq(StatementType.QUERY),
any(IDatabricksSession.class),
eq(statement),
any()))
.thenReturn(firstResult)
.thenReturn(secondResult);

// First execution with direct results (server already closed the operation)
statement.executeQuery(STATEMENT);
statement.setStatementId(new StatementId("direct-stmt-id"));
statement.markDirectResultsReceived();

// Second execution — should NOT close server operation (already closed by server)
statement.executeQuery(STATEMENT);

verify(client, never()).closeStatement(any(StatementId.class));
verify(firstResult, times(1)).close();
}

@Test
public void testReExecutionHandlesCloseFailureGracefully() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

DatabricksResultSet firstResult = mock(DatabricksResultSet.class);
DatabricksResultSet secondResult = mock(DatabricksResultSet.class);
StatementId firstStatementId = new StatementId("failing-stmt-id");

// closeStatement throws (e.g., operation already expired on server)
doThrow(new DatabricksSQLException("Operation not found", "HY000"))
.when(client)
.closeStatement(eq(firstStatementId));

when(client.executeStatement(
eq(STATEMENT),
eq(new Warehouse(WAREHOUSE_ID)),
eq(new HashMap<>()),
eq(StatementType.QUERY),
any(IDatabricksSession.class),
eq(statement),
any()))
.thenReturn(firstResult)
.thenReturn(secondResult);

statement.executeQuery(STATEMENT);
statement.setStatementId(firstStatementId);

// Re-execution should succeed even though closing previous operation failed
assertDoesNotThrow(() -> statement.executeQuery(STATEMENT));
assertEquals(secondResult, statement.getResultSet());
}

@Test
public void testReExecutionHandlesTransportErrorGracefully() throws Exception {
IDatabricksConnectionContext connectionContext =
DatabricksConnectionContext.parse(JDBC_URL, new Properties());
DatabricksConnection connection = new DatabricksConnection(connectionContext, client);
DatabricksStatement statement = new DatabricksStatement(connection);

DatabricksResultSet firstResult = mock(DatabricksResultSet.class);
DatabricksResultSet secondResult = mock(DatabricksResultSet.class);
StatementId firstStatementId = new StatementId("transport-error-stmt-id");

// closeStatement throws a transport-level error (e.g., unexpected server response,
// corrupted framed transport). This is the scarier failure mode — not just "not found"
// but a low-level I/O error that could corrupt shared transport state.
doThrow(new RuntimeException("HTTP request failed by code: 500, unexpected response"))
.when(client)
.closeStatement(eq(firstStatementId));

when(client.executeStatement(
eq(STATEMENT),
eq(new Warehouse(WAREHOUSE_ID)),
eq(new HashMap<>()),
eq(StatementType.QUERY),
any(IDatabricksSession.class),
eq(statement),
any()))
.thenReturn(firstResult)
.thenReturn(secondResult);

statement.executeQuery(STATEMENT);
statement.setStatementId(firstStatementId);

// Re-execution must succeed even with transport-level close failure.
// The new execution creates a fresh server operation with a new statementId.
assertDoesNotThrow(() -> statement.executeQuery(STATEMENT));
assertEquals(secondResult, statement.getResultSet());
}

@Test
public void testAsyncExecutionResetsStateFromPreviousSyncExecution() throws Exception {
IDatabricksConnectionContext connectionContext =
Expand Down
Loading