From f101f01c75059b7cd63f4359fe19adbe5cd59a52 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Thu, 23 Apr 2026 14:11:31 +0530 Subject: [PATCH 1/2] Close server-side operation on Statement re-execution When a Statement is re-executed, the previous server-side operation is now closed before starting the new execution. This prevents resource leaks (e.g., PreparedStatement in a loop leaving orphaned operations). This matches the behavior of pgJDBC (closeForNextExecution), MySQL Connector/J (implicitlyCloseAllOpenResults), Trino JDBC (clearCurrentResults), and Databricks Python SQL Connector (_close_and_clear_active_result_set). Skips server close when: - No previous execution (statementId is null) - Server already closed the operation (direct results) Failure to close is logged at DEBUG and does not block re-execution. Tests: - testReExecutionClosesServerOperation: verifies closeStatement called - testReExecutionSkipsServerCloseForDirectResults: verifies skip - testReExecutionHandlesCloseFailureGracefully: verifies non-blocking Signed-off-by: Gopal Lal Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../jdbc/api/impl/DatabricksStatement.java | 24 +++- .../api/impl/DatabricksStatementTest.java | 107 +++++++++++++++++- 2 files changed, 123 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java index c9b6733e7..aebfb4578 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java @@ -962,12 +962,24 @@ private void resetForNewExecution() { noMoreResults = false; updateCount = -1; - // Per JDBC spec, re-execution does not explicitly close the previous server-side - // operation handle. The server manages operation handle lifecycle — handles are - // cleaned up when the session closes or the server evicts idle operations. - // Attempting to close handles here would corrupt Thrift HTTP transport connections - // when the server returns unexpected responses (e.g., WireMock 404 in tests). - // For direct results, the server already closed the handle. + // Close the previous server-side operation if it exists. This prevents resource + // leaks when a Statement is re-executed (e.g., PreparedStatement in a loop). + // This matches the behavior of pgJDBC, MySQL Connector/J, Trino JDBC, and + // Databricks Python SQL Connector — all close the previous operation on re-execute. + // Skip if: (1) no previous execution (statementId==null), or + // (2) server already closed the operation (direct results). + if (statementId != null && !directResultsReceived) { + try { + connection.getSession().getDatabricksClient().closeStatement(statementId); + } catch (Exception e) { + // Don't block re-execution if closing the previous operation fails + // (e.g., network error, operation already expired on server) + LOGGER.debug( + "Failed to close previous server operation {} during re-execution: {}", + statementId, + e.getMessage()); + } + } directResultsReceived = false; diff --git a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java index ff5d9e7ec..726a148d8 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java @@ -1028,8 +1028,8 @@ public void testReExecutionClosesPreviousResultSetWithoutServerHandleClose() thr // First execution statement.executeQuery(STATEMENT); - // Second execution — previous ResultSet is closed per JDBC spec. - // Server handle is NOT explicitly closed (server manages handle lifecycle). + // Second execution — previous ResultSet closed. No server close because + // statementId is null in this mock setup (mock doesn't call setStatementId). statement.executeQuery(STATEMENT); verify(firstResult, times(1)).close(); @@ -1037,6 +1037,109 @@ public void testReExecutionClosesPreviousResultSetWithoutServerHandleClose() thr assertEquals(secondResult, statement.getResultSet()); } + @Test + public void testReExecutionClosesServerOperation() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + DatabricksResultSet firstResult = mock(DatabricksResultSet.class); + DatabricksResultSet secondResult = mock(DatabricksResultSet.class); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any())) + .thenReturn(firstResult) + .thenReturn(secondResult); + + // First execution + statement.executeQuery(STATEMENT); + // Simulate server setting the statementId (normally done inside executeStatement) + StatementId firstStatementId = new StatementId("first-stmt-id"); + statement.setStatementId(firstStatementId); + + // Second execution — should close the first server operation + statement.executeQuery(STATEMENT); + + verify(client, times(1)).closeStatement(eq(firstStatementId)); + verify(firstResult, times(1)).close(); + assertEquals(secondResult, statement.getResultSet()); + } + + @Test + public void testReExecutionSkipsServerCloseForDirectResults() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + DatabricksResultSet firstResult = mock(DatabricksResultSet.class); + DatabricksResultSet secondResult = mock(DatabricksResultSet.class); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any())) + .thenReturn(firstResult) + .thenReturn(secondResult); + + // First execution with direct results (server already closed the operation) + statement.executeQuery(STATEMENT); + statement.setStatementId(new StatementId("direct-stmt-id")); + statement.markDirectResultsReceived(); + + // Second execution — should NOT close server operation (already closed by server) + statement.executeQuery(STATEMENT); + + verify(client, never()).closeStatement(any(StatementId.class)); + verify(firstResult, times(1)).close(); + } + + @Test + public void testReExecutionHandlesCloseFailureGracefully() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + DatabricksResultSet firstResult = mock(DatabricksResultSet.class); + DatabricksResultSet secondResult = mock(DatabricksResultSet.class); + StatementId firstStatementId = new StatementId("failing-stmt-id"); + + // closeStatement throws (e.g., operation already expired on server) + doThrow(new DatabricksSQLException("Operation not found", "HY000")) + .when(client) + .closeStatement(eq(firstStatementId)); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any())) + .thenReturn(firstResult) + .thenReturn(secondResult); + + statement.executeQuery(STATEMENT); + statement.setStatementId(firstStatementId); + + // Re-execution should succeed even though closing previous operation failed + assertDoesNotThrow(() -> statement.executeQuery(STATEMENT)); + assertEquals(secondResult, statement.getResultSet()); + } + @Test public void testAsyncExecutionResetsStateFromPreviousSyncExecution() throws Exception { IDatabricksConnectionContext connectionContext = From daacb722cca9d69ded905efa6026517915e20043 Mon Sep 17 00:00:00 2001 From: Gopal Lal Date: Thu, 23 Apr 2026 14:22:45 +0530 Subject: [PATCH 2/2] Address review feedback: comments, transport error test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add comment explaining directResultsReceived timing: the flag checked is from the PREVIOUS execution, reset happens below - Add comment explaining synchronous close latency tradeoff: consistent with pgJDBC's closeForNextExecution, correctness over latency - Expand catch comment to cover transport-level errors - Add testReExecutionHandlesTransportErrorGracefully: verifies re-execution succeeds even when closeStatement throws a RuntimeException (transport-level error like HTTP 500), not just DatabricksSQLException Note on integration test gap (#3): the existing mock-based tests verify the close is called. Transport-level regression requires a real Thrift endpoint — tracked as a follow-up with the fake service infrastructure. Signed-off-by: Gopal Lal Co-authored-by: Isaac Signed-off-by: Gopal Lal --- .../jdbc/api/impl/DatabricksStatement.java | 15 +++++++- .../api/impl/DatabricksStatementTest.java | 38 +++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java index aebfb4578..0afe1c26f 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java @@ -966,14 +966,25 @@ private void resetForNewExecution() { // leaks when a Statement is re-executed (e.g., PreparedStatement in a loop). // This matches the behavior of pgJDBC, MySQL Connector/J, Trino JDBC, and // Databricks Python SQL Connector — all close the previous operation on re-execute. + // + // Note on directResultsReceived: we check the flag value from the PREVIOUS execution + // here. The flag is reset to false below, after this close attempt. + // + // Note on latency: this close is synchronous (adds one RPC round-trip before the next + // execution). This is consistent with pgJDBC's closeForNextExecution() which is also + // synchronous. The correctness benefit (no orphaned server operations) outweighs the + // latency cost for typical usage patterns. + // // Skip if: (1) no previous execution (statementId==null), or // (2) server already closed the operation (direct results). if (statementId != null && !directResultsReceived) { try { connection.getSession().getDatabricksClient().closeStatement(statementId); } catch (Exception e) { - // Don't block re-execution if closing the previous operation fails - // (e.g., network error, operation already expired on server) + // Don't block re-execution if closing the previous operation fails. + // This covers: network errors, operation already expired/evicted on server, + // and transport-level errors (e.g., unexpected server responses). + // The new execution will create a fresh operation with a new statementId. LOGGER.debug( "Failed to close previous server operation {} during re-execution: {}", statementId, diff --git a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java index 726a148d8..1b56abedd 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java @@ -1140,6 +1140,44 @@ public void testReExecutionHandlesCloseFailureGracefully() throws Exception { assertEquals(secondResult, statement.getResultSet()); } + @Test + public void testReExecutionHandlesTransportErrorGracefully() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + DatabricksResultSet firstResult = mock(DatabricksResultSet.class); + DatabricksResultSet secondResult = mock(DatabricksResultSet.class); + StatementId firstStatementId = new StatementId("transport-error-stmt-id"); + + // closeStatement throws a transport-level error (e.g., unexpected server response, + // corrupted framed transport). This is the scarier failure mode — not just "not found" + // but a low-level I/O error that could corrupt shared transport state. + doThrow(new RuntimeException("HTTP request failed by code: 500, unexpected response")) + .when(client) + .closeStatement(eq(firstStatementId)); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any())) + .thenReturn(firstResult) + .thenReturn(secondResult); + + statement.executeQuery(STATEMENT); + statement.setStatementId(firstStatementId); + + // Re-execution must succeed even with transport-level close failure. + // The new execution creates a fresh server operation with a new statementId. + assertDoesNotThrow(() -> statement.executeQuery(STATEMENT)); + assertEquals(secondResult, statement.getResultSet()); + } + @Test public void testAsyncExecutionResetsStateFromPreviousSyncExecution() throws Exception { IDatabricksConnectionContext connectionContext =