Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions Collections/BaseOrderedChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ public abstract class BaseOrderedChannel<TSort, TValue, TCollection>
private Channel<(TSort sort, TValue value)> _channel;
private bool _isClosed;

/// <inheritdoc />
/// <summary>
/// Gets the number of items in the sorted collection.
/// </summary>
/// <remarks>
/// Note: This property only returns the count of items in the internal sorted collection.
/// It does not include items that are currently in the channel but have not yet been read into the collection.
/// The actual total number of items may be higher during concurrent operations.
/// </remarks>
public int Count
{
get
Expand Down Expand Up @@ -130,7 +137,9 @@ public void Close()
/// </summary>
/// <param name="sort">The sort with which to associate the new value.</param>
/// <param name="value">The value to add to the queue.</param>
protected void Enqueue(TSort sort, TValue value)
/// <param name="cancellationToken"><see cref="CancellationToken"/></param>
/// <returns><see cref="ValueTask"/></returns>
protected async ValueTask Enqueue(TSort sort, TValue value, CancellationToken cancellationToken = default)
{
if (value is null)
throw new ArgumentNullException(nameof(value));
Expand All @@ -150,12 +159,19 @@ protected void Enqueue(TSort sort, TValue value)
try
{
if (!channel.Writer.TryWrite((sort, value)))
AsyncHelper.Run(() => channel.Writer.WriteAsync((sort, value)));
await channel.Writer.WriteAsync((sort, value), cancellationToken);
}
catch (ChannelClosedException)
{
// Queue was closed concurrently; value is dropped by design.
}
catch
{
if (cancellationToken.IsCancellationRequested)
return;

throw;
}
}

/// <summary>
Expand Down
52 changes: 30 additions & 22 deletions Tests/Collections/BaseOrderedChannelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ public class BaseOrderedChannelTests : BaseTestClass
/// Test implementation of BaseOrderedChannel for testing purposes.
/// Uses PriorityQueue for sorting by integer priority.
/// </summary>
private class TestOrderedChannel(int maxSize) : BaseOrderedChannel<int, string, Ecng.Collections.PriorityQueue<int, string>>(new Ecng.Collections.PriorityQueue<int, string>((a, b) => Math.Abs(a - b), Comparer<int>.Default), maxSize)
private class TestOrderedChannel : BaseOrderedChannel<int, string, Ecng.Collections.PriorityQueue<int, string>>
{
public void Add(int priority, string value) => Enqueue(priority, value);
public TestOrderedChannel(int maxSize = -1)
: base(new Ecng.Collections.PriorityQueue<int, string>((a, b) => Math.Abs(a - b), Comparer<int>.Default), maxSize)
{
}

public async ValueTask Add(int priority, string value, CancellationToken cancellationToken = default)
=> await Enqueue(priority, value, cancellationToken);
}

[TestMethod]
Expand Down Expand Up @@ -64,13 +70,15 @@ public void MaxSize_SetValidValue_Succeeds()
}

[TestMethod]
public void Enqueue_WhenClosed_DropsValue()
public async Task Enqueue_WhenClosed_DropsValue()
{
var token = CancellationToken;

// Arrange
var queue = CreateQueue();

// Act - enqueue without opening
queue.Add(1, "test");
await queue.Add(1, "test", token);

// Assert - value should be dropped
queue.Count.AssertEqual(0);
Expand All @@ -86,7 +94,7 @@ public async Task EnqueueDequeue_SingleItem_WorksCorrectly()
queue.Open();

// Act
queue.Add(1, "first");
await queue.Add(1, "first", token);
var result = await queue.DequeueAsync(token);

// Assert
Expand All @@ -104,9 +112,9 @@ public async Task EnqueueDequeue_MultipleItems_MaintainsOrder()
queue.Open();

// Act - enqueue in non-sorted order
queue.Add(3, "third");
queue.Add(1, "first");
queue.Add(2, "second");
await queue.Add(3, "third", token);
await queue.Add(1, "first", token);
await queue.Add(2, "second", token);

await Task.Delay(100, token); // Allow time for sorting

Expand All @@ -129,9 +137,9 @@ public async Task ReadAllAsync_ReturnsAllItems()
var queue = CreateQueue();
queue.Open();

queue.Add(1, "first");
queue.Add(2, "second");
queue.Add(3, "third");
await queue.Add(1, "first", token);
await queue.Add(2, "second", token);
await queue.Add(3, "third", token);

await Task.Delay(100, token); // Allow time for items to be queued

Expand Down Expand Up @@ -167,9 +175,9 @@ public async Task Clear_RemovesAllItems()
var queue = CreateQueue();
queue.Open();

queue.Add(1, "first");
queue.Add(2, "second");
queue.Add(3, "third");
await queue.Add(1, "first", token);
await queue.Add(2, "second", token);
await queue.Add(3, "third", token);

await Task.Delay(100, token); // Allow time for items to be queued

Expand All @@ -188,12 +196,12 @@ public async Task Reopen_AfterClose_WorksCorrectly()
// Arrange
var queue = CreateQueue();
queue.Open();
queue.Add(1, "first");
await queue.Add(1, "first", token);
queue.Close();

// Act
queue.Open();
queue.Add(2, "second");
await queue.Add(2, "second", token);
var result = await queue.DequeueAsync(token);

// Assert
Expand All @@ -211,10 +219,10 @@ public async Task BoundedQueue_RespectsMaxSize()
queue.Open();

// Act - try to add 3 items to queue with max size 2
queue.Add(1, "first");
queue.Add(2, "second");
await queue.Add(1, "first", token);
await queue.Add(2, "second", token);

var writeTask = Task.Run(() => queue.Add(3, "third"), token); // This should block
var writeTask = Task.Run(async () => await queue.Add(3, "third", token), token); // This should block

// Wait a bit to see if it completes (it shouldn't)
var completed = await Task.WhenAny(writeTask, Task.Delay(500, token)) == writeTask;
Expand All @@ -241,11 +249,11 @@ public async Task Count_ReflectsQueueSize()
// Act & Assert
queue.Count.AssertEqual(0);

queue.Add(1, "first");
await queue.Add(1, "first", token);
await Task.Delay(50, token);

queue.Add(2, "second");
queue.Add(3, "third");
await queue.Add(2, "second", token);
await queue.Add(3, "third", token);
await Task.Delay(100, token);

// Dequeue all
Expand Down
Loading