From 32afe0337b2d21974592439a507b17dc855360d0 Mon Sep 17 00:00:00 2001 From: hg-ms <53219833+hg-ms@users.noreply.github.com> Date: Mon, 4 May 2026 14:23:50 +0200 Subject: [PATCH] fixing #405 and #414 fixing #405 and #414 by sync on object registry only --- .../binary/types/BinaryStorer.java | 378 +++++++++--------- .../persistence/types/PersistenceManager.java | 6 + .../types/PersistenceObjectManager.java | 22 +- 3 files changed, 208 insertions(+), 198 deletions(-) diff --git a/persistence/binary/src/main/java/org/eclipse/serializer/persistence/binary/types/BinaryStorer.java b/persistence/binary/src/main/java/org/eclipse/serializer/persistence/binary/types/BinaryStorer.java index a247c355..8cf8c862 100644 --- a/persistence/binary/src/main/java/org/eclipse/serializer/persistence/binary/types/BinaryStorer.java +++ b/persistence/binary/src/main/java/org/eclipse/serializer/persistence/binary/types/BinaryStorer.java @@ -101,17 +101,22 @@ protected static int defaultSlotSize() /* * Concurrency / thread-safety concept: - * - head is the internal mutex instance since it hints to the mutable state but is final and immutable itself. - * - lock order/hierarchy must always be: - * 1.) if applicable: in ObjectManager instance lock on objectRegistry - * 2.) lock on this.head - * Should this order ever reverse anywhere, it will be a deadlock race condition! - * This is also the reason for the internal mutex instead of using this directly: - * Outside logic could lock the storer first, reversing the lock order. - * - A storer instance is never meant to be used in a mutating fashion by more than one thread - * at any given moment. The concurrency logic is only meant to synchronize reading accesses - * from other storer instances of other threads for doing the + * - The storer uses the parent ObjectManager's registry monitor (objectRegistryMonitor) + * as its single internal lock. Using the registry as the storer's own lock collapses the + * former two-level hierarchy (objectRegistry -> this.head) into one, which makes the lock + * order structural instead of convention-based: any storer state mutation is performed + * inside the registry monitor, so peer threads in synchCheckLocalRegistries (already + * holding the registry) cannot face a lock-order inversion when reading a foreign + * storer's state. Java synchronization is reentrant, so the recursion through + * ensureObjectId during typeHandler.store(...) does not self-deadlock. + * - The head Item below is no longer used as a monitor; it is just the sentinel start of + * the linked item chain. + * - A storer instance is never meant to be used in a mutating fashion by more than one + * thread at any given moment. The locking is only there to synchronize reading accesses + * from other storer instances of other threads. */ + final Object objectRegistryMonitor; + final Item head = new Item(null, 0L, null, null); private Item tail; private Item[] hashSlots; @@ -155,15 +160,16 @@ protected Default( ) { super(); - this.objectManager = notNull(objectManager) ; - this.objectRetriever = notNull(objectRetriever) ; - this.typeManager = notNull(typeManager) ; - this.target = notNull(target) ; - this.bufferSizeProvider = notNull(bufferSizeProvider); - this.chunksHashRange = channelCount - 1 ; - this.switchByteOrder = switchByteOrder ; - this.persister = mayNull(persister) ; - + this.objectManager = notNull(objectManager) ; + this.objectRegistryMonitor = objectManager.objectRegistryMonitor(); + this.objectRetriever = notNull(objectRetriever) ; + this.typeManager = notNull(typeManager) ; + this.target = notNull(target) ; + this.bufferSizeProvider = notNull(bufferSizeProvider) ; + this.chunksHashRange = channelCount - 1 ; + this.switchByteOrder = switchByteOrder ; + this.persister = mayNull(persister) ; + this.defaultInitialize(); } @@ -194,7 +200,7 @@ public final long maximumCapacity() @Override public final long currentCapacity() { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { return this.hashSlots.length; } @@ -203,7 +209,7 @@ public final long currentCapacity() @Override public final long size() { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { return this.itemCount; } @@ -282,17 +288,17 @@ private void defaultInitialize() protected void internalInitialize(final int hashLength) { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { this.itemCount = 0; this.hashSlots = new Item[hashLength]; this.hashRange = hashLength - 1; - + // initializing/clearing item chain (this.tail = this.head).next = null; - + this.synchCreateStoringChunksBuffers(); - + // must be clear instead of just reset to avoid memory leaks this.commitListeners.clear(); this.persistenceObjectRegistrationListener.clear(); @@ -328,7 +334,7 @@ private void synchCreateStoringChunksBuffers() @Override public PersistenceStorer ensureCapacity(final long desiredCapacity) { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { if(this.currentCapacity() >= desiredCapacity) { @@ -336,7 +342,7 @@ public PersistenceStorer ensureCapacity(final long desiredCapacity) } this.synchRebuildStoreItems(XHashing.padHashLength(desiredCapacity)); } - + return this; } @@ -502,11 +508,23 @@ protected final void storeItem(final Item item) LazyArg(() -> systemString(item.instance)), LazyArgInContext(STORER_CONTEXT, item.instance) ); - - synchronized(this.head) + + /* + * Look up the chunk under the storer monitor (== objectRegistry) because chunks[] + * is replaced atomically by internalInitialize(). The type handler call is done + * outside the monitor on purpose: typeHandler.store recurses through apply() -> + * register() -> objectManager.ensureObjectId, which re-acquires the same monitor. + * Holding it across the recursive walk would needlessly serialize the entire graph + * traversal of every storer; reentrant acquisitions are cheap, contended ones are not. + * ChunksBuffer is per-storer and only accessed by the owning thread, so no lock is + * needed for the actual write. + */ + final ChunksBuffer chunk; + synchronized(this.objectRegistryMonitor) { - item.typeHandler.store(this.synchLookupChunk(item.oid), item.instance, item.oid, this); + chunk = this.synchLookupChunk(item.oid); } + item.typeHandler.store(chunk, item.instance, item.oid, this); } @Override @@ -538,7 +556,7 @@ public void storeAll(final Iterable instances) @Override public void iterateMergeableEntries(final PersistenceAcceptor iterator) { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { for(Item e = this.head; (e = e.next) != null;) { @@ -547,7 +565,7 @@ public void iterateMergeableEntries(final PersistenceAcceptor iterator) { continue; } - + // mergeable entry iterator.accept(e.oid, e.instance); } @@ -578,42 +596,45 @@ public Object commit() "Committing {} object(s)", LazyArg(this::size) // use lazy here, #size() locks ); - + // isEmpty locks internally if(!this.isEmpty()) { // must validate here, too, in case the WriteController disabled writing during the storer's existence. this.target.validateIsStoringEnabled(); - + final Binary writeData; - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { this.typeManager.checkForPendingRootInstances(); this.typeManager.checkForPendingRootsStoring(this); writeData = this.synchComplete(); } - + // very costly IO-operation does not need to occupy the lock this.target.write(writeData); - - synchronized(this.head) - { - this.typeManager.clearStorePendingRoots(); - this.objectManager.mergeEntries(this); - } + + /* + * mergeEntries acquires the object registry, which is the same monitor we use as + * the storer's own lock. iterateMergeableEntries (called inside) reacquires it + * recursively — fine, Java synchronization is reentrant. clearStorePendingRoots + * is a plain field write on the type manager, called only from this thread. + */ + this.typeManager.clearStorePendingRoots(); + this.objectManager.mergeEntries(this); } this.notifyCommitListeners(); this.clear(); - + logger.debug("Commit finished successfully"); - + // not used (yet?) return null; } public final long lookupOid(final Object object) { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { for(Item e = this.hashSlots[identityHashCode(object) & this.hashRange]; e != null; e = e.link) { @@ -640,7 +661,7 @@ public final long lookupObjectId( final PersistenceTypeHandler optionalHandler ) { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { for(Item e = this.hashSlots[identityHashCode(object) & this.hashRange]; e != null; e = e.link) { @@ -678,8 +699,8 @@ public final void registerGuaranteed( ); this.persistenceObjectRegistrationListener.forEach(c -> c.onObjectRegistration(objectId, instance)); - - synchronized(this.head) + + synchronized(this.objectRegistryMonitor) { // ensure handler (or fail if type is not persistable) before ensuring an OID. final PersistenceTypeHandler typeHandler = optionalHandler != null @@ -759,7 +780,7 @@ public final boolean skipNulled(final Object instance) final boolean internalSkip(final Object instance, final long objectId) { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { // lookup returns -1 on failure, so 0 is a valid lookup result. Main reason for -1 vs. 0 distinction! if(Swizzling.isNotFoundId(this.lookupOid(instance))) @@ -768,7 +789,7 @@ final boolean internalSkip(final Object instance, final long objectId) this.synchRegisterObjectId(instance, null, objectId); return true; } - + // already locally present (found), do nothing. return false; } @@ -1008,51 +1029,16 @@ public final class Batching extends Default implements BatchStorer private volatile boolean closed ; /* - * Concurrency note — why this.head, and why holding it across ensureObjectId* is safe here. - * - * Background: Default documents the canonical lock order as - * 1) objectRegistry (acquired inside ObjectManager.ensureObjectId*) - * 2) this.head - * and warns at registerGuaranteed(Object) that "ensureObjectId may never be called under - * a storer lock or a deadlock might happen!" — i.e. Default itself never holds this.head - * while calling back into ensureObjectId*, precisely so two threads operating on different - * storers cannot form a head↔objectRegistry cycle. - * - * Batching deliberately inverts that: every public mutator wraps its work in - * synchronized(this.head) and the registration path (registerGuaranteed → ensureObjectId*) - * runs while this.head is already held. This is normally forbidden, but is safe in this - * subclass for the following combined reasons: - * - * - Single-thread-per-storer invariant (Default field comment, lines ~111-113): - * "A storer instance is never meant to be used in a mutating fashion by more than one - * thread at any given moment." Batching extends a single-writer storer; its lock is not - * meant to coordinate concurrent writers but to provide an atomic boundary for the - * close-check / register / processItems / optFlush sequence and for the background - * flush thread. + * commitLock serialises concurrent commit() calls (background flush thread vs. explicit + * user flush/commit) and the storeItem-into-chunks vs. commit's synchComplete/clear pair. + * Without it, a background flush could complete & clear chunks while a store thread is + * still writing items into them. * - * - Reentrancy: synchronized(this.head) is reentrant, so callbacks from - * ObjectManager.ensureObjectIdGuaranteedRegister(...) that re-enter this storer - * (registerGuaranteed(long, Object, ...), synchRegisterObjectId, ...) acquire the same - * monitor without contention. - * - * - No cross-storer head acquisition: the registry callbacks always target the calling - * storer (passed as objectIdRequestor = this); they never attempt to lock another - * storer's head. The classic A-holds-headA-wants-objectRegistry / B-holds-objectRegistry- - * wants-headA cycle therefore cannot form, because no thread ever wants a foreign storer's - * head while holding the registry. - * - * - this.head, not synchronized(this): exposing the storer instance as a monitor would let - * outside code lock it first and genuinely reverse the order vs. ObjectManager. Using the - * package-private head field keeps the monitor private to this class hierarchy. - * - * Why this matters for the override shape: each Batching mutator (internalStore, - * store(Object, long), forceRootStore, ...) keeps the closed-check, registration, - * processItems and optFlush inside one synchronized(this.head) block on purpose. Splitting - * the lock around the ensureObjectId* call (as a naive reading of Default's warning would - * suggest) would re-open windows where the background flush, close(), or another mutator - * could observe partially-applied state — and would not eliminate any real deadlock, - * because the conditions above already preclude one. + * Lock order: commitLock -> objectRegistryMonitor (the storer's only inner lock). + * Nothing acquires objectRegistryMonitor first and then waits on commitLock, so the order + * is one-way and cannot cycle. */ + private final Object commitLock = new Object(); Batching( final PersistenceObjectManager objectManager , @@ -1112,28 +1098,19 @@ protected boolean deduplicateChunkEntities() @Override public long store(final Object instance, final long objectId) { - synchronized(this.head) + logger.debug( + "Store request: {}({}) with ID {}", + LazyArg(() -> systemString(instance)), + LazyArgInContext(STORER_CONTEXT, instance), + objectId + ); + + synchronized(this.commitLock) { if(this.closed) { throw new IllegalStateException("BatchStorer is already closed."); } - - logger.debug( - "Store request: {}({}) with ID {}", - LazyArg(() -> systemString(instance)), - LazyArgInContext(STORER_CONTEXT, instance), - objectId - ); - - /* - * Force re-registration at the explicitly supplied objectId to capture - * the instance's current state. Default.internalStore(root, objectId) - * returns early when the instance is already locally registered, which - * would silently skip the re-serialization and violate the documented - * BatchStorer contract ("always re-serializes explicitly passed root - * instances"). Child graph traversal still uses lazy semantics via apply(). - */ this.registerGuaranteed(objectId, instance, null); if(!this.isProcessingItems) @@ -1141,81 +1118,48 @@ public long store(final Object instance, final long objectId) try { this.isProcessingItems = true; - this.processItems(); + super.processItems(); } finally { this.isProcessingItems = false; } } + } - // Default.internalStore(root, objectId) does not call optFlush(), - // so invoke it here to match the internalStore(Object) path and - // ensure size/time-based flush controllers trigger consistently. - this.optFlush(); + this.optFlush(); - return objectId; - } + return objectId; } @Override public void storeAll(final Iterable instances) { - synchronized(this.head) - { - super.storeAll(instances); - } + // No outer lock: each internalStore() call handles its own commitLock acquisition + // and ends with optFlush() outside the lock. + super.storeAll(instances); } @Override public void forceRootStore(final PersistenceRoots pendingStoreRoot) { - synchronized(this.head) + synchronized(this.commitLock) { if(this.closed) { throw new IllegalStateException("BatchStorer is already closed."); } - super.forceRootStore(pendingStoreRoot); - // Default.forceRootStore does not call optFlush(), so invoke - // it here to match the internalStore(Object) path and ensure - // forced root updates participate in flush policy consistently. - this.optFlush(); - } - } - - @Override - public PersistenceStorer reinitialize() - { - synchronized(this.head) - { - return super.reinitialize(); - } - } - - @Override - public PersistenceStorer reinitialize(final long initialCapacity) - { - synchronized(this.head) - { - return super.reinitialize(initialCapacity); - } - } - - @Override - public PersistenceStorer ensureCapacity(final long desiredCapacity) - { - synchronized(this.head) - { - return super.ensureCapacity(desiredCapacity); } + this.optFlush(); } @Override public void registerCommitListener(final PersistenceCommitListener listener) { - synchronized(this.head) + // super stores into a non-thread-safe BulkList; serialise the add against concurrent + // listener registrations and against commit's notifyCommitListeners iteration. + synchronized(this.objectRegistryMonitor) { super.registerCommitListener(listener); } @@ -1224,7 +1168,7 @@ public void registerCommitListener(final PersistenceCommitListener listener) @Override public void registerRegistrationListener(final PersistenceObjectRegistrationListener listener) { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { super.registerRegistrationListener(listener); } @@ -1233,29 +1177,39 @@ public void registerRegistrationListener(final PersistenceObjectRegistrationList @Override protected long internalStore(final Object root) { - synchronized(this.head) + logger.debug( + "Store request: {}({})", + LazyArg(() -> systemString(root)), + LazyArgInContext(STORER_CONTEXT, root) + ); + + /* + * commitLock serialises registration + item processing against commit/clear so that + * a background flush cannot drain & clear chunks while items are still being written + * into them. Inside, lookups and registrations briefly acquire objectRegistryMonitor + * (the storer's only inner lock); recursion through ensureObjectId reacquires it. + * + * Lock order: commitLock -> objectRegistryMonitor. + * optFlush() runs after the lock is released so internalFlush()'s commit can take + * commitLock without holding it across the optFlush decision. + */ + final long rootOid; + synchronized(this.commitLock) { if(this.closed) { throw new IllegalStateException("BatchStorer is already closed."); } - logger.debug( - "Store request: {}({})", - LazyArg(() -> systemString(root)), - LazyArgInContext(STORER_CONTEXT, root) - ); + final long localOid = this.lookupOid(root); /* - * Unlike the default lazy storer, a batch storer always re-registers - * explicitly passed root instances to capture their current state. - * Child graph traversal still uses lazy semantics (via apply()) — - * children are only stored if not yet in the global registry. + * Always re-register root to capture current state (BatchStorer contract). */ - long rootOid; - if(Swizzling.isFoundId(rootOid = this.lookupOid(root))) + if(Swizzling.isFoundId(localOid)) { - this.registerGuaranteed(rootOid, root, null); + this.registerGuaranteed(localOid, root, null); + rootOid = localOid; } else { @@ -1267,24 +1221,44 @@ protected long internalStore(final Object root) try { this.isProcessingItems = true; - this.processItems(); + super.processItems(); } finally { this.isProcessingItems = false; } } + } - this.optFlush(); + this.optFlush(); - return rootOid; + return rootOid; + } + + @Override + protected void processItems() + { + /* + * commitLock guarantees mutual exclusion between processItems (which writes to chunks + * via storeItem) and commit (which calls synchComplete/clear on the same chunks). + * Without this, the background flush thread could commit/clear while a store thread + * is still writing items, corrupting chunk state. + * + * The internalStore / store(Object,long) / forceRootStore overrides above already + * call this method while holding commitLock; the recursive acquisition here is harmless + * and keeps the invariant explicit for any other call site that may go through + * super.forceRootStore -> this.processItems. + */ + synchronized(this.commitLock) + { + super.processItems(); } } @Override public void clear() { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { super.clear(); this.pendingSinceNanos = 0L; @@ -1294,7 +1268,14 @@ public void clear() @Override public Object commit() { - synchronized(this.head) + /* + * commitLock serialises concurrent commits (background flush thread vs. explicit + * user call) and excludes any in-flight processItems on the same storer. super.commit() + * acquires the storer's inner monitor (objectRegistryMonitor) only for the brief + * synchComplete and the mergeEntries calls; the IO write happens between them with + * no lock held. + */ + synchronized(this.commitLock) { return super.commit(); } @@ -1303,16 +1284,13 @@ public Object commit() @Override public void flush() { - synchronized(this.head) - { - this.internalFlush(); - } + this.internalFlush(); } @Override public boolean hasPendingData() { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { return !this.isEmpty(); } @@ -1321,7 +1299,7 @@ public boolean hasPendingData() @Override public void close() { - synchronized(this.head) + synchronized(this.objectRegistryMonitor) { if(this.closed) { @@ -1348,12 +1326,10 @@ public void close() Thread.currentThread().interrupt(); } - synchronized(this.head) + // Final flush takes commitLock internally via commit(); must not be held here. + if(!this.isEmpty()) { - if(!this.isEmpty()) - { - this.internalFlush(); - } + this.internalFlush(); } } @@ -1371,7 +1347,14 @@ private void backgroundFlush() private void optFlush() { - synchronized(this.head) + /* + * Decide whether to flush under the storer's inner monitor (to read size and + * byte-count atomically), then perform the actual flush outside the lock. + * internalFlush() takes commitLock internally; not holding any lock here keeps + * the lock order single-direction (commitLock -> objectRegistryMonitor). + */ + final boolean doFlush; + synchronized(this.objectRegistryMonitor) { if(this.closed || this.isEmpty()) { @@ -1384,7 +1367,6 @@ private void optFlush() this.pendingSinceNanos = nowNanos; } - // Safety: force flush if any channel approaches the 2 GB storage limit final long maxBytes = this.maxChannelByteCount(); if(maxBytes >= MAX_CHANNEL_BYTES_BEFORE_FLUSH) { @@ -1393,26 +1375,28 @@ private void optFlush() maxBytes, MAX_CHANNEL_BYTES_BEFORE_FLUSH ); - this.internalFlush(); - return; + doFlush = true; } - - if(this.controller.shouldFlush( - this.size(), - TimeUnit.NANOSECONDS.toMillis(nowNanos - this.pendingSinceNanos) - )) + else { - this.internalFlush(); + doFlush = this.controller.shouldFlush( + this.size(), + TimeUnit.NANOSECONDS.toMillis(nowNanos - this.pendingSinceNanos) + ); } } + + if(doFlush) + { + this.internalFlush(); + } } private void internalFlush() { logger.debug("Flushing batch storer with size = {}", this.size()); - + // pendingSinceNanos is reset inside commit() -> Batching.clear(); no reset needed here. this.commit(); - this.pendingSinceNanos = 0L; } } diff --git a/persistence/persistence/src/main/java/org/eclipse/serializer/persistence/types/PersistenceManager.java b/persistence/persistence/src/main/java/org/eclipse/serializer/persistence/types/PersistenceManager.java index e908029d..aea2be3a 100644 --- a/persistence/persistence/src/main/java/org/eclipse/serializer/persistence/types/PersistenceManager.java +++ b/persistence/persistence/src/main/java/org/eclipse/serializer/persistence/types/PersistenceManager.java @@ -414,6 +414,12 @@ public final void mergeEntries(final PersistenceLocalObjectIdRegistry localRe this.objectManager.mergeEntries(localRegistry); } + @Override + public final Object objectRegistryMonitor() + { + return this.objectManager.objectRegistryMonitor(); + } + @Override public final Object get() { diff --git a/persistence/persistence/src/main/java/org/eclipse/serializer/persistence/types/PersistenceObjectManager.java b/persistence/persistence/src/main/java/org/eclipse/serializer/persistence/types/PersistenceObjectManager.java index 7cb8bdde..8cfc1c49 100644 --- a/persistence/persistence/src/main/java/org/eclipse/serializer/persistence/types/PersistenceObjectManager.java +++ b/persistence/persistence/src/main/java/org/eclipse/serializer/persistence/types/PersistenceObjectManager.java @@ -61,9 +61,23 @@ public default PersistenceObjectManager Clone() } public boolean registerLocalRegistry(PersistenceLocalObjectIdRegistry localRegistry); - + public void mergeEntries(PersistenceLocalObjectIdRegistry localRegistry); + /** + * Returns the monitor used by this object manager for synchronization on the object registry. + *

+ * Storer implementations may use this monitor as their own internal lock to make the canonical + * lock order ({@code objectRegistry -> storer-state}) structural rather than convention-based: + * any storer state mutation is always performed inside the registry monitor, so peer threads in + * {@code synchCheckLocalRegistries} (which already hold the registry) cannot face a lock-order + * inversion when reading a foreign storer's state. + *

+ * Java synchronization is reentrant, so nested calls into {@link #ensureObjectId(Object)} and + * related methods do not self-deadlock. + */ + public Object objectRegistryMonitor(); + @@ -139,6 +153,12 @@ public void consolidate() } } + @Override + public Object objectRegistryMonitor() + { + return this.objectRegistry; + } + @Override public long lookupObjectId(final Object object) {