Replace PriorityQueue + Dictionary with IndexedPriorityQueue in Sorte…#1684
Replace PriorityQueue + Dictionary with IndexedPriorityQueue in Sorte…#1684hamdaankhalid wants to merge 14 commits intomicrosoft:devfrom
Conversation
…dSetObject Replace the separate expirationTimes Dictionary and expirationQueue PriorityQueue with a single IndexedPriorityQueue that supports O(log N) in-place priority updates. This eliminates stale entry accumulation in the expiration queue when member TTLs are frequently refreshed. Benefits: - Memory: O(N) always vs O(N + total updates) with stale entries - Dequeue: Always O(log N) vs O(S * log N) when draining stale entries - Remove: O(log N) by key vs not supported - No stale entry validation needed during DeleteExpiredItems Bound DeleteExpiredItems on mutation hot paths (bound=16) to cap worst-case latency per operation. Serialization path (DoSerialize) remains unbounded to drain all expired entries before checkpoint. Correctness is preserved because every read path independently filters expired members via IsExpired(). Add IndexedPriorityQueue<TElement, TPriority> to Garnet.common with: - IEqualityComparer<TElement> support for byte[] content matching - EnqueueOrUpdate, Dequeue, TryPeek, TryGetPriority, TryRemove, ChangePriority, Exists - Automatic grow/shrink of backing array Add 25 unit tests covering basic ops, in-place updates, custom comparers, removal, and stress/ordering. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR replaces the SortedSet member-expiration tracking from a Dictionary<byte[], long> + PriorityQueue<byte[], long> pair to a single IndexedPriorityQueue<byte[], long> (supporting O(log N) in-place priority updates), and adds bounded expiration cleanup on mutation hot paths to cap worst-case latency.
Changes:
- Add
IndexedPriorityQueue<TElement, TPriority>toGarnet.commonplus unit tests covering key operations and custom equality comparers. - Update
SortedSetObjectto use the new indexed queue for TTL tracking and adjust heap memory accounting constants. - Bound
DeleteExpiredItemscalls on SortedSet mutation paths (e.g., ZADD/ZREM/ZINCRBY/ZPOPMIN/MAX, etc.) while keeping serialization unbounded.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| test/Garnet.test/IndexedPriorityQueueTests.cs | Adds unit tests validating core queue semantics, updates, removal, and custom byte[] comparer behavior. |
| libs/common/Collections/IndexedPriorityQueue.cs | Introduces the new indexed min-heap + index dictionary implementation used for expirable-member tracking. |
| libs/storage/Tsavorite/cs/src/core/Utilities/MemoryUtils.cs | Adds new memory overhead constants for the indexed priority queue for size accounting. |
| libs/server/Objects/SortedSet/SortedSetObject.cs | Switches SortedSet expiration tracking to IndexedPriorityQueue, updates expiration cleanup, size accounting, and TTL lookups. |
| libs/server/Objects/SortedSet/SortedSetObjectImpl.cs | Updates SortedSet command implementations to call bounded expiration deletion and adjusts expiration-aware iteration checks. |
Comments suppressed due to low confidence (9)
libs/server/Objects/SortedSet/SortedSetObjectImpl.cs:771
PopMinOrMaxnow boundsDeleteExpiredItemsto 16, but then popssortedSet.Min/Maxwithout checkingIsExpired. If more than 16 expired members exist, this method can return an expired member to the client. To preserve correctness, ensure the popped element is non-expired (e.g., loop removing expired min/max elements before returning, or run an unbounded cleanup for pop operations).
public (double Score, byte[] Element) PopMinOrMax(bool popMaxScoreElement = false)
{
DeleteExpiredItems(bound: 16);
if (sortedSet.Count == 0)
return default;
var element = popMaxScoreElement ? sortedSet.Max : sortedSet.Min;
sortedSet.Remove(element);
sortedSetDict.Remove(element.Element);
TryRemoveExpiration(element.Element);
this.UpdateSize(element.Element, false);
libs/server/Objects/SortedSet/SortedSetObjectImpl.cs:826
SortedSetPopMinOrMaxCountboundsDeleteExpiredItemsto 16 and then repeatedly removessortedSet.Min/Maxwithout filtering expired members. With many expired entries still present, this can return expired members and/or computecountbased onsortedSet.Count(which includes expired). Consider skipping/removing expired elements in the pop loop until a live element is found (and adjusting the response count accordingly), or do an unbounded cleanup for pop commands.
private void SortedSetPopMinOrMaxCount(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion, SortedSetOperation op)
{
DeleteExpiredItems(bound: 16);
var count = input.arg1;
var countDone = 0;
var withHeader = true;
if (count == -1)
{
withHeader = false;
count = 1;
}
if (sortedSet.Count < count)
count = sortedSet.Count;
if (input.arg2 > 0)
respProtocolVersion = (byte)input.arg2;
// When the output will be read later by ProcessRespArrayOutputAsPairs we force RESP version to 2.
using var writer = new RespMemoryWriter(respProtocolVersion, ref output.SpanByteAndMemory);
if (count == 0)
{
writer.WriteEmptyArray();
output.result1 = 0;
return;
}
if (withHeader)
{
if (respProtocolVersion >= 3)
writer.WriteArrayLength(count);
else
writer.WriteArrayLength(count * 2);
}
while (count > 0)
{
var max = op == SortedSetOperation.ZPOPMAX ? sortedSet.Max : sortedSet.Min;
sortedSet.Remove(max);
sortedSetDict.Remove(max.Element);
TryRemoveExpiration(max.Element);
UpdateSize(max.Element, false);
libs/server/Objects/SortedSet/SortedSetObject.cs:583
Count()now iterates allsortedSetDictentries to compute expired count when expirations exist. Previously this scanned only the expirable items collection, so this is an O(N) regression even when only a small subset of members have TTLs. Consider iterating only expirable members (e.g., expose an enumeration of elements fromIndexedPriorityQueue/its index, or track expirable keys separately) to keep the cost proportional to TTL-bearing members.
public int Count()
{
if (!HasExpirableItems())
return sortedSetDict.Count;
var expiredKeysCount = 0;
foreach (var item in sortedSetDict)
{
if (IsExpired(item.Key))
expiredKeysCount++;
}
return sortedSetDict.Count - expiredKeysCount;
libs/server/Objects/SortedSet/SortedSetObjectImpl.cs:606
SortedSetRemoveRangeByRankrelies onsortedSet.Skip(start).Take(...)and usessortedSetDict.Countfor bounds, but it does not skip expired members during rank calculation/removal. WithDeleteExpiredItems(bound: 16), expired members can remain and will shift ranks, causing incorrect removals vs Redis semantics (expired members should be treated as nonexistent). Either ensure this path ignores expired members (compute rank over non-expired entries) or run an unbounded expiration cleanup for rank-based operations.
private void SortedSetRemoveRangeByRank(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion)
{
DeleteExpiredItems(bound: 16);
using var writer = new RespMemoryWriter(respProtocolVersion, ref output.SpanByteAndMemory);
// ZREMRANGEBYRANK key start stop
if (!input.parseState.TryGetInt(0, out var start) ||
!input.parseState.TryGetInt(1, out var stop))
{
writer.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER);
return;
}
if (start > sortedSetDict.Count - 1)
return;
// Shift from the end of the set
start = start < 0 ? sortedSetDict.Count + start : start;
stop = stop < 0
? sortedSetDict.Count + stop
: stop >= sortedSetDict.Count ? sortedSetDict.Count - 1 : stop;
// Calculate number of elements
var elementCount = stop - start + 1;
// Using to list to avoid modified enumerator exception
foreach (var item in sortedSet.Skip(start).Take(elementCount).ToList())
{
if (sortedSetDict.Remove(item.Element, out var key))
{
sortedSet.Remove((key, item.Element));
UpdateSize(item.Element, false);
}
TryRemoveExpiration(item.Element);
}
// Write the number of elements
writer.WriteInt32(elementCount);
}
libs/server/Objects/SortedSet/SortedSetObjectImpl.cs:141
SortedSetAddnow only deletes up to 16 expired members up front. The rest of the method assumes expired members are already physically removed (it treatssortedSetDict.TryGetValueas “member exists”), but it never checksIsExpired(member)on the update path. If an expired member remains past the bound, ZADD can behave as if the member still exists (wrong CH/NX/XX behavior, wrong return counts, and potential “revival” without removing old state). Consider treating an expired hit as missing by removing it (from sortedSet/sortedSetDict/expirationQueue) before proceeding, or keep unbounded cleanup for ZADD.
private void SortedSetAdd(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion)
{
DeleteExpiredItems(bound: 16);
var addedOrChanged = 0;
double incrResult = 0;
var options = SortedSetAddOption.None;
var currTokenIdx = 0;
var parsedOptions = false;
var writer = new RespMemoryWriter(respProtocolVersion, ref output.SpanByteAndMemory);
try
{
while (currTokenIdx < input.parseState.Count)
{
// Try to parse a Score field
if (!input.parseState.TryGetDouble(currTokenIdx, out var score))
{
// Try to get and validate options before the Score field, if any
if (!parsedOptions)
{
parsedOptions = true;
if (!GetOptions(ref input, ref currTokenIdx, out options, ref writer))
return;
continue; // retry after parsing options
}
else
{
// Invalid Score encountered
writer.WriteError(CmdStrings.RESP_ERR_NOT_VALID_FLOAT);
return;
}
}
parsedOptions = true;
currTokenIdx++;
// Member
var memberSpan = input.parseState.GetArgSliceByRef(currTokenIdx++).ReadOnlySpan;
var member = memberSpan.ToArray();
// Add new member
if (!sortedSetDict.TryGetValue(member, out var scoreStored))
{
// Don't add new member if XX flag is set
if ((options & SortedSetAddOption.XX) == SortedSetAddOption.XX) continue;
incrResult = score;
sortedSetDict.Add(member, score);
if (sortedSet.Add((score, member)))
libs/server/Objects/SortedSet/SortedSetObjectImpl.cs:228
SortedSetRemovenow bounds expiration cleanup to 16, but then removes keys directly fromsortedSetDictwithout checkingIsExpired. If a target member is expired but not yet deleted due to the bound, ZREM will count it as removed (and return 1) even though it should be treated as non-existent. Consider checkingIsExpired(valueArray)and treating expired members as missing (optionally cleaning them up) before incrementingresult1.
private void SortedSetRemove(ref ObjectInput input, ref ObjectOutput output)
{
DeleteExpiredItems(bound: 16);
for (var i = 0; i < input.parseState.Count; i++)
{
var value = input.parseState.GetArgSliceByRef(i).ReadOnlySpan;
var valueArray = value.ToArray();
if (!sortedSetDict.Remove(valueArray, out var key))
continue;
output.result1++;
sortedSet.Remove((key, valueArray));
_ = TryRemoveExpiration(valueArray);
this.UpdateSize(value, false);
}
libs/server/Objects/SortedSet/SortedSetObjectImpl.cs:352
SortedSetIncrementbounds expiration cleanup to 16 but then usessortedSetDict.TryGetValue(member, out score)to decide whether the member exists, without checkingIsExpired(member). If an expired member remains, ZINCRBY will increment/update it instead of treating it as a new insertion, which changes semantics and return values. Consider treating an expired hit as missing by removing it first or by doing an unbounded expiration cleanup for ZINCRBY.
private void SortedSetIncrement(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion)
{
DeleteExpiredItems(bound: 16);
// It's useful to fix RESP2 in the internal API as that just reads back the output.
if (input.arg2 > 0)
respProtocolVersion = (byte)input.arg2;
// ZINCRBY key increment member
using var writer = new RespMemoryWriter(respProtocolVersion, ref output.SpanByteAndMemory);
// Try to read increment value
if (!input.parseState.TryGetDouble(0, out var incrValue))
{
writer.WriteError(CmdStrings.RESP_ERR_NOT_VALID_FLOAT);
return;
}
// Read member
var member = input.parseState.GetArgSliceByRef(1).ToArray();
if (sortedSetDict.TryGetValue(member, out var score))
{
var result = score + incrValue;
if (double.IsNaN(result))
{
writer.WriteError(CmdStrings.RESP_ERR_GENERIC_SCORE_NAN);
return;
}
sortedSetDict[member] = result;
sortedSet.Remove((score, member));
sortedSet.Add((result, member));
}
else
libs/server/Objects/SortedSet/SortedSetObjectImpl.cs:857
SortedSetPersistbounds expiration cleanup to 16, butPersist()(called per member) checks onlysortedSetDict.ContainsKeyand can succeed even if the member is already logically expired but not yet deleted due to the bound. This can “revive” expired members by removing their expiration. Consider checkingIsExpired(member)insidePersist()/SortedSetPersistand treating expired as not found (and optionally cleaning it up) before persisting.
private void SortedSetPersist(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion)
{
DeleteExpiredItems(bound: 16);
var numFields = input.parseState.Count;
using var writer = new RespMemoryWriter(respProtocolVersion, ref output.SpanByteAndMemory);
writer.WriteArrayLength(numFields);
foreach (var item in input.parseState.Parameters)
{
var result = Persist(item.ToArray());
writer.WriteInt32(result);
output.result1++;
}
}
libs/server/Objects/SortedSet/SortedSetObjectImpl.cs:919
SortedSetExpirebounds expiration cleanup to 16, butSetExpiration()only checkssortedSetDict.ContainsKeyand can update TTL for a member that is already logically expired but not yet deleted due to the bound. This can incorrectly allow extending TTL / changing expire options on expired members. Consider checkingIsExpired(member)before callingSetExpiration(and treating it as KeyNotFound / removing it), or keep unbounded cleanup for expire-related commands.
private void SortedSetExpire(ref ObjectInput input, ref ObjectOutput output, byte respProtocolVersion)
{
DeleteExpiredItems(bound: 16);
var expirationWithOption = new ExpirationWithOption(input.arg1, input.arg2);
using var writer = new RespMemoryWriter(respProtocolVersion, ref output.SpanByteAndMemory);
writer.WriteArrayLength(input.parseState.Count);
foreach (var item in input.parseState.Parameters)
{
var result = SetExpiration(item.ToArray(), expirationWithOption.ExpirationTimeInTicks, expirationWithOption.ExpireOption);
writer.WriteInt32(result);
output.result1++;
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
c524cb0 to
b42c233
Compare
- Clear vacated heap slots to default in DequeueFromHeap and TryRemove to prevent retaining references to removed elements - Ensure TryRemove always runs shrink check even when removing last element - Add XML doc comment to Exists method Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
b42c233 to
b1b7ed0
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
761450a to
81ac050
Compare
- Count(): fast-path when no expirations are due yet (peek min), iterate RawHeap instead of sortedSetDict for expired count - Add RawHeap property to IndexedPriorityQueue for fast iteration - Fix Debug.Assert to use correct minimum (SortedSetOverhead + DictionaryOverhead) - Fix SetExpiration: remove from expirationQueue when expiration is in the past Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
8babdfc to
8ccbd17
Compare
Collect keys to remove into a List first, then remove after iteration. Modifying a Dictionary during foreach throws InvalidOperationException. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
IsExpired(key.Key) was checked against this object on both sides of && and ||. The second check should be other.IsExpired(key.Key) to correctly compare expiration state between two SortedSetObjects. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Demonstrates that Clone() shares mutable collections by reference, causing InvalidOperationException when one thread serializes (iterates sortedSetDict in DoSerialize) while another mutates the shared dict (via Add on the clone). This is a pre-existing issue affecting all GarnetObject types, not introduced by the IndexedPriorityQueue change. Three tests: - Race condition: concurrent serialize + mutate throws InvalidOperationException - Deterministic: clone's Add is visible through original's Dictionary - HashObject: Clone also shares mutable state Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Unit-level tests (ShallowCloneRaceConditionTests): - Race test: concurrent serialize + mutate proves InvalidOperationException - Deterministic: clone.Add visible through original.Dictionary (shared ref) - HashObject: Clone also shares mutable state Server-level test (ShallowCloneServerRaceTests): - Custom object with pausing SerializeObject (ManualResetEventSlim) - Serialization triggered by hybrid log page flush (tail growth), not checkpoint - Target object serialization pauses, then 197 mutations applied via RESP - Result: Tsavorite's record-level locking prevents the race at server level Mutations are queued behind the record lock and only execute after flush completes. This confirms the concurrency control layer provides protection. The shallow Clone() shared-mutable-state issue is real at the object level but does not manifest through the Garnet server due to Tsavorite's locking. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
00887a6 to
14a1b71
Compare
Confirmed that Tsavorite's CacheSerializedObjectData prevents the shared-mutable-state race at the server level. The old object is always transitioned out of REST state before CopyUpdate completes, so DoSerialize is never called on a cloned object during flush. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
ZCOLLECT is an explicit cleanup operation — it should drain all expired entries, not just 16. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
PopMinOrMax and SortedSetPopMinOrMaxCount access sortedSet.Min/Max directly after cleanup. With bounded deletion, expired entries can remain in the sorted set and be returned to clients. Use unbounded deletion for Pop since correctness matters more than latency here. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
c28f482 to
0b7bc25
Compare
Replace the separate expirationTimes Dictionary and expirationQueue PriorityQueue with a single IndexedPriorityQueue that supports O(log N) in-place priority updates. This eliminates stale entry accumulation in the expiration queue when member TTLs are frequently refreshed.
Benefits:
Bound DeleteExpiredItems on mutation hot paths (bound=16) to cap worst-case latency per operation. Serialization path (DoSerialize) remains unbounded to drain all expired entries before checkpoint. Correctness is preserved because every read path independently filters expired members via IsExpired().
Add IndexedPriorityQueue<TElement, TPriority> to Garnet.common with:
Add 25 unit tests covering basic ops, in-place updates, custom comparers, removal, and stress/ordering.