Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/SchematicHQ.Client.Test/TestEventBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ public async Task Stop_BufferStopsSuccessfully()
await _buffer.Stop();

Assert.Throws<InvalidOperationException>(() => _buffer.Push(1));

var semaphore = GetPrivateFieldValue<SemaphoreSlim>(_buffer, "_semaphore");

var cts = GetPrivateFieldValue<CancellationTokenSource>(_buffer, "_cts");

Assert.That(IsSemaphoreSlimDisposed(semaphore), Is.True, "SemaphoreSlim was not disposed.");

Assert.That(IsCancellationTokenSourceDisposed(cts), Is.True, "CancellationTokenSource was not disposed.");
}

Expand Down
141 changes: 57 additions & 84 deletions src/SchematicHQ.Client/EventBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System.Collections.Concurrent;
using System.Threading.Channels;

#nullable enable

namespace SchematicHQ.Client;

public interface IEventBuffer<T>
public interface IEventBuffer<in T> where T: notnull
{
void Push(T item);
void Start();
Expand All @@ -13,37 +13,42 @@ public interface IEventBuffer<T>
int GetEventCount();
}

public class EventBuffer<T> : IEventBuffer<T>
public class EventBuffer<T> : IEventBuffer<T> where T : notnull
{
private const int DefaultMaxSize = 100;
private static readonly TimeSpan DefaultFlushPeriod = TimeSpan.FromMilliseconds(5000);
private const int MaxWaitForBuffer = 3; //seconds to wait for event buffer to flush on Stop
private const int MaxRetries = 3; // Maximum number of retry attempts
private const double InitialRetryDelaySeconds = 1.0; // Initial retry delay in seconds

private const int StateStopped = 0;
private const int StateRunning = 1;

private readonly int _maxSize;
private readonly TimeSpan _flushPeriod;
private readonly Func<List<T>, Task> _action;
private readonly ISchematicLogger _logger;
private readonly ConcurrentQueue<T> _queue;
private readonly Channel<T> _channel;
private readonly SemaphoreSlim _semaphore;
private CancellationTokenSource _cts;
private bool _isRunning;
private int _state;
private Task _periodicFlushTask = Task.CompletedTask;
private Task _processBufferTask = Task.CompletedTask;
private readonly object _taskLock = new object();
private readonly object _runningLock = new object();

public EventBuffer(Func<List<T>, Task> action, ISchematicLogger logger, int maxSize = DefaultMaxSize, TimeSpan? flushPeriod = null)
{
_maxSize = maxSize;
_flushPeriod = flushPeriod ?? DefaultFlushPeriod;
_action = action ?? throw new ArgumentNullException(nameof(action));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_queue = new ConcurrentQueue<T>();
_semaphore = new SemaphoreSlim(0);
_channel = Channel.CreateUnbounded<T>(new UnboundedChannelOptions
{
SingleReader = false,
SingleWriter = false
});
_cts = new CancellationTokenSource();
_isRunning = false;
_state = StateStopped;
_semaphore = new SemaphoreSlim(0);

_logger.Debug("EventBuffer initialized with maxSize: {0}, flushPeriod: {1}", _maxSize, _flushPeriod);

Expand All @@ -62,9 +67,8 @@ private async Task EmergencyFlush()
try
{
_logger.Debug("Emergency flush triggered by program termination");
// Don't check _isRunning here since we're in an emergency shutdown
var items = new List<T>();
while (_queue.TryDequeue(out var item))
var items = new List<T>(_channel.Reader.Count);
while (_channel.Reader.TryRead(out var item))
{
items.Add(item);
}
Expand All @@ -84,56 +88,47 @@ private async Task EmergencyFlush()

public void Push(T item)
{
lock (_runningLock)
{
if (!_isRunning) throw new InvalidOperationException("Buffer is not running.");
}
if (Volatile.Read(ref _state) != StateRunning)
throw new InvalidOperationException("Buffer is not running.");

_queue.Enqueue(item);
_logger.Debug("Item added to buffer. Current size: {0}", _queue.Count);
if (_queue.Count >= _maxSize)
if (!_channel.Writer.TryWrite(item))
throw new InvalidOperationException("Failed to write item to buffer channel.");

_logger.Debug("Item added to buffer. Current size: {0}", _channel.Reader.Count);

if (_channel.Reader.Count >= _maxSize)
{
_semaphore.Release();
}
}

public void Start()
{
lock (_runningLock)
{
if (_isRunning) return;

_isRunning = true;
_cts = new CancellationTokenSource();
}
if (Interlocked.CompareExchange(ref _state, StateRunning, StateStopped) != StateStopped)
return;

lock (_taskLock)
{
if (_periodicFlushTask == Task.CompletedTask || _periodicFlushTask.IsCompleted)
{
_periodicFlushTask = Task.Run(() => PeriodicFlushAsync(_cts.Token));
}
if (_processBufferTask == Task.CompletedTask || _processBufferTask.IsCompleted)
{
_processBufferTask = Task.Run(() => ProcessBufferAsync(_cts.Token));
}
}
_cts = new CancellationTokenSource();
_periodicFlushTask = Task.Run(() => PeriodicFlushAsync(_cts.Token));
_processBufferTask = Task.Run(() => ProcessBufferAsync(_cts.Token));

_logger.Info("EventBuffer started.");
}

public async Task Flush()
{
lock (_runningLock)
{
if (!_isRunning) throw new InvalidOperationException("Buffer is not running.");
}
if (Volatile.Read(ref _state) != StateRunning)
throw new InvalidOperationException("Buffer is not running.");

while (!_queue.IsEmpty)
await DrainAsync();
_logger.Info("Buffer flushed manually.");
}

private async Task DrainAsync()
{
while (_channel.Reader.Count > 0)
{
await FlushBufferAsync();
}
_logger.Info("Buffer flushed manually.");
}

private async Task ProcessBufferAsync(CancellationToken token)
Expand All @@ -143,6 +138,7 @@ private async Task ProcessBufferAsync(CancellationToken token)
try
{
await _semaphore.WaitAsync(token);

await FlushBufferAsync();
}
catch (OperationCanceledException)
Expand All @@ -160,7 +156,7 @@ private async Task PeriodicFlushAsync(CancellationToken token)
{
await Task.Delay(_flushPeriod, token);

if (_queue.Count > 0)
if (_channel.Reader.Count > 0)
{
await FlushBufferAsync();
}
Expand All @@ -178,8 +174,8 @@ private async Task PeriodicFlushAsync(CancellationToken token)

private async Task FlushBufferAsync()
{
var items = new List<T>();
while (items.Count < _maxSize && _queue.TryDequeue(out var item))
var items = new List<T>(_channel.Reader.Count);
while (items.Count < _maxSize && _channel.Reader.TryRead(out var item))
{
items.Add(item);
}
Expand All @@ -192,7 +188,6 @@ private async Task FlushBufferAsync()
int retryCount = 0;
bool success = false;
Exception? lastException = null;
Random random = new Random();

// Try with retries and exponential backoff
while (retryCount <= MaxRetries && !success)
Expand All @@ -219,7 +214,7 @@ private async Task FlushBufferAsync()
// Calculate backoff with jitter
double baseDelay = InitialRetryDelaySeconds;
double delay = baseDelay * Math.Pow(2, retryCount - 1);
double jitter = random.NextDouble() * 0.1 * delay; // 10% jitter
double jitter = Random.Shared.NextDouble() * 0.1 * delay; // 10% jitter
TimeSpan waitTime = TimeSpan.FromSeconds(delay + jitter);

_logger.Warn(
Expand Down Expand Up @@ -247,50 +242,28 @@ private async Task FlushBufferAsync()

public async Task Stop()
{
bool needsFlush = false;

lock (_runningLock)
{
if (!_isRunning) return;
needsFlush = true;
}
if (Interlocked.CompareExchange(ref _state, StateStopped, StateRunning) != StateRunning)
return;

try
{
if (needsFlush)
{
try
{
await Flush();
}
catch (InvalidOperationException)
{
// Another thread might have changed the state between our check and the flush
// Just continue with shutdown
}
}

lock (_runningLock)
await _cts.CancelAsync();
_channel.Writer.TryComplete();

try
{
if (!_isRunning) return;
_isRunning = false;
_cts.Cancel();
await DrainAsync();
}

if (_periodicFlushTask != Task.CompletedTask)
catch (Exception ex)
{
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(MaxWaitForBuffer));
await Task.WhenAny(_periodicFlushTask, timeoutTask);
_logger.Warn("Error draining buffer on stop: {0}", ex.Message);
}

if (_processBufferTask != Task.CompletedTask)
{
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(MaxWaitForBuffer));
await Task.WhenAny(_processBufferTask, timeoutTask);
}
var timeout = TimeSpan.FromSeconds(MaxWaitForBuffer);
await Task.WhenAny(Task.WhenAll(_periodicFlushTask, _processBufferTask), Task.Delay(timeout));

_semaphore.Dispose();
Comment thread
Hawxy marked this conversation as resolved.
_cts.Dispose();
_semaphore.Dispose();
_logger.Info("EventBuffer shut down cleanly.");
}
catch (Exception ex)
Expand All @@ -302,6 +275,6 @@ public async Task Stop()

public int GetEventCount()
{
return _queue.Count;
return _channel.Reader.Count;
}
}