diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java index c9b6733e7..0afe1c26f 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java @@ -962,12 +962,35 @@ private void resetForNewExecution() { noMoreResults = false; updateCount = -1; - // Per JDBC spec, re-execution does not explicitly close the previous server-side - // operation handle. The server manages operation handle lifecycle — handles are - // cleaned up when the session closes or the server evicts idle operations. - // Attempting to close handles here would corrupt Thrift HTTP transport connections - // when the server returns unexpected responses (e.g., WireMock 404 in tests). - // For direct results, the server already closed the handle. + // Close the previous server-side operation if it exists. This prevents resource + // leaks when a Statement is re-executed (e.g., PreparedStatement in a loop). + // This matches the behavior of pgJDBC, MySQL Connector/J, Trino JDBC, and + // Databricks Python SQL Connector — all close the previous operation on re-execute. + // + // Note on directResultsReceived: we check the flag value from the PREVIOUS execution + // here. The flag is reset to false below, after this close attempt. + // + // Note on latency: this close is synchronous (adds one RPC round-trip before the next + // execution). This is consistent with pgJDBC's closeForNextExecution() which is also + // synchronous. The correctness benefit (no orphaned server operations) outweighs the + // latency cost for typical usage patterns. + // + // Skip if: (1) no previous execution (statementId==null), or + // (2) server already closed the operation (direct results). + if (statementId != null && !directResultsReceived) { + try { + connection.getSession().getDatabricksClient().closeStatement(statementId); + } catch (Exception e) { + // Don't block re-execution if closing the previous operation fails. + // This covers: network errors, operation already expired/evicted on server, + // and transport-level errors (e.g., unexpected server responses). + // The new execution will create a fresh operation with a new statementId. + LOGGER.debug( + "Failed to close previous server operation {} during re-execution: {}", + statementId, + e.getMessage()); + } + } directResultsReceived = false; diff --git a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java index ff5d9e7ec..1b56abedd 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/DatabricksStatementTest.java @@ -1028,8 +1028,8 @@ public void testReExecutionClosesPreviousResultSetWithoutServerHandleClose() thr // First execution statement.executeQuery(STATEMENT); - // Second execution — previous ResultSet is closed per JDBC spec. - // Server handle is NOT explicitly closed (server manages handle lifecycle). + // Second execution — previous ResultSet closed. No server close because + // statementId is null in this mock setup (mock doesn't call setStatementId). statement.executeQuery(STATEMENT); verify(firstResult, times(1)).close(); @@ -1037,6 +1037,147 @@ public void testReExecutionClosesPreviousResultSetWithoutServerHandleClose() thr assertEquals(secondResult, statement.getResultSet()); } + @Test + public void testReExecutionClosesServerOperation() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + DatabricksResultSet firstResult = mock(DatabricksResultSet.class); + DatabricksResultSet secondResult = mock(DatabricksResultSet.class); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any())) + .thenReturn(firstResult) + .thenReturn(secondResult); + + // First execution + statement.executeQuery(STATEMENT); + // Simulate server setting the statementId (normally done inside executeStatement) + StatementId firstStatementId = new StatementId("first-stmt-id"); + statement.setStatementId(firstStatementId); + + // Second execution — should close the first server operation + statement.executeQuery(STATEMENT); + + verify(client, times(1)).closeStatement(eq(firstStatementId)); + verify(firstResult, times(1)).close(); + assertEquals(secondResult, statement.getResultSet()); + } + + @Test + public void testReExecutionSkipsServerCloseForDirectResults() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + DatabricksResultSet firstResult = mock(DatabricksResultSet.class); + DatabricksResultSet secondResult = mock(DatabricksResultSet.class); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any())) + .thenReturn(firstResult) + .thenReturn(secondResult); + + // First execution with direct results (server already closed the operation) + statement.executeQuery(STATEMENT); + statement.setStatementId(new StatementId("direct-stmt-id")); + statement.markDirectResultsReceived(); + + // Second execution — should NOT close server operation (already closed by server) + statement.executeQuery(STATEMENT); + + verify(client, never()).closeStatement(any(StatementId.class)); + verify(firstResult, times(1)).close(); + } + + @Test + public void testReExecutionHandlesCloseFailureGracefully() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + DatabricksResultSet firstResult = mock(DatabricksResultSet.class); + DatabricksResultSet secondResult = mock(DatabricksResultSet.class); + StatementId firstStatementId = new StatementId("failing-stmt-id"); + + // closeStatement throws (e.g., operation already expired on server) + doThrow(new DatabricksSQLException("Operation not found", "HY000")) + .when(client) + .closeStatement(eq(firstStatementId)); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any())) + .thenReturn(firstResult) + .thenReturn(secondResult); + + statement.executeQuery(STATEMENT); + statement.setStatementId(firstStatementId); + + // Re-execution should succeed even though closing previous operation failed + assertDoesNotThrow(() -> statement.executeQuery(STATEMENT)); + assertEquals(secondResult, statement.getResultSet()); + } + + @Test + public void testReExecutionHandlesTransportErrorGracefully() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksConnection connection = new DatabricksConnection(connectionContext, client); + DatabricksStatement statement = new DatabricksStatement(connection); + + DatabricksResultSet firstResult = mock(DatabricksResultSet.class); + DatabricksResultSet secondResult = mock(DatabricksResultSet.class); + StatementId firstStatementId = new StatementId("transport-error-stmt-id"); + + // closeStatement throws a transport-level error (e.g., unexpected server response, + // corrupted framed transport). This is the scarier failure mode — not just "not found" + // but a low-level I/O error that could corrupt shared transport state. + doThrow(new RuntimeException("HTTP request failed by code: 500, unexpected response")) + .when(client) + .closeStatement(eq(firstStatementId)); + + when(client.executeStatement( + eq(STATEMENT), + eq(new Warehouse(WAREHOUSE_ID)), + eq(new HashMap<>()), + eq(StatementType.QUERY), + any(IDatabricksSession.class), + eq(statement), + any())) + .thenReturn(firstResult) + .thenReturn(secondResult); + + statement.executeQuery(STATEMENT); + statement.setStatementId(firstStatementId); + + // Re-execution must succeed even with transport-level close failure. + // The new execution creates a fresh server operation with a new statementId. + assertDoesNotThrow(() -> statement.executeQuery(STATEMENT)); + assertEquals(secondResult, statement.getResultSet()); + } + @Test public void testAsyncExecutionResetsStateFromPreviousSyncExecution() throws Exception { IDatabricksConnectionContext connectionContext =