Skip to content

Commit 8e96096

Browse files
committed
FIx the full scan from straggler error
1 parent 00aea99 commit 8e96096

14 files changed

Lines changed: 216 additions & 66 deletions

publish-nuget.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ cd "$SCRIPT_DIR"
1919
# Override version if provided
2020
VERSION_FLAG=""
2121
if [ -n "$VERSION" ]; then
22-
VERSION_FLAG="/p:Version=$VERSION"
22+
VERSION_FLAG="-p:Version=$VERSION"
2323
echo "Publishing StreamDB v$VERSION"
2424
else
2525
echo "Publishing StreamDB (version from csproj)"
2626
fi
2727

2828
# Clean previous artifacts
29+
rm -rf ./artifacts/*.nupkg
2930
rm -rf src/StreamDB/bin/Release/*.nupkg
3031

3132
# Build and pack

src/StreamDB/StreamDB.cs

Lines changed: 88 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ private readonly (int indexSpacing, int batchSize, int indexMask)[] AdaptiveTuni
9393
private readonly ILogger<StreamDB>? _logger;
9494
private readonly Task _bgFlushRunner;
9595
private readonly ManualResetEventSlim _flushSignal = new(false);
96+
private readonly ManualResetEventSlim _flushIdle = new(true);
9697
private readonly Lock _maintenanceLock = new(); // Ensures only one maintenance operation at a time
9798
private readonly DeferrableRwLock _indexWriteLock = new();
9899

@@ -127,11 +128,12 @@ private readonly (int indexSpacing, int batchSize, int indexMask)[] AdaptiveTuni
127128

128129
#endregion
129130

130-
public StreamDB(string? baseDir = null, TimeSpan? retentionPeriod = null, TimeSpan? checkpointInterval = null, long jitterWindow = 0, ILogger<StreamDB>? logger = null)
131+
public StreamDB(string? baseDir = null, TimeSpan? retentionPeriod = null, TimeSpan? checkpointInterval = null, long jitterWindow = 0, ILogger<StreamDB>? logger = null, int initialAdaptiveIdx = 6)
131132
{
132133
_logger = logger;
133134
_baseDir = baseDir ?? "streams";
134135
_jitterWindow = jitterWindow;
136+
_adaptiveIdx = Math.Clamp(initialAdaptiveIdx, 0, AdaptiveTuning.Length - 1);
135137
Directory.CreateDirectory(_baseDir);
136138

137139
_retentionPeriod = retentionPeriod ?? TimeSpan.FromDays(60);
@@ -258,6 +260,7 @@ private void FlushWorker()
258260
// Process all pending batches until queue is empty
259261
while (!_pendingIndexInserts.IsEmpty && !_disposed)
260262
{
263+
_flushIdle.Reset();
261264
// Check tuning on every batch for responsiveness (before acquiring lock)
262265
AdaptivelyTuneParameters();
263266

@@ -346,6 +349,7 @@ private void FlushWorker()
346349
_indexWriteLock.ExitReadLock(actualLockAcquired);
347350
}
348351
}
352+
_flushIdle.Set();
349353
}
350354
}
351355

@@ -462,9 +466,9 @@ public void Append(long primaryIndex, int secondaryIndex, ushort version, ReadOn
462466
int currentCount = Volatile.Read(ref _pendingIndexCount);
463467
if (currentCount < QueueCapacity)
464468
{
469+
_flushIdle.Reset();
465470
Interlocked.Increment(ref _pendingIndexCount);
466471
_pendingIndexInserts.Enqueue((secondaryIndex, primaryIndex, address));
467-
// in aggressive scenarios as index frequency goes down we want to make sure the worker is signaled to process the larger batches in a timely manner
468472
_flushSignal.Set();
469473
}
470474
else
@@ -482,16 +486,13 @@ public void Append(long primaryIndex, int secondaryIndex, ushort version, ReadOn
482486
/// </summary>
483487
public void WaitForPendingWrites()
484488
{
485-
if (_pendingIndexInserts.IsEmpty)
489+
if (_pendingIndexInserts.IsEmpty && _flushIdle.IsSet)
486490
return;
487491

488492
_flushSignal.Set();
489493

490-
// Wait for worker to process all entries
491-
while (!_pendingIndexInserts.IsEmpty)
492-
{
493-
Thread.Yield();
494-
}
494+
// Wait for worker to drain the queue AND finish committing to SQLite
495+
_flushIdle.Wait();
495496
}
496497

497498
/// <summary>
@@ -624,13 +625,27 @@ public Dictionary<int, List<StreamEntry>> ReadRange(IEnumerable<int> secondaryIn
624625
string.Join(", ", secondaryIndexes), startPrimaryIndex, endPrimaryIndex, limit);
625626

626627
// Group secondary indexes by shard so we can scan each shard at most once.
628+
// Use forward lookup to skip devices whose data is entirely outside the query range.
627629
Dictionary<int, (HashSet<int> Indexes, long MinAddress)> shardGroups = new Dictionary<int, (HashSet<int> Indexes, long MinAddress)>();
628630

629631
foreach (int idx in secondaryIndexes)
630632
{
631-
int shardIndex = idx & ShardMask;
632633
long addr = LookupNearestAddress(idx, startPrimaryIndex);
633634

635+
if (addr == 0)
636+
{
637+
// No backward hit: use forward lookup to check if device has any indexed data.
638+
var forward = LookupFirstAddressAtOrAfter(idx, startPrimaryIndex);
639+
if (!forward.HasValue)
640+
{
641+
_logger?.LogDebug("ReadRange (multi-index): skipping secondaryIndex={SecondaryIndex} — no indexed data", idx);
642+
continue;
643+
}
644+
// Forward hit exists: device has data. Unindexed entries may exist before the
645+
// first index entry. Scan from shard beginning (addr stays 0).
646+
}
647+
648+
int shardIndex = idx & ShardMask;
634649
if (shardGroups.TryGetValue(shardIndex, out (HashSet<int> Indexes, long MinAddress) group))
635650
{
636651
group.Indexes.Add(idx);
@@ -727,27 +742,53 @@ public Dictionary<int, List<StreamEntry>> ReadRange(long startPrimaryIndex, long
727742
_logger?.LogDebug("GetEarliestPrimaryIndex: secondaryIndexes=[{Indexes}], from={FromPrimaryIndex}",
728743
string.Join(", ", secondaryIndexes), fromPrimaryIndex);
729744

745+
long? globalMin = null;
730746
Dictionary<int, (HashSet<int> Indexes, long MinAddress)> shardGroups = new Dictionary<int, (HashSet<int> Indexes, long MinAddress)>();
731747

732748
foreach (int idx in secondaryIndexes)
733749
{
734750
int shardIndex = idx & ShardMask;
735751
long addr = LookupNearestAddress(idx, fromPrimaryIndex);
736752

737-
if (shardGroups.TryGetValue(shardIndex, out (HashSet<int> Indexes, long MinAddress) group))
753+
if (addr > 0)
738754
{
739-
group.Indexes.Add(idx);
740-
if (addr < group.MinAddress)
741-
shardGroups[shardIndex] = (group.Indexes, addr);
755+
// Backward hit: scan from this address.
756+
if (shardGroups.TryGetValue(shardIndex, out (HashSet<int> Indexes, long MinAddress) group))
757+
{
758+
group.Indexes.Add(idx);
759+
if (addr < group.MinAddress)
760+
shardGroups[shardIndex] = (group.Indexes, addr);
761+
}
762+
else
763+
{
764+
shardGroups[shardIndex] = (new HashSet<int> { idx }, addr);
765+
}
742766
}
743767
else
744768
{
745-
shardGroups[shardIndex] = (new HashSet<int> { idx }, addr);
769+
// No backward hit: use forward lookup to check if device has any indexed data.
770+
var forward = LookupFirstAddressAtOrAfter(idx, fromPrimaryIndex);
771+
if (forward.HasValue)
772+
{
773+
// Forward hit: device has data but first index entry is after fromPrimaryIndex.
774+
// Unindexed entries may exist before it — scan from shard beginning.
775+
if (shardGroups.TryGetValue(shardIndex, out (HashSet<int> Indexes, long MinAddress) group))
776+
{
777+
group.Indexes.Add(idx);
778+
shardGroups[shardIndex] = (group.Indexes, Math.Min(group.MinAddress, 0));
779+
}
780+
else
781+
{
782+
shardGroups[shardIndex] = (new HashSet<int> { idx }, 0);
783+
}
784+
785+
if (!globalMin.HasValue || forward.Value.PrimaryIndex < globalMin.Value)
786+
globalMin = forward.Value.PrimaryIndex;
787+
}
788+
// else: no indexed data at all — skip this device
746789
}
747790
}
748791

749-
long? globalMin = null;
750-
751792
_logger?.LogDebug("GetEarliestPrimaryIndex: grouped into {ShardCount} shards", shardGroups.Count);
752793

753794
foreach ((int shardIndex, (HashSet<int>? indexes, long minAddress)) in shardGroups)
@@ -837,6 +878,37 @@ private long LookupNearestAddress(int secondaryIndex, long startPrimaryIndex)
837878
return addr;
838879
}
839880

881+
/// <summary>
882+
/// Find the first indexed entry at or after <paramref name="primaryIndex"/> for this secondary index.
883+
/// Returns the primary index and FasterLog address, or null if no such entry exists.
884+
/// Uses the B-tree primary key index for O(log N) lookup.
885+
/// </summary>
886+
private (long PrimaryIndex, long Address)? LookupFirstAddressAtOrAfter(int secondaryIndex, long primaryIndex)
887+
{
888+
using PooledConnection pooled = GetConnection();
889+
using SqliteCommand cmd = pooled.Connection.CreateCommand();
890+
cmd.CommandText = "SELECT primary_index, log_address FROM stream_index WHERE secondary_index = $sidx AND primary_index >= $pi ORDER BY primary_index ASC LIMIT 1";
891+
SqliteParameter pSidx = cmd.Parameters.Add("$sidx", SqliteType.Integer);
892+
SqliteParameter pPi = cmd.Parameters.Add("$pi", SqliteType.Integer);
893+
cmd.Prepare();
894+
895+
pSidx.Value = secondaryIndex;
896+
pPi.Value = primaryIndex;
897+
using SqliteDataReader reader = cmd.ExecuteReader();
898+
if (reader.Read())
899+
{
900+
long pi = reader.GetInt64(0);
901+
long addr = reader.GetInt64(1);
902+
_logger?.LogDebug("LookupFirstAddressAtOrAfter: secondaryIndex={SecondaryIndex}, from={PrimaryIndex} -> pi={Pi}, address={Address}",
903+
secondaryIndex, primaryIndex, pi, addr);
904+
return (pi, addr);
905+
}
906+
907+
_logger?.LogDebug("LookupFirstAddressAtOrAfter: secondaryIndex={SecondaryIndex}, from={PrimaryIndex} -> null",
908+
secondaryIndex, primaryIndex);
909+
return null;
910+
}
911+
840912
/// <summary>
841913
/// Returns all distinct secondary indexes that have at least one indexed entry in the stream.
842914
/// </summary>

tests/StreamDB.Tests/StreamDBBasicTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class StreamDBBasicTests
2727
public void SetUp()
2828
{
2929
_dataDir = Path.Combine(Path.GetTempPath(), $"streamdb-test-{Guid.NewGuid():N}");
30-
_db = new StreamDB(baseDir: _dataDir);
30+
_db = new StreamDB(baseDir: _dataDir, initialAdaptiveIdx: 0);
3131
}
3232

3333
[TearDown]

tests/StreamDB.Tests/StreamDBCheckpointTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class StreamDBCheckpointTests
3030
public void SetUp()
3131
{
3232
_dataDir = Path.Combine(Path.GetTempPath(), $"streamdb-test-{Guid.NewGuid():N}");
33-
_db = new StreamDB(baseDir: _dataDir, checkpointInterval: CheckpointInterval);
33+
_db = new StreamDB(baseDir: _dataDir, checkpointInterval: CheckpointInterval, initialAdaptiveIdx: 0);
3434
}
3535

3636
[TearDown]

tests/StreamDB.Tests/StreamDBConcurrencyTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class StreamDBConcurrencyTests
2727
public void SetUp()
2828
{
2929
_dataDir = Path.Combine(Path.GetTempPath(), $"streamdb-test-{Guid.NewGuid():N}");
30-
_db = new StreamDB(baseDir: _dataDir);
30+
_db = new StreamDB(baseDir: _dataDir, initialAdaptiveIdx: 0);
3131
}
3232

3333
[TearDown]

tests/StreamDB.Tests/StreamDBDisposeTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class StreamDBDisposeTests
2323
public void Append_AfterDispose_Throws()
2424
{
2525
string dataDir = Path.Combine(Path.GetTempPath(), $"streamdb-test-{Guid.NewGuid():N}");
26-
var db = new StreamDB(baseDir: dataDir);
26+
var db = new StreamDB(baseDir: dataDir, initialAdaptiveIdx: 0);
2727
db.Dispose();
2828

2929
try
@@ -42,7 +42,7 @@ public void Append_AfterDispose_Throws()
4242
public void ReadRange_AfterDispose_Throws()
4343
{
4444
string dataDir = Path.Combine(Path.GetTempPath(), $"streamdb-test-{Guid.NewGuid():N}");
45-
var db = new StreamDB(baseDir: dataDir);
45+
var db = new StreamDB(baseDir: dataDir, initialAdaptiveIdx: 0);
4646
db.Dispose();
4747

4848
try
@@ -60,7 +60,7 @@ public void ReadRange_AfterDispose_Throws()
6060
public void DoubleDispose_DoesNotThrow()
6161
{
6262
string dataDir = Path.Combine(Path.GetTempPath(), $"streamdb-test-{Guid.NewGuid():N}");
63-
var db = new StreamDB(baseDir: dataDir);
63+
var db = new StreamDB(baseDir: dataDir, initialAdaptiveIdx: 0);
6464
try
6565
{
6666
db.Dispose();

tests/StreamDB.Tests/StreamDBGetEarliestPrimaryIndexTests.cs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class StreamDBGetEarliestPrimaryIndexTests
2727
public void SetUp()
2828
{
2929
_dataDir = Path.Combine(Path.GetTempPath(), $"streamdb-test-{Guid.NewGuid():N}");
30-
_db = new StreamDB(baseDir: _dataDir);
30+
_db = new StreamDB(baseDir: _dataDir, initialAdaptiveIdx: 0);
3131
}
3232

3333
[TearDown]
@@ -48,9 +48,12 @@ private void AppendPayload(int secondaryIndex, long primaryIndex)
4848
[Test]
4949
public void GetEarliestPrimaryIndex_ReturnsEarliest()
5050
{
51-
AppendPayload(1, 200);
52-
AppendPayload(2, 100);
53-
AppendPayload(3, 300);
51+
for (int i = 0; i < 20; i++)
52+
{
53+
AppendPayload(1, 200 + i);
54+
AppendPayload(2, 100 + i);
55+
AppendPayload(3, 300 + i);
56+
}
5457
_db.WaitForPendingWrites();
5558

5659
long? earliest = _db.GetEarliestPrimaryIndex(new[] { 1, 2, 3 }, fromPrimaryIndex: 0);
@@ -60,13 +63,12 @@ public void GetEarliestPrimaryIndex_ReturnsEarliest()
6063
[Test]
6164
public void GetEarliestPrimaryIndex_RespectsFromPrimaryIndex()
6265
{
63-
AppendPayload(1, 100);
64-
AppendPayload(1, 200);
65-
AppendPayload(1, 300);
66+
for (int i = 0; i < 20; i++)
67+
AppendPayload(1, 100 + i * 10);
6668
_db.WaitForPendingWrites();
6769

6870
long? earliest = _db.GetEarliestPrimaryIndex(new[] { 1 }, fromPrimaryIndex: 150);
69-
Assert.That(earliest, Is.EqualTo(200));
71+
Assert.That(earliest, Is.EqualTo(150));
7072
}
7173

7274
[Test]

tests/StreamDB.Tests/StreamDBJitterWindowTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class StreamDBJitterWindowTests
2727
public void SetUp()
2828
{
2929
_dataDir = Path.Combine(Path.GetTempPath(), $"streamdb-test-{Guid.NewGuid():N}");
30-
_db = new StreamDB(baseDir: _dataDir, jitterWindow: 50);
30+
_db = new StreamDB(baseDir: _dataDir, jitterWindow: 50, initialAdaptiveIdx: 0);
3131
}
3232

3333
[TearDown]

tests/StreamDB.Tests/StreamDBLargePayloadTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class StreamDBLargePayloadTests
2626
public void SetUp()
2727
{
2828
_dataDir = Path.Combine(Path.GetTempPath(), $"streamdb-test-{Guid.NewGuid():N}");
29-
_db = new StreamDB(baseDir: _dataDir);
29+
_db = new StreamDB(baseDir: _dataDir, initialAdaptiveIdx: 0);
3030
}
3131

3232
[TearDown]

tests/StreamDB.Tests/StreamDBLateArrivalsTests.cs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class StreamDBLateArrivalsTests
2727
public void SetUp()
2828
{
2929
_dataDir = Path.Combine(Path.GetTempPath(), $"streamdb-test-{Guid.NewGuid():N}");
30-
_db = new StreamDB(baseDir: _dataDir);
30+
_db = new StreamDB(baseDir: _dataDir, initialAdaptiveIdx: 0);
3131
}
3232

3333
[TearDown]
@@ -154,23 +154,24 @@ public void LateArrival_PreservesPayloadData()
154154
[Test]
155155
public void LateArrival_MultiIndex_ReadRange()
156156
{
157-
AppendPayload(1, 100);
158-
AppendPayload(1, 300);
159-
AppendPayload(2, 100);
160-
AppendPayload(2, 300);
157+
for (int i = 0; i < 20; i++)
158+
{
159+
AppendPayload(1, 100 + i);
160+
AppendPayload(2, 100 + i);
161+
}
161162

162-
// Late arrivals for both indexes
163-
AppendPayload(1, 200);
164-
AppendPayload(2, 200);
163+
// Late arrivals for both indexes (below max seen)
164+
AppendPayload(1, 50);
165+
AppendPayload(2, 50);
165166
_db.WaitForPendingWrites();
166167

167-
var results = _db.ReadRange(secondaryIndexes: new[] { 1, 2 }, startPrimaryIndex: 100, endPrimaryIndex: 300);
168-
Assert.That(results[1], Has.Count.EqualTo(3));
169-
Assert.That(results[2], Has.Count.EqualTo(3));
168+
var results = _db.ReadRange(secondaryIndexes: new[] { 1, 2 }, startPrimaryIndex: 50, endPrimaryIndex: 119);
169+
Assert.That(results[1], Has.Count.EqualTo(21));
170+
Assert.That(results[2], Has.Count.EqualTo(21));
170171

171-
// Both should be in primary index order
172-
Assert.That(results[1][1].PrimaryIndex, Is.EqualTo(200));
173-
Assert.That(results[2][1].PrimaryIndex, Is.EqualTo(200));
172+
// Late arrivals should be first (pi=50)
173+
Assert.That(results[1][0].PrimaryIndex, Is.EqualTo(50));
174+
Assert.That(results[2][0].PrimaryIndex, Is.EqualTo(50));
174175
}
175176

176177
[Test]

0 commit comments

Comments
 (0)