From b10bbf9ede6bd9a904643617863b37a05b776842 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Thu, 7 May 2026 20:45:43 -0500 Subject: [PATCH 1/4] Added logic to externalize commit retries in rest-catalog. Added metrics for commit retries in rest-catalog. Added logic in insert to handle retries by fetching newer metadata. Added logic to add Commit Lock using etcd distributed locks --- examples/grafana/METRICS.md | 4 + examples/scratch/.ice-rest-catalog.yaml | 21 +- ice-rest-catalog/README.md | 13 ++ .../com/altinity/ice/rest/catalog/Main.java | 35 ++- .../internal/config/CommitLockConfig.java | 42 ++++ .../internal/config/CommitRetryConfig.java | 56 +++++ .../rest/catalog/internal/config/Config.java | 10 + .../catalog/internal/etcd/CommitLock.java | 199 ++++++++++++++++ .../etcd/CommitLockTimeoutException.java | 21 ++ .../catalog/internal/etcd/EtcdCatalog.java | 5 + .../internal/metrics/CatalogMetrics.java | 66 ++++++ .../internal/metrics/IcebergMetricNames.java | 22 ++ .../internal/rest/RESTCatalogAdapter.java | 136 +++++++++-- .../internal/rest/RESTCatalogServlet.java | 2 + .../ice/rest/catalog/RESTCatalogTestBase.java | 2 + .../config/CommitRetryConfigTest.java | 45 ++++ .../catalog/internal/etcd/EtcdCatalogIT.java | 41 ++++ .../rest/RESTCatalogAdapterCreateIT.java | 191 ++++++++++++++++ .../scenarios/basic-operations/run.sh.tmpl | 4 + .../scenarios/basic-operations/scenario.yaml | 1 + ice/README.md | 19 ++ .../main/java/com/altinity/ice/cli/Main.java | 14 ++ .../altinity/ice/cli/internal/cmd/Insert.java | 213 +++++++++++++++++- .../logback/ColorAwarePatternLayout.java | 4 + ice/src/main/resources/logback.xml | 2 +- 25 files changed, 1133 insertions(+), 35 deletions(-) create mode 100644 ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitLockConfig.java create mode 100644 ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfig.java create mode 100644 ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLock.java create mode 100644 ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLockTimeoutException.java create mode 100644 ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfigTest.java create mode 100644 ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapterCreateIT.java diff --git a/examples/grafana/METRICS.md b/examples/grafana/METRICS.md index fab3e92b..4dc0287e 100644 --- a/examples/grafana/METRICS.md +++ b/examples/grafana/METRICS.md @@ -58,6 +58,10 @@ These metrics are reported by Iceberg clients when they perform operations on ta | `iceberg_commit_added_equality_deletes_total` | Counter | catalog, namespace, table, operation | Total number of equality deletes added in commits | | `iceberg_commit_total_files_size_bytes` | Counter | catalog, namespace, table, operation | Total size in bytes of files involved in commits | | `iceberg_commit_duration_seconds` | Histogram | catalog, namespace, table, operation | Duration of commit operations in seconds | +| `iceberg_commit_retries_total` | Counter | catalog, namespace, table | Server-side retries after a commit CAS conflict (`CommitFailedException`) in the REST catalog commit loop; tune `commitRetry` in `.ice-rest-catalog.yaml` if this grows under parallel writers | +| `iceberg_commit_lock_acquire_seconds` | Histogram | catalog | Time to acquire the etcd per-table commit lock (`commitLock` in `.ice-rest-catalog.yaml`; etcd backend only) | +| `iceberg_commit_lock_held_seconds` | Histogram | catalog | Time the etcd commit lock was held during a table commit | +| `iceberg_commit_lock_acquire_timeouts_total` | Counter | catalog | Acquire attempts that exceeded `commitLock.acquireTimeoutMs` (HTTP 503 to clients) | #### Reporter Metrics diff --git a/examples/scratch/.ice-rest-catalog.yaml b/examples/scratch/.ice-rest-catalog.yaml index 2cd11bf3..ec18536a 100644 --- a/examples/scratch/.ice-rest-catalog.yaml +++ b/examples/scratch/.ice-rest-catalog.yaml @@ -1,16 +1,23 @@ -uri: jdbc:sqlite:file:data/ice-rest-catalog/db.sqlite?journal_mode=WAL&synchronous=OFF&journal_size_limit=500 +#uri: jdbc:sqlite:file:data/ice-rest-catalog/db.sqlite?journal_mode=WAL&synchronous=OFF&journal_size_limit=500 # To use etcd instead of sqlite, start etcd with `etcd --data-dir=data/etcd`, then uncomment the line below -#uri: etcd:http://localhost:2379 +uri: etcd:http://localhost:2379 #uri: etcd:http://127.0.0.1:12379,http://127.0.0.1:12479,http://127.0.0.1:12579 -warehouse: s3://bucket1 +warehouse: s3://altiound-op3z9pa3-iceberg/ +#warehouse: warehouse: s3://bucket1 +commitRetry: + numRetries: 50 + totalTimeoutMs: 5000 + +commitLock: + enabled: true + leaseTtlSeconds: 30 + acquireTimeoutMs: 30000 s3: - endpoint: http://localhost:9000 + endpoint: https://s3.us-west-2.amazonaws.com pathStyleAccess: true - accessKeyID: miniouser - secretAccessKey: miniopassword - region: minio + region: us-west-2 bearerTokens: - value: foo diff --git a/ice-rest-catalog/README.md b/ice-rest-catalog/README.md index e0ee2d05..446249b5 100644 --- a/ice-rest-catalog/README.md +++ b/ice-rest-catalog/README.md @@ -11,6 +11,19 @@ That's it. Examples of `.ice-rest-catalog.yaml` (as well as Kubernetes deployment manifests) can be found [here](../examples/). +## Parallel writers (`commitLock`) + +Many concurrent commits to the **same table** can cause repeated `CommitFailedException` (optimistic concurrency). For the **etcd** metastore you can serialize commits per table using etcd’s lock API: + +```yaml +commitLock: + enabled: true + leaseTtlSeconds: 30 + acquireTimeoutMs: 30000 +``` + +If `enabled` is true but the catalog backend is not etcd, the lock is ignored (warning in logs). When lock acquisition exceeds `acquireTimeoutMs`, the server responds with HTTP **503** so clients can retry. + ## Documentation - [Architecture](../docs/architecture.md) -- components, design principles, HA, backup/recovery diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java index a901f999..4d94eff7 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java @@ -19,6 +19,7 @@ import com.altinity.ice.rest.catalog.internal.aws.CredentialsProvider; import com.altinity.ice.rest.catalog.internal.config.Config; import com.altinity.ice.rest.catalog.internal.config.MaintenanceConfig; +import com.altinity.ice.rest.catalog.internal.etcd.CommitLock; import com.altinity.ice.rest.catalog.internal.etcd.EtcdCatalog; import com.altinity.ice.rest.catalog.internal.maintenance.DataCompaction; import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceJob; @@ -279,7 +280,8 @@ private static Server createBaseServer( if (requireAuth) { mux.insertHandler(createAuthorizationHandler(config.bearerTokens(), config)); - restCatalogAdapter = new RESTCatalogAdapter(catalog); + restCatalogAdapter = + new RESTCatalogAdapter(catalog, config.commitRetry(), maybeCommitLock(catalog, config)); var globalConfig = config.toIcebergConfigDefaults(); if (!globalConfig.isEmpty()) { restCatalogAdapter = new RESTCatalogMiddlewareConfig(restCatalogAdapter, globalConfig); @@ -299,7 +301,8 @@ private static Server createBaseServer( new RESTCatalogMiddlewareCredentials(restCatalogAdapter, auth), auth); } } else { - restCatalogAdapter = new RESTCatalogAdapter(catalog); + restCatalogAdapter = + new RESTCatalogAdapter(catalog, config.commitRetry(), maybeCommitLock(catalog, config)); var globalConfig = config.toIcebergConfigDefaults(); if (!globalConfig.isEmpty()) { restCatalogAdapter = new RESTCatalogMiddlewareConfig(restCatalogAdapter, globalConfig); @@ -319,6 +322,18 @@ private static Server createBaseServer( } } + logger.info( + "Commit retry config: numRetries={} minWaitMs={} maxWaitMs={} totalTimeoutMs={}", + config.commitRetry().numRetries(), + config.commitRetry().minWaitMs(), + config.commitRetry().maxWaitMs(), + config.commitRetry().totalTimeoutMs()); + logger.info( + "Commit lock config: enabled={} leaseTtlSeconds={} acquireTimeoutMs={}", + config.commitLock().enabled(), + config.commitLock().leaseTtlSeconds(), + config.commitLock().acquireTimeoutMs()); + var h = new ServletHolder(new RESTCatalogServlet(restCatalogAdapter)); mux.addServlet(h, "/*"); @@ -395,6 +410,22 @@ private static RESTCatalogAuthorizationHandler createAuthorizationHandler( return new RESTCatalogAuthorizationHandler(tokens, anonymousSession); } + /** + * Per-table etcd commit lock for {@link EtcdCatalog}; ignored when disabled or when not using + * etcd. + */ + static CommitLock maybeCommitLock(Catalog catalog, Config config) { + if (!config.commitLock().enabled()) { + return null; + } + if (!(catalog instanceof EtcdCatalog etcd)) { + logger.warn( + "commitLock.enabled is true but catalog is not EtcdCatalog; commit lock disabled"); + return null; + } + return new CommitLock(etcd.etcdClient(), catalog.name(), config.commitLock()); + } + private static void overrideJettyDefaults(Server s) { ServerConfig.setQuiet(s); s.setErrorHandler(new PlainErrorHandler()); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitLockConfig.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitLockConfig.java new file mode 100644 index 00000000..6bbc80ba --- /dev/null +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitLockConfig.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.rest.catalog.internal.config; + +import com.fasterxml.jackson.annotation.JsonPropertyDescription; + +/** + * Optional per-table etcd commit lock for {@code ice-rest-catalog} when using the etcd metastore. + * + *

Serializes commits to the same table so concurrent writers do not lose optimistic concurrency + * races indefinitely. + */ +public record CommitLockConfig( + @JsonPropertyDescription( + "Enable etcd mutual-exclusion lock around table commits (etcd backend only; default false)") + boolean enabled, + @JsonPropertyDescription( + "Lease TTL for the lock in seconds (must exceed slow commits; default 30)") + long leaseTtlSeconds, + @JsonPropertyDescription("Max time to wait to acquire the lock in milliseconds (default 30000)") + long acquireTimeoutMs) { + + public CommitLockConfig { + if (leaseTtlSeconds <= 0) { + leaseTtlSeconds = 30; + } + if (acquireTimeoutMs <= 0) { + acquireTimeoutMs = 30_000L; + } + } + + public static CommitLockConfig defaults() { + return new CommitLockConfig(false, 30, 30_000L); + } +} diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfig.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfig.java new file mode 100644 index 00000000..a3c0a94a --- /dev/null +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfig.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.rest.catalog.internal.config; + +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import org.apache.iceberg.TableProperties; + +/** + * Server-side tuning for the REST catalog commit retry loop (OCC compare-and-swap failures). + * + *

Defaults match Iceberg's {@link TableProperties} commit retry defaults. + */ +public record CommitRetryConfig( + @JsonPropertyDescription( + "Number of retries on CommitFailedException (default: Iceberg commit.retry.num-retries = 4)") + int numRetries, + @JsonPropertyDescription( + "Minimum backoff between retries in ms (default: Iceberg commit.retry.min-wait-ms)") + long minWaitMs, + @JsonPropertyDescription( + "Maximum backoff between retries in ms (default: Iceberg commit.retry.max-wait-ms)") + long maxWaitMs, + @JsonPropertyDescription( + "Total time budget for the retry loop in ms (default: Iceberg commit.retry.total-timeout-ms)") + long totalTimeoutMs) { + + public CommitRetryConfig { + if (numRetries <= 0) { + numRetries = TableProperties.COMMIT_NUM_RETRIES_DEFAULT; + } + if (minWaitMs <= 0) { + minWaitMs = TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; + } + if (maxWaitMs <= 0) { + maxWaitMs = TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; + } + if (totalTimeoutMs <= 0) { + totalTimeoutMs = TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; + } + } + + public static CommitRetryConfig defaults() { + return new CommitRetryConfig( + TableProperties.COMMIT_NUM_RETRIES_DEFAULT, + TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT); + } +} diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java index ab554ebb..cb98cded 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java @@ -56,6 +56,12 @@ public record Config( "Maintenance schedule in https://github.com/shyiko/skedule?tab=readme-ov-file#format format, e.g. \"every day 00:00\". Empty schedule disables automatic maintenance (default)") String maintenanceSchedule, @JsonPropertyDescription("Maintenance config") MaintenanceConfig maintenance, + @JsonPropertyDescription( + "Server-side commit retry config; tune up for high-contention workloads (e.g., parallel `ice insert` to one table)") + CommitRetryConfig commitRetry, + @JsonPropertyDescription( + "Optional etcd per-table commit lock (etcd metastore only). Reduces CommitFailedException under concurrent writers.") + CommitLockConfig commitLock, @JsonPropertyDescription( "(experimental) Extra properties to include in loadTable REST response.") Map loadTableProperties, @@ -81,6 +87,8 @@ public Config( AnonymousAccess anonymousAccess, String maintenanceSchedule, MaintenanceConfig maintenance, + CommitRetryConfig commitRetry, + CommitLockConfig commitLock, Map loadTableProperties, @JsonProperty("iceberg") Map icebergProperties) { this.addr = Strings.orDefault(addr, DEFAULT_ADDR); @@ -98,6 +106,8 @@ public Config( this.maintenance = Objects.requireNonNullElseGet( maintenance, () -> new MaintenanceConfig(null, 0, 0, 0, 0, 0, 0, null, false)); + this.commitRetry = Objects.requireNonNullElse(commitRetry, CommitRetryConfig.defaults()); + this.commitLock = Objects.requireNonNullElse(commitLock, CommitLockConfig.defaults()); this.loadTableProperties = Objects.requireNonNullElse(loadTableProperties, Map.of()); this.icebergProperties = Objects.requireNonNullElse(icebergProperties, Map.of()); } diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLock.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLock.java new file mode 100644 index 00000000..84fb3966 --- /dev/null +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLock.java @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.rest.catalog.internal.etcd; + +import com.altinity.ice.rest.catalog.internal.config.CommitLockConfig; +import com.altinity.ice.rest.catalog.internal.metrics.CatalogMetrics; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.Lease; +import io.etcd.jetcd.Lock; +import io.etcd.jetcd.lock.LockResponse; +import io.etcd.jetcd.support.CloseableClient; +import io.grpc.stub.StreamObserver; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.iceberg.catalog.TableIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mutual exclusion for Iceberg table commits using etcd leases + the lock API (same cluster as + * {@link EtcdCatalog}). + */ +public final class CommitLock { + + private static final Logger logger = LoggerFactory.getLogger(CommitLock.class); + + private static final StreamObserver NOOP_KEEPALIVE = + new StreamObserver<>() { + @Override + public void onNext(io.etcd.jetcd.lease.LeaseKeepAliveResponse value) {} + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() {} + }; + + private final Lock lockApi; + private final Lease leaseApi; + private final String catalogName; + private final CommitLockConfig config; + + public CommitLock(Client etcdClient, String catalogName, CommitLockConfig config) { + this.lockApi = etcdClient.getLockClient(); + this.leaseApi = etcdClient.getLeaseClient(); + this.catalogName = catalogName; + this.config = config; + } + + /** + * Acquire the commit lock for {@code ident}. Caller must {@link Handle#close()} to release. + * + * @throws CommitLockTimeoutException if the lock is not acquired within {@link + * CommitLockConfig#acquireTimeoutMs()} + */ + public Handle acquire(TableIdentifier ident) { + String path = lockPath(catalogName, ident); + ByteSequence name = ByteSequence.from(path, StandardCharsets.UTF_8); + long waitStartNanos = System.nanoTime(); + + long leaseId; + CloseableClient keepAlive; + try { + leaseId = leaseApi.grant(config.leaseTtlSeconds()).get().getID(); + keepAlive = leaseApi.keepAlive(leaseId, NOOP_KEEPALIVE); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + + try { + LockResponse lr = + lockApi.lock(name, leaseId).get(config.acquireTimeoutMs(), TimeUnit.MILLISECONDS); + long acquireEndNanos = System.nanoTime(); + CatalogMetrics.getInstance() + .recordCommitLockAcquireSeconds( + catalogName, (acquireEndNanos - waitStartNanos) / 1_000_000_000.0); + return new Handle(lockApi, leaseApi, lr.getKey(), leaseId, keepAlive, acquireEndNanos); + } catch (TimeoutException e) { + cleanupAfterFailedAcquire(keepAlive, leaseId); + CatalogMetrics.getInstance().recordCommitLockAcquireTimeout(catalogName); + throw new CommitLockTimeoutException( + "commit lock acquire timed out after " + config.acquireTimeoutMs() + " ms", e); + } catch (InterruptedException e) { + cleanupAfterFailedAcquire(keepAlive, leaseId); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + cleanupAfterFailedAcquire(keepAlive, leaseId); + throw new RuntimeException(e.getCause()); + } + } + + private void cleanupAfterFailedAcquire(CloseableClient keepAlive, long leaseId) { + try { + keepAlive.close(); + } catch (RuntimeException ex) { + logger.warn("keepAlive.close failed after lock acquire failure", ex); + } + try { + leaseApi.revoke(leaseId).get(); + } catch (Exception ex) { + logger.warn("lease revoke failed after lock acquire failure", ex); + } + } + + /** + * Acquire locks for every identifier in {@code sorted} order (caller must sort for deadlock-free + * ordering), run {@code action}, then release in reverse order. + */ + public void withLocks(List sorted, Runnable action) { + List handles = new ArrayList<>(sorted.size()); + try { + for (TableIdentifier id : sorted) { + handles.add(acquire(id)); + } + action.run(); + } finally { + for (int i = handles.size() - 1; i >= 0; i--) { + try { + handles.get(i).close(); + } catch (RuntimeException e) { + logger.warn("failed to release commit lock handle", e); + } + } + } + } + + static String lockPath(String catalogName, TableIdentifier ident) { + return "locks/v1/" + catalogName + "/" + ident; + } + + /** Lease-backed etcd lock; closes to unlock and revoke the lease. */ + public final class Handle implements AutoCloseable { + + private final Lock lockApi; + private final Lease leaseApi; + private final ByteSequence lockKey; + private final long leaseId; + private final CloseableClient keepAlive; + private final long acquiredAtNanos; + + private Handle( + Lock lockApi, + Lease leaseApi, + ByteSequence lockKey, + long leaseId, + CloseableClient keepAlive, + long acquiredAtNanos) { + this.lockApi = lockApi; + this.leaseApi = leaseApi; + this.lockKey = lockKey; + this.leaseId = leaseId; + this.keepAlive = keepAlive; + this.acquiredAtNanos = acquiredAtNanos; + } + + @Override + public void close() { + long heldNanos = System.nanoTime() - acquiredAtNanos; + try { + lockApi.unlock(lockKey).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + try { + keepAlive.close(); + } catch (RuntimeException e) { + logger.warn("keepAlive.close failed during unlock", e); + } + try { + leaseApi.revoke(leaseId).get(); + } catch (Exception e) { + logger.warn("lease revoke failed during unlock", e); + } + } + CatalogMetrics.getInstance() + .recordCommitLockHeldSeconds(catalogName, heldNanos / 1_000_000_000.0); + } + } +} diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLockTimeoutException.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLockTimeoutException.java new file mode 100644 index 00000000..5f99b53b --- /dev/null +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLockTimeoutException.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.rest.catalog.internal.etcd; + +/** + * Thrown when acquiring the etcd commit lock exceeds the configured timeout. Mapped to HTTP 503 so + * clients can retry. + */ +public final class CommitLockTimeoutException extends RuntimeException { + + public CommitLockTimeoutException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalog.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalog.java index 6d76830e..11c49473 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalog.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalog.java @@ -82,6 +82,11 @@ public EtcdCatalog(String name, String uri, String warehouseLocation, FileIO io) this.io = io; } + /** Shared jetcd client for auxiliary features (e.g. commit locks) without a second connection. */ + public Client etcdClient() { + return client; + } + // Used by EtcdCatalogTest to test concurrent modifications. protected Txn kvtx() { return kv.txn(); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/CatalogMetrics.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/CatalogMetrics.java index ce7a4d94..f0ba0cdc 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/CatalogMetrics.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/CatalogMetrics.java @@ -16,10 +16,22 @@ import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.CATALOG_OPERATION_LABELS; import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.CATALOG_TABLES_HELP; import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.CATALOG_TABLES_NAME; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_ACQUIRE_SECONDS_HELP; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_ACQUIRE_SECONDS_NAME; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_HELP; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_NAME; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_HELD_SECONDS_HELP; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_HELD_SECONDS_NAME; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_LABELS; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_RETRIES_TOTAL_HELP; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_RETRIES_TOTAL_NAME; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_RETRY_LABELS; +import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.DURATION_BUCKETS; import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.LABEL_CATALOG; import io.prometheus.metrics.core.metrics.Counter; import io.prometheus.metrics.core.metrics.Gauge; +import io.prometheus.metrics.core.metrics.Histogram; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +64,10 @@ private static class Holder { private final Gauge tablesTotal; private final Gauge namespacesTotal; private final Counter operationsTotal; + private final Counter commitRetriesTotal; + private final Histogram commitLockAcquireSeconds; + private final Histogram commitLockHeldSeconds; + private final Counter commitLockAcquireTimeoutsTotal; /** Returns the singleton instance of the catalog metrics. */ public static CatalogMetrics getInstance() { @@ -80,6 +96,36 @@ private CatalogMetrics() { .labelNames(CATALOG_OPERATION_LABELS) .register(); + this.commitRetriesTotal = + Counter.builder() + .name(COMMIT_RETRIES_TOTAL_NAME) + .help(COMMIT_RETRIES_TOTAL_HELP) + .labelNames(COMMIT_RETRY_LABELS) + .register(); + + this.commitLockAcquireSeconds = + Histogram.builder() + .name(COMMIT_LOCK_ACQUIRE_SECONDS_NAME) + .help(COMMIT_LOCK_ACQUIRE_SECONDS_HELP) + .labelNames(COMMIT_LOCK_LABELS) + .classicUpperBounds(DURATION_BUCKETS) + .register(); + + this.commitLockHeldSeconds = + Histogram.builder() + .name(COMMIT_LOCK_HELD_SECONDS_NAME) + .help(COMMIT_LOCK_HELD_SECONDS_HELP) + .labelNames(COMMIT_LOCK_LABELS) + .classicUpperBounds(DURATION_BUCKETS) + .register(); + + this.commitLockAcquireTimeoutsTotal = + Counter.builder() + .name(COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_NAME) + .help(COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_HELP) + .labelNames(COMMIT_LOCK_LABELS) + .register(); + logger.info("Catalog Prometheus metrics initialized"); } @@ -118,6 +164,26 @@ public void recordOperation(String catalog, String operation) { operationsTotal.labelValues(catalog, operation).inc(); } + /** Record one server-side commit retry after a commit CAS conflict (CommitFailedException). */ + public void recordCommitRetry(String catalog, String namespace, String table) { + commitRetriesTotal.labelValues(catalog, namespace, table).inc(); + } + + /** Record duration of etcd commit lock acquisition (wait time). */ + public void recordCommitLockAcquireSeconds(String catalog, double seconds) { + commitLockAcquireSeconds.labelValues(catalog).observe(seconds); + } + + /** Record duration the etcd commit lock was held during a commit. */ + public void recordCommitLockHeldSeconds(String catalog, double seconds) { + commitLockHeldSeconds.labelValues(catalog).observe(seconds); + } + + /** Record a commit lock acquire that exceeded {@code acquireTimeoutMs}. */ + public void recordCommitLockAcquireTimeout(String catalog) { + commitLockAcquireTimeoutsTotal.labelValues(catalog).inc(); + } + /** Record a table creation. */ public void recordTableCreated(String catalog) { incrementTablesTotal(catalog); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/IcebergMetricNames.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/IcebergMetricNames.java index c189eead..d27603c0 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/IcebergMetricNames.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/IcebergMetricNames.java @@ -152,6 +152,28 @@ private IcebergMetricNames() {} public static final String COMMIT_DURATION_NAME = "iceberg_commit_duration_seconds"; public static final String COMMIT_DURATION_HELP = "Duration of commit operations in seconds"; + public static final String COMMIT_RETRIES_TOTAL_NAME = "iceberg_commit_retries_total"; + public static final String COMMIT_RETRIES_TOTAL_HELP = + "Total number of CommitFailedException retries triggered by the server-side commit retry loop"; + + public static final String[] COMMIT_RETRY_LABELS = {LABEL_CATALOG, LABEL_NAMESPACE, LABEL_TABLE}; + + public static final String COMMIT_LOCK_ACQUIRE_SECONDS_NAME = + "iceberg_commit_lock_acquire_seconds"; + public static final String COMMIT_LOCK_ACQUIRE_SECONDS_HELP = + "Time taken to acquire the etcd table commit lock (seconds)"; + + public static final String COMMIT_LOCK_HELD_SECONDS_NAME = "iceberg_commit_lock_held_seconds"; + public static final String COMMIT_LOCK_HELD_SECONDS_HELP = + "Time the etcd table commit lock was held during commit (seconds)"; + + public static final String COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_NAME = + "iceberg_commit_lock_acquire_timeouts_total"; + public static final String COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_HELP = + "Total number of etcd commit lock acquire attempts that exceeded acquireTimeoutMs"; + + public static final String[] COMMIT_LOCK_LABELS = {LABEL_CATALOG}; + // ========================================================================== // Histogram Buckets // ========================================================================== diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapter.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapter.java index 5c782a58..66b0b725 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapter.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapter.java @@ -18,15 +18,13 @@ */ package com.altinity.ice.rest.catalog.internal.rest; -import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; -import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; - import com.altinity.ice.rest.catalog.internal.auth.Session; +import com.altinity.ice.rest.catalog.internal.config.CommitRetryConfig; +import com.altinity.ice.rest.catalog.internal.etcd.CommitLock; import com.altinity.ice.rest.catalog.internal.metrics.CatalogMetrics; import com.altinity.ice.rest.catalog.internal.metrics.PrometheusMetricsReporter; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,12 +37,14 @@ import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; import org.apache.iceberg.Transactions; +import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.ViewCatalog; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.CatalogHandlers; import org.apache.iceberg.rest.Endpoint; @@ -60,21 +60,43 @@ import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RESTCatalogAdapter implements RESTCatalogHandler { + private static final Logger logger = LoggerFactory.getLogger(RESTCatalogAdapter.class); + private final Catalog catalog; private final SupportsNamespaces asNamespaceCatalog; private final ViewCatalog asViewCatalog; + private final CommitRetryConfig commitRetry; + private final CommitLock commitLock; public RESTCatalogAdapter(Catalog catalog) { + this(catalog, CommitRetryConfig.defaults(), null); + } + + public RESTCatalogAdapter(Catalog catalog, CommitRetryConfig commitRetry) { + this(catalog, commitRetry, null); + } + + public RESTCatalogAdapter(Catalog catalog, CommitRetryConfig commitRetry, CommitLock commitLock) { this.catalog = catalog; this.asNamespaceCatalog = catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null; this.asViewCatalog = catalog instanceof ViewCatalog ? (ViewCatalog) catalog : null; + this.commitRetry = commitRetry != null ? commitRetry : CommitRetryConfig.defaults(); + this.commitLock = commitLock; + } + + private static String namespaceLabel(TableIdentifier ident) { + String[] levels = ident.namespace().levels(); + return levels.length == 0 ? "" : String.join(".", levels); } @Override @@ -235,7 +257,7 @@ public T handle( { TableIdentifier ident = tableIdentFromPathVars(vars); UpdateTableRequest request = castRequest(UpdateTableRequest.class, requestBody); - var response = CatalogHandlers.updateTable(catalog, ident, request); + var response = updateTable(catalog, ident, request); // Check if this update contains schema changes boolean hasSchemaUpdate = @@ -274,7 +296,7 @@ public T handle( { CommitTransactionRequest request = castRequest(CommitTransactionRequest.class, requestBody); - commitTransaction(catalog, request); + commitTransaction(request); return null; } @@ -384,15 +406,86 @@ private static OAuthTokenResponse handleOAuthRequest(Object body) { } } + private LoadTableResponse updateTable( + Catalog catalog, TableIdentifier ident, UpdateTableRequest request) { + if (commitLock != null) { + try (CommitLock.Handle ignored = commitLock.acquire(ident)) { + return loadAndCommitTable(catalog, ident, request); + } + } + return loadAndCommitTable(catalog, ident, request); + } + + /** + * Matches {@link CatalogHandlers#updateTable(Catalog, TableIdentifier, UpdateTableRequest)} so + * staged-create commits ({@link UpdateRequirement.AssertTableDoesNotExist}) work instead of + * failing with {@link org.apache.iceberg.exceptions.NoSuchTableException} from {@link + * Catalog#loadTable(TableIdentifier)}. + */ + private static boolean isCreate(UpdateTableRequest request) { + boolean isCreate = + request.requirements().stream() + .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance); + + if (isCreate) { + List invalidRequirements = + request.requirements().stream() + .filter(req -> !(req instanceof UpdateRequirement.AssertTableDoesNotExist)) + .collect(Collectors.toList()); + Preconditions.checkArgument( + invalidRequirements.isEmpty(), "Invalid create requirements: %s", invalidRequirements); + } + + return isCreate; + } + + private LoadTableResponse loadAndCommitTable( + Catalog catalog, TableIdentifier ident, UpdateTableRequest request) { + if (isCreate(request)) { + logger.info("Committing staged table create for {}", ident); + return CatalogHandlers.updateTable(catalog, ident, request); + } + Table table = catalog.loadTable(ident); + if (!(table instanceof BaseTable)) { + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata updated = commit(ops, request, ident); + return LoadTableResponse.builder().withTableMetadata(updated).build(); + } + /** * This is a very simplistic approach that only validates the requirements for each table and does * not do any other conflict detection. Therefore, it does not guarantee true transactional * atomicity, which is left to the implementation details of a REST server. */ - private static void commitTransaction(Catalog catalog, CommitTransactionRequest request) { + private void commitTransaction(CommitTransactionRequest request) { + List sorted = + request.tableChanges().stream() + .map(UpdateTableRequest::identifier) + .distinct() + .sorted(Comparator.comparing(TableIdentifier::toString)) + .toList(); + + Runnable body = () -> commitTransactionBody(request); + if (commitLock != null) { + commitLock.withLocks(sorted, body); + } else { + body.run(); + } + } + + private void commitTransactionBody(CommitTransactionRequest request) { List transactions = Lists.newArrayList(); for (UpdateTableRequest tableChange : request.tableChanges()) { + if (isCreate(tableChange)) { + logger.info( + "Committing staged table create (multi-table txn) for {}", tableChange.identifier()); + CatalogHandlers.updateTable(catalog, tableChange.identifier(), tableChange); + continue; + } + Table table = catalog.loadTable(tableChange.identifier()); if (table instanceof BaseTable) { Transaction transaction = @@ -404,7 +497,7 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest (BaseTransaction.TransactionTable) transaction.table(); // this performs validations and makes temporary commits that are in-memory - commit(txTable.operations(), tableChange); + commit(txTable.operations(), tableChange, tableChange.identifier()); } else { throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); } @@ -414,18 +507,26 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest transactions.forEach(Transaction::commitTransaction); } - // Copied from CatalogHandlers.commit. - static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { + // Copied from CatalogHandlers.commit; retry budget is configurable (see CommitRetryConfig). + private TableMetadata commit( + TableOperations ops, UpdateTableRequest request, TableIdentifier ident) { AtomicBoolean isRetry = new AtomicBoolean(false); try { Tasks.foreach(ops) - .retry(COMMIT_NUM_RETRIES_DEFAULT) + .retry(commitRetry.numRetries()) .exponentialBackoff( - COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, - COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, - COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, + commitRetry.minWaitMs(), + commitRetry.maxWaitMs(), + commitRetry.totalTimeoutMs(), 2.0 /* exponential */) .onlyRetryOn(CommitFailedException.class) + .onFailure( + (task, ex) -> { + if (ex instanceof CommitFailedException) { + CatalogMetrics.getInstance() + .recordCommitRetry(catalog.name(), namespaceLabel(ident), ident.name()); + } + }) .run( taskOps -> { TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current(); @@ -446,6 +547,11 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { TableMetadata updated = metadataBuilder.build(); if (updated.changes().isEmpty()) { // do not commit if the metadata has not changed + logger.warn( + "commit no-op for table {}: empty metadata changes after refresh " + + "(isRetry={}); client requirements validated but updates produced no changes", + ident, + isRetry.get()); return; } diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogServlet.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogServlet.java index 4b2563ec..65926cca 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogServlet.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogServlet.java @@ -19,6 +19,7 @@ package com.altinity.ice.rest.catalog.internal.rest; import com.altinity.ice.rest.catalog.internal.auth.Session; +import com.altinity.ice.rest.catalog.internal.etcd.CommitLockTimeoutException; import com.altinity.ice.rest.catalog.internal.metrics.HttpMetrics; import jakarta.servlet.http.HttpServlet; import jakarta.servlet.http.HttpServletRequest; @@ -70,6 +71,7 @@ public class RESTCatalogServlet extends HttpServlet { .put(CommitFailedException.class, 409) .put(UnprocessableEntityException.class, 422) .put(CommitStateUnknownException.class, 500) + .put(CommitLockTimeoutException.class, HttpServletResponse.SC_SERVICE_UNAVAILABLE) .buildOrThrow(); private final RESTCatalogHandler restCatalogAdapter; diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/RESTCatalogTestBase.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/RESTCatalogTestBase.java index bfdc07cd..df8fee2a 100644 --- a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/RESTCatalogTestBase.java +++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/RESTCatalogTestBase.java @@ -102,6 +102,8 @@ public void setUp() throws Exception { false, null)), // anonymousAccess - enable with read-write for testing null, // maintenanceSchedule null, // maintenance + null, // commitRetry + null, // commitLock null, // loadTableProperties null // icebergProperties ); diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfigTest.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfigTest.java new file mode 100644 index 00000000..ea74edd8 --- /dev/null +++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfigTest.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.rest.catalog.internal.config; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.TableProperties; +import org.junit.Test; + +public class CommitRetryConfigTest { + + @Test + public void defaultsMatchIcebergTableProperties() { + CommitRetryConfig d = CommitRetryConfig.defaults(); + assertThat(d.numRetries()).isEqualTo(TableProperties.COMMIT_NUM_RETRIES_DEFAULT); + assertThat(d.minWaitMs()).isEqualTo(TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT); + assertThat(d.maxWaitMs()).isEqualTo(TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT); + assertThat(d.totalTimeoutMs()).isEqualTo(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT); + } + + @Test + public void zeroOrNegativeFieldsFallBackToDefaults() { + CommitRetryConfig c = new CommitRetryConfig(0, 0, 0, 0); + assertThat(c.numRetries()).isEqualTo(TableProperties.COMMIT_NUM_RETRIES_DEFAULT); + assertThat(c.minWaitMs()).isEqualTo(TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT); + assertThat(c.maxWaitMs()).isEqualTo(TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT); + assertThat(c.totalTimeoutMs()).isEqualTo(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT); + } + + @Test + public void explicitPositiveValuesPreserved() { + CommitRetryConfig c = new CommitRetryConfig(20, 50L, 10_000L, 500_000L); + assertThat(c.numRetries()).isEqualTo(20); + assertThat(c.minWaitMs()).isEqualTo(50L); + assertThat(c.maxWaitMs()).isEqualTo(10_000L); + assertThat(c.totalTimeoutMs()).isEqualTo(500_000L); + } +} diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalogIT.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalogIT.java index 2b23b10d..d80a7880 100644 --- a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalogIT.java +++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalogIT.java @@ -12,12 +12,19 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.altinity.ice.rest.catalog.internal.config.CommitLockConfig; import io.etcd.jetcd.KV; import io.etcd.jetcd.Txn; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.IntStream; import org.apache.iceberg.BaseMetastoreTableOperations; @@ -315,6 +322,40 @@ public void testTableRegister() { assertThat(catalog.dropNamespace(namespace)).isTrue(); } + @Test + public void commitLockSerializesConcurrentAcquires() throws Exception { + CommitLock lock = + new CommitLock( + catalog.etcdClient(), catalog.name(), new CommitLockConfig(true, 30, 60_000)); + TableIdentifier id = TableIdentifier.of(Namespace.of("ns"), "t"); + AtomicInteger concurrent = new AtomicInteger(0); + AtomicInteger maxConcurrent = new AtomicInteger(0); + int threads = 10; + ExecutorService pool = Executors.newFixedThreadPool(threads); + CountDownLatch start = new CountDownLatch(1); + List> futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add( + pool.submit( + () -> { + start.await(); + try (CommitLock.Handle h = lock.acquire(id)) { + int c = concurrent.incrementAndGet(); + maxConcurrent.updateAndGet(m -> Math.max(m, c)); + Thread.sleep(5); + concurrent.decrementAndGet(); + } + return null; + })); + } + start.countDown(); + for (Future f : futures) { + f.get(); + } + pool.shutdown(); + assertThat(maxConcurrent.get()).isEqualTo(1); + } + private static String rand() { return UUID.randomUUID().toString(); } diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapterCreateIT.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapterCreateIT.java new file mode 100644 index 00000000..ebde55a2 --- /dev/null +++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapterCreateIT.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.rest.catalog.internal.rest; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.altinity.ice.rest.catalog.internal.auth.Session; +import com.altinity.ice.rest.catalog.internal.config.Config; +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.Schema; +import org.apache.iceberg.UpdateRequirements; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.rest.HTTPRequest; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Covers staged table create (REST {@code stageCreate} + commit) which must use {@link + * org.apache.iceberg.rest.CatalogHandlers#updateTable} create path instead of {@link + * org.apache.iceberg.catalog.Catalog#loadTable(TableIdentifier)} first. Uses an in-memory {@code + * jdbc:sqlite} metastore (same pattern as {@link + * com.altinity.ice.rest.catalog.RESTCatalogTestBase}). + */ +public class RESTCatalogAdapterCreateIT { + + private static final Schema SCHEMA = + new Schema(Types.NestedField.required(1, "id", Types.LongType.get())); + + private Path warehouseDir; + private Catalog catalog; + private RESTCatalogAdapter adapter; + private Session session; + + @BeforeClass + public void beforeClass() throws IOException { + warehouseDir = Files.createTempDirectory("restcat-create-it-"); + String warehouseUri = warehouseDir.toUri().toString(); + if (!warehouseUri.endsWith("/")) { + warehouseUri = warehouseUri + "/"; + } + Config config = + new Config( + "localhost:8080", + "localhost:8081", + null, + "it", + "jdbc:sqlite::memory:", + warehouseUri, + null, + null, + null, + new Config.AnonymousAccess(true, new Config.AccessConfig(false, null)), + null, + null, + null, + null, + null, + null); + catalog = CatalogUtil.buildIcebergCatalog("it", config.toIcebergConfig(), null); + adapter = new RESTCatalogAdapter(catalog); + session = new Session("it", false, null); + } + + @AfterClass + public void afterClass() throws Exception { + if (catalog instanceof Closeable c) { + c.close(); + } + if (warehouseDir != null) { + try (var walk = Files.walk(warehouseDir)) { + walk.sorted((a, b) -> -a.compareTo(b)) + .forEach( + p -> { + try { + Files.deleteIfExists(p); + } catch (IOException ignored) { + // best-effort cleanup + } + }); + } + } + } + + private static String shortId() { + return UUID.randomUUID().toString().replace("-", "").substring(0, 8); + } + + @Test + public void stagedCreateCommitRegistersTable() { + String nsName = "ns_" + shortId(); + Namespace ns = Namespace.of(nsName); + ((SupportsNamespaces) catalog).createNamespace(ns); + String tableName = "t_" + shortId(); + TableIdentifier ident = TableIdentifier.of(ns, tableName); + + CreateTableRequest stageReq = + CreateTableRequest.builder().withName(tableName).withSchema(SCHEMA).stageCreate().build(); + stageReq.validate(); + + Map createVars = Map.of("namespace", RESTUtil.encodeNamespace(ns)); + LoadTableResponse staged = + adapter.handle(session, Route.CREATE_TABLE, createVars, stageReq, LoadTableResponse.class); + assertThat(staged.tableMetadata()).isNotNull(); + + List updates = staged.tableMetadata().changes(); + UpdateTableRequest commitReq = + UpdateTableRequest.create(ident, UpdateRequirements.forCreateTable(updates), updates); + commitReq.validate(); + + Map updateVars = + Map.of( + "namespace", RESTUtil.encodeNamespace(ns), + "table", RESTUtil.encodeString(tableName)); + LoadTableResponse committed = + adapter.handle(session, Route.UPDATE_TABLE, updateVars, commitReq, LoadTableResponse.class); + assertThat(committed.tableMetadata()).isNotNull(); + assertThat(catalog.tableExists(ident)).isTrue(); + assertThat(catalog.loadTable(ident).schema().sameSchema(SCHEMA)).isTrue(); + } + + @Test + public void stagedCreateCommitDuplicateFails() { + String nsName = "ns_" + shortId(); + Namespace ns = Namespace.of(nsName); + ((SupportsNamespaces) catalog).createNamespace(ns); + String tableName = "t_" + shortId(); + TableIdentifier ident = TableIdentifier.of(ns, tableName); + + CreateTableRequest stageReq = + CreateTableRequest.builder().withName(tableName).withSchema(SCHEMA).stageCreate().build(); + stageReq.validate(); + + Map createVars = Map.of("namespace", RESTUtil.encodeNamespace(ns)); + LoadTableResponse staged = + adapter.handle(session, Route.CREATE_TABLE, createVars, stageReq, LoadTableResponse.class); + + List updates = staged.tableMetadata().changes(); + UpdateTableRequest commitReq = + UpdateTableRequest.create(ident, UpdateRequirements.forCreateTable(updates), updates); + commitReq.validate(); + + Map updateVars = + Map.of( + "namespace", RESTUtil.encodeNamespace(ns), + "table", RESTUtil.encodeString(tableName)); + + adapter.handle(session, Route.UPDATE_TABLE, updateVars, commitReq, LoadTableResponse.class); + assertThat(catalog.tableExists(ident)).isTrue(); + + assertThatThrownBy( + () -> + adapter.handle( + session, Route.UPDATE_TABLE, updateVars, commitReq, LoadTableResponse.class)) + .isInstanceOfAny(AlreadyExistsException.class, CommitFailedException.class); + } + + @Test + public void routeFromMatchesStagedCreatePaths() { + assertThat(Route.from(HTTPRequest.HTTPMethod.POST, "v1/namespaces/ns1/tables").first()) + .isEqualTo(Route.CREATE_TABLE); + assertThat(Route.from(HTTPRequest.HTTPMethod.POST, "v1/namespaces/ns1/tables/t1").first()) + .isEqualTo(Route.UPDATE_TABLE); + } +} diff --git a/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl b/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl index de9184a2..9232cddf 100644 --- a/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl +++ b/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl @@ -30,6 +30,10 @@ echo "OK list-namespaces listed ${NAMESPACE_NAME}" {{ICE_CLI}} --config {{CLI_CONFIG}} describe echo "OK Listed namespaces" +# Explicit create-table (staged create + commit); regression for RESTCatalogAdapter UPDATE_TABLE path +{{ICE_CLI}} --config {{CLI_CONFIG}} create-table ${TABLE_CREATE_VIA_CLI} --schema-from-parquet "file://${INPUT_PATH}" +echo "OK create-table ${TABLE_CREATE_VIA_CLI}" + # Insert from file (like README: ice insert flowers.iris -p file://...) {{ICE_CLI}} --config {{CLI_CONFIG}} insert ${TABLE_IRIS} -p "file://${INPUT_PATH}" echo "Inserted from file into ${TABLE_IRIS}" diff --git a/ice-rest-catalog/src/test/resources/scenarios/basic-operations/scenario.yaml b/ice-rest-catalog/src/test/resources/scenarios/basic-operations/scenario.yaml index e1f9ca56..6d410aea 100644 --- a/ice-rest-catalog/src/test/resources/scenarios/basic-operations/scenario.yaml +++ b/ice-rest-catalog/src/test/resources/scenarios/basic-operations/scenario.yaml @@ -10,6 +10,7 @@ env: NAMESPACE_NAME: "test_ns" INPUT_FILE: "input.parquet" TABLE_IRIS: "test_ns.iris" + TABLE_CREATE_VIA_CLI: "test_ns.iris_create_cli" TABLE_PARTITIONED: "test_ns.taxis_p_by_day" TABLE_SORTED: "test_ns.taxis_s_by_day" TABLE_NO_COPY: "test_ns.iris_no_copy" diff --git a/ice/README.md b/ice/README.md index 69687120..c0410008 100644 --- a/ice/README.md +++ b/ice/README.md @@ -13,6 +13,7 @@ A CLI for loading data into Iceberg REST catalogs. - [Delete Partition](#delete-partition) - [Insert Without Copy](#insert-without-copy) - [Multiple Files](#multiple-files) + - [Parallel inserts and commit retries](#parallel-inserts-and-commit-retries) - [Namespace Management](#namespace-management) - [Inspect](#inspect) - [S3 with Public Data](#s3-with-public-data) @@ -135,6 +136,24 @@ cat filelist | ice insert flowers.iris -p - where `filelist` contains one file path per line. If any file fails, the entire transaction is rolled back. +### Parallel inserts and commit retries + +Several concurrent `ice insert` processes (or high catalog contention) can hit optimistic concurrency: one commit wins and others see `CommitFailedException` with a message like `Requirement failed: branch main has changed`. That means the table moved forward while this client was committing; it is not a data corruption signal. + +By default, `ice insert` performs **outer** commit retries: it reloads table metadata, re-appends the staged data files, and tries again (default **10** rounds within **300000** ms / 5 minutes). Tune with: + +```shell +# disable outer retries (previous behavior) +ice insert ns.table file://a.parquet --commit-retries=0 + +# more headroom under heavy parallel load +ice insert ns.table file://a.parquet --commit-retries=20 --commit-retry-total-ms=600000 +``` + +Note: `--commit-retry-total-ms` only applies when `--commit-retries` is greater than zero. + +If all retries are exhausted, `ice` logs each **orphaned** data file path (uploaded but not registered in the table). You can delete those objects from object storage if you do not plan to retry. + ### Namespace Management ```shell diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index 7d8753fd..7f79483b 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -516,6 +516,18 @@ void insert( description = "Number of threads to use for inserting data", defaultValue = "-1") int threadCount, + @CommandLine.Option( + names = {"--commit-retries"}, + description = + "Outer retry rounds after CommitFailedException (reload metadata and re-append;" + + " set to 0 to disable)", + defaultValue = "10") + int commitRetries, + @CommandLine.Option( + names = {"--commit-retry-total-ms"}, + description = "Total wall-clock budget (ms) for outer commit retries", + defaultValue = "300000") + long commitRetryTotalMs, @CommandLine.Option( names = {"--compression"}, description = @@ -617,6 +629,8 @@ void insert( .sortOrderList(sortOrders) .threadCount( threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount) + .commitRetries(commitRetries) + .commitRetryTotalMs(commitRetryTotalMs) .compression(compression) .build(); diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 858cf042..af7ddc33 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -66,6 +67,8 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.io.CloseableIterable; @@ -125,6 +128,16 @@ public static Result run( } } + if (options.commitRetries() < 0 || options.commitRetryTotalMs() < 0) { + throw new IllegalArgumentException( + "--commit-retries and --commit-retry-total-ms must be non-negative"); + } + if (options.commitRetries() == 0 && options.commitRetryTotalMs() > 0) { + logger.warn( + "--commit-retry-total-ms has no effect when --commit-retries is 0; " + + "set --commit-retries > 0 to enable outer commit retries"); + } + Table table = catalog.loadTable(nsTable); // Create transaction and pass it to updatePartitionAndSortOrderMetadata @@ -183,6 +196,7 @@ public static Result run( // appendOp to use the same transaction. AppendFiles appendOp = txn.newAppend(); + List stagedFiles = new ArrayList<>(); try (FileIO inputIO = Input.newIO(filesExpanded.getFirst(), table, s3ClientLazy); RetryLog retryLog = @@ -241,6 +255,7 @@ public static Result run( for (DataFile df : dataFiles) { atLeastOneFileAppended = true; appendOp.appendFile(df); // Only main thread appends now + stagedFiles.add(df); } } catch (ExecutionException e) { failed++; @@ -257,17 +272,34 @@ public static Result run( if (!options.noCommit()) { // TODO: log if (atLeastOneFileAppended) { - appendOp.commit(); + try { + appendOp.commit(); + txn.commitTransaction(); + verifyCommitOrThrow(catalog, nsTable, stagedFiles); + } catch (CommitFailedException e) { + logger.error("CommitFailedException"); + commitWithRetryAfterInitialFailure(catalog, nsTable, stagedFiles, options, e); + } catch (CommitStateUnknownException e) { + logCommitOrphans(stagedFiles); + throw new IOException( + "DATA LOSS RISK: commit state unknown; " + + stagedFiles.size() + + " data file(s) uploaded; manual reconciliation required", + e); + } catch (RuntimeException e) { + logCommitOrphans(stagedFiles); + throw new IOException( + "DATA LOSS: unexpected error during commit; " + + stagedFiles.size() + + " data file(s) may NOT be registered", + e); + } + if (retryLog != null) { + retryLog.commit(); + } } else { logger.warn("Table commit skipped (no files to append)"); } - if (retryLog != null) { - retryLog.commit(); - } - if (atLeastOneFileAppended) { - // Commit transaction. - txn.commitTransaction(); - } } else { logger.warn("Table commit skipped (--no-commit)"); } @@ -840,6 +872,151 @@ public String get(PartitionSpec spec, StructLike partitionData, String file) { } } + private static void sleepCommitRetryBackoff(int round) throws InterruptedException { + int shift = Math.min(round - 1, 20); + long capMs = Math.min(100L << shift, 30_000L); + long sleepMs = ThreadLocalRandom.current().nextLong(0, capMs + 1); + Thread.sleep(sleepMs); + } + + private static void logCommitOrphans(List stagedFiles) { + for (DataFile df : stagedFiles) { + logger.error( + "DATA LOSS: orphaned data file after exhausted commit retries: {}", df.location()); + } + } + + /** + * Ensures every staged data file appears in the table after commit (detects server-side no-op + * commits where HTTP succeeds but metadata did not change). + */ + private static void verifyCommitOrThrow( + RESTCatalog catalog, TableIdentifier tableId, List stagedFiles) throws IOException { + if (stagedFiles.isEmpty()) { + return; + } + Set committed = new HashSet<>(); + Table fresh = catalog.loadTable(tableId); + try (var plan = fresh.newScan().planFiles()) { + for (var task : plan) { + committed.add(task.file().location()); + } + } + List missing = + stagedFiles.stream() + .filter(df -> !committed.contains(df.location())) + .collect(Collectors.toList()); + if (!missing.isEmpty()) { + for (DataFile df : missing) { + logger.error( + "DATA LOSS: post-commit verification failed; staged file not present in table: {}", + df.location()); + } + throw new IOException( + "DATA LOSS: " + + missing.size() + + " of " + + stagedFiles.size() + + " staged file(s) NOT found in table after commit; " + + "client believed commit succeeded but server may have early-returned (empty metadata changes)"); + } + } + + /** + * Retries transaction commit after {@link CommitFailedException} by reloading table metadata and + * re-appending staged {@link DataFile}s (fixes stale snapshot requirements under contention). + */ + private static void commitWithRetryAfterInitialFailure( + RESTCatalog catalog, + TableIdentifier tableId, + List stagedFiles, + Options options, + CommitFailedException initialFailure) + throws IOException, InterruptedException { + + logger.warn( + "Outer commit retry engaged for {} after initial CommitFailedException " + + "(commit-retries={}, commit-retry-total-ms={}, stagedFiles={}): {}", + tableId, + options.commitRetries(), + options.commitRetryTotalMs(), + stagedFiles.size(), + initialFailure.getMessage()); + + if (options.commitRetries() == 0) { + logCommitOrphans(stagedFiles); + throw new IOException( + "DATA LOSS: commit failed (--commit-retries is 0); data file(s) may not be registered", + initialFailure); + } + + long deadlineNs = + System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(options.commitRetryTotalMs()); + CommitFailedException last = initialFailure; + + for (int round = 1; round <= options.commitRetries(); round++) { + if (System.nanoTime() > deadlineNs) { + logger.warn( + "Commit retry wall-clock budget ({} ms) exhausted before completing outer retry round " + + "{}/{}", + options.commitRetryTotalMs(), + round, + options.commitRetries()); + break; + } + + sleepCommitRetryBackoff(round); + + if (System.nanoTime() > deadlineNs) { + break; + } + + try { + Table fresh = catalog.loadTable(tableId); + Transaction freshTxn = fresh.newTransaction(); + AppendFiles freshAppend = freshTxn.newAppend(); + for (DataFile df : stagedFiles) { + freshAppend.appendFile(df); + } + freshAppend.commit(); + freshTxn.commitTransaction(); + verifyCommitOrThrow(catalog, tableId, stagedFiles); + return; + } catch (CommitFailedException retryEx) { + last = retryEx; + logger.warn( + "Commit retry {}/{} failed: {}", round, options.commitRetries(), retryEx.getMessage()); + } catch (IOException verifyEx) { + logCommitOrphans(stagedFiles); + throw new IOException( + String.format( + "DATA LOSS: post-commit verification failed during outer retry round %d/%d; " + + "halting retries to avoid duplicate appends. Manual reconciliation required.", + round, options.commitRetries()), + verifyEx); + } catch (CommitStateUnknownException e) { + logCommitOrphans(stagedFiles); + throw new IOException( + String.format( + "DATA LOSS RISK: commit state unknown during outer retry round %d/%d; manual reconciliation required", + round, options.commitRetries()), + e); + } catch (RuntimeException e) { + last = new CommitFailedException(e, "Commit retry failed unexpectedly"); + logger.warn("Commit retry {}/{} failed unexpectedly", round, options.commitRetries(), e); + } + } + + logCommitOrphans(stagedFiles); + throw new IOException( + "DATA LOSS: commit failed after " + + options.commitRetries() + + " outer commit retries. " + + stagedFiles.size() + + " data file(s) uploaded but NOT registered in the table.", + last); + } + public record Options( DataFileNamingStrategy.Name dataFileNamingStrategy, boolean skipDuplicates, @@ -857,7 +1034,9 @@ public record Options( @Nullable List partitionList, @Nullable List sortOrderList, int threadCount, - @Nullable String compression) { + @Nullable String compression, + int commitRetries, + long commitRetryTotalMs) { public static Builder builder() { return new Builder(); @@ -881,6 +1060,8 @@ public static final class Builder { private List sortOrderList = List.of(); private int threadCount = Runtime.getRuntime().availableProcessors(); private String compression; + private int commitRetries = 10; + private long commitRetryTotalMs = 300_000L; private Builder() {} @@ -969,6 +1150,16 @@ public Builder compression(String compression) { return this; } + public Builder commitRetries(int commitRetries) { + this.commitRetries = commitRetries; + return this; + } + + public Builder commitRetryTotalMs(long commitRetryTotalMs) { + this.commitRetryTotalMs = commitRetryTotalMs; + return this; + } + public Options build() { return new Options( dataFileNamingStrategy, @@ -987,7 +1178,9 @@ public Options build() { partitionList, sortOrderList, threadCount, - compression); + compression, + commitRetries, + commitRetryTotalMs); } } } diff --git a/ice/src/main/java/com/altinity/ice/internal/logback/ColorAwarePatternLayout.java b/ice/src/main/java/com/altinity/ice/internal/logback/ColorAwarePatternLayout.java index 4ca5b19f..5122ce1b 100644 --- a/ice/src/main/java/com/altinity/ice/internal/logback/ColorAwarePatternLayout.java +++ b/ice/src/main/java/com/altinity/ice/internal/logback/ColorAwarePatternLayout.java @@ -21,6 +21,10 @@ public class ColorAwarePatternLayout extends PatternLayout { static { + // Logback 1.5.x resolves patterns from DEFAULT_CONVERTER_SUPPLIER_MAP; %processId is not + // built into logback-classic 1.5.18, so we register our own. + DEFAULT_CONVERTER_SUPPLIER_MAP.put("processId", ProcessIdClassicConverter::new); + DEFAULT_CONVERTER_MAP.put("processId", ProcessIdClassicConverter.class.getName()); if (!CommandLine.Help.Ansi.AUTO.enabled()) { // Usage of Picocli heuristic DEFAULT_CONVERTER_MAP.put("black", NoColorConverter.class.getName()); DEFAULT_CONVERTER_MAP.put("red", NoColorConverter.class.getName()); diff --git a/ice/src/main/resources/logback.xml b/ice/src/main/resources/logback.xml index d731bd8f..27fc8463 100644 --- a/ice/src/main/resources/logback.xml +++ b/ice/src/main/resources/logback.xml @@ -33,7 +33,7 @@ - %gray(%d{yyyy-MM-dd HH:mm:ss} [%.11thread]) %highlight(%-4level) %gray(%logger{27} >) %X{msgContext}%msg%n%replace(%ex{full, + %gray(%d{yyyy-MM-dd HH:mm:ss} [%.11thread/%processId]) %highlight(%-4level) %gray(%logger{27} >) %X{msgContext}%msg%n%replace(%ex{full, org.eclipse.jetty, jakarta.servlet, software.amazon.awssdk.core.internal, From 02e749b1f588e2cbd57170cd16e669a2a3f24b58 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Fri, 8 May 2026 13:16:48 -0500 Subject: [PATCH 2/4] Added ProcessIdClassicConverter to display processId in the logs --- .../logback/ProcessIdClassicConverter.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 ice/src/main/java/com/altinity/ice/internal/logback/ProcessIdClassicConverter.java diff --git a/ice/src/main/java/com/altinity/ice/internal/logback/ProcessIdClassicConverter.java b/ice/src/main/java/com/altinity/ice/internal/logback/ProcessIdClassicConverter.java new file mode 100644 index 00000000..81ce3027 --- /dev/null +++ b/ice/src/main/java/com/altinity/ice/internal/logback/ProcessIdClassicConverter.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.internal.logback; + +import ch.qos.logback.classic.pattern.ClassicConverter; +import ch.qos.logback.classic.spi.ILoggingEvent; + +/** + * Emits the JVM process id for use in log patterns as {@code %processId}. Logback does not ship + * this conversion word in {@code logback-classic} 1.5.x; we register it from {@link + * ColorAwarePatternLayout}. + */ +public class ProcessIdClassicConverter extends ClassicConverter { + + private static final String PID = String.valueOf(ProcessHandle.current().pid()); + + @Override + public String convert(ILoggingEvent event) { + return PID; + } +} From 309ffe08e1f8508d6d4136d6d9e73e10fd041873 Mon Sep 17 00:00:00 2001 From: Andrew Xie Date: Fri, 8 May 2026 15:45:54 -0400 Subject: [PATCH 3/4] Fix failing scenario test --- .../src/test/resources/scenarios/basic-operations/run.sh.tmpl | 1 + 1 file changed, 1 insertion(+) diff --git a/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl b/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl index 9232cddf..d82a93e6 100644 --- a/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl +++ b/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl @@ -217,6 +217,7 @@ echo "OK list-tables listed tables in ${NAMESPACE_NAME}" {{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_IRIS} {{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_PARTITIONED} {{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_SORTED} +{{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_CREATE_VIA_CLI} echo "OK Deleted tables" # Delete the namespace via CLI From b73d8db03769c25d45199935f76824b5b24c3109 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Fri, 8 May 2026 15:06:39 -0500 Subject: [PATCH 4/4] Rolled back changes to example ice-rest-catalog --- examples/scratch/.ice-rest-catalog.yaml | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/examples/scratch/.ice-rest-catalog.yaml b/examples/scratch/.ice-rest-catalog.yaml index ec18536a..ea47bfe9 100644 --- a/examples/scratch/.ice-rest-catalog.yaml +++ b/examples/scratch/.ice-rest-catalog.yaml @@ -1,27 +1,20 @@ -#uri: jdbc:sqlite:file:data/ice-rest-catalog/db.sqlite?journal_mode=WAL&synchronous=OFF&journal_size_limit=500 +uri: jdbc:sqlite:file:data/ice-rest-catalog/db.sqlite?journal_mode=WAL&synchronous=OFF&journal_size_limit=500 # To use etcd instead of sqlite, start etcd with `etcd --data-dir=data/etcd`, then uncomment the line below -uri: etcd:http://localhost:2379 +#uri: etcd:http://localhost:2379 #uri: etcd:http://127.0.0.1:12379,http://127.0.0.1:12479,http://127.0.0.1:12579 -warehouse: s3://altiound-op3z9pa3-iceberg/ -#warehouse: warehouse: s3://bucket1 -commitRetry: - numRetries: 50 - totalTimeoutMs: 5000 - -commitLock: - enabled: true - leaseTtlSeconds: 30 - acquireTimeoutMs: 30000 +warehouse: s3://bucket1 s3: - endpoint: https://s3.us-west-2.amazonaws.com + endpoint: http://localhost:9000 pathStyleAccess: true - region: us-west-2 + accessKeyID: miniouser + secretAccessKey: miniopassword + region: minio bearerTokens: - value: foo anonymousAccess: enabled: true - accessConfig: {} + accessConfig: {} \ No newline at end of file