fix(occ): RowDelta uses partition-scoped conflict check instead of AlwaysTrue#983
Conversation
laskoviymishka
left a comment
There was a problem hiding this comment.
Thanks for picking this up — the diagnosis on #978 looks right, and partition-scoped validation is definitely the right direction.
I’d like to hold this version before merge though, mostly because the new validator may be a bit too narrow today. The old AlwaysTrue path was conservative, but safe. This version can miss conflicts under partition-spec evolution, and possibly for partition values like UUID, decimal, binary, or fixed.
The good news is I think this can be fixed without changing the overall direction. The codebase already has validateAddedDataFilesMatchingFilter, which handles per-spec projection, manifest pruning, and type-aware partition evaluation. Could we express the equality-delete partitions as an OR-of-equalities filter and route through that helper? That should avoid the string-key comparison and reuse the existing validation path.
A few things I’d love to see before merge:
- add the #978 reproducer as a regression test
- add same-partition reject / different-partition allow tests
- add one UUID or decimal partition test
- add a partition-spec evolution test
- update the
RowDelta.validatecomments, since they still describe the old conservative behavior - split out the unrelated REST/S3/ancestry changes, so this PR stays focused
The empty-base ancestry fix looks useful too, just probably deserves its own PR.
Overall, I think this is the right line of work — I’d just rather make the new scoped validator as safe as the conservative one before we land it.
| sort.Ints(keys) | ||
| var buf []byte | ||
| for _, k := range keys { | ||
| buf = fmt.Appendf(buf, "%d=%v;", k, p[k]) |
There was a problem hiding this comment.
fmt.Appendf("%d=%v;", k, p[k]) is pretty fragile as an equality oracle here. map[int]any partition values can land as several Go types depending on construction path — uuid.UUID (= [16]byte) vs raw [16]byte, iceberg.DecimalLiteral vs *big.Rat, time.Time with monotonic-clock suffix, etc., see convertAvroValueToIcebergType in manifest.go:1756. Same logical value, different %v, missed match. Also = and ; aren't escaped, so a string value "1=a;2=b" collides with a different two-field tuple.
A silent miss here is worse than the bug being fixed — we'd accept a commit Java would reject. I'd either build an Or(And(...)) partition expression and call the existing validateAddedDataFilesMatchingFilter (which uses iceberg.Literal comparison and handles all of these), or normalize each known type to a canonical hashable form before keying.
There was a problem hiding this comment.
partitionTupleKey has been removed entirely. The new anyToLiteral helper uses a type switch over all iceberg.LiteralType values — bool, int32, int64, float32, float64, string, []byte, Date, Time, Timestamp, TimestampNano, Decimal, uuid.UUID — to produce typed iceberg.Literal values with stable equality semantics. No more fmt.Sprintf("%v") instability for UUID/decimal/binary. The =/; injection issue is gone — string-keyed maps are no longer used at all.
| if e.Status() != iceberg.EntryStatusADDED || e.SnapshotID() != snap.SnapshotID { | ||
| continue | ||
| } | ||
| if _, ok := partSet[partitionTupleKey(e.DataFile().Partition())]; ok { |
There was a problem hiding this comment.
The lookup compares only (field_id, value) and never consults mf.PartitionSpecID(). After a partition-spec evolution (renamed field, identity → bucket, day → hour), the same logical row gets written under a different tuple — eq-delete bound to spec A might be {1: "hot"} while concurrent data under spec B is {1: 3} (bucketed) or {2: "hot"} (renamed). Tuples never match, real conflict missed.
The sibling helper at lines 343-434 handles this via buildPartitionProjection(specID, ...) keyed per spec id. I'd at minimum key on (specID, tuple) and use mf.PartitionSpecID() plus the eq-delete file's SpecID(). Routing through validateAddedDataFilesMatchingFilter with an OR-of-equalities filter gets it for free.
There was a problem hiding this comment.
Resolved automatically by routing through validateAddedDataFilesMatchingFilter. That helper calls buildPartitionProjection(specID, ...) keyed per each concurrent manifest's PartitionSpecID, so a concurrent file written under spec B is projected against spec B's fields — not spec A's. The eq-delete filter is expressed in row (source) space using Reference(sourceFieldName), so it projects correctly regardless of which spec the concurrent file was written under.
| // If any eq-delete file is unpartitioned (empty tuple), the delete | ||
| // could affect any row — fall back to the conservative AlwaysTrue check. | ||
| for _, p := range eqDeletePartitions { | ||
| if len(p) == 0 { |
There was a problem hiding this comment.
I'd hoist this fallback to the caller. The function name says "InPartitions", but a single empty input element silently turns it into AlwaysTrue across the whole table — leaky. Also "empty per-file partition tuple" is a noisy proxy for "unpartitioned table": an eq-delete file with an unset partition map on a partitioned table (easy to do via NewDataFileBuilder without partition data) silently nullifies the optimization. The right signal is the spec itself.
Move the decision into RowDelta.validate: if the table's only spec is unpartitioned, call the AlwaysTrue version directly; otherwise the partition-scoped one. Both paths become explicit.
There was a problem hiding this comment.
Moved exactly as suggested. validateNoConflictingDataFilesInPartitions no longer contains any len(p) == 0 fallback. Instead, RowDelta.validate checks currentSpec.NumFields() == 0 before calling anything: if the table is unpartitioned, it calls validateNoConflictingDataFiles(ctx, iceberg.AlwaysTrue{}, level) directly; otherwise it calls validateNoConflictingDataFilesInPartitions. Both paths are now explicit at the caller.
| continue | ||
| } | ||
| if _, ok := partSet[partitionTupleKey(e.DataFile().Partition())]; ok { | ||
| return fmt.Errorf("%w: snapshot %d added data file %s in eq-delete partition", |
There was a problem hiding this comment.
The sibling on line 404 is "snapshot %d added data file %s matching filter %s" — includes the filter that triggered the match. This one says "in eq-delete partition" without saying which one. With multiple eq-delete files spanning multiple partitions, an operator triaging this can't tell which one conflicted without reading manifests by hand.
return fmt.Errorf("%w: snapshot %d added data file %s in partition %v overlapping eq-delete",
ErrConflictingDataFiles, snap.SnapshotID, e.DataFile().FilePath(), e.DataFile().Partition())There was a problem hiding this comment.
Moot — the custom error site no longer exists. By routing through validateAddedDataFilesMatchingFilter, the existing error message "snapshot %d added data file %s matching filter %s" is produced, which includes the full filter expression. No separate fix needed.
| // Empty partition tuple → unpartitioned eq-delete → must fall back to AlwaysTrue. | ||
| // With no concurrent snapshots in ctx the AlwaysTrue path returns nil (no-op). | ||
| emptyPartition := map[int]any{} | ||
| require.NoError(t, validateNoConflictingDataFilesInPartitions(ctx, []map[int]any{emptyPartition}, IsolationSerializable)) |
There was a problem hiding this comment.
This test doesn't actually exercise the fallback. newConflictContext(meta, meta, ...) produces ctx.concurrent = [] because base and current point at the same head, so the validator short-circuits at len(ctx.concurrent) == 0 before reaching the empty-tuple loop. The comment in the test admits it: "With no concurrent snapshots in ctx the AlwaysTrue path returns nil (no-op)" — it'd still pass if we deleted the fallback entirely.
I'd build a context with a real concurrent snapshot (mirror TestNewConflictContext_WriterHasNoBranchView — base = newConflictTestMetadata(t, nil), current = newConflictTestMetadata(t, &head)), then assert the fallback is reached, either by injecting a fake iceio.IO and observing the manifest fetch, or by an end-to-end fixture where AlwaysTrue would correctly flag a conflict the partition-scoped check would miss.
There was a problem hiding this comment.
The old trivial-pass test is removed. It is replaced by TestRowDeltaValidate_UnpartitionedTableFallsBackToAlwaysTrue, which builds a real table with an empty partition spec, writes a concurrent snapshot with a real data file manifest, and asserts the commit is rejected with ErrConflictingDataFiles. This directly proves the AlwaysTrue path is active and wired — not just that it compiles.
| // With no concurrent snapshots in ctx the AlwaysTrue path returns nil (no-op). | ||
| emptyPartition := map[int]any{} | ||
| require.NoError(t, validateNoConflictingDataFilesInPartitions(ctx, []map[int]any{emptyPartition}, IsolationSerializable)) | ||
| } |
There was a problem hiding this comment.
The #978 reproducer (TestBugRepro_RowDeltaFalseConflictDifferentPartition from the issue body) is the test that would fail on main and pass on this branch — it's the regression guard for the actual contract of this PR, and it isn't here.
A few I'd want at minimum:
- the Bug(table): RowDelta.validate uses AlwaysTrue filter — false conflicts for concurrent appends to different partitions #978 reproducer as-is — different-partition allowed
- same-partition rejected — without this, a future change to
partitionTupleKeycould silently degrade the validator into a no-op with no signal - UUID or decimal partition column, same-partition rejected — would fail today against
%vkeying, that's the point - spec evolution — eq-delete bound to spec A, concurrent data under spec B (renamed field or transform change), will fail today
- a genuinely-unpartitioned table replacing the trivial-pass test above
The harness exists — conflict_validation_test.go builds metadata with concurrent snapshots and row_delta_test.go builds eq-delete files. They just need to be wired together.
There was a problem hiding this comment.
All five requested cases are now present in table/partition_conflict_test.go:
TestRowDeltaValidate_DifferentPartitionAllowed— Bug(table): RowDelta.validate uses AlwaysTrue filter — false conflicts for concurrent appends to different partitions #978 reproducer:eu-west-1concurrent +us-east-1eq-delete → no conflictTestRowDeltaValidate_SamePartitionRejected—us-east-1concurrent +us-east-1eq-delete → rejectedTestRowDeltaValidate_UUIDPartitionSameRejected/TestRowDeltaValidate_UUIDPartitionDifferentAllowed— UUID partition type safety (these would fail against%vkeying)TestRowDeltaValidate_UnpartitionedTableFallsBackToAlwaysTrue— replaces the trivial-pass testTestRowDeltaValidate_SpecEvolutionConflictDetected— eq-delete written under spec A (identity(region), partitionFieldID=1000), concurrent data written under spec B (renamed toregion_v2, partitionFieldID=1001, same source field ID=2), both with value"us-east-1".eqDeletePartitionsToFilterbuildsReference("region") == "us-east-1"in row space;validateAddedDataFilesMatchingFilterprojects it against spec B viabuildPartitionProjection→ conflict detected.
| // falls back to AlwaysTrue — the equality delete could affect any row. | ||
| // | ||
| // Under IsolationSnapshot this validator is a no-op. | ||
| func validateNoConflictingDataFilesInPartitions(ctx *conflictContext, eqDeletePartitions []map[int]any, level IsolationLevel) error { |
There was a problem hiding this comment.
Bigger-picture: I'd consider folding this into validateAddedDataFilesMatchingFilter rather than adding a sibling. Derive a BooleanExpression from eqDeletePartitions like Or(And(field_a == v_a, field_b == v_b), ...) and pass it to validateNoConflictingDataFiles(ctx, filter, level). That helper already does per-spec projection (spec-evolution correct), manifest-summary pruning (no full-manifest scan when summaries can't match), and type-aware partition evaluation via iceberg.Literal (no UUID/decimal/binary issues).
Resolves the comments on partitionTupleKey and spec-id awareness at the same time, and matches the pattern documented in this file's preamble ("there is one code path that pruning semantics flow through") and Java's MergingSnapshotProducer.validateNoNewDataFiles.
There was a problem hiding this comment.
Done — validateNoConflictingDataFilesInPartitions is not a sibling that walks manifests independently. It calls eqDeletePartitionsToFilter to build an OR(AND(EqualTo(...))) expression, then passes it to validateNoConflictingDataFiles(ctx, filter, level), which internally calls validateAddedDataFilesMatchingFilter. Per-spec projection, manifest-summary pruning, and type-aware partition evaluation all come for free from the existing path.
2af6a47 to
8b617c6
Compare
…idate Replace partitionTupleKey + validateNoConflictingDataFilesInPartitions (old) with eqDeletePartitionsToFilter which builds an OR(AND(EqualTo(...))) BooleanExpression from the eq-delete DataFiles and routes it through validateNoConflictingDataFiles -> validateAddedDataFilesMatchingFilter. Benefits over the removed partitionTupleKey approach: - Type-safe: anyToLiteral uses a type switch over all iceberg.LiteralType values (bool, int32/64, float32/64, string, []byte, Date, Time, Timestamp, TimestampNano, Decimal, uuid.UUID) -- no fmt.Sprintf instability for UUID/decimal/binary partitions. - Spec-evolution correct: the filter is projected per each concurrent manifest's PartitionSpec via buildPartitionProjection, so renamed fields and transform changes do not produce false conflicts. - No key-injection: avoids the =;-separator collision risk in string-keyed maps. - Unpartitioned tables: RowDelta.validate checks PartitionSpec.NumFields() == 0 and passes AlwaysTrue directly; eqDeletePartitionsToFilter is only called for partitioned tables. Updates RowDelta.validate doc comment to describe the partition-scoped behavior. Adds 9 regression and validation tests: - anyToLiteral for all supported types and an unsupported type - Short-circuit under SnapshotIsolation and empty inputs - apache#978 reproducer: different-partition concurrent append is not rejected - Same-partition concurrent append IS rejected - UUID partition: same rejected, different allowed - Unpartitioned table AlwaysTrue fallback detects any concurrent append Fixes apache#978
8b617c6 to
ed11891
Compare
|
All points addressed. Summary of what changed:
|
|
Attached image below The zizmor failure looks pre-existing. A quick look at the run history:
PR #983 was passing fine earlier today. PR #984 (completely unrelated, different author) started failing around the same time at run #217. Looks like something changed in the zizmor action around that point, not anything in this diff. image:
|
laskoviymishka
left a comment
There was a problem hiding this comment.
Great job! My previous concerns look cleanly addressed.
Manifest pruning is back through the existing helper, and the test coverage is much stronger. Direction looks good.
I caught three silent failure modes while tracing the transform/literal paths:
- Non-identity transforms like
bucket,day,hour,truncate: the row-space filter seems to double-transform the literal during projection. I confirmed an eq-delete inbucket=1can matchbucket=4afterBucketTransform.Project. The current spec-evolution test only uses identity, so it misses this. - Decimal partitions:
anyToLiteralmatchesDecimal, but manifest reads return the named typeDecimalLiteral, so reusing a manifest-readDataFilethroughAddDeletesfails for decimal-partitioned tables. - Timestamp nanos: the
TimestampNanobranch looks unreachable because manifest reading has no nanos case, so rawint64matches first.
The transform issue is the one I’d fix before merge, since bucket(...) and day(ts) are very common Iceberg partition transforms and the failure is silent. The decimal and timestamp cases feel smaller and could be follow-ups.
Once the transform path is fixed, ideally with a bucket[N] or day(ts) regression test, I’m happy to take another pass and approve.
| return nil, fmt.Errorf("partition field %q: %w", sourceField.Name, err) | ||
| } | ||
|
|
||
| conjuncts = append(conjuncts, iceberg.LiteralPredicate(iceberg.OpEQ, iceberg.Reference(sourceField.Name), lit)) |
There was a problem hiding this comment.
I think there's a subtle issue here for non-identity transforms — wanted to flag it because the test suite only exercises IdentityTransform so it might not surface naturally.
The literal we pass in (lit) is the post-transform partition value (a bucket index from BucketTransform, days-from-epoch from DayTransform, etc.), but the predicate is anchored against the source column via Reference(sourceField.Name). Downstream, validateAddedDataFilesMatchingFilter calls BucketTransform.Project (transforms.go:366), which does transformLiteral(transformer, p.Literal()) — re-bucketing what's already a bucket index.
Quick repro on BucketTransform{NumBuckets: 16} over int64:
user_id=12345 → bucket=1 (what DataFile.Partition() stores)
user_id=1 → bucket=4 (what BucketTransform.Project produces)
Eq-delete is in bucket 1, projected predicate matches bucket 4 — a concurrent file in bucket 1 silently passes. Same shape for day(ts), hour(ts), truncate[K], year/month. IdentityTransform happens to work because identity(identity(x)) == x, which is why the existing spec-evolution test is green.
A couple of ways to handle, wdyt?
- branch on
pf.Transform: identity → keep this row-space predicate, non-identity → build a partition-space predicate (Reference(pf.Name) == litkeyed by(specID, pf.Name)) evaluated only against same-spec concurrent manifests, with cross-spec as a conservative fallback for that file — close to Java'sPartitionSet. - narrower scope: explicitly reject non-identity transforms here with a clear error and document this as identity-only for now.
A couple of regression tests under bucket[N] and day(ts) would lock the contract either way.
There was a problem hiding this comment.
Fixed. eqDeletePartitionsToFilter now detects non-identity transforms per partition field before building any row-space predicate. If any field in a file's partition spec uses a non-identity transform (bucket, day, hour, truncate, year, month), the function falls back to AlwaysTrue{} for that file — treating the eq-delete as table-wide, which is conservative but always correct. IdentityTransform continues to produce the scoped row-space predicate as before.
The full PartitionSet-style fix (partition-space predicate keyed by (specID, partitionFieldName) evaluated only against same-spec concurrent manifests) is deferred to a follow-up PR.
Two regression tests lock the contract:
TestRowDeltaValidate_BucketTransformFallsBackToAlwaysTrue:bucket[16](user_id)partitioned table — concurrent data in bucket 1, eq-delete in bucket 1 → rejected.TestRowDeltaValidate_DayTransformFallsBackToAlwaysTrue:day(event_ts)partitioned table — concurrent data in day 100, eq-delete in day 200 → still rejected (conservative; AlwaysTrue cannot distinguish days).
| return iceberg.NewLiteral(val), nil | ||
| case iceberg.TimestampNano: | ||
| return iceberg.NewLiteral(val), nil | ||
| case iceberg.Decimal: |
There was a problem hiding this comment.
Tiny type-asymmetry I noticed while tracing the round-trip:
convertAvroValueToIcebergType at manifest.go:1800 returns iceberg.DecimalLiteral{Scale, Val}, which is type DecimalLiteral Decimal — a named type derived from Decimal, not Decimal itself. Go type switches match exact dynamic type, so this arm doesn't fire on a round-tripped value — it falls through to default: with "unsupported partition value type DecimalLiteral".
Since AddDeletes takes iceberg.DataFile, any flow that re-uses a manifest-read file as input would hit this on decimal-partitioned tables. Two options I can think of:
- add
case iceberg.DecimalLiteral:here - normalize
convertAvroValueToIcebergTypeto returnDecimalconsistently with the writer side
Fwiw TestAnyToLiteral_SupportedTypes currently skips Decimal — adding a round-trip case once this is settled would catch any future drift.
There was a problem hiding this comment.
Fixed. Added case iceberg.DecimalLiteral: to anyToLiteral alongside the existing case iceberg.Decimal:. The new arm casts to iceberg.Decimal before wrapping in a Literal, bridging the named-type gap: iceberg.Decimal(val) is a valid conversion because type DecimalLiteral Decimal.
The TestAnyToLiteral_SupportedTypes table now includes a DecimalLiteral subtest (iceberg.DecimalLiteral{Scale: 2}) that would have failed before this fix — it passes now.
Normalizing convertAvroValueToIcebergType to return Decimal consistently is deferred; it touches callers beyond anyToLiteral and deserves its own PR.
| return iceberg.NewLiteral(val), nil | ||
| case iceberg.Timestamp: | ||
| return iceberg.NewLiteral(val), nil | ||
| case iceberg.TimestampNano: |
There was a problem hiding this comment.
Related to the Decimal one above — convertAvroValueToIcebergType (manifest.go:1756-1817) has cases for TimestampMillis and TimestampMicros but none for nanos, so a nanosecond-timestamp partition value arrives here as a raw int64 and matches case int64: first. This case iceberg.TimestampNano: arm ends up unreachable from the read path.
Probably worth either adding a TimestampNanos case to convertAvroValueToIcebergType so reads return iceberg.TimestampNano and this arm fires, or dropping this arm and noting that TimestampNano partitions aren't supported here yet. Same as Decimal, the unit table also skips this case so it's not caught today.
There was a problem hiding this comment.
Fixed. Added case atype.TimestampNanos: to convertAvroValueToIcebergType in manifest.go, following the same pattern as the existing TimestampMillis and TimestampMicros cases. A timestamp-nanos logical type field now returns iceberg.TimestampNano from the manifest reader, making the anyToLiteral arm reachable from the read path.
Added a TimestampNano subtest to TestAnyToLiteral_SupportedTypes to catch future drift. The test passes.
…DeletePartitionsToFilter For non-identity transforms (bucket, day, hour, truncate, year, month), DataFile.Partition() stores post-transform values (e.g. bucket indices, days-since-epoch). Building a row-space predicate from those values and passing it to validateAddedDataFilesMatchingFilter causes the transform to be re-applied downstream, producing wrong matches (double-transformation). For now, detect non-identity transforms and fall back to AlwaysTrue (treat the eq-delete as table-wide), which is conservative but correct. A full PartitionSet-style approach — building a partition-space predicate keyed by (specID, partitionFieldName) evaluated only against same-spec manifests — is deferred to a follow-up PR. Add two regression tests: - TestRowDeltaValidate_BucketTransformFallsBackToAlwaysTrue - TestRowDeltaValidate_DayTransformFallsBackToAlwaysTrue
convertAvroValueToIcebergType returns iceberg.DecimalLiteral{} (a named
type — type DecimalLiteral Decimal), not iceberg.Decimal. Go type switches
match exact dynamic type, so the existing 'case iceberg.Decimal:' arm did
not fire on manifest-read partition values, causing a fallthrough to the
default error branch for decimal-partitioned tables.
Add a 'case iceberg.DecimalLiteral:' arm that casts to Decimal before
wrapping in a Literal. Also add a DecimalLiteral subtest to
TestAnyToLiteral_SupportedTypes to catch future drift.
…eToIcebergType The manifest reader already handled timestamp-millis and timestamp-micros but had no case for timestamp-nanos (atype.TimestampNanos). A partition field with that logical type arrived in anyToLiteral as a raw int64, matching 'case int64:' first and making the 'case iceberg.TimestampNano:' arm unreachable from the read path. Add the missing case following the same pattern as the existing timestamp cases. Also add a TimestampNano subtest to TestAnyToLiteral_SupportedTypes to cover the round-trip.
|
During my integration tests against a real S3-backed Iceberg catalog, I found two pre-existing bugs in
fixedSize := internal.DecimalRequiredBytes(len(dec.String()))
The resulting slice length doesn't match the Avro schema's Repro: write a manifest with a
func (d DecimalLiteral) Type() Type { return DecimalTypeOf(9, d.Scale) }
Both bugs exist in |
|
@mzzz-zzm --> for those new bugs, can you file a separate issues? Thanks for this! It's very usefull |
laskoviymishka
left a comment
There was a problem hiding this comment.
LGTM!
Clean iteration: all three flagged cases addressed with regression tests. The conservative AlwaysTrue fallback for non-identity transforms is the right interim call; worth a follow-up issue to track the full PartitionSet-style partition-space check so bucket/day-partitioned tables eventually get the same scoping benefit.

Fixes #978
Problem
RowDelta.validatepassesiceberg.AlwaysTrue{}tovalidateNoConflictingDataFileswhenever equality-delete files are present. This means any concurrent append to any partition is treated as a conflict, even when it lands in a completely different partition from the equality-deletes. Under serializable isolation this causes spuriousErrConflictingDataFileserrors for workloads that write to multiple independent partitions concurrently.Fix
RowDeltanow collects the partition tuples of all equality-delete files it adds (eqDeletePartitions). A new validator,validateNoConflictingDataFilesInPartitions, checks concurrent data files only in those specific partition tuples:AlwaysTruecheck, preserving existing safety.Files changed
table/row_delta.go: collecteqDeletePartitionsinstead ofhasEqDeletestable/conflict_validation.go:validateNoConflictingDataFilesInPartitions+partitionTupleKeytable/partition_conflict_test.go: unit tests for both new functions