Skip to content

Latest commit

 

History

History
334 lines (250 loc) · 13.1 KB

File metadata and controls

334 lines (250 loc) · 13.1 KB

NetEvolve.Pulse.Extensibility

NuGet Version NuGet Downloads License

NetEvolve.Pulse.Extensibility delivers the core contracts for building CQRS mediators: commands, queries, events, handlers, interceptors, and configurators that compose the Pulse pipeline.

Features

  • Minimal abstractions for commands, queries, events, and request/response flows
  • Strongly typed handler interfaces with single-handler guarantees for commands and queries
  • Interceptor interfaces for cross-cutting concerns (logging, validation, metrics, caching)
  • Fluent mediator configuration via IMediatorBuilder and extension methods
  • [PulseHandler] / [PulseHandler<T>] — handler registration attributes consumed by NetEvolve.Pulse.SourceGeneration to emit compile-time DI registrations
  • [PulseGenericHandler] — open-generic variant that instructs the source generator to emit a typeof()-based open-generic DI registration, enabling the container to resolve any closed variant at runtime
  • ICacheableQuery<TResponse> — opt-in caching contract that pairs with the AddQueryCaching() interceptor in NetEvolve.Pulse
  • IPayloadSerializer — serialization abstraction for outbox payloads, cache entries, and internal storage; default System.Text.Json implementation provided by NetEvolve.Pulse with override capability
  • Outbox pattern contracts including IEventOutbox, IOutboxRepository, and IMessageTransport
  • Designed for framework-agnostic use while pairing seamlessly with NetEvolve.Pulse
  • Test-friendly primitives including Void responses and TimeProvider awareness

Installation

NuGet Package Manager

Install-Package NetEvolve.Pulse.Extensibility

.NET CLI

dotnet add package NetEvolve.Pulse.Extensibility

PackageReference

<PackageReference Include="NetEvolve.Pulse.Extensibility" Version="x.x.x" />

Quick Start

using NetEvolve.Pulse.Extensibility;

public record CreateInvoiceCommand(string CustomerId, decimal Amount) : ICommand<InvoiceCreated>;
public record InvoiceCreated(Guid InvoiceId);

public sealed class CreateInvoiceHandler
    : ICommandHandler<CreateInvoiceCommand, InvoiceCreated>
{
    public Task<InvoiceCreated> HandleAsync(
        CreateInvoiceCommand command,
        CancellationToken cancellationToken) =>
        Task.FromResult(new InvoiceCreated(Guid.NewGuid()));
}

public record GetInvoiceQuery(Guid Id) : IQuery<Invoice>;
public record Invoice(Guid Id, string CustomerId, decimal Amount);

public sealed class GetInvoiceHandler : IQueryHandler<GetInvoiceQuery, Invoice>
{
    public Task<Invoice> HandleAsync(GetInvoiceQuery query, CancellationToken cancellationToken) =>
        Task.FromResult(new Invoice(query.Id, "CUST-123", 125.00m));
}

Usage

Basic Example

Pair the contracts with the Pulse mediator for DI and dispatching:

using Microsoft.Extensions.DependencyInjection;
using NetEvolve.Pulse;
using NetEvolve.Pulse.Extensibility;

var services = new ServiceCollection();
services.AddPulse();
services.AddScoped<ICommandHandler<CreateInvoiceCommand, InvoiceCreated>, CreateInvoiceHandler>();
services.AddScoped<IQueryHandler<GetInvoiceQuery, Invoice>, GetInvoiceHandler>();

var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();

var created = await mediator.SendAsync<CreateInvoiceCommand, InvoiceCreated>(
    new CreateInvoiceCommand("CUST-123", 125.00m));
var invoice = await mediator.QueryAsync<GetInvoiceQuery, Invoice>(
    new GetInvoiceQuery(created.InvoiceId));

Advanced Example

Extend the configurator with your own interceptors and plug them into Pulse:

using NetEvolve.Pulse.Extensibility;

public static class MediatorBuilderExtensions
{
    public static IMediatorBuilder AddCustomValidation(
        this IMediatorBuilder builder)
    {
        // Register validation interceptors or pipelines here
        return configurator;
    }
}

// Register with Pulse
services.AddPulse(config =>
{
    config.AddActivityAndMetrics()
          .AddCustomValidation();
});

Open-Generic Handlers ([PulseGenericHandler])

Apply [PulseGenericHandler] to an open-generic handler class when you want a single implementation to service any closed variant of a message type. The source generator (NetEvolve.Pulse.SourceGeneration) detects the attribute and emits a typeof()-based open-generic DI registration instead of a closed-type one:

using NetEvolve.Pulse.Extensibility;
using NetEvolve.Pulse.Extensibility.Attributes;

// One class handles ICommandHandler<TCommand, TResult> for every TCommand : ICommand<TResult>
[PulseGenericHandler]
public class GenericCommandHandler<TCommand, TResult>
    : ICommandHandler<TCommand, TResult>
    where TCommand : ICommand<TResult>
{
    public Task<TResult> HandleAsync(
        TCommand command,
        CancellationToken cancellationToken) =>
        Task.FromResult(default(TResult)!);
}

// Generated registration (via NetEvolve.Pulse.SourceGeneration):
// services.TryAddScoped(
//     typeof(ICommandHandler<,>), typeof(GenericCommandHandler<,>));

// With an explicit lifetime
[PulseGenericHandler(Lifetime = PulseServiceLifetime.Singleton)]
public class GenericEventHandler<TEvent>
    : IEventHandler<TEvent>
    where TEvent : IEvent
{
    public Task HandleAsync(TEvent message, CancellationToken cancellationToken) =>
        Task.CompletedTask;
}

// Generated registration:
// services.TryAddSingleton(
//     typeof(IEventHandler<>), typeof(GenericEventHandler<>));
Attribute Use case Generator output
[PulseHandler] Concrete handler class services.TryAddScoped<ICommandHandler<MyCmd, MyResult>, MyHandler>()
[PulseHandler<TMessage>] Close an open-generic handler for one specific message type services.TryAddScoped<ICommandHandler<MyCmd, MyResult>, MyHandler<MyCmd, MyResult>>()
[PulseGenericHandler] Register an open-generic handler for all closed variants services.TryAddScoped(typeof(ICommandHandler<,>), typeof(MyHandler<,>))

Cacheable Queries (ICacheableQuery<TResponse>)

Opt specific queries into transparent IDistributedCache caching by implementing ICacheableQuery<TResponse> instead of IQuery<TResponse>. The interface adds two properties that control the cache entry:

using NetEvolve.Pulse.Extensibility;

// Implement ICacheableQuery<TResponse> on the queries you want cached
public record GetCustomerByIdQuery(Guid CustomerId)
    : ICacheableQuery<CustomerDetailsDto>
{
    public string? CorrelationId { get; set; }

    // Unique cache key — include all parameters that distinguish results
    public string CacheKey => $"customer:{CustomerId}";

    // null = rely on cache default; TimeSpan = expiry duration
    public TimeSpan? Expiry => TimeSpan.FromMinutes(10);
}

public record CustomerDetailsDto(Guid Id, string Name, string Email);

To activate caching, register an IDistributedCache implementation and call AddQueryCaching() during Pulse setup (requires NetEvolve.Pulse). Use QueryCachingOptions to configure serialization and expiration behavior:

services.AddDistributedMemoryCache();
services.AddPulse(config => config.AddQueryCaching(options =>
{
    // Absolute (default) or Sliding expiration
    options.ExpirationMode = CacheExpirationMode.Sliding;

    // Fallback expiry for queries that return null from ICacheableQuery<TResponse>.Expiry
    options.DefaultExpiry = TimeSpan.FromMinutes(10);

    // Custom JSON serializer options
    options.JsonSerializerOptions = new JsonSerializerOptions
    {
        PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
    };
}));

QueryCachingOptions properties:

Property Type Default Description
JsonSerializerOptions JsonSerializerOptions JsonSerializerOptions.Default Options used for cache serialization and deserialization
ExpirationMode CacheExpirationMode Absolute Whether Expiry is treated as absolute or sliding expiration
DefaultExpiry TimeSpan? null Fallback expiry used when ICacheableQuery<TResponse>.Expiry returns null

Queries that do not implement ICacheableQuery<TResponse> always reach the handler unchanged. When IDistributedCache is not registered the interceptor falls through silently.

Payload Serialization (IPayloadSerializer)

IPayloadSerializer defines the serialization contract for all payload operations within Pulse. It decouples the framework from any specific serialization library, allowing you to use System.Text.Json (default), Newtonsoft.Json, MessagePack, or any other serializer.

The interface is consumed by:

  • Outbox pattern — serializing event payloads before storage
  • Distributed cache — serializing query results for cache entries
  • Dead letter stores — persisting failed command payloads for diagnostics
  • Audit trails — capturing request/response snapshots

Interface Definition

public interface IPayloadSerializer
{
    string Serialize<T>(T value);
    string Serialize(object value, Type type);
    byte[] SerializeToBytes<T>(T value);
    T? Deserialize<T>(string payload);
    T? Deserialize<T>(byte[] payload);
}

Default Implementation

NetEvolve.Pulse automatically registers SystemTextJsonPayloadSerializer as the default implementation. No explicit registration is required:

services.AddPulse();
// SystemTextJsonPayloadSerializer automatically registered

Configure JSON options using the standard .NET options pattern:

services.Configure<JsonSerializerOptions>(options =>
{
    options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
});
services.AddPulse();

Custom Implementation

Replace the default serializer by registering your own implementation before calling AddPulse():

// Register custom serializer
services.AddSingleton<IPayloadSerializer, MyCustomSerializer>();
services.AddPulse();

public sealed class MyCustomSerializer : IPayloadSerializer
{
    public string Serialize<T>(T value) => /* your implementation */;
    public string Serialize(object value, Type type) => /* your implementation */;
    public byte[] SerializeToBytes<T>(T value) => /* your implementation */;
    public T? Deserialize<T>(string payload) => /* your implementation */;
    public T? Deserialize<T>(byte[] payload) => /* your implementation */;
}

Implementation Guidelines

  • Thread-safety: Implementations MUST be thread-safe — the same instance may be called concurrently
  • Non-null returns: Serialize methods MUST NOT return null; use empty string or empty array instead
  • Exception handling: Propagate serialization exceptions rather than swallowing them
  • Consistency: Use the same serialization format across all methods for consistent payload handling

Configuration

// Configure mediator features during startup
services.AddPulse(config =>
{
    // Built-in observability interceptors
    config.AddActivityAndMetrics();

    // Custom extension methods for validation, caching, retries, etc.
    // config.AddCustomValidation();
});

Requirements

  • .NET 8.0, .NET 9.0, or .NET 10.0
  • Suitable for ASP.NET Core, console, worker, and library projects
  • OpenTelemetry packages required when using AddActivityAndMetrics() through Pulse

Related Packages

Documentation

For complete documentation, please visit the official documentation.

Contributing

Contributions are welcome! Please read the Contributing Guidelines before submitting a pull request.

Support

License

This project is licensed under the MIT License - see the LICENSE file for details.


Note

Made with ❤️ by the NetEvolve Team