Skip to content

MrFr3di/SmartPipe-Core

Repository files navigation

SmartPipe.Core

Universal streaming pipeline engine for .NET 10

Built on System.Threading.Channels, SmartPipe.Core provides a production-ready pipeline engine for ETL, real-time stream processing, API aggregation, and AI agent integration — all with 0 allocations in hot path.

CI NuGet Core NuGet Extensions License: MIT Coverage

📖 Complete Feature Reference →

What is SmartPipe?

SmartPipe is not just another ETL library. It's a universal streaming pipeline engine that handles:

  • ETL/ELT — extract from DB/API, transform, load to anywhere
  • Real-time stream processing — process events as they arrive
  • API aggregation — fan-out requests, aggregate responses
  • Data validation pipelines — validate, enrich, route
  • AI agent tools — integrate with Semantic Kernel, AutoGen
  • Log/sensor processing — process IoT telemetry, application logs
  • Error recovery & dead letter — capture failures for later replay
  • Stream merging — combine multiple data sources into one pipeline

All in 5 lines of code:

using SmartPipe.Core;
using SmartPipe.Extensions;

var pipeline = PipelineBuilder
    .From(new HttpSelector<MyDto>("https://api.example.com/data"))
    .Transform(x => x.Enrich())       // Middleware for simple ops
    .Transform(new JsonTransform<MyDto, MyEntity>())  // ITransformer for complex
    .WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<MyEntity>(logger));

Getting Started | Installation

# Core engine 
dotnet add package SmartPipe.Core

# Extensions (Http, EF Core, Dapper, JSON, CSV, Mapster, Polly)
dotnet add package SmartPipe.Extensions

Examples by Scenario

Middleware Pattern (5 lines)

var pipeline = PipelineBuilder
    .From(new HttpSelector<int>("https://api.example.com/numbers"))
    .Transform(x => x * 2)          // Middleware!
    .Transform(x => x + 1)          // Middleware!
    .WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<int>(logger));

ETL Pipeline (Database → Transform → API)

var pipeline = PipelineBuilder
    .From(new EfCoreSelector<Order>(dbContext).WithQuery(q => q.Where(o => o.Status == "Pending")))
    .Transform(new MapsterTransform<Order, OrderDto>())
    .Transform(new PollyResilienceTransform<OrderDto>(resiliencePipeline))
    .WithOptions(o => o.MaxDegreeOfParallelism = 8);
await pipeline.To(new HttpSink<OrderDto>(httpClient, "https://api.destination.com/orders"));

Real-time Stream Processing (API → Filter → Log)

var pipeline = PipelineBuilder
    .From(new HttpSelector<SensorData>("https://iot.example.com/telemetry"))
    .Transform(new JsonTransform<SensorData, SensorData>())
    .Transform(new MapsterTransform<SensorData, Alert>())
    .WithOptions(o => { o.MaxDegreeOfParallelism = 2; o.ContinueOnError = true; });
await pipeline.To(new LoggerSink<Alert>(logger));

Single Item Processing

var pipeline = new SmartPipeChannel<string, string>();
pipeline.AddTransformer(new JsonTransform<string, PromptDto>());
var result = await pipeline.ProcessSingleAsync(new ProcessingContext<string>("Long text to summarize..."));

API Aggregation (Fan-out → Aggregate)

var pipeline = PipelineBuilder
    .From(new HttpSelector<User>("https://users.api.com"))
    .Transform(new MapsterTransform<User, EnrichedUser>())
    .Transform(new PollyResilienceTransform<EnrichedUser>(resiliencePipeline));
await pipeline.To(new Sink<EnrichedUser>(user => enrichedUsers.Add(user)));

Error Persistence with DeadLetterSink

var pipeline = PipelineBuilder
    .From(new HttpSelector<Order>("https://api.example.com/orders"))
    .Transform(new OrderValidator())
    .WithOptions(o => o.ContinueOnError = true);
await pipeline.To(new DeadLetterSink<Order>("failed_orders.json"));

First Pipeline (5 lines)

using SmartPipe.Core;
using SmartPipe.Extensions.Selectors;
using SmartPipe.Extensions.Transforms;
using SmartPipe.Extensions.Sinks;

var pipeline = PipelineBuilder
    .From(new HttpSelector<MyDto>("https://api.example.com/data"))
    .Transform(new JsonTransform<MyDto, MyEntity>())
    .WithOptions(o => o.MaxDegreeOfParallelism = 4);
await pipeline.To(new LoggerSink<MyEntity>(logger));

ASP.NET Core BackgroundService

public class PipelineWorker : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        var pipeline = PipelineBuilder
            .From(new EfCoreSelector<Order>(_dbContext))
            .Transform(new MapsterTransform<Order, OrderDto>())
            .WithOptions(o => o.MaxDegreeOfParallelism = 8);
        await pipeline.To(new HttpSink<OrderDto>(_httpClient, "https://api.dest.com"));
    }
}

SmartPipe Architecture

Overview

SmartPipe is a streaming pipeline engine built on System.Threading.Channels. It consists of 34 integrated components organized in a resilience pipeline order.

Pipeline Flow

ISource<T> (or RunInBackground)
    ▼
Bounded Channel (or Rendezvous Channel)
    ▼
BackpressureStrategy (P-controller: continuous throttling)
    ▼
DeduplicationFilter (Bloom, O(1)) + HyperLogLogEstimator
    ▼
AdaptiveParallelism (P-controller with dead zone + anti-windup)
    ▼
CircuitBreaker (Lock-free, Closed→Open→HalfOpen + Isolated)
    │
    ▼
MiddlewareTransformer (Func<T,T>) + ITransformer (ValueTask)
    ▼
RetryQueue (Jitter + Exponential Backoff)
    ▼
Bounded Channel
    ▼
ISink<T> (Logger, DeadLetter, HealthChecks)
    ▼
AsChannelReader() → SignalR/gRPC

Resilience Pipeline Order

  1. TotalRequestTimeout — maximum time for entire pipeline
  2. CircuitBreaker — stops processing on high failure rate
  3. RetryQueue — delays and retries transient errors
  4. AttemptTimeout — per-transformer timeout
  5. DeadLetterSink — captures exhausted retries for later replay
  6. LivenessCheck — detects stalled pipeline
  7. ReadinessCheck — detects overloaded pipeline
  8. DefaultRetryPolicy — per-pipeline default retry configuration
  9. RetryBudget — per-item retry budget control

Component Overview

Component Type Memory Performance
DeduplicationFilter Bloom filter O(1) 20.65 ns
ObjectPool Lock-free O(n) 15.55 ns
CircuitBreaker Lock-free (Interlocked) O(n) 29.30 ns
RetryQueue Lock-free (Channel) O(n) 69.16 ns
ExponentialHistogram Percentiles O(log² n) < 100 ns
JumpHash Sharding O(1) < 10 ns
CuckooFilter Dedup + delete O(1) < 50 ns
ReservoirSampler Sampling O(k) < 10 ns
HyperLogLogEstimator Count-Distinct O(1) < 50 ns
DeadLetterSink Error persistence O(n)
ChannelMerge Stream merging O(n)
AdaptiveMetrics (Update) Double EMA O(1) 20.25 ns
AdaptiveMetrics (Predict) Double EMA O(1) 0.16 ns
IClock Time abstraction O(1) < 5 ns
AtomicHelper Lock-free double ops O(1) < 10 ns

Extension Architecture

Extensions follow the Selection Pattern — a single package with categorized components:

  • Selectors — data sources (Http, EF Core, Dapper, CSV, JSON, DeadLetter)
  • Transforms — data transformers (JSON, CSV, Mapster, Compression, Polly, Filter, Validation, Conditional, Composite)
  • Sinks — data destinations (Logger, DeadLetter, Http, Db, CSV, JSON)
  • Health — Kubernetes probes (Liveness, Readiness)
  • Streaming — ChannelMerge, RunInBackground, AsChannelReader

Instead of 12 separate NuGet packages, SmartPipe uses a single SmartPipe.Extensions package with the Selection Pattern:

SmartPipe.Extensions/
├── Selectors/          ← Data sources
│   ├── HttpSelector      ← REST API client
│   ├── EfCoreSelector    ← Entity Framework streaming
│   ├── DapperSelector    ← High-performance SQL
│   ├── CsvFileSource     ← CSV file reader
│   ├── JsonFileSource    ← JSON array & NDJSON reader
│   └── DeadLetterSource  ← Replay failed items
├── Transforms/         ← Data transformers
│   ├── JsonTransform          ← JSON serialization
│   ├── CsvTransform           ← CSV parsing
│   ├── MapsterTransform       ← Object mapping
│   ├── CompressionTransform   ← Brotli/GZip
│   ├── PollyResilienceTransform ← Retry/CB/Hedging
│   ├── FilterTransform        ← Predicate filtering
│   ├── ValidationTransform    ← DataAnnotations validation
│   ├── ConditionalTransform   ← Conditional execution
│   ├── CompositeTransform     ← Chain transforms
│   └── FilterValidationExtensions ← ToFilter() conversion
├── Sinks/              ← Data destinations
│   ├── LoggerSink       ← Structured logging
│   ├── DeadLetterSink   ← Failed items persistence
│   ├── HttpSink         ← REST API client
│   ├── DbSink           ← Database insert
│   ├── CsvFileSink      ← CSV file writer
│   └── JsonFileSink     ← JSON file writer
├── Hosting/            ← ASP.NET Core integration
│   ├── SmartPipeHostedService       ← BackgroundService
│   ├── SmartPipeServiceCollectionExtensions ← AddSmartPipe DI
│   └── SmartPipeResilienceExtensions ← Polly registration
├── Health/             ← Kubernetes probes
│   ├── SmartPipeLivenessCheck
│   └── SmartPipeReadinessCheck
└── Streaming/          ← Stream utilities
    └── ChannelMerge    ← Merge two channels
One package. All integrations. 

Requirements

  • .NET 10.0+
  • SmartPipe.Core: 0 dependencies
  • SmartPipe.Extensions: Polly, EF Core, Dapper, Mapster, CsvHelper
  • 598 tests, 89.2% code coverage

What's New in v1.0.6

  • Thread safety — CuckooFilter, DeduplicationFilter, ReservoirSampler now fully thread-safe
  • ObjectPool max capacity — prevents unbounded pool growth under sustained load
  • DeduplicationFilter TTL — automatic entry expiration for long-running pipelines
  • JsonFileSink periodic flushing — NDJSON batch writes, prevents OOM on large datasets
  • RetryQueue polling optimization — single CancellationTokenSource per call, reduced allocations
  • DrainAsync + WithTimeoutAsync — CancellationToken support
  • PipelineDashboard — readonly record struct, PipelineDashboard.Empty
  • TransformWithTimeoutAsync — catch-all exception handling prevents consumer crashes
  • SecretScanner — disabled by default, explicit opt-in via EnableFeature("SecretScanner")
  • ExponentialHistogram — Volatile.Read for percentile reads, P50/P95/P99 caching
  • AdaptiveMetrics — Stopwatch.GetTimestamp() instead of TickCount64
  • DbSink — async ExecuteAsync, no thread pool blocking
  • DapperSelector — try/finally reader disposal
  • ChannelMerge — optional BoundedChannelOptions

What's New in v1.0.5

  • 598 tests, 95.8% code coverage
  • DefaultRetryPolicy — per-pipeline retry configuration in SmartPipeChannelOptions
  • RetryBudget — per-item retry budget in RetryQueue, auto-routes exhausted items to DeadLetterSink
  • DisposeAsync(CancellationToken) — graceful cancellation during pipeline disposal
  • AddSmartPipe DI — service collection extensions for ASP.NET Core integration
  • IClock integration — time abstraction for testability, replaces DateTime.UtcNow
  • AtomicHelper — lock-free CompareExchange loop utility
  • SecretScanner evasion detection — Base64/URL decoding, MaxRecursionDepth=3, 164 tests
  • DeadLetterSink retry — IOException recovery with exponential backoff
  • AdaptiveParallelism adaptive alpha — faster response to latency changes
  • CircuitBreaker CleanupWindow — thread-safe via TryDequeue+check
  • ObjectPool ABA protection — version stamps prevent race conditions
  • CuckooFilter Merge — combine multiple filters

What's New in v1.0.4

  • 22 new features (243 tests, 96.4% coverage)
  • P-Controller Parallelism — smooth thread scaling, no binary jumps
  • Double EMA + Prediction — velocity tracking + one-step latency forecast
  • Hybrid CircuitBreaker — EWMA early warning + Sliding window decisions
  • P-Controller Backpressure — continuous throttling, no oscillation
  • PipelineState + Cancel() — lifecycle management with events
  • Progress reportingOnProgress with ETA calculation
  • Auto DeadLetter routing — exhausted retries → DeadLetterSink
  • 12 new Extensions — CsvFileSource/Sink, JsonFileSource/Sink, FilterTransform, ValidationTransform, DbSink, HttpSink, ConditionalTransform, DeadLetterSource, CompositeTransform
  • Metrics.Export() — JSON + Prometheus format
  • 4 new OWASP patterns in SecretScanner
  • 12% faster ValueTask_Transform (69.12 ns)

What's New in v1.0.3

  • 13 new features (215 tests, 96.3% coverage)
  • Middleware TransformerFunc<T,T> as lightweight ITransformer
  • Rendezvous Channel — (BoundedCapacity=0)
  • HyperLogLogEstimator — Count-Distinct with O(1) memory
  • Dual-threshold Watermark — Pause/Resume prevents oscillation
  • Liveness/Readiness Health Checks — Kubernetes-native
  • DeadLetterSink — failed items persistence
  • Data Lineage — provenance tracking in Metadata
  • ChannelMerge — merge two streams
  • RunInBackground() — streaming pipeline consumption
  • Hybrid Queue — FullMode option (Wait/DropOldest)
  • AsChannelReader() — SignalR/gRPC integration

What's New in v1.0.2

  • Lock-free RetryQueue
  • Lock-free CircuitBreaker
  • SmartPipeEventSource — monitor via dotnet-counters
  • SmartPipeHostedService — native ASP.NET Core integration
  • SmartPipeHealthCheck — pipeline health for YARP/Kubernetes
  • Adaptive EMA — dynamic α for spike detection
  • Dynamic Watermark — throughput-based backpressure
  • 96.3% code coverage (up from 86.5%)
  • 47 new tests, 0 regressions in benchmarks

Documentation

Acknowledgements

SmartPipe is built on ideas and research from:

  • Polly — resilience patterns for .NET (github.com/App-vNext/Polly)
  • System.Threading.Channels — lock-free producer/consumer infrastructure by Microsoft
  • OpenTelemetry — observability framework for cloud-native software
  • Little's Law — queue theory applied to adaptive parallelism (ACM Queue, 2025)
  • Bloom & Cuckoo Filters — probabilistic data structures for deduplication
  • ReTraced — three-level retry model inspiration
  • TheCodeMan — production Channel pipeline patterns
  • Microsoft.Extensions.Resilience — resilience pipeline integration
  • OWASP — security patterns for secret detection
  • BenchmarkDotNet — performance measurement framework
  • Control Theory (P-controllers) — applied to AdaptiveParallelism and BackpressureStrategy
  • HyperLogLog (Flajolet et al.) — cardinality estimation algorithm

License MIT License — see LICENSE for details.

Packages

 
 
 

Contributors

Languages