From aba22f124e21648fb0d595db57ddfb476c9ab962 Mon Sep 17 00:00:00 2001 From: JT Date: Sun, 26 Apr 2026 21:57:15 +0800 Subject: [PATCH 1/3] Add channel-based buffer --- .../TestEventBuffer.cs | 6 +- src/SchematicHQ.Client/EventBuffer.cs | 153 ++++++++---------- 2 files changed, 66 insertions(+), 93 deletions(-) diff --git a/src/SchematicHQ.Client.Test/TestEventBuffer.cs b/src/SchematicHQ.Client.Test/TestEventBuffer.cs index 310a1472..6bd2a20b 100644 --- a/src/SchematicHQ.Client.Test/TestEventBuffer.cs +++ b/src/SchematicHQ.Client.Test/TestEventBuffer.cs @@ -46,11 +46,9 @@ public async Task Stop_BufferStopsSuccessfully() await _buffer.Stop(); Assert.Throws(() => _buffer.Push(1)); - - var semaphore = GetPrivateFieldValue(_buffer, "_semaphore"); + var cts = GetPrivateFieldValue(_buffer, "_cts"); - - Assert.That(IsSemaphoreSlimDisposed(semaphore), Is.True, "SemaphoreSlim was not disposed."); + Assert.That(IsCancellationTokenSourceDisposed(cts), Is.True, "CancellationTokenSource was not disposed."); } diff --git a/src/SchematicHQ.Client/EventBuffer.cs b/src/SchematicHQ.Client/EventBuffer.cs index 95f17932..926051c0 100644 --- a/src/SchematicHQ.Client/EventBuffer.cs +++ b/src/SchematicHQ.Client/EventBuffer.cs @@ -1,10 +1,10 @@ -using System.Collections.Concurrent; +using System.Threading.Channels; #nullable enable namespace SchematicHQ.Client; -public interface IEventBuffer +public interface IEventBuffer where T: notnull { void Push(T item); void Start(); @@ -13,7 +13,7 @@ public interface IEventBuffer int GetEventCount(); } -public class EventBuffer : IEventBuffer +public class EventBuffer : IEventBuffer where T : notnull { private const int DefaultMaxSize = 100; private static readonly TimeSpan DefaultFlushPeriod = TimeSpan.FromMilliseconds(5000); @@ -21,18 +21,18 @@ public class EventBuffer : IEventBuffer private const int MaxRetries = 3; // Maximum number of retry attempts private const double InitialRetryDelaySeconds = 1.0; // Initial retry delay in seconds + private const int StateStopped = 0; + private const int StateRunning = 1; + private readonly int _maxSize; private readonly TimeSpan _flushPeriod; private readonly Func, Task> _action; private readonly ISchematicLogger _logger; - private readonly ConcurrentQueue _queue; - private readonly SemaphoreSlim _semaphore; + private readonly Channel _channel; private CancellationTokenSource _cts; - private bool _isRunning; + private int _state; private Task _periodicFlushTask = Task.CompletedTask; private Task _processBufferTask = Task.CompletedTask; - private readonly object _taskLock = new object(); - private readonly object _runningLock = new object(); public EventBuffer(Func, Task> action, ISchematicLogger logger, int maxSize = DefaultMaxSize, TimeSpan? flushPeriod = null) { @@ -40,10 +40,13 @@ public EventBuffer(Func, Task> action, ISchematicLogger logger, int maxS _flushPeriod = flushPeriod ?? DefaultFlushPeriod; _action = action ?? throw new ArgumentNullException(nameof(action)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _queue = new ConcurrentQueue(); - _semaphore = new SemaphoreSlim(0); + _channel = Channel.CreateUnbounded(new UnboundedChannelOptions + { + SingleReader = false, + SingleWriter = false + }); _cts = new CancellationTokenSource(); - _isRunning = false; + _state = StateStopped; _logger.Debug("EventBuffer initialized with maxSize: {0}, flushPeriod: {1}", _maxSize, _flushPeriod); @@ -62,9 +65,8 @@ private async Task EmergencyFlush() try { _logger.Debug("Emergency flush triggered by program termination"); - // Don't check _isRunning here since we're in an emergency shutdown - var items = new List(); - while (_queue.TryDequeue(out var item)) + var items = new List(_channel.Reader.Count); + while (_channel.Reader.TryRead(out var item)) { items.Add(item); } @@ -84,56 +86,42 @@ private async Task EmergencyFlush() public void Push(T item) { - lock (_runningLock) - { - if (!_isRunning) throw new InvalidOperationException("Buffer is not running."); - } + if (Volatile.Read(ref _state) != StateRunning) + throw new InvalidOperationException("Buffer is not running."); - _queue.Enqueue(item); - _logger.Debug("Item added to buffer. Current size: {0}", _queue.Count); - if (_queue.Count >= _maxSize) - { - _semaphore.Release(); - } + if (!_channel.Writer.TryWrite(item)) + throw new InvalidOperationException("Failed to write item to buffer channel."); + + _logger.Debug("Item added to buffer. Current size: {0}", _channel.Reader.Count); } public void Start() { - lock (_runningLock) - { - if (_isRunning) return; + if (Interlocked.CompareExchange(ref _state, StateRunning, StateStopped) != StateStopped) + return; - _isRunning = true; - _cts = new CancellationTokenSource(); - } - - lock (_taskLock) - { - if (_periodicFlushTask == Task.CompletedTask || _periodicFlushTask.IsCompleted) - { - _periodicFlushTask = Task.Run(() => PeriodicFlushAsync(_cts.Token)); - } - if (_processBufferTask == Task.CompletedTask || _processBufferTask.IsCompleted) - { - _processBufferTask = Task.Run(() => ProcessBufferAsync(_cts.Token)); - } - } + _cts = new CancellationTokenSource(); + _periodicFlushTask = Task.Run(() => PeriodicFlushAsync(_cts.Token)); + _processBufferTask = Task.Run(() => ProcessBufferAsync(_cts.Token)); _logger.Info("EventBuffer started."); } public async Task Flush() { - lock (_runningLock) - { - if (!_isRunning) throw new InvalidOperationException("Buffer is not running."); - } + if (Volatile.Read(ref _state) != StateRunning) + throw new InvalidOperationException("Buffer is not running."); + + await DrainAsync(); + _logger.Info("Buffer flushed manually."); + } - while (!_queue.IsEmpty) + private async Task DrainAsync() + { + while (_channel.Reader.Count > 0) { await FlushBufferAsync(); } - _logger.Info("Buffer flushed manually."); } private async Task ProcessBufferAsync(CancellationToken token) @@ -142,13 +130,24 @@ private async Task ProcessBufferAsync(CancellationToken token) { try { - await _semaphore.WaitAsync(token); - await FlushBufferAsync(); + // Wait for at least one item to be available + await _channel.Reader.WaitToReadAsync(token); + + // Check if we've accumulated enough items for a batch flush + if (_channel.Reader.Count >= _maxSize) + { + await FlushBufferAsync(); + } } catch (OperationCanceledException) { _logger.Warn("Process buffer task was canceled."); } + catch (ChannelClosedException) + { + _logger.Debug("Channel closed, process buffer task exiting."); + break; + } } } @@ -160,7 +159,7 @@ private async Task PeriodicFlushAsync(CancellationToken token) { await Task.Delay(_flushPeriod, token); - if (_queue.Count > 0) + if (_channel.Reader.Count > 0) { await FlushBufferAsync(); } @@ -178,8 +177,8 @@ private async Task PeriodicFlushAsync(CancellationToken token) private async Task FlushBufferAsync() { - var items = new List(); - while (items.Count < _maxSize && _queue.TryDequeue(out var item)) + var items = new List(_channel.Reader.Count); + while (items.Count < _maxSize && _channel.Reader.TryRead(out var item)) { items.Add(item); } @@ -192,7 +191,6 @@ private async Task FlushBufferAsync() int retryCount = 0; bool success = false; Exception? lastException = null; - Random random = new Random(); // Try with retries and exponential backoff while (retryCount <= MaxRetries && !success) @@ -219,7 +217,7 @@ private async Task FlushBufferAsync() // Calculate backoff with jitter double baseDelay = InitialRetryDelaySeconds; double delay = baseDelay * Math.Pow(2, retryCount - 1); - double jitter = random.NextDouble() * 0.1 * delay; // 10% jitter + double jitter = Random.Shared.NextDouble() * 0.1 * delay; // 10% jitter TimeSpan waitTime = TimeSpan.FromSeconds(delay + jitter); _logger.Warn( @@ -247,49 +245,26 @@ private async Task FlushBufferAsync() public async Task Stop() { - bool needsFlush = false; - - lock (_runningLock) - { - if (!_isRunning) return; - needsFlush = true; - } + if (Interlocked.CompareExchange(ref _state, StateStopped, StateRunning) != StateRunning) + return; try { - if (needsFlush) + try { - try - { - await Flush(); - } - catch (InvalidOperationException) - { - // Another thread might have changed the state between our check and the flush - // Just continue with shutdown - } + await DrainAsync(); } - - lock (_runningLock) + catch (Exception ex) { - if (!_isRunning) return; - _isRunning = false; - _cts.Cancel(); + _logger.Warn("Error draining buffer on stop: {0}", ex.Message); } - if (_periodicFlushTask != Task.CompletedTask) - { - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(MaxWaitForBuffer)); - await Task.WhenAny(_periodicFlushTask, timeoutTask); - } + await _cts.CancelAsync(); + _channel.Writer.TryComplete(); - if (_processBufferTask != Task.CompletedTask) - { - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(MaxWaitForBuffer)); - await Task.WhenAny(_processBufferTask, timeoutTask); - } + var timeout = TimeSpan.FromSeconds(MaxWaitForBuffer); + await Task.WhenAny(Task.WhenAll(_periodicFlushTask, _processBufferTask), Task.Delay(timeout)); - _semaphore.Dispose(); _cts.Dispose(); _logger.Info("EventBuffer shut down cleanly."); } @@ -302,6 +277,6 @@ public async Task Stop() public int GetEventCount() { - return _queue.Count; + return _channel.Reader.Count; } } From 00bc33f0c4d757c23e625c3d4f98aaa4720a5002 Mon Sep 17 00:00:00 2001 From: JT Date: Wed, 29 Apr 2026 23:08:10 +0800 Subject: [PATCH 2/3] Re-add semaphore signal for size limit --- src/SchematicHQ.Client/EventBuffer.cs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/SchematicHQ.Client/EventBuffer.cs b/src/SchematicHQ.Client/EventBuffer.cs index 926051c0..9750edd1 100644 --- a/src/SchematicHQ.Client/EventBuffer.cs +++ b/src/SchematicHQ.Client/EventBuffer.cs @@ -29,6 +29,7 @@ public class EventBuffer : IEventBuffer where T : notnull private readonly Func, Task> _action; private readonly ISchematicLogger _logger; private readonly Channel _channel; + private readonly SemaphoreSlim _semaphore; private CancellationTokenSource _cts; private int _state; private Task _periodicFlushTask = Task.CompletedTask; @@ -47,6 +48,7 @@ public EventBuffer(Func, Task> action, ISchematicLogger logger, int maxS }); _cts = new CancellationTokenSource(); _state = StateStopped; + _semaphore = new SemaphoreSlim(0); _logger.Debug("EventBuffer initialized with maxSize: {0}, flushPeriod: {1}", _maxSize, _flushPeriod); @@ -93,6 +95,11 @@ public void Push(T item) throw new InvalidOperationException("Failed to write item to buffer channel."); _logger.Debug("Item added to buffer. Current size: {0}", _channel.Reader.Count); + + if (_channel.Reader.Count >= _maxSize) + { + _semaphore.Release(); + } } public void Start() @@ -130,24 +137,14 @@ private async Task ProcessBufferAsync(CancellationToken token) { try { - // Wait for at least one item to be available - await _channel.Reader.WaitToReadAsync(token); - - // Check if we've accumulated enough items for a batch flush - if (_channel.Reader.Count >= _maxSize) - { - await FlushBufferAsync(); - } + await _semaphore.WaitAsync(token); + + await FlushBufferAsync(); } catch (OperationCanceledException) { _logger.Warn("Process buffer task was canceled."); } - catch (ChannelClosedException) - { - _logger.Debug("Channel closed, process buffer task exiting."); - break; - } } } From 45cd1ba4e0166b5ef50847b299996bdda0ea3d31 Mon Sep 17 00:00:00 2001 From: JT Date: Thu, 30 Apr 2026 11:44:41 +0800 Subject: [PATCH 3/3] Ensure semaphore is disposed and correct stop ordering --- src/SchematicHQ.Client/EventBuffer.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/SchematicHQ.Client/EventBuffer.cs b/src/SchematicHQ.Client/EventBuffer.cs index 9750edd1..e3c79b02 100644 --- a/src/SchematicHQ.Client/EventBuffer.cs +++ b/src/SchematicHQ.Client/EventBuffer.cs @@ -247,6 +247,9 @@ public async Task Stop() try { + await _cts.CancelAsync(); + _channel.Writer.TryComplete(); + try { await DrainAsync(); @@ -256,13 +259,11 @@ public async Task Stop() _logger.Warn("Error draining buffer on stop: {0}", ex.Message); } - await _cts.CancelAsync(); - _channel.Writer.TryComplete(); - var timeout = TimeSpan.FromSeconds(MaxWaitForBuffer); await Task.WhenAny(Task.WhenAll(_periodicFlushTask, _processBufferTask), Task.Delay(timeout)); _cts.Dispose(); + _semaphore.Dispose(); _logger.Info("EventBuffer shut down cleanly."); } catch (Exception ex)