diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 5d95473f2..a78b8237e 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -13,6 +13,7 @@ 4. **Native geospatial type support (`GEOMETRY` and `GEOGRAPHY`) is now enabled by default.** `getObject()` now returns `IGeometry`/`IGeography` instances instead of EWKT strings. Set `EnableGeoSpatialSupport=0` to restore the previous behavior. ### Added +- Added result set heartbeat / keep-alive to prevent server-side result expiry during slow consumption. When enabled via `EnableHeartbeat=1`, the driver periodically polls `GetStatementStatus` (SEA) or `GetOperationStatus` (Thrift) to keep the operation alive while the client reads results. Configurable interval via `HeartbeatIntervalSeconds` (default 60s). Heartbeat automatically stops when results are fully consumed, ResultSet is closed, or the server returns a terminal state. Disabled by default due to cost implications (heartbeats keep the warehouse running). ### Updated - `EnableGeoSpatialSupport` no longer requires `EnableComplexDatatypeSupport=1`. Geospatial types (GEOMETRY, GEOGRAPHY) can now be enabled independently of complex type support (ARRAY, MAP, STRUCT). diff --git a/docs/design/HEARTBEAT_KEEP_ALIVE.md b/docs/design/HEARTBEAT_KEEP_ALIVE.md new file mode 100644 index 000000000..0d531c48d --- /dev/null +++ b/docs/design/HEARTBEAT_KEEP_ALIVE.md @@ -0,0 +1,535 @@ +# Design: Result Set Heartbeat / Keep-Alive for Ongoing Query Executions + +**JIRA:** PECOBLR-2321 +**Author:** Gopal Lal +**Date:** 2026-04-21 +**Status:** DRAFT + +--- + +## Problem Statement + +When a user executes a query and reads results slowly (e.g., processes each row with pauses of minutes or hours), the server-side resources backing those results can expire: + +1. **Warehouse auto-stop**: The cluster shuts down after its configured idle timeout (e.g., 10-60 minutes of no activity), destroying any in-memory state +2. **Operation handle expiry**: The server evicts idle operations after a timeout +3. **Inline result loss**: For inline results (data returned directly in the execute response, not uploaded to cloud storage), the data exists only on the cluster — when the cluster stops, the results are permanently lost +4. **Presigned URL expiry**: Cloud fetch URLs expire in 5-15 minutes (refreshable via re-fetch, but requires the operation to still be alive) + +The user sees errors like `INVALID_HANDLE_STATUS`, connection timeouts, or "operation not found" when they attempt to read the next row after a pause. + +## Requirements + +1. **Keep results alive**: Periodically signal the server that the client is still consuming results, preventing premature resource cleanup +2. **Minimize cost impact**: Heartbeats keep the warehouse running, which costs money. Must be opt-in or clearly documented, and stopped immediately when no longer needed +3. **Zero resource leaks**: Heartbeat tasks must stop when results are fully consumed, ResultSet is closed, Statement is closed, or Connection is closed — any leak means the customer pays for an idle cluster indefinitely +4. **Both protocols**: Work for both SEA (SQL Execution API) and Thrift execution paths +5. **Configurable**: Users can enable/disable the feature and control the interval +6. **Inline results priority**: Most critical for inline results where data is not persisted to cloud storage + +## Non-Goals + +- Extending the 1-hour result data retention limit in the SEA API (this is a server-side hard limit that cannot be extended by client polling) +- Implementing full result materialization to client-side storage +- Changing server-side timeout configurations + +--- + +## Background: What Keeps Results Alive? + +### SEA (SQL Execution API) + +- **Statement keep-alive**: Polling via `GET /api/2.1/sql/statements/{id}` keeps the statement alive. The API documentation states: "To guarantee that the statement is kept alive, you must poll at least once every 15 minutes." +- **Result retention**: Results are available for **1 hour** after the query succeeds. Polling does NOT extend this. This is a hard server-side limit. +- **Presigned URLs**: External link URLs expire in <= 15 minutes but can be refreshed by calling `GetStatementResultChunkN`. + +### Thrift (HiveServer2 protocol) + +- **Operation keep-alive**: The server maintains `lastAccessTime` for each operation. `FetchResults` RPCs constitute activity and reset the idle timer. `GetOperationStatus` may or may not reset it depending on server configuration. +- **Idle operation timeout**: Configured server-side (typically 5 minutes to 1 day). Operations in terminal state (FINISHED) are subject to eviction. + +### What Currently Happens + +The driver has **no background heartbeat**. Result fetching is demand-driven: +- `ResultSet.next()` triggers the next fetch (Thrift `FetchResults` or SEA chunk download) +- If the user pauses between `next()` calls longer than the server timeout, the operation expires +- For cloud fetch results, the `StreamingChunkProvider` prefetches chunks in the background, which acts as implicit heartbeat — but only while there are unfetched chunks +- For inline results, there is no background activity at all after the initial response + +--- + +## Industry Patterns + +### Cross-Driver Survey + +Research across Databricks' own driver ecosystem and open-source analytical JDBC drivers: + +| Driver | Heartbeat During Result Consumption? | API Called | Default Interval | Configurable? | +|--------|--------------------------------------|-----------|-----------------|---------------| +| **Databricks ADBC (C#)** | **YES** | `GetOperationStatus` (Thrift) | 60s | Yes: `adbc.databricks.fetch_heartbeat_interval` | +| **Databricks Python** | No | N/A | N/A | N/A | +| **Databricks Go** | No | N/A | N/A | N/A | +| **Databricks Node.js** | No | N/A | N/A | N/A | +| **Databricks JDBC (this driver)** | No | N/A | N/A | N/A | + +### Reference Implementation: Databricks ADBC C# Driver + +The ADBC C# driver (`adbc-drivers/databricks`) is the **only** Databricks driver with a heartbeat. Key design details: + +- **Class**: `DatabricksOperationStatusPoller` implementing `IOperationStatusPoller` +- **Protocol**: Thrift only (no SEA equivalent) +- **RPC**: `TCLIService.GetOperationStatus(TGetOperationStatusReq)` — lightweight status check +- **Default interval**: 60 seconds, configurable via `adbc.databricks.fetch_heartbeat_interval` +- **Request timeout**: 30 seconds per poll, configurable via `adbc.databricks.operation_status_request_timeout` +- **Mechanism**: `Task.Run()` with async loop + `CancellationTokenSource` for clean shutdown +- **Started**: In `DatabricksCompositeReader` constructor when `HasMoreRows` is true or unknown +- **Stopped**: On end of results (null batch), Dispose, terminal state, max failures, or cancellation +- **Error resilience**: Max 10 consecutive failures (~10 min at default interval) before self-stop; single success resets counter +- **Terminal states**: CANCELED, ERROR, CLOSED, TIMEDOUT, UNKNOWN → stop polling +- **FINISHED state**: Continues polling (operation complete but client still reading) +- **No disable toggle**: Must set large interval as workaround +- **No SEA support**: REST API path does not implement heartbeat + +**Key lessons from the reference implementation:** +1. The heartbeat is Thrift-only — the SEA API has different lifecycle semantics +2. Error resilience is critical — a single transient error (e.g., TLS recycling) should not permanently kill the heartbeat (fixed in their PR #372 after a customer incident) +3. The heartbeat continues even after the query is FINISHED — the goal is to keep the operation handle alive while the client reads results, not to wait for execution +4. No explicit disable parameter exists — our design should add one since heartbeats have cost implications + +### General Industry Patterns + +| Pattern | Adoption | Effectiveness | This Driver | +|---------|----------|--------------|-------------| +| **Demand-driven fetch (implicit heartbeat)** | Universal | Works only while chunks remain to fetch | Already implemented via `StreamingChunkProvider` | +| **Background heartbeat polling** | One driver (ADBC C#) | Keeps operation alive during slow consumption | Not implemented | +| **Eager cloud storage prefetch** | Universal for cloud fetch | Results survive cluster stop (data in cloud) | Already implemented | +| **Result materialization to client disk** | None found | Would solve the problem completely | Too complex, memory/disk concerns | + +--- + +## Design + +### Sequence Diagram: Normal Flow (Happy Path) + +```mermaid +sequenceDiagram + participant User + participant Statement + participant ResultSet + participant HeartbeatMgr as ResultHeartbeatManager + participant Server + + User->>Statement: executeQuery(sql) + Statement->>Server: ExecuteStatement + Server-->>Statement: Results (statementId) + Statement->>ResultSet: create(results) + Statement->>HeartbeatMgr: startHeartbeat(statementId) + HeartbeatMgr->>HeartbeatMgr: schedule task every 60s + + loop User reads slowly + User->>ResultSet: next() + ResultSet-->>User: row data + end + + Note over HeartbeatMgr,Server: Heartbeat fires during pauses + HeartbeatMgr->>Server: GetStatementStatus / GetOperationStatus + Server-->>HeartbeatMgr: FINISHED (still alive) + HeartbeatMgr->>Server: GetStatementStatus / GetOperationStatus + Server-->>HeartbeatMgr: FINISHED (still alive) + + User->>ResultSet: next() + ResultSet-->>User: false (end of results) + ResultSet->>HeartbeatMgr: stopHeartbeat(statementId) + + User->>ResultSet: close() + ResultSet->>Statement: handleResultSetClose() +``` + +### Sequence Diagram: Close During Slow Consumption + +```mermaid +sequenceDiagram + participant User + participant ResultSet + participant HeartbeatMgr as ResultHeartbeatManager + participant Server + + Note over HeartbeatMgr,Server: Heartbeat running every 60s + HeartbeatMgr->>Server: GetStatementStatus + Server-->>HeartbeatMgr: FINISHED + + User->>ResultSet: close() + ResultSet->>HeartbeatMgr: stopHeartbeat(statementId) + HeartbeatMgr->>HeartbeatMgr: cancel scheduled task + ResultSet->>Server: closeStatement(statementId) + Note over HeartbeatMgr: No more heartbeats — clean shutdown +``` + +### Sequence Diagram: Server-Side Expiry + +```mermaid +sequenceDiagram + participant User + participant ResultSet + participant HeartbeatMgr as ResultHeartbeatManager + participant Server + + HeartbeatMgr->>Server: GetStatementStatus + Server-->>HeartbeatMgr: CLOSED (operation expired) + HeartbeatMgr->>HeartbeatMgr: stop self (terminal state) + + User->>ResultSet: next() + ResultSet->>Server: fetch results + Server-->>ResultSet: ERROR (not found) + ResultSet-->>User: SQLException +``` + +### Sequence Diagram: Transient Errors with Recovery + +```mermaid +sequenceDiagram + participant HeartbeatMgr as ResultHeartbeatManager + participant Server + + HeartbeatMgr->>Server: GetStatementStatus + Server--xHeartbeatMgr: Connection error (failure 1/10) + Note over HeartbeatMgr: consecutiveFailures = 1 + + HeartbeatMgr->>Server: GetStatementStatus + Server--xHeartbeatMgr: Timeout (failure 2/10) + Note over HeartbeatMgr: consecutiveFailures = 2 + + HeartbeatMgr->>Server: GetStatementStatus + Server-->>HeartbeatMgr: FINISHED (success!) + Note over HeartbeatMgr: consecutiveFailures = 0 (reset) + + HeartbeatMgr->>Server: GetStatementStatus + Server-->>HeartbeatMgr: FINISHED +``` + +### Heartbeat State Machine + +```mermaid +stateDiagram-v2 + [*] --> Idle: Driver initialized + + Idle --> Scheduled: startHeartbeat()\n[EnableHeartbeat=true\nAND result has more rows\nAND not direct results] + + Scheduled --> Polling: timer fires (every 60s) + + Polling --> Scheduled: Server returns\nFINISHED / RUNNING / PENDING + Polling --> Scheduled: Transient error\n[consecutiveFailures < 10] + + Polling --> Stopped: Server returns\nCLOSED / ERROR / CANCELED\n/ TIMEDOUT / UNKNOWN + Polling --> Stopped: consecutiveFailures >= 10 + + Scheduled --> Stopped: stopHeartbeat()\n[ResultSet.close() OR\nStatement.close() OR\nnext() returns false] + + Scheduled --> Stopped: shutdown()\n[Connection.close()] + + Stopped --> [*] +``` + +### Result Type Eligibility + +```mermaid +flowchart TD + A[Query executed successfully] --> B{EnableHeartbeat\n= true?} + B -->|No| Z[No heartbeat] + B -->|Yes| C{Direct results?\ndirectResultsReceived} + C -->|Yes| Z + C -->|No| D{Result type?} + D -->|SEA inline| E[START heartbeat\nvia GetStatementStatus\nHighest priority - data on cluster only] + D -->|Thrift inline| F[START heartbeat\nvia GetOperationStatus\nHighest priority - data on cluster only] + D -->|SEA cloud fetch| G[START heartbeat\nvia GetStatementStatus\nKeeps statement alive for URL refresh] + D -->|Thrift cloud fetch| H[START heartbeat\nvia GetOperationStatus\nKeeps operation alive for URL refresh] +``` + +### Component Architecture + +```mermaid +classDiagram + class DatabricksConnection { + -ResultHeartbeatManager heartbeatManager + +close() stops all heartbeats + } + + class DatabricksStatement { + +executeInternal() triggers heartbeat start + +close() triggers heartbeat stop + } + + class DatabricksResultSet { + +next() stops heartbeat when returns false + +close() stops heartbeat + } + + class ResultHeartbeatManager { + -ScheduledExecutorService scheduler + -Map~StatementId, ScheduledFuture~ activeHeartbeats + +startHeartbeat(StatementId, HeartbeatTask) + +stopHeartbeat(StatementId) + +shutdown() + } + + class HeartbeatTask { + <> + +run() sends heartbeat RPC + } + + class SeaHeartbeatTask { + -DatabricksSdkClient client + -StatementId statementId + +run() GET /sql/statements/id + } + + class ThriftHeartbeatTask { + -DatabricksThriftAccessor accessor + -TOperationHandle handle + +run() GetOperationStatus + } + + DatabricksConnection --> ResultHeartbeatManager + DatabricksStatement --> ResultHeartbeatManager + DatabricksResultSet --> ResultHeartbeatManager + ResultHeartbeatManager --> HeartbeatTask + HeartbeatTask <|.. SeaHeartbeatTask + HeartbeatTask <|.. ThriftHeartbeatTask +``` + +### New Component: `ResultHeartbeatManager` + +A lightweight manager that schedules periodic heartbeat RPCs for active result sets. + +```java +/** + * Schedules periodic heartbeat RPCs to keep server-side result state alive + * while the client is consuming results slowly. Uses a shared + * ScheduledExecutorService (daemon threads) to minimize resource overhead. + */ +class ResultHeartbeatManager { + + private final ScheduledExecutorService scheduler; // shared across connection + private final Map> activeHeartbeats; + + /** Start heartbeat for a statement. Called after successful execution. */ + void startHeartbeat(StatementId id, HeartbeatTask task); + + /** Stop heartbeat for a statement. Called on ResultSet.close(), Statement.close(), + * or when all results have been fetched. */ + void stopHeartbeat(StatementId id); + + /** Stop all heartbeats. Called on Connection.close(). */ + void shutdown(); +} +``` + +### Heartbeat Task + +The heartbeat task is protocol-specific: + +**SEA**: `GetStatementStatus` — `GET /api/2.1/sql/statements/{statementId}` (lightweight status check, already implemented in `DatabricksSdkClient` line 276-280). The API documentation states: "To guarantee that the statement is kept alive, you must poll at least once every 15 minutes." This makes heartbeat particularly valuable for SEA. + +**Thrift**: `GetOperationStatus(operationHandle)` (lightweight status check, already implemented in `DatabricksThriftAccessor.getOperationStatus()`) + +The task: +1. Sends the status check RPC +2. If the server responds with a terminal state (CLOSED, ERROR, CANCELED, TIMEDOUT, UNKNOWN), stops the heartbeat and logs a warning +3. If the server responds with FINISHED state, **continues** polling (operation complete but client still reading) +4. If the RPC fails (connection error, timeout), increments a consecutive failure counter. After **10 consecutive failures** (~10 minutes at default interval), stops the heartbeat. A single success resets the counter. (Learned from ADBC C# PR #372 — a single transient error like TLS recycling should not permanently kill the heartbeat) +5. Does NOT throw exceptions — failures are logged, not propagated to the user's thread + +### When Heartbeat Starts and Stops + +The heartbeat only starts **after** the main thread's execution polling completes and the ResultSet is constructed. During query execution, the driver's own polling loop (200ms interval) keeps the operation alive — the heartbeat is not needed and does not run during this phase. + +``` +Execution phase (driver polls): Result consumption phase (heartbeat polls): + executeQuery() rs = getResultSet() + → client.executeStatement() heartbeat starts ──────────────┐ + → poll every 200ms ──────────┐ │ + ← SUCCEEDED │ rs.next() ... pause ... │ heartbeat 60s + ← ResultSet created ───────────┘ rs.next() ... pause ... │ heartbeat 60s + rs.close() ───────────────────┘ heartbeat stops +``` + +### Async Execution: No Heartbeat During Wait + +For `executeAsync()`, the user controls polling via `getExecutionResult()`. The heartbeat does **not** run between `executeAsync()` and `getExecutionResult()` because: + +1. The user opted into async precisely to control timing +2. Automatic heartbeat would keep the warehouse alive even if the user abandoned the query +3. The user has the poll interface — they are responsible for calling `getExecutionResult()` before the statement expires + +The heartbeat starts only when the ResultSet is available and the user begins consuming results: + +``` +stmt.executeAsync(sql) ← returns immediately, no heartbeat + ... user does other work ... ← no heartbeat (user's responsibility to poll) +rs = stmt.getExecutionResult() ← ResultSet created, heartbeat starts +rs.next() ... pause ... ← heartbeat keeps alive +rs.close() ← heartbeat stops +``` + +### Lifecycle + +``` +Statement.execute() / getExecutionResult() + └─▶ ResultSet created + └─▶ If heartbeat enabled AND result type is eligible: + startHeartbeat(statementId, heartbeatTask) + +ResultSet.next() returns false (all results consumed) + └─▶ stopHeartbeat(statementId) + +ResultSet.close() + └─▶ stopHeartbeat(statementId) + +Statement.close() + └─▶ stopHeartbeat(statementId) // safety net + +Connection.close() + └─▶ heartbeatManager.shutdown() // kills all heartbeats +``` + +### Which Result Types Need Heartbeat? + +| Result Type | Cloud Persisted? | Needs Heartbeat? | Reason | +|-------------|-----------------|-------------------|--------| +| SEA inline (JSON) | No | **No** | All data loaded into memory at construction (InlineJsonResult) | +| SEA cloud fetch (Arrow) | Yes | **Yes** | Statement must stay alive for URL refresh | +| Thrift inline (columnar) | No | **Yes** | Data fetched on-demand from server; server can evict | +| Thrift cloud fetch | Yes | **Yes** | Operation handle must stay alive for URL refresh | +| Direct results (CLOSED state) | N/A | **No** | Server already closed the operation; data fully delivered | +| Update count (DML) | N/A | **No** | No result rows; execution polling already kept it alive | + +### Note: Cloud Fetch Prefetch Interaction + +For cloud fetch result types (SEA Arrow, Thrift cloud fetch), the `StreamingChunkProvider` and `RemoteChunkProvider` already make background RPCs to download chunks and refresh presigned URLs. These background fetches act as an **implicit heartbeat** — each `FetchResults` or `GetStatementResultChunkN` RPC constitutes server activity. + +When both the prefetch threads and the explicit heartbeat are active, the heartbeat is technically redundant. However, we still enable it for cloud fetch because: +1. The prefetch threads stop once all chunks are downloaded — the heartbeat continues during the gap between "all chunks downloaded" and "user finishes reading" +2. If the prefetch is paused (e.g., sliding window full, waiting for user to consume), the heartbeat fills the gap +3. The heartbeat cost is minimal (one lightweight GET/status check per minute) + +### Configuration + +New connection parameters: + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `EnableHeartbeat` | boolean | `false` | Enable periodic heartbeat for active result sets | +| `HeartbeatIntervalSeconds` | int | `60` | Interval between heartbeat RPCs (aligned with ADBC C# default) | +| `HeartbeatRequestTimeoutSeconds` | int | `30` | Timeout for each heartbeat RPC | + +Default is **disabled** because: +- Heartbeats keep the warehouse running, increasing cost +- Most users consume results quickly +- Cloud fetch results already survive cluster stop +- Users who know they'll read slowly can opt in + +**Design choice: explicit disable toggle.** The ADBC C# driver has no way to disable the heartbeat (must set a large interval as workaround). We add `EnableHeartbeat` as an explicit boolean for clarity — cost implications should be a conscious opt-in decision. + +### Thread Management + +Follows existing codebase patterns: + +- **Shared `ScheduledExecutorService`** per connection (like `TelemetryClient`) +- **Daemon threads** (won't prevent JVM shutdown) +- **Named thread factory**: `databricks-jdbc-heartbeat-{connectionId}` +- **Single thread**: heartbeats are lightweight; one thread handles all active result sets for a connection +- **Cleanup order**: ResultSet.close() → stopHeartbeat → (if last) scheduler stays alive for reuse → Connection.close() → scheduler.shutdown() + +### Cleanup Guarantees (Zero Leak) + +The heartbeat is stopped in **four** places to guarantee no leaks: + +1. **All results consumed**: `ResultSet.next()` returns false → stop +2. **ResultSet.close()**: explicit user close → stop +3. **Statement.close()**: closes ResultSet → triggers #2 → stop +4. **Connection.close()**: `heartbeatManager.shutdown()` → cancels ALL heartbeats + +Additionally: +- If the heartbeat RPC itself fails (server gone, operation expired), the heartbeat self-stops +- Daemon threads ensure JVM can exit even if cleanup is missed +- `ScheduledFuture.cancel(false)` is used (does not interrupt running heartbeat, waits for current one to finish) + +### Error Handling + +| Scenario | Behavior | +|----------|----------| +| Heartbeat RPC returns CLOSED/ERROR state | Stop heartbeat, log warning at INFO level | +| Heartbeat RPC fails (network error) | Retry once after 30s; if still fails, stop heartbeat, log warning | +| Heartbeat RPC returns success | Continue scheduling next heartbeat | +| ResultSet.close() called during heartbeat RPC | `cancel(false)` waits for RPC to finish, then stops | +| Connection.close() with active heartbeats | `scheduler.shutdownNow()` interrupts all | + +--- + +## Implementation Plan + +### Phase 1: Core Infrastructure + +1. **`ResultHeartbeatManager`**: Shared per-connection manager with start/stop/shutdown +2. **`HeartbeatTask` interface**: Protocol-specific implementations for SEA and Thrift +3. **Connection parameters**: `EnableHeartbeat`, `HeartbeatIntervalSeconds` +4. **Integration points**: Hook into `DatabricksResultSet` constructor, `close()`, and `next()` return-false path + +### Phase 2: Protocol Implementation + +5. **SEA heartbeat**: `DatabricksSdkClient.getStatementStatus(statementId)` — lightweight GET +6. **Thrift heartbeat**: `DatabricksThriftAccessor.getOperationStatus(operationHandle)` — lightweight RPC +7. **Direct results skip**: No heartbeat when `directResultsReceived` is true + +### Phase 3: Testing + +8. **Unit tests**: Verify lifecycle (start, stop on close, stop on all-consumed, stop on error) +9. **Integration tests**: Verify heartbeat keeps results alive across a simulated pause +10. **Leak tests**: Verify no threads leak after ResultSet/Statement/Connection close + +### Phase 4: Documentation + +11. **Connection parameter docs**: Document `EnableHeartbeat` and `HeartbeatIntervalSeconds` +12. **Cost implications**: Document that heartbeats keep the warehouse running + +--- + +## Alternatives Considered + +### 1. Eager Full Prefetch to Client Memory + +Fetch all results into client memory immediately after execution, eliminating server dependency. + +**Rejected**: Would cause OOM for large result sets. The existing sliding-window prefetch is the right balance. + +### 2. Result Materialization to Client Disk + +Write results to local disk as they arrive, serving reads from disk. + +**Rejected**: Adds significant complexity (disk management, temp file cleanup, serialization format). No industry precedent in JDBC drivers. + +### 3. Force Cloud Fetch for All Queries + +Configure the server to always upload results to cloud storage, even for small queries. + +**Rejected**: Server-side change outside driver's control. Would add latency for small queries. + +### 4. Heartbeat at HTTP Connection Level + +Use TCP keepalive or HTTP-level ping to keep the connection alive. + +**Rejected**: HTTP keepalive keeps the TCP connection alive, but does NOT keep the server-side operation/statement alive. The problem is application-layer resource expiry, not connection expiry. + +--- + +## Open Questions + +1. **Does `GetOperationStatus` reset the Thrift idle operation timer?** The ADBC C# driver uses `GetOperationStatus` for its heartbeat and it works in production (customers confirmed). Need to verify with the server team whether this is guaranteed behavior or implementation-dependent. + +2. **Should heartbeat be enabled by default for inline results?** Since inline results are the most vulnerable and the feature is explicitly designed for this case, it might make sense to auto-enable when the result type is inline. But this increases cost without user opt-in. The ADBC C# driver always enables it (no toggle) — we chose to make it opt-in. + +3. **SEA heartbeat included (unlike ADBC C#).** The ADBC C# driver only implements Thrift heartbeat. We include SEA heartbeat via `GetStatementStatus` because the API documentation explicitly requires polling every 15 minutes to keep the statement alive. The 1-hour result retention is a hard limit that heartbeat cannot extend, but keeping the statement alive is still valuable for URL refresh and metadata access. Our default 60s interval satisfies the 15-minute requirement with ample margin. + +4. **Should we expose a callback for heartbeat failures?** When the heartbeat detects that results have expired (terminal state from server), should we proactively set an error flag so the next `ResultSet.next()` throws a clear error instead of waiting for the fetch to fail with a cryptic server error? + +5. **Alignment with ADBC C# driver parameters.** The ADBC C# driver uses `adbc.databricks.fetch_heartbeat_interval`. Should we align our JDBC parameter names for cross-driver consistency (e.g., `FetchHeartbeatIntervalSeconds`) or use our own convention (`HeartbeatIntervalSeconds`)? diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java index 691bcc3ff..961b07ad7 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnection.java @@ -38,6 +38,7 @@ public class DatabricksConnection implements IDatabricksConnection, IDatabricksC private final Set statementSet = ConcurrentHashMap.newKeySet(); private SQLWarning warnings = null; private final IDatabricksConnectionContext connectionContext; + private final ResultHeartbeatManager heartbeatManager; /** * Creates an instance of Databricks connection for given connection context. @@ -49,6 +50,7 @@ public DatabricksConnection(IDatabricksConnectionContext connectionContext) this.connectionContext = connectionContext; DatabricksThreadContextHolder.setConnectionContext(connectionContext); this.session = new DatabricksSession(connectionContext); + this.heartbeatManager = createHeartbeatManager(connectionContext); } @VisibleForTesting @@ -58,10 +60,26 @@ public DatabricksConnection( this.connectionContext = connectionContext; DatabricksThreadContextHolder.setConnectionContext(connectionContext); this.session = new DatabricksSession(connectionContext, testDatabricksClient); + this.heartbeatManager = createHeartbeatManager(connectionContext); UserAgentManager.setUserAgent(connectionContext); TelemetryHelper.updateTelemetryAppName(connectionContext, null); } + private static ResultHeartbeatManager createHeartbeatManager( + IDatabricksConnectionContext connectionContext) { + // H6 fix: Use interface methods instead of instanceof check so mocks and + // alternate implementations can also enable heartbeat + if (connectionContext.isHeartbeatEnabled()) { + return new ResultHeartbeatManager(connectionContext.getHeartbeatIntervalSeconds()); + } + return null; + } + + /** Returns the heartbeat manager, or null if heartbeat is disabled. */ + ResultHeartbeatManager getHeartbeatManager() { + return heartbeatManager; + } + @Override public void open() throws SQLException { this.session.open(); @@ -416,6 +434,11 @@ public void rollback() throws SQLException { @Override public void close() throws SQLException { LOGGER.debug("public void close()"); + // H5 fix: Shutdown heartbeat FIRST — prevents RPCs on closing connections and + // ensures shutdown runs even if statement.close() throws + if (heartbeatManager != null) { + heartbeatManager.shutdown(); + } for (IDatabricksStatementInternal statement : statementSet) { statement.close(false); statementSet.remove(statement); diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java index 159df78f0..188939dc1 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java @@ -943,6 +943,26 @@ public boolean isTelemetryEnabled() { return getParameter(DatabricksJdbcUrlParams.ENABLE_TELEMETRY).equals("1"); } + public boolean isHeartbeatEnabled() { + return getParameter(DatabricksJdbcUrlParams.ENABLE_HEARTBEAT).equals("1"); + } + + public int getHeartbeatIntervalSeconds() { + int interval = + Integer.parseInt(getParameter(DatabricksJdbcUrlParams.HEARTBEAT_INTERVAL_SECONDS)); + if (interval <= 0) { + LOGGER.warn("HeartbeatIntervalSeconds must be positive, got {}. Using default 60.", interval); + return 60; + } + if (interval > 3600) { + LOGGER.warn( + "HeartbeatIntervalSeconds {} is very large (> 1 hour). " + + "Heartbeat may not keep the operation alive.", + interval); + } + return interval; + } + @Override public String getVolumeOperationAllowedPaths() { return getParameter( diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java index a83956528..6552903f7 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java @@ -21,6 +21,7 @@ import com.databricks.jdbc.common.Nullable; import com.databricks.jdbc.common.StatementType; import com.databricks.jdbc.common.util.WarningUtil; +import com.databricks.jdbc.dbclient.IDatabricksClient; import com.databricks.jdbc.dbclient.impl.common.StatementId; import com.databricks.jdbc.exception.DatabricksParsingException; import com.databricks.jdbc.exception.DatabricksSQLException; @@ -123,6 +124,7 @@ public DatabricksResultSet( this.cachedTelemetryCollector = resolveTelemetryCollector(parentStatement); this.isClosed = false; this.wasNull = false; + startHeartbeatIfEnabled(); } @VisibleForTesting @@ -191,6 +193,7 @@ public DatabricksResultSet( this.cachedTelemetryCollector = resolveTelemetryCollector(parentStatement); this.isClosed = false; this.wasNull = false; + startHeartbeatIfEnabled(); // C4 fix: Thrift result sets also need heartbeat } /* Constructing results for getUDTs, getTypeInfo, getProcedures metadata calls */ @@ -278,23 +281,222 @@ public DatabricksResultSet( @Override public boolean next() throws SQLException { checkIfClosed(); + if (executionResult == null) { + throw new DatabricksSQLException( + "Cannot iterate: no result data available. " + + "For async execution, call getExecutionResult() first.", + DatabricksDriverErrorCode.INVALID_STATE); + } boolean hasNext = this.executionResult.next(); if (cachedTelemetryCollector != null) { cachedTelemetryCollector.recordResultSetIteration( statementId.toSQLExecStatementId(), resultSetMetaData.getChunkCount(), hasNext); } + if (!hasNext) { + stopHeartbeat(); + } return hasNext; } @Override public void close() throws DatabricksSQLException { + stopHeartbeat(); isClosed = true; - this.executionResult.close(); + if (executionResult != null) { + executionResult.close(); + } if (parentStatement != null) { parentStatement.handleResultSetClose(this); } } + /** Starts heartbeat polling if enabled on the connection and this result set is eligible. */ + private void startHeartbeatIfEnabled() { + if (parentStatement == null || statementId == null) { + return; + } + if (!isHeartbeatEligible()) { + return; + } + + try { + // C3 fix: Use JDBC unwrap() to handle pooled connection wrappers (HikariCP, DBCP) + java.sql.Connection rawConn = parentStatement.getStatement().getConnection(); + DatabricksConnection conn; + if (rawConn instanceof DatabricksConnection) { + conn = (DatabricksConnection) rawConn; + } else if (rawConn.isWrapperFor(DatabricksConnection.class)) { + conn = rawConn.unwrap(DatabricksConnection.class); + } else { + LOGGER.debug("Cannot start heartbeat: connection is not a DatabricksConnection"); + return; + } + + ResultHeartbeatManager mgr = conn.getHeartbeatManager(); + if (mgr == null) { + return; // heartbeat not enabled + } + + // C2 fix: Capture only what the lambda needs — avoid capturing 'this' to prevent + // abandoned ResultSets from keeping the warehouse alive via heartbeat. + // Note: capturing 'client' retains a reference to the session/connection. If the + // connection is GC'd without close(), heartbeat RPCs will fail and self-stop after + // maxConsecutiveFailures (10 ticks, ~10 min at 60s interval). Acceptable tradeoff. + final IDatabricksClient client = conn.getSession().getDatabricksClient(); + final StatementId capturedStatementId = this.statementId; + final int maxConsecutiveFailures = 10; + final java.util.concurrent.atomic.AtomicInteger consecutiveFailures = + new java.util.concurrent.atomic.AtomicInteger(0); + // C1 fix: Read the stopped flag from the manager on each tick instead of pre-capturing. + // Pre-capturing caused an orphan-flag bug: startHeartbeat() internally calls + // stopHeartbeat() which removes and replaces the flag, leaving the lambda with a + // permanently-true reference. Reading from the manager each tick always gets the + // current flag. + final ResultHeartbeatManager capturedMgr = mgr; + + Runnable heartbeatTask = + () -> { + // C1 fix: read current flag each tick + java.util.concurrent.atomic.AtomicBoolean stopped = + capturedMgr.getStoppedFlag(capturedStatementId); + if (stopped.get()) { + return; // client/session may be closed, skip RPC + } + try { + boolean alive = client.checkStatementAlive(capturedStatementId); + consecutiveFailures.set(0); // reset on success + if (!alive) { + LOGGER.info( + "Heartbeat detected terminal state for statement {}, stopping", + capturedStatementId); + capturedMgr.stopHeartbeat(capturedStatementId); + } + } catch (Throwable e) { + // Catch Throwable (not just Exception) because ScheduledExecutorService + // suppresses subsequent executions on uncaught Error. Without this, an + // OOM or NoClassDefFoundError would silently kill the heartbeat. + if (e instanceof VirtualMachineError) { + // Don't swallow fatal JVM errors — stop heartbeat and let it propagate + capturedMgr.stopHeartbeat(capturedStatementId); + throw (VirtualMachineError) e; + } + // Re-read flag — may have been set during the RPC (connection closing) + if (capturedMgr.getStoppedFlag(capturedStatementId).get()) { + return; + } + // Unsupported client — stop immediately, don't retry 10 times + if (e instanceof java.sql.SQLFeatureNotSupportedException) { + LOGGER.debug( + "Heartbeat not supported by client for statement {}, stopping", + capturedStatementId); + capturedMgr.stopHeartbeat(capturedStatementId); + return; + } + int failures = consecutiveFailures.incrementAndGet(); + if (failures == 1) { + LOGGER.info( + "Heartbeat failed for statement {} (first failure): {}", + capturedStatementId, + e.getMessage()); + } else { + LOGGER.debug( + "Heartbeat failed for statement {} (failure {}/{}): {}", + capturedStatementId, + failures, + maxConsecutiveFailures, + e.getMessage()); + } + if (failures >= maxConsecutiveFailures) { + LOGGER.warn( + "Heartbeat stopped for statement {} after {} consecutive failures. " + + "Server-side results may expire. Last error: {}", + capturedStatementId, + failures, + e.getMessage()); + capturedMgr.stopHeartbeat(capturedStatementId); + } + } + }; + + mgr.startHeartbeat(capturedStatementId, heartbeatTask); + LOGGER.debug( + "Heartbeat started for statement {} (resultType={}, interval={}s)", + capturedStatementId, + resultSetType, + mgr.getIntervalSeconds()); + } catch (Exception e) { + LOGGER.debug("Failed to start heartbeat: {}", e.getMessage()); + } + } + + /** Stops the heartbeat for this result set's statement. Idempotent. */ + private void stopHeartbeat() { + if (parentStatement == null || statementId == null) { + return; + } + try { + // Use same unwrap pattern as startHeartbeatIfEnabled() for pooled connections + java.sql.Connection rawConn = parentStatement.getStatement().getConnection(); + DatabricksConnection conn; + if (rawConn instanceof DatabricksConnection) { + conn = (DatabricksConnection) rawConn; + } else if (rawConn.isWrapperFor(DatabricksConnection.class)) { + conn = rawConn.unwrap(DatabricksConnection.class); + } else { + return; + } + ResultHeartbeatManager mgr = conn.getHeartbeatManager(); + if (mgr != null) { + mgr.stopHeartbeat(statementId); + } + } catch (Exception e) { + LOGGER.debug("Failed to stop heartbeat: {}", e.getMessage()); + } + } + + /** + * Determines whether this result set is eligible for heartbeat polling. Package-visible for + * testing. + * + *

Heartbeat is NOT needed when: + * + *

    + *
  • No execution result (nothing to fetch, also covers async PENDING/RUNNING with no data) + *
  • SEA inline (InlineJsonResult): all rows loaded in memory at construction + *
  • Update count (DML): no result rows to keep alive + *
  • Direct results (CLOSED state): server already closed, data fully delivered + *
  • Async execution (PENDING/RUNNING): user controls polling via getExecutionResult() + *
+ */ + boolean isHeartbeatEligible() { + // No execution result — nothing to fetch + if (executionResult == null) { + return false; + } + // SEA inline — all data loaded in memory at construction + if (resultSetType == ResultSetType.SEA_INLINE) { + return false; + } + // Update count — no result rows + if (statementType == StatementType.UPDATE) { + return false; + } + // Check execution state + if (executionStatus != null) { + com.databricks.jdbc.api.ExecutionState state = executionStatus.getExecutionState(); + // Direct results — server already closed + if (state == com.databricks.jdbc.api.ExecutionState.CLOSED) { + return false; + } + // Async execution — user controls polling + if (state == com.databricks.jdbc.api.ExecutionState.PENDING + || state == com.databricks.jdbc.api.ExecutionState.RUNNING) { + return false; + } + } + return true; + } + private static TelemetryCollector resolveTelemetryCollector( IDatabricksStatementInternal parentStatement) { try { diff --git a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java index 0afe1c26f..4a2b5307f 100644 --- a/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java +++ b/src/main/java/com/databricks/jdbc/api/impl/DatabricksStatement.java @@ -172,6 +172,13 @@ public void close(boolean removeFromSession) throws DatabricksSQLException { this.connection.closeStatement(this); } DatabricksThreadContextHolder.clearStatementInfo(); + // Safety net: stop any heartbeat for this statement + if (statementId != null) { + ResultHeartbeatManager mgr = connection.getHeartbeatManager(); + if (mgr != null) { + mgr.stopHeartbeat(statementId); + } + } shutDownExecutor(); this.updateCount = -1; this.isClosed = true; @@ -246,6 +253,15 @@ public void cancel() throws SQLException { LOGGER.debug("public void cancel()"); checkIfClosed(); + // H11 fix: Stop heartbeat on cancel — server operation is being cancelled, + // no point continuing to poll it + if (statementId != null) { + ResultHeartbeatManager mgr = connection.getHeartbeatManager(); + if (mgr != null) { + mgr.stopHeartbeat(statementId); + } + } + if (statementId != null && !directResultsReceived) { this.connection.getSession().getDatabricksClient().cancelStatement(statementId); DatabricksThreadContextHolder.clearStatementInfo(); @@ -672,6 +688,8 @@ public ResultSet executeAsync(String sql) throws SQLException { LOGGER.debug("ResultSet executeAsync() for statement {%s}", sql); checkIfClosed(); + // No heartbeat during async wait — the user controls polling via getExecutionResult(). + // Heartbeat starts later when the ResultSet is constructed (after getExecutionResult()). resetForNewExecution(); IDatabricksClient client = connection.getSession().getDatabricksClient(); @@ -992,6 +1010,16 @@ private void resetForNewExecution() { } } + // Stop heartbeat for the previous execution before clearing state. + // Without this, the old heartbeat (keyed by old statementId) would fail and self-terminate + // after 10 consecutive failures — wasteful and noisy in logs. + if (statementId != null) { + ResultHeartbeatManager mgr = connection.getHeartbeatManager(); + if (mgr != null) { + mgr.stopHeartbeat(statementId); + } + } + directResultsReceived = false; // Per JDBC spec, re-executing a Statement implicitly closes the current ResultSet. diff --git a/src/main/java/com/databricks/jdbc/api/impl/ResultHeartbeatManager.java b/src/main/java/com/databricks/jdbc/api/impl/ResultHeartbeatManager.java new file mode 100644 index 000000000..8a225e8fd --- /dev/null +++ b/src/main/java/com/databricks/jdbc/api/impl/ResultHeartbeatManager.java @@ -0,0 +1,177 @@ +package com.databricks.jdbc.api.impl; + +import com.databricks.jdbc.dbclient.impl.common.StatementId; +import com.databricks.jdbc.log.JdbcLogger; +import com.databricks.jdbc.log.JdbcLoggerFactory; +import java.util.Map; +import java.util.concurrent.*; + +/** + * Manages periodic heartbeat tasks to keep server-side result state alive while the client consumes + * results slowly. One instance per connection, shared across all statements. + * + *

Each active result set can register a heartbeat task that periodically calls + * GetStatementStatus (SEA) or GetOperationStatus (Thrift) to signal the server that the client is + * still consuming results. This prevents premature operation expiry and warehouse auto-stop. + * + *

Heartbeats are stopped when: + * + *

    + *
  • All results are consumed (ResultSet.next() returns false) + *
  • ResultSet.close() is called + *
  • Statement.close() is called (safety net) + *
  • Connection.close() calls shutdown() + *
  • The heartbeat task itself detects a terminal state or max consecutive failures + *
+ */ +class ResultHeartbeatManager { + + private static final JdbcLogger LOGGER = + JdbcLoggerFactory.getLogger(ResultHeartbeatManager.class); + + private final ScheduledExecutorService scheduler; + private final Map> activeHeartbeats = new ConcurrentHashMap<>(); + private final Map stoppedFlags = + new ConcurrentHashMap<>(); + + /** Sentinel returned by getStoppedFlag when the flag has been removed (already stopped). */ + private static final java.util.concurrent.atomic.AtomicBoolean ALREADY_STOPPED = + new java.util.concurrent.atomic.AtomicBoolean(true); + + private final int intervalSeconds; + private volatile boolean isShutdown = false; + + // Small pool to prevent one blocked heartbeat RPC from starving others on the same connection. + // A connection with multiple active statements needs concurrent heartbeat ticks. + private static final int HEARTBEAT_THREAD_POOL_SIZE = 2; + + private static final java.util.concurrent.atomic.AtomicLong MANAGER_COUNTER = + new java.util.concurrent.atomic.AtomicLong(0); + + ResultHeartbeatManager(int intervalSeconds) { + this.intervalSeconds = intervalSeconds; + long managerId = MANAGER_COUNTER.incrementAndGet(); + this.scheduler = + Executors.newScheduledThreadPool( + HEARTBEAT_THREAD_POOL_SIZE, + r -> { + Thread t = new Thread(r, "databricks-jdbc-heartbeat-" + managerId); + t.setDaemon(true); + return t; + }); + } + + /** + * Starts a periodic heartbeat for the given statement. The task runs every {@code + * intervalSeconds} after an initial delay equal to the interval. + * + * @param statementId the statement to keep alive + * @param heartbeatTask the task that sends the heartbeat RPC. Must handle its own exceptions. + */ + void startHeartbeat(StatementId statementId, Runnable heartbeatTask) { + if (isShutdown || statementId == null) { + return; + } + + // Stop any existing heartbeat for this statement (e.g., re-execution) + stopHeartbeat(statementId); + + // Create a fresh stopped flag for the new heartbeat + resetStoppedFlag(statementId); + + LOGGER.debug( + "Starting heartbeat for statement {} with interval {}s", statementId, intervalSeconds); + + ScheduledFuture future = + scheduler.scheduleWithFixedDelay( + heartbeatTask, intervalSeconds, intervalSeconds, TimeUnit.SECONDS); + activeHeartbeats.put(statementId, future); + } + + /** + * Stops the heartbeat for the given statement. Idempotent — safe to call multiple times or for + * statements that have no active heartbeat. + */ + void stopHeartbeat(StatementId statementId) { + if (statementId == null) { + return; + } + + // Set the stopped flag BEFORE canceling — prevents in-flight RPC from calling a closed client + java.util.concurrent.atomic.AtomicBoolean flag = stoppedFlags.remove(statementId); + if (flag != null) { + flag.set(true); + } + + ScheduledFuture future = activeHeartbeats.remove(statementId); + if (future != null) { + future.cancel(false); // don't interrupt if currently running + LOGGER.debug("Stopped heartbeat for statement {}", statementId); + } + } + + /** + * Returns the stopped flag for the given statement, or a sentinel ALREADY_STOPPED if the flag has + * been removed (i.e., stopHeartbeat was already called). Uses get() instead of computeIfAbsent() + * to prevent re-creating a false flag after stopHeartbeat() removed it. + */ + java.util.concurrent.atomic.AtomicBoolean getStoppedFlag(StatementId statementId) { + java.util.concurrent.atomic.AtomicBoolean flag = stoppedFlags.get(statementId); + return flag != null ? flag : ALREADY_STOPPED; + } + + /** + * Creates or resets the stopped flag for a statement. Called only from startHeartbeat() when + * setting up a new heartbeat — not from the heartbeat task itself. + */ + private void resetStoppedFlag(StatementId statementId) { + stoppedFlags.put(statementId, new java.util.concurrent.atomic.AtomicBoolean(false)); + } + + /** Stops all heartbeats and shuts down the scheduler. Called on Connection.close(). */ + void shutdown() { + isShutdown = true; + + // Set all stopped flags FIRST — prevents in-flight RPCs from calling closed clients + for (java.util.concurrent.atomic.AtomicBoolean flag : stoppedFlags.values()) { + flag.set(true); + } + stoppedFlags.clear(); + + for (Map.Entry> entry : activeHeartbeats.entrySet()) { + entry.getValue().cancel(false); + LOGGER.debug("Stopped heartbeat for statement {} during shutdown", entry.getKey()); + } + activeHeartbeats.clear(); + + // Wait for in-flight RPCs to complete. 10s gives reasonable headroom for HTTP + // timeouts to fire first (~300s connection timeout won't be an issue here since + // stopped flags are already set, so RPCs will short-circuit on next check). + // If tasks don't finish in time, shutdownNow() sends interrupts. + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.debug("Heartbeat tasks did not terminate in 10s, forcing shutdown"); + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + + LOGGER.debug("Heartbeat manager shut down"); + } + + /** Returns the number of active heartbeats. Visible for testing. */ + int getActiveHeartbeatCount() { + return activeHeartbeats.size(); + } + + boolean isShutdown() { + return isShutdown; + } + + int getIntervalSeconds() { + return intervalSeconds; + } +} diff --git a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java index 83867e267..8ca5ae0cd 100644 --- a/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java +++ b/src/main/java/com/databricks/jdbc/api/internal/IDatabricksConnectionContext.java @@ -406,6 +406,16 @@ public interface IDatabricksConnectionContext { /** Returns the timeout in seconds for metadata polling operations. 0 means no timeout. */ int getMetadataOperationTimeout(); + /** Returns whether heartbeat/keep-alive polling is enabled. */ + default boolean isHeartbeatEnabled() { + return false; + } + + /** Returns the heartbeat polling interval in seconds. */ + default int getHeartbeatIntervalSeconds() { + return 60; + } + /** Returns whether batched INSERT optimization is enabled */ boolean isBatchedInsertsEnabled(); diff --git a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java index b0c886459..568481cc8 100644 --- a/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java +++ b/src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java @@ -233,7 +233,15 @@ public enum DatabricksJdbcUrlParams { NON_ROWCOUNT_QUERY_PREFIXES( "NonRowcountQueryPrefixes", "Comma-separated list of query prefixes (like INSERT,UPDATE,DELETE) that should return result sets instead of row counts", - ""); + ""), + ENABLE_HEARTBEAT( + "EnableHeartbeat", + "Enable periodic heartbeat polling to keep server-side results alive during slow consumption", + "0"), + HEARTBEAT_INTERVAL_SECONDS( + "HeartbeatIntervalSeconds", + "Interval in seconds between heartbeat RPCs to keep results alive", + "60"); private final String paramName; private final String defaultValue; diff --git a/src/main/java/com/databricks/jdbc/dbclient/IDatabricksClient.java b/src/main/java/com/databricks/jdbc/dbclient/IDatabricksClient.java index 1c990777d..718478c17 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/IDatabricksClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/IDatabricksClient.java @@ -106,6 +106,20 @@ DatabricksResultSet executeStatementAsync( @DatabricksMetricsTimed void cancelStatement(StatementId statementId) throws DatabricksSQLException; + /** + * Checks the status of a statement without fetching results. Used for heartbeat polling to keep + * server-side operation state alive during slow result consumption. + * + * @param statementId statement to check status for + * @return true if the statement is still in a non-terminal state (alive), false if terminal + */ + default boolean checkStatementAlive(StatementId statementId) throws SQLException { + // H7 fix: Throw instead of returning false. Returning false is treated as "terminal state" + // by the heartbeat task, causing misleading "terminal state" logs. Throwing makes the + // heartbeat task count it as a failure, which is more accurate for unsupported clients. + throw new java.sql.SQLFeatureNotSupportedException("Heartbeat not supported by this client"); + } + /** * Fetches result for underlying statement-Id * diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/common/StatementId.java b/src/main/java/com/databricks/jdbc/dbclient/impl/common/StatementId.java index 1579cab94..d8c6010a5 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/common/StatementId.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/common/StatementId.java @@ -95,4 +95,9 @@ public boolean equals(Object otherStatement) { return Objects.equals(this.guid, ((StatementId) otherStatement).guid) && Objects.equals(this.secret, ((StatementId) otherStatement).secret); } + + @Override + public int hashCode() { + return Objects.hash(clientType, guid, secret); + } } diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java index ada68f693..1af31de5c 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClient.java @@ -37,6 +37,7 @@ import com.databricks.jdbc.model.core.ExternalLink; import com.databricks.jdbc.model.core.ResultData; import com.databricks.jdbc.model.core.ResultManifest; +import com.databricks.jdbc.model.core.StatementStatus; import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode; import com.databricks.sdk.WorkspaceClient; import com.databricks.sdk.core.ApiClient; @@ -414,6 +415,30 @@ public DatabricksResultSet executeStatementAsync( parentStatement); } + @Override + public boolean checkStatementAlive(StatementId typedStatementId) throws SQLException { + String statementId = typedStatementId.toSQLExecStatementId(); + // Use lightweight /status endpoint (~100 bytes) instead of full GetStatement (~21KB) + String statusPath = String.format(STATEMENT_STATUS_PATH_WITH_ID, statementId); + try { + Request req = new Request(Request.GET, statusPath, (String) null); + req.withHeaders(getHeaders("getStatementStatus")); + StatementStatus status = apiClient.execute(req, StatementStatus.class); + StatementState state = status.getState(); + // Terminal states mean the operation is no longer alive + return state != StatementState.CANCELED + && state != StatementState.CLOSED + && state != StatementState.FAILED; + } catch (Exception e) { + // H10 fix: Catch all exceptions (DatabricksError, DatabricksException, IOException, + // RuntimeException) — not just IOException. A 401 token expiry should count as a + // transient failure, not bypass error handling. + LOGGER.debug("Heartbeat check failed for statement {}: {}", statementId, e.getMessage()); + throw new DatabricksSQLException( + "Heartbeat status check failed", e, DatabricksDriverErrorCode.SDK_CLIENT_ERROR); + } + } + @Override public DatabricksResultSet getStatementResult( StatementId typedStatementId, diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/PathConstants.java b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/PathConstants.java index 45713d3c8..817558508 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/PathConstants.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/sqlexec/PathConstants.java @@ -8,6 +8,7 @@ public class PathConstants { public static final String STATEMENT_PATH = BASE_PATH + "statements/"; public static final String STATEMENT_PATH_WITH_ID = STATEMENT_PATH + "%s"; public static final String CANCEL_STATEMENT_PATH_WITH_ID = STATEMENT_PATH + "%s/cancel"; + public static final String STATEMENT_STATUS_PATH_WITH_ID = STATEMENT_PATH + "%s/status"; public static final String RESULT_CHUNK_PATH = STATEMENT_PATH_WITH_ID + "/result/chunks/%s"; public static final String TELEMETRY_PATH = "/telemetry-ext"; public static final String TELEMETRY_PATH_UNAUTHENTICATED = "/telemetry-unauth"; diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java index a8fc8623e..cc3b07767 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftAccessor.java @@ -1030,7 +1030,8 @@ private TimeoutHandler getTimeoutHandler( internalErrorCode); } - private TGetOperationStatusResp getOperationStatus( + // Package-visible for heartbeat access from DatabricksThriftServiceClient + TGetOperationStatusResp getOperationStatus( TGetOperationStatusReq statusReq, StatementId statementId) throws TException { long operationStatusStartTime = System.nanoTime(); TGetOperationStatusResp operationStatus = getThriftClient().GetOperationStatus(statusReq); diff --git a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java index 0057c7faa..b58c52474 100644 --- a/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java +++ b/src/main/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClient.java @@ -42,6 +42,7 @@ import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; +import org.apache.thrift.TException; public class DatabricksThriftServiceClient implements IDatabricksClient, IDatabricksMetadataClient { @@ -269,6 +270,35 @@ private TExecuteStatementReq getRequest( return request; } + @Override + public boolean checkStatementAlive(StatementId statementId) throws DatabricksSQLException { + LOGGER.debug("Heartbeat check for statement {} using Thrift client", statementId); + DatabricksThreadContextHolder.setStatementId(statementId); + try { + TGetOperationStatusReq statusReq = + new TGetOperationStatusReq() + .setOperationHandle(getOperationHandle(statementId)) + .setGetProgressUpdate(false); + TGetOperationStatusResp resp = thriftAccessor.getOperationStatus(statusReq, statementId); + TOperationState state = resp.getOperationState(); + if (state == null) { + LOGGER.warn( + "Heartbeat for statement {} received null operation state, assuming alive", + statementId); + return true; // assume alive — server returned response but no state + } + // Terminal states mean the operation is no longer alive + return state != TOperationState.CANCELED_STATE + && state != TOperationState.CLOSED_STATE + && state != TOperationState.ERROR_STATE + && state != TOperationState.TIMEDOUT_STATE; + } catch (TException e) { + LOGGER.debug("Heartbeat check failed for statement {}: {}", statementId, e.getMessage()); + throw new DatabricksSQLException( + "Heartbeat status check failed", e, DatabricksDriverErrorCode.INVALID_STATE); + } + } + @Override public void closeStatement(StatementId statementId) throws DatabricksSQLException { LOGGER.debug( diff --git a/src/test/java/com/databricks/jdbc/api/impl/DatabricksConnectionContextTest.java b/src/test/java/com/databricks/jdbc/api/impl/DatabricksConnectionContextTest.java index fe1aaa4e9..79bcc5695 100644 --- a/src/test/java/com/databricks/jdbc/api/impl/DatabricksConnectionContextTest.java +++ b/src/test/java/com/databricks/jdbc/api/impl/DatabricksConnectionContextTest.java @@ -1783,4 +1783,81 @@ public void testTreatCatalogAsPattern1_withExplicitSEA_treatCatalogStillTrue() assertEquals(DatabricksClientType.SEA, ctx.getClientType()); assertTrue(ctx.treatMetadataCatalogNameAsPattern()); } + + // ========================================================================= + // Heartbeat configuration + // ========================================================================= + + @Test + public void testHeartbeatDisabledByDefault() throws DatabricksSQLException { + IDatabricksConnectionContext ctx = + DatabricksConnectionContext.parse(TestConstants.VALID_URL_1, properties); + assertFalse(ctx.isHeartbeatEnabled()); + } + + @Test + public void testHeartbeatEnabled() throws DatabricksSQLException { + IDatabricksConnectionContext ctx = + DatabricksConnectionContext.parse( + TestConstants.VALID_URL_1 + ";EnableHeartbeat=1", properties); + assertTrue(ctx.isHeartbeatEnabled()); + } + + @Test + public void testHeartbeatIntervalDefault() throws DatabricksSQLException { + IDatabricksConnectionContext ctx = + DatabricksConnectionContext.parse(TestConstants.VALID_URL_1, properties); + assertEquals(60, ctx.getHeartbeatIntervalSeconds()); + } + + @Test + public void testHeartbeatIntervalCustom() throws DatabricksSQLException { + IDatabricksConnectionContext ctx = + DatabricksConnectionContext.parse( + TestConstants.VALID_URL_1 + ";HeartbeatIntervalSeconds=30", properties); + assertEquals(30, ctx.getHeartbeatIntervalSeconds()); + } + + @Test + public void testHeartbeatIntervalZeroDefaultsTo60() throws DatabricksSQLException { + IDatabricksConnectionContext ctx = + DatabricksConnectionContext.parse( + TestConstants.VALID_URL_1 + ";HeartbeatIntervalSeconds=0", properties); + assertEquals(60, ctx.getHeartbeatIntervalSeconds()); + } + + @Test + public void testHeartbeatIntervalNegativeDefaultsTo60() throws DatabricksSQLException { + IDatabricksConnectionContext ctx = + DatabricksConnectionContext.parse( + TestConstants.VALID_URL_1 + ";HeartbeatIntervalSeconds=-5", properties); + assertEquals(60, ctx.getHeartbeatIntervalSeconds()); + } + + @Test + public void testHeartbeatIntervalLargeValueAcceptedWithWarning() throws DatabricksSQLException { + // Values > 3600 are accepted but log a warning + IDatabricksConnectionContext ctx = + DatabricksConnectionContext.parse( + TestConstants.VALID_URL_1 + ";HeartbeatIntervalSeconds=7200", properties); + assertEquals(7200, ctx.getHeartbeatIntervalSeconds()); + } + + @Test + public void testHeartbeatExplicitlyDisabled() throws DatabricksSQLException { + IDatabricksConnectionContext ctx = + DatabricksConnectionContext.parse( + TestConstants.VALID_URL_1 + ";EnableHeartbeat=0", properties); + assertFalse(ctx.isHeartbeatEnabled()); + } + + @Test + public void testHeartbeatInterfaceDefaultDisabled() { + // IDatabricksConnectionContext default methods + IDatabricksConnectionContext defaultCtx = + org.mockito.Mockito.mock( + IDatabricksConnectionContext.class, org.mockito.Mockito.CALLS_REAL_METHODS); + assertFalse(defaultCtx.isHeartbeatEnabled()); + assertEquals(60, defaultCtx.getHeartbeatIntervalSeconds()); + } } diff --git a/src/test/java/com/databricks/jdbc/api/impl/ResultHeartbeatManagerTest.java b/src/test/java/com/databricks/jdbc/api/impl/ResultHeartbeatManagerTest.java new file mode 100644 index 000000000..3a17952f2 --- /dev/null +++ b/src/test/java/com/databricks/jdbc/api/impl/ResultHeartbeatManagerTest.java @@ -0,0 +1,375 @@ +package com.databricks.jdbc.api.impl; + +import static org.junit.jupiter.api.Assertions.*; + +import com.databricks.jdbc.dbclient.impl.common.StatementId; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +public class ResultHeartbeatManagerTest { + + private ResultHeartbeatManager manager; + + @AfterEach + void tearDown() { + if (manager != null && !manager.isShutdown()) { + manager.shutdown(); + } + } + + // ========================================================================= + // Core lifecycle + // ========================================================================= + + @Test + void testStartAndStopHeartbeat() throws Exception { + manager = new ResultHeartbeatManager(1); + StatementId id = new StatementId("test-1"); + CountDownLatch firstExecution = new CountDownLatch(1); + + manager.startHeartbeat(id, firstExecution::countDown); + assertEquals(1, manager.getActiveHeartbeatCount()); + + assertTrue(firstExecution.await(5, TimeUnit.SECONDS), "Heartbeat should execute within 5s"); + + manager.stopHeartbeat(id); + assertEquals(0, manager.getActiveHeartbeatCount()); + } + + @Test + void testStopIsIdempotent() { + manager = new ResultHeartbeatManager(60); + StatementId id = new StatementId("test-2"); + + manager.startHeartbeat(id, () -> {}); + manager.stopHeartbeat(id); + assertDoesNotThrow(() -> manager.stopHeartbeat(id)); + assertDoesNotThrow(() -> manager.stopHeartbeat(new StatementId("nonexistent"))); + assertDoesNotThrow(() -> manager.stopHeartbeat(null)); + } + + @Test + void testShutdownCancelsAll() { + manager = new ResultHeartbeatManager(60); + manager.startHeartbeat(new StatementId("a"), () -> {}); + manager.startHeartbeat(new StatementId("b"), () -> {}); + manager.startHeartbeat(new StatementId("c"), () -> {}); + assertEquals(3, manager.getActiveHeartbeatCount()); + + manager.shutdown(); + assertEquals(0, manager.getActiveHeartbeatCount()); + assertTrue(manager.isShutdown()); + } + + @Test + void testStartAfterShutdownIsNoOp() { + manager = new ResultHeartbeatManager(60); + manager.shutdown(); + + manager.startHeartbeat(new StatementId("late"), () -> {}); + assertEquals(0, manager.getActiveHeartbeatCount()); + } + + @Test + void testNullStatementIdHandled() { + manager = new ResultHeartbeatManager(60); + assertDoesNotThrow(() -> manager.startHeartbeat(null, () -> {})); + assertDoesNotThrow(() -> manager.stopHeartbeat(null)); + assertEquals(0, manager.getActiveHeartbeatCount()); + } + + // ========================================================================= + // Re-execution replaces heartbeat + // ========================================================================= + + @Test + void testReExecutionReplacesHeartbeat() throws Exception { + manager = new ResultHeartbeatManager(1); + StatementId id = new StatementId("reuse"); + CountDownLatch firstRan = new CountDownLatch(1); + CountDownLatch secondRan = new CountDownLatch(1); + AtomicBoolean firstStillRunning = new AtomicBoolean(true); + + manager.startHeartbeat( + id, + () -> { + firstRan.countDown(); + firstStillRunning.set(true); + }); + assertTrue(firstRan.await(5, TimeUnit.SECONDS)); + + // Replace with new task + manager.startHeartbeat( + id, + () -> { + firstStillRunning.set(false); + secondRan.countDown(); + }); + assertEquals(1, manager.getActiveHeartbeatCount()); + + assertTrue(secondRan.await(5, TimeUnit.SECONDS)); + assertFalse(firstStillRunning.get(), "New heartbeat should have run, not old"); + } + + // ========================================================================= + // Heartbeat executes at interval (deterministic with latch) + // ========================================================================= + + @Test + void testHeartbeatExecutesMultipleTimes() throws Exception { + manager = new ResultHeartbeatManager(1); + CountDownLatch latch = new CountDownLatch(3); + + manager.startHeartbeat(new StatementId("interval"), latch::countDown); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "Should execute 3 times within 10s"); + } + + // ========================================================================= + // Stopped flag — prevents RPC after stop + // ========================================================================= + + @Test + void testStoppedFlagSetOnStop() { + manager = new ResultHeartbeatManager(60); + StatementId id = new StatementId("flag-test"); + + manager.startHeartbeat(id, () -> {}); + // Get flag AFTER start (start creates a fresh flag) + AtomicBoolean flag = manager.getStoppedFlag(id); + assertFalse(flag.get()); + + manager.stopHeartbeat(id); + assertTrue(flag.get(), "Stopped flag should be set after stopHeartbeat"); + } + + @Test + void testStoppedFlagSetOnShutdown() { + manager = new ResultHeartbeatManager(60); + StatementId id = new StatementId("shutdown-flag"); + + manager.startHeartbeat(id, () -> {}); + // Get flag AFTER start (start creates a fresh flag) + AtomicBoolean flag = manager.getStoppedFlag(id); + assertFalse(flag.get()); + + manager.shutdown(); + // Note: shutdown clears the map, so the flag instance may be orphaned. + // But it should have been set to true before removal. + assertTrue(flag.get(), "Stopped flag should be set after shutdown"); + } + + @Test + void testStopRacingWithScheduledTick() throws Exception { + manager = new ResultHeartbeatManager(1); + StatementId id = new StatementId("race"); + AtomicInteger rpcCount = new AtomicInteger(0); + CountDownLatch firstTick = new CountDownLatch(1); + + manager.startHeartbeat(id, () -> firstTick.countDown()); + // Get flag AFTER start + AtomicBoolean stoppedFlag = manager.getStoppedFlag(id); + + // Replace with a task that checks the stopped flag before "RPC" + manager.startHeartbeat( + id, + () -> { + if (!manager.getStoppedFlag(id).get()) { + rpcCount.incrementAndGet(); + } + firstTick.countDown(); + }); + + assertTrue(firstTick.await(5, TimeUnit.SECONDS)); + int countBeforeStop = rpcCount.get(); + assertTrue(countBeforeStop >= 1); + + // Stop — flag is set atomically before cancel + manager.stopHeartbeat(id); + assertTrue(stoppedFlag.get()); + + // Any tick that fires after stop will see the flag and skip the RPC + // We can't guarantee no tick fires, but the flag prevents the RPC + } + + // ========================================================================= + // Shutdown timeout — blocked task + // ========================================================================= + + @Test + void testShutdownWithBlockedTask() throws Exception { + manager = new ResultHeartbeatManager(1); + CountDownLatch taskStarted = new CountDownLatch(1); + CountDownLatch taskCanContinue = new CountDownLatch(1); + + manager.startHeartbeat( + new StatementId("blocked"), + () -> { + taskStarted.countDown(); + try { + // Simulate a task blocked longer than awaitTermination(5s) + taskCanContinue.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + assertTrue(taskStarted.await(5, TimeUnit.SECONDS), "Task should start"); + + // Shutdown should not hang — it awaits 5s then calls shutdownNow() + long start = System.currentTimeMillis(); + manager.shutdown(); + long elapsed = System.currentTimeMillis() - start; + + assertTrue(manager.isShutdown()); + // Shutdown should complete within ~6s (5s await + overhead), not 30s + assertTrue(elapsed < 15_000, "Shutdown should not hang, took " + elapsed + "ms"); + + taskCanContinue.countDown(); // cleanup + } + + // ========================================================================= + // Invalid interval validation + // ========================================================================= + + @Test + void testZeroIntervalDefaultsToSafe() { + // Zero interval would cause ScheduledExecutorService to throw. + // The validation is in DatabricksConnectionContext.getHeartbeatIntervalSeconds(). + // ResultHeartbeatManager itself receives a validated value. + // Test that the manager handles 1-second interval (minimum valid). + manager = new ResultHeartbeatManager(1); + CountDownLatch ran = new CountDownLatch(1); + manager.startHeartbeat(new StatementId("min-interval"), ran::countDown); + assertDoesNotThrow(() -> ran.await(5, TimeUnit.SECONDS)); + } + + // ========================================================================= + // Concurrent start for same statementId — orphaned future must not happen + // ========================================================================= + + @Test + void testConcurrentStartForSameStatementId_replacesCleanly() throws Exception { + manager = new ResultHeartbeatManager(1); + StatementId id = new StatementId("concurrent"); + AtomicInteger firstCount = new AtomicInteger(0); + AtomicInteger secondCount = new AtomicInteger(0); + CountDownLatch secondRan = new CountDownLatch(1); + + // Start first heartbeat + manager.startHeartbeat(id, firstCount::incrementAndGet); + // Immediately replace with second — first should be stopped + manager.startHeartbeat( + id, + () -> { + secondCount.incrementAndGet(); + secondRan.countDown(); + }); + + assertEquals(1, manager.getActiveHeartbeatCount(), "Only one heartbeat should be active"); + assertTrue(secondRan.await(5, TimeUnit.SECONDS), "Second heartbeat should fire"); + + // Wait a bit more, then verify second is the one running + Thread.sleep(2000); + assertTrue(secondCount.get() >= 1, "Second task should have run"); + // First task may have run once before replacement, but should not keep running + int firstSnapshot = firstCount.get(); + Thread.sleep(1500); + assertEquals(firstSnapshot, firstCount.get(), "First task should not run after replacement"); + } + + // ========================================================================= + // getStoppedFlag race with stop — flag must not be recreated after stop + // ========================================================================= + + @Test + void testGetStoppedFlagAfterStop_returnsSentinel() { + manager = new ResultHeartbeatManager(60); + StatementId id = new StatementId("sentinel-test"); + + manager.startHeartbeat(id, () -> {}); + // Flag should be false while active + assertFalse(manager.getStoppedFlag(id).get()); + + manager.stopHeartbeat(id); + // After stop, getStoppedFlag should return a true sentinel, NOT recreate a false flag + AtomicBoolean flag = manager.getStoppedFlag(id); + assertTrue(flag.get(), "Flag after stop should be true (sentinel)"); + + // Calling getStoppedFlag again should still return true — no new false flag created + assertTrue(manager.getStoppedFlag(id).get(), "Repeated call should still be true"); + } + + @Test + void testStoppedFlagRaceWithScheduledTick_noLeakedRpc() throws Exception { + manager = new ResultHeartbeatManager(1); + StatementId id = new StatementId("race-sentinel"); + AtomicInteger rpcCount = new AtomicInteger(0); + CountDownLatch firstTick = new CountDownLatch(1); + + // Task checks stopped flag from manager each tick (like production code) + manager.startHeartbeat( + id, + () -> { + AtomicBoolean stopped = manager.getStoppedFlag(id); + if (!stopped.get()) { + rpcCount.incrementAndGet(); + } + firstTick.countDown(); + }); + + assertTrue(firstTick.await(5, TimeUnit.SECONDS), "First tick should fire"); + int countBeforeStop = rpcCount.get(); + assertTrue(countBeforeStop >= 1, "At least one RPC should have fired"); + + // Stop — after this, getStoppedFlag should return sentinel true + manager.stopHeartbeat(id); + assertTrue(manager.getStoppedFlag(id).get(), "Flag should be true after stop"); + + // Wait and verify no more RPCs fire + Thread.sleep(2000); + assertEquals( + countBeforeStop, + rpcCount.get(), + "No RPCs should fire after stop (sentinel prevents recreation)"); + } + + // ========================================================================= + // Heartbeat continues past cancel to close — verify no leaked RPCs + // ========================================================================= + + @Test + void testStopThenShutdown_noLeakedRpcs() throws Exception { + manager = new ResultHeartbeatManager(1); + StatementId id = new StatementId("cancel-to-close"); + AtomicInteger rpcCount = new AtomicInteger(0); + CountDownLatch firstTick = new CountDownLatch(1); + + manager.startHeartbeat( + id, + () -> { + if (!manager.getStoppedFlag(id).get()) { + rpcCount.incrementAndGet(); + } + firstTick.countDown(); + }); + + assertTrue(firstTick.await(5, TimeUnit.SECONDS)); + + // Simulate Statement.cancel() stopping heartbeat + manager.stopHeartbeat(id); + int countAfterCancel = rpcCount.get(); + + // Simulate Connection.close() shutting down manager + manager.shutdown(); + + // Wait and verify no more RPCs + Thread.sleep(1500); + assertEquals(countAfterCancel, rpcCount.get(), "No RPCs should fire after stop + shutdown"); + assertTrue(manager.isShutdown()); + } +} diff --git a/src/test/java/com/databricks/jdbc/api/impl/ResultSetHeartbeatEligibilityTest.java b/src/test/java/com/databricks/jdbc/api/impl/ResultSetHeartbeatEligibilityTest.java new file mode 100644 index 000000000..799c88c10 --- /dev/null +++ b/src/test/java/com/databricks/jdbc/api/impl/ResultSetHeartbeatEligibilityTest.java @@ -0,0 +1,160 @@ +package com.databricks.jdbc.api.impl; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import com.databricks.jdbc.common.StatementType; +import com.databricks.jdbc.dbclient.impl.common.StatementId; +import com.databricks.jdbc.model.core.StatementStatus; +import com.databricks.sdk.service.sql.StatementState; +import org.junit.jupiter.api.Test; + +/** + * Tests for heartbeat eligibility logic in DatabricksResultSet. Verifies that heartbeat is started + * only for result sets that need it and skipped for cases where all data is already client-side or + * the user controls polling. + */ +public class ResultSetHeartbeatEligibilityTest { + + private DatabricksResultSet createResultSet( + StatementState state, + StatementType statementType, + DatabricksResultSet.ResultSetType resultSetType, + boolean hasExecutionResult) { + DatabricksResultSet rs = mock(DatabricksResultSet.class, CALLS_REAL_METHODS); + + // Set fields via reflection since they're private + try { + setField(rs, "executionStatus", new ExecutionStatus(new StatementStatus().setState(state))); + setField(rs, "statementType", statementType); + setField(rs, "resultSetType", resultSetType); + setField(rs, "executionResult", hasExecutionResult ? mock(IExecutionResult.class) : null); + setField(rs, "statementId", new StatementId("test-stmt")); + } catch (Exception e) { + throw new RuntimeException(e); + } + return rs; + } + + private void setField(Object obj, String fieldName, Object value) throws Exception { + java.lang.reflect.Field field = DatabricksResultSet.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(obj, value); + } + + // === Eligible cases === + + @Test + void testSeaCloudFetchIsEligible() { + DatabricksResultSet rs = + createResultSet( + StatementState.SUCCEEDED, + StatementType.QUERY, + DatabricksResultSet.ResultSetType.SEA_ARROW_ENABLED, + true); + assertTrue(rs.isHeartbeatEligible(), "SEA cloud fetch should be eligible"); + } + + @Test + void testThriftInlineIsEligible() { + DatabricksResultSet rs = + createResultSet( + StatementState.SUCCEEDED, + StatementType.QUERY, + DatabricksResultSet.ResultSetType.THRIFT_INLINE, + true); + assertTrue(rs.isHeartbeatEligible(), "Thrift inline should be eligible"); + } + + @Test + void testThriftArrowIsEligible() { + DatabricksResultSet rs = + createResultSet( + StatementState.SUCCEEDED, + StatementType.QUERY, + DatabricksResultSet.ResultSetType.THRIFT_ARROW_ENABLED, + true); + assertTrue(rs.isHeartbeatEligible(), "Thrift arrow should be eligible"); + } + + @Test + void testMetadataQueryIsEligible() { + DatabricksResultSet rs = + createResultSet( + StatementState.SUCCEEDED, + StatementType.METADATA, + DatabricksResultSet.ResultSetType.THRIFT_INLINE, + true); + assertTrue(rs.isHeartbeatEligible(), "Metadata queries can have large results"); + } + + // === Ineligible cases === + + @Test + void testSeaInlineNotEligible() { + DatabricksResultSet rs = + createResultSet( + StatementState.SUCCEEDED, + StatementType.QUERY, + DatabricksResultSet.ResultSetType.SEA_INLINE, + true); + assertFalse(rs.isHeartbeatEligible(), "SEA inline has all data in memory"); + } + + @Test + void testDirectResultsNotEligible() { + DatabricksResultSet rs = + createResultSet( + StatementState.CLOSED, + StatementType.QUERY, + DatabricksResultSet.ResultSetType.SEA_ARROW_ENABLED, + true); + assertFalse(rs.isHeartbeatEligible(), "Direct results — server already closed"); + } + + @Test + void testUpdateCountNotEligible() { + DatabricksResultSet rs = + createResultSet( + StatementState.SUCCEEDED, + StatementType.UPDATE, + DatabricksResultSet.ResultSetType.UNASSIGNED, + true); + assertFalse(rs.isHeartbeatEligible(), "Update count has no result rows"); + } + + @Test + void testNullExecutionResultNotEligible() { + DatabricksResultSet rs = + createResultSet( + StatementState.SUCCEEDED, + StatementType.QUERY, + DatabricksResultSet.ResultSetType.UNASSIGNED, + false); + assertFalse(rs.isHeartbeatEligible(), "No execution result — nothing to fetch"); + } + + @Test + void testAsyncPendingNotEligible() { + DatabricksResultSet rs = + createResultSet( + StatementState.PENDING, + StatementType.SQL, + DatabricksResultSet.ResultSetType.UNASSIGNED, + true); + assertFalse( + rs.isHeartbeatEligible(), "Async PENDING — user controls polling via getExecutionResult"); + } + + @Test + void testAsyncRunningNotEligible() { + DatabricksResultSet rs = + createResultSet( + StatementState.RUNNING, + StatementType.SQL, + DatabricksResultSet.ResultSetType.UNASSIGNED, + true); + assertFalse( + rs.isHeartbeatEligible(), "Async RUNNING — user controls polling via getExecutionResult"); + } +} diff --git a/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java b/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java index fcb9b8ab3..71457c35e 100644 --- a/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java +++ b/src/test/java/com/databricks/jdbc/dbclient/impl/sqlexec/DatabricksSdkClientTest.java @@ -1311,4 +1311,95 @@ public void testGetResultChunksData_DatabricksError_throwsSQLException() throws assertTrue(exception.getMessage().contains("Results have expired")); assertNotNull(exception.getCause()); } + + // ========================================================================= + // checkStatementAlive + // ========================================================================= + + @Test + public void testCheckStatementAlive_succeededState_returnsTrue() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + + StatementStatus status = new StatementStatus().setState(StatementState.SUCCEEDED); + + when(apiClient.execute(any(Request.class), eq(StatementStatus.class))).thenReturn(status); + + assertTrue(databricksSdkClient.checkStatementAlive(STATEMENT_ID)); + } + + @Test + public void testCheckStatementAlive_runningState_returnsTrue() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + + StatementStatus status = new StatementStatus().setState(StatementState.RUNNING); + + when(apiClient.execute(any(Request.class), eq(StatementStatus.class))).thenReturn(status); + + assertTrue(databricksSdkClient.checkStatementAlive(STATEMENT_ID)); + } + + @Test + public void testCheckStatementAlive_canceledState_returnsFalse() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + + StatementStatus status = new StatementStatus().setState(StatementState.CANCELED); + + when(apiClient.execute(any(Request.class), eq(StatementStatus.class))).thenReturn(status); + + assertFalse(databricksSdkClient.checkStatementAlive(STATEMENT_ID)); + } + + @Test + public void testCheckStatementAlive_closedState_returnsFalse() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + + StatementStatus status = new StatementStatus().setState(StatementState.CLOSED); + + when(apiClient.execute(any(Request.class), eq(StatementStatus.class))).thenReturn(status); + + assertFalse(databricksSdkClient.checkStatementAlive(STATEMENT_ID)); + } + + @Test + public void testCheckStatementAlive_failedState_returnsFalse() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + + StatementStatus status = new StatementStatus().setState(StatementState.FAILED); + + when(apiClient.execute(any(Request.class), eq(StatementStatus.class))).thenReturn(status); + + assertFalse(databricksSdkClient.checkStatementAlive(STATEMENT_ID)); + } + + @Test + public void testCheckStatementAlive_exceptionWrapped() throws Exception { + IDatabricksConnectionContext connectionContext = + DatabricksConnectionContext.parse(JDBC_URL, new Properties()); + DatabricksSdkClient databricksSdkClient = + new DatabricksSdkClient(connectionContext, statementExecutionService, apiClient); + + when(apiClient.execute(any(Request.class), eq(StatementStatus.class))) + .thenThrow(new RuntimeException("Network error")); + + DatabricksSQLException exception = + assertThrows( + DatabricksSQLException.class, + () -> databricksSdkClient.checkStatementAlive(STATEMENT_ID)); + assertTrue(exception.getMessage().contains("Heartbeat status check failed")); + } } diff --git a/src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClientTest.java b/src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClientTest.java index 583438d5b..371e886ca 100644 --- a/src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClientTest.java +++ b/src/test/java/com/databricks/jdbc/dbclient/impl/thrift/DatabricksThriftServiceClientTest.java @@ -1176,4 +1176,104 @@ void testListSchemasDoesNotEscapeCatalogWhenPatternEnabled() throws SQLException verify(thriftAccessor).getThriftResponse(captor.capture()); assertEquals("my_catalog", captor.getValue().getCatalogName()); } + + // ========================================================================= + // checkStatementAlive — heartbeat support + // ========================================================================= + + @Test + public void testCheckStatementAlive_finishedState_returnsTrue() throws Exception { + DatabricksThriftServiceClient client = + new DatabricksThriftServiceClient(thriftAccessor, connectionContext); + + TGetOperationStatusResp resp = new TGetOperationStatusResp(); + resp.setOperationState(TOperationState.FINISHED_STATE); + when(thriftAccessor.getOperationStatus( + any(TGetOperationStatusReq.class), any(StatementId.class))) + .thenReturn(resp); + + assertTrue(client.checkStatementAlive(TEST_STMT_ID)); + } + + @Test + public void testCheckStatementAlive_runningState_returnsTrue() throws Exception { + DatabricksThriftServiceClient client = + new DatabricksThriftServiceClient(thriftAccessor, connectionContext); + + TGetOperationStatusResp resp = new TGetOperationStatusResp(); + resp.setOperationState(TOperationState.RUNNING_STATE); + when(thriftAccessor.getOperationStatus( + any(TGetOperationStatusReq.class), any(StatementId.class))) + .thenReturn(resp); + + assertTrue(client.checkStatementAlive(TEST_STMT_ID)); + } + + @Test + public void testCheckStatementAlive_canceledState_returnsFalse() throws Exception { + DatabricksThriftServiceClient client = + new DatabricksThriftServiceClient(thriftAccessor, connectionContext); + + TGetOperationStatusResp resp = new TGetOperationStatusResp(); + resp.setOperationState(TOperationState.CANCELED_STATE); + when(thriftAccessor.getOperationStatus( + any(TGetOperationStatusReq.class), any(StatementId.class))) + .thenReturn(resp); + + assertFalse(client.checkStatementAlive(TEST_STMT_ID)); + } + + @Test + public void testCheckStatementAlive_closedState_returnsFalse() throws Exception { + DatabricksThriftServiceClient client = + new DatabricksThriftServiceClient(thriftAccessor, connectionContext); + + TGetOperationStatusResp resp = new TGetOperationStatusResp(); + resp.setOperationState(TOperationState.CLOSED_STATE); + when(thriftAccessor.getOperationStatus( + any(TGetOperationStatusReq.class), any(StatementId.class))) + .thenReturn(resp); + + assertFalse(client.checkStatementAlive(TEST_STMT_ID)); + } + + @Test + public void testCheckStatementAlive_errorState_returnsFalse() throws Exception { + DatabricksThriftServiceClient client = + new DatabricksThriftServiceClient(thriftAccessor, connectionContext); + + TGetOperationStatusResp resp = new TGetOperationStatusResp(); + resp.setOperationState(TOperationState.ERROR_STATE); + when(thriftAccessor.getOperationStatus( + any(TGetOperationStatusReq.class), any(StatementId.class))) + .thenReturn(resp); + + assertFalse(client.checkStatementAlive(TEST_STMT_ID)); + } + + @Test + public void testCheckStatementAlive_nullState_assumesAlive() throws Exception { + DatabricksThriftServiceClient client = + new DatabricksThriftServiceClient(thriftAccessor, connectionContext); + + TGetOperationStatusResp resp = new TGetOperationStatusResp(); + // operationState not set — null + when(thriftAccessor.getOperationStatus( + any(TGetOperationStatusReq.class), any(StatementId.class))) + .thenReturn(resp); + + assertTrue(client.checkStatementAlive(TEST_STMT_ID)); + } + + @Test + public void testCheckStatementAlive_thriftException_wraps() throws Exception { + DatabricksThriftServiceClient client = + new DatabricksThriftServiceClient(thriftAccessor, connectionContext); + + when(thriftAccessor.getOperationStatus( + any(TGetOperationStatusReq.class), any(StatementId.class))) + .thenThrow(new org.apache.thrift.TException("Connection refused")); + + assertThrows(DatabricksSQLException.class, () -> client.checkStatementAlive(TEST_STMT_ID)); + } } diff --git a/src/test/java/com/databricks/jdbc/integration/e2e/HeartbeatIntegrationTest.java b/src/test/java/com/databricks/jdbc/integration/e2e/HeartbeatIntegrationTest.java new file mode 100644 index 000000000..f68d1468e --- /dev/null +++ b/src/test/java/com/databricks/jdbc/integration/e2e/HeartbeatIntegrationTest.java @@ -0,0 +1,123 @@ +package com.databricks.jdbc.integration.e2e; + +import static org.junit.jupiter.api.Assertions.*; + +import java.sql.*; +import java.util.Properties; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +/** + * Integration test for the heartbeat / keep-alive feature. Connects to a real Databricks warehouse + * and verifies that heartbeat polling keeps results alive during slow consumption. + * + *

Run with: mvn -pl jdbc-core test -Dtest="HeartbeatIntegrationTest" -DDATABRICKS_HOST=... + * -DDATABRICKS_TOKEN=... -DDATABRICKS_HTTP_PATH=... + * + *

Or set environment variables: DATABRICKS_HOST, DATABRICKS_TOKEN, DATABRICKS_HTTP_PATH + */ +@Tag("e2e") +public class HeartbeatIntegrationTest { + + private static String getEnvOrProp(String name) { + String value = System.getProperty(name); + if (value == null) { + value = System.getenv(name); + } + return value; + } + + private Connection createConnection(int heartbeatIntervalSeconds) throws Exception { + String host = getEnvOrProp("DATABRICKS_HOST"); + String token = getEnvOrProp("DATABRICKS_TOKEN"); + String httpPath = getEnvOrProp("DATABRICKS_HTTP_PATH"); + + if (host == null || token == null || httpPath == null) { + throw new IllegalStateException( + "Set DATABRICKS_HOST, DATABRICKS_TOKEN, DATABRICKS_HTTP_PATH"); + } + + String url = + String.format( + "jdbc:databricks://%s:443/default;transportMode=http;ssl=1;AuthMech=3;" + + "httpPath=%s;EnableHeartbeat=1;HeartbeatIntervalSeconds=%d;" + + "LogLevel=6;LogPath=/tmp", + host, httpPath, heartbeatIntervalSeconds); + + Properties props = new Properties(); + props.put("UID", "token"); + props.put("PWD", token); + + Class.forName("com.databricks.client.jdbc.Driver"); + return DriverManager.getConnection(url, props); + } + + /** + * Verifies heartbeat keeps results alive during a slow consumer scenario. Executes a query, reads + * first row, pauses for 2x the heartbeat interval (to allow heartbeats to fire), then reads + * remaining rows successfully. + */ + @Test + void testHeartbeatKeepsResultsAliveDuringSlowConsumption() throws Exception { + int heartbeatInterval = 5; // 5 seconds for fast testing + try (Connection conn = createConnection(heartbeatInterval)) { + System.out.println("Connected with EnableHeartbeat=1, interval=" + heartbeatInterval + "s"); + + // Execute a query that returns multiple rows + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("SELECT explode(sequence(1, 100)) AS id"); // 100 rows + + // Read first row + assertTrue(rs.next(), "Should have first row"); + int firstId = rs.getInt("id"); + System.out.println("Read first row: id=" + firstId); + + // Pause — heartbeat should fire during this time + int pauseSeconds = heartbeatInterval * 3; + System.out.println( + "Pausing for " + + pauseSeconds + + "s (heartbeat should fire " + + (pauseSeconds / heartbeatInterval) + + " times)..."); + Thread.sleep(pauseSeconds * 1000L); + + // Read remaining rows — should succeed if heartbeat kept results alive + int rowCount = 1; + while (rs.next()) { + rowCount++; + } + System.out.println("Read " + rowCount + " total rows after pause"); + assertEquals(100, rowCount, "Should read all 100 rows"); + + rs.close(); + } + + System.out.println("Test passed — heartbeat kept results alive during slow consumption"); + } + } + + /** + * Verifies heartbeat stops when ResultSet is closed. After close, no more heartbeat RPCs should + * fire. + */ + @Test + void testHeartbeatStopsOnResultSetClose() throws Exception { + int heartbeatInterval = 5; + try (Connection conn = createConnection(heartbeatInterval)) { + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("SELECT 1 AS val"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt("val")); + + // Close — heartbeat should stop + rs.close(); + + // Pause to verify no heartbeat errors after close + System.out.println("ResultSet closed, waiting to verify no heartbeat errors..."); + Thread.sleep(heartbeatInterval * 2 * 1000L); + System.out.println("No errors — heartbeat stopped cleanly"); + } + } + } +}