diff --git a/src/SchematicHQ.Client.Test/TestEventBuffer.cs b/src/SchematicHQ.Client.Test/TestEventBuffer.cs index 310a147..6bd2a20 100644 --- a/src/SchematicHQ.Client.Test/TestEventBuffer.cs +++ b/src/SchematicHQ.Client.Test/TestEventBuffer.cs @@ -46,11 +46,9 @@ public async Task Stop_BufferStopsSuccessfully() await _buffer.Stop(); Assert.Throws(() => _buffer.Push(1)); - - var semaphore = GetPrivateFieldValue(_buffer, "_semaphore"); + var cts = GetPrivateFieldValue(_buffer, "_cts"); - - Assert.That(IsSemaphoreSlimDisposed(semaphore), Is.True, "SemaphoreSlim was not disposed."); + Assert.That(IsCancellationTokenSourceDisposed(cts), Is.True, "CancellationTokenSource was not disposed."); } diff --git a/src/SchematicHQ.Client/EventBuffer.cs b/src/SchematicHQ.Client/EventBuffer.cs index 95f1793..e3c79b0 100644 --- a/src/SchematicHQ.Client/EventBuffer.cs +++ b/src/SchematicHQ.Client/EventBuffer.cs @@ -1,10 +1,10 @@ -using System.Collections.Concurrent; +using System.Threading.Channels; #nullable enable namespace SchematicHQ.Client; -public interface IEventBuffer +public interface IEventBuffer where T: notnull { void Push(T item); void Start(); @@ -13,7 +13,7 @@ public interface IEventBuffer int GetEventCount(); } -public class EventBuffer : IEventBuffer +public class EventBuffer : IEventBuffer where T : notnull { private const int DefaultMaxSize = 100; private static readonly TimeSpan DefaultFlushPeriod = TimeSpan.FromMilliseconds(5000); @@ -21,18 +21,19 @@ public class EventBuffer : IEventBuffer private const int MaxRetries = 3; // Maximum number of retry attempts private const double InitialRetryDelaySeconds = 1.0; // Initial retry delay in seconds + private const int StateStopped = 0; + private const int StateRunning = 1; + private readonly int _maxSize; private readonly TimeSpan _flushPeriod; private readonly Func, Task> _action; private readonly ISchematicLogger _logger; - private readonly ConcurrentQueue _queue; + private readonly Channel _channel; private readonly SemaphoreSlim _semaphore; private CancellationTokenSource _cts; - private bool _isRunning; + private int _state; private Task _periodicFlushTask = Task.CompletedTask; private Task _processBufferTask = Task.CompletedTask; - private readonly object _taskLock = new object(); - private readonly object _runningLock = new object(); public EventBuffer(Func, Task> action, ISchematicLogger logger, int maxSize = DefaultMaxSize, TimeSpan? flushPeriod = null) { @@ -40,10 +41,14 @@ public EventBuffer(Func, Task> action, ISchematicLogger logger, int maxS _flushPeriod = flushPeriod ?? DefaultFlushPeriod; _action = action ?? throw new ArgumentNullException(nameof(action)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _queue = new ConcurrentQueue(); - _semaphore = new SemaphoreSlim(0); + _channel = Channel.CreateUnbounded(new UnboundedChannelOptions + { + SingleReader = false, + SingleWriter = false + }); _cts = new CancellationTokenSource(); - _isRunning = false; + _state = StateStopped; + _semaphore = new SemaphoreSlim(0); _logger.Debug("EventBuffer initialized with maxSize: {0}, flushPeriod: {1}", _maxSize, _flushPeriod); @@ -62,9 +67,8 @@ private async Task EmergencyFlush() try { _logger.Debug("Emergency flush triggered by program termination"); - // Don't check _isRunning here since we're in an emergency shutdown - var items = new List(); - while (_queue.TryDequeue(out var item)) + var items = new List(_channel.Reader.Count); + while (_channel.Reader.TryRead(out var item)) { items.Add(item); } @@ -84,14 +88,15 @@ private async Task EmergencyFlush() public void Push(T item) { - lock (_runningLock) - { - if (!_isRunning) throw new InvalidOperationException("Buffer is not running."); - } + if (Volatile.Read(ref _state) != StateRunning) + throw new InvalidOperationException("Buffer is not running."); - _queue.Enqueue(item); - _logger.Debug("Item added to buffer. Current size: {0}", _queue.Count); - if (_queue.Count >= _maxSize) + if (!_channel.Writer.TryWrite(item)) + throw new InvalidOperationException("Failed to write item to buffer channel."); + + _logger.Debug("Item added to buffer. Current size: {0}", _channel.Reader.Count); + + if (_channel.Reader.Count >= _maxSize) { _semaphore.Release(); } @@ -99,41 +104,31 @@ public void Push(T item) public void Start() { - lock (_runningLock) - { - if (_isRunning) return; - - _isRunning = true; - _cts = new CancellationTokenSource(); - } + if (Interlocked.CompareExchange(ref _state, StateRunning, StateStopped) != StateStopped) + return; - lock (_taskLock) - { - if (_periodicFlushTask == Task.CompletedTask || _periodicFlushTask.IsCompleted) - { - _periodicFlushTask = Task.Run(() => PeriodicFlushAsync(_cts.Token)); - } - if (_processBufferTask == Task.CompletedTask || _processBufferTask.IsCompleted) - { - _processBufferTask = Task.Run(() => ProcessBufferAsync(_cts.Token)); - } - } + _cts = new CancellationTokenSource(); + _periodicFlushTask = Task.Run(() => PeriodicFlushAsync(_cts.Token)); + _processBufferTask = Task.Run(() => ProcessBufferAsync(_cts.Token)); _logger.Info("EventBuffer started."); } public async Task Flush() { - lock (_runningLock) - { - if (!_isRunning) throw new InvalidOperationException("Buffer is not running."); - } + if (Volatile.Read(ref _state) != StateRunning) + throw new InvalidOperationException("Buffer is not running."); - while (!_queue.IsEmpty) + await DrainAsync(); + _logger.Info("Buffer flushed manually."); + } + + private async Task DrainAsync() + { + while (_channel.Reader.Count > 0) { await FlushBufferAsync(); } - _logger.Info("Buffer flushed manually."); } private async Task ProcessBufferAsync(CancellationToken token) @@ -143,6 +138,7 @@ private async Task ProcessBufferAsync(CancellationToken token) try { await _semaphore.WaitAsync(token); + await FlushBufferAsync(); } catch (OperationCanceledException) @@ -160,7 +156,7 @@ private async Task PeriodicFlushAsync(CancellationToken token) { await Task.Delay(_flushPeriod, token); - if (_queue.Count > 0) + if (_channel.Reader.Count > 0) { await FlushBufferAsync(); } @@ -178,8 +174,8 @@ private async Task PeriodicFlushAsync(CancellationToken token) private async Task FlushBufferAsync() { - var items = new List(); - while (items.Count < _maxSize && _queue.TryDequeue(out var item)) + var items = new List(_channel.Reader.Count); + while (items.Count < _maxSize && _channel.Reader.TryRead(out var item)) { items.Add(item); } @@ -192,7 +188,6 @@ private async Task FlushBufferAsync() int retryCount = 0; bool success = false; Exception? lastException = null; - Random random = new Random(); // Try with retries and exponential backoff while (retryCount <= MaxRetries && !success) @@ -219,7 +214,7 @@ private async Task FlushBufferAsync() // Calculate backoff with jitter double baseDelay = InitialRetryDelaySeconds; double delay = baseDelay * Math.Pow(2, retryCount - 1); - double jitter = random.NextDouble() * 0.1 * delay; // 10% jitter + double jitter = Random.Shared.NextDouble() * 0.1 * delay; // 10% jitter TimeSpan waitTime = TimeSpan.FromSeconds(delay + jitter); _logger.Warn( @@ -247,50 +242,28 @@ private async Task FlushBufferAsync() public async Task Stop() { - bool needsFlush = false; - - lock (_runningLock) - { - if (!_isRunning) return; - needsFlush = true; - } + if (Interlocked.CompareExchange(ref _state, StateStopped, StateRunning) != StateRunning) + return; try { - if (needsFlush) - { - try - { - await Flush(); - } - catch (InvalidOperationException) - { - // Another thread might have changed the state between our check and the flush - // Just continue with shutdown - } - } - - lock (_runningLock) + await _cts.CancelAsync(); + _channel.Writer.TryComplete(); + + try { - if (!_isRunning) return; - _isRunning = false; - _cts.Cancel(); + await DrainAsync(); } - - if (_periodicFlushTask != Task.CompletedTask) + catch (Exception ex) { - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(MaxWaitForBuffer)); - await Task.WhenAny(_periodicFlushTask, timeoutTask); + _logger.Warn("Error draining buffer on stop: {0}", ex.Message); } - if (_processBufferTask != Task.CompletedTask) - { - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(MaxWaitForBuffer)); - await Task.WhenAny(_processBufferTask, timeoutTask); - } + var timeout = TimeSpan.FromSeconds(MaxWaitForBuffer); + await Task.WhenAny(Task.WhenAll(_periodicFlushTask, _processBufferTask), Task.Delay(timeout)); - _semaphore.Dispose(); _cts.Dispose(); + _semaphore.Dispose(); _logger.Info("EventBuffer shut down cleanly."); } catch (Exception ex) @@ -302,6 +275,6 @@ public async Task Stop() public int GetEventCount() { - return _queue.Count; + return _channel.Reader.Count; } }