-
Notifications
You must be signed in to change notification settings - Fork 1
Pipeline
Sebastian Martinez edited this page Mar 29, 2026
·
1 revision
Zero-dependency, TypeScript-first functional data pipeline system for composing map, filter, and reduce transformations with full type inference across sync and async operations.
import { pipe, map, filter, reduce } from "bytekit/pipeline";
// or from the root entry:
import { pipe, map, filter, reduce } from "bytekit";The pipeline system lets you compose typed transformation operators into a reusable, lazy, immutable pipeline. Nothing executes until you call .process(data). Each .pipe() call returns a new Pipeline instance — the original is never mutated.
Operator types:
-
map— transform each element concurrently (Promise.all) -
filter— retain elements concurrently, preserve order -
reduce— accumulate sequentially (each step awaits the previous)
type PipelineOp<TIn, TOut> = (input: TIn) => TOut | Promise<TOut>;The atomic unit. A function that transforms a value — synchronously or asynchronously.
| Method | Signature | Description |
|---|---|---|
pipe(op) |
(op: PipelineOp<TOut, TNext>) => Pipeline<TIn, TNext> |
Append an operator. Returns a new Pipeline. |
process(data) |
(data: TIn) => Promise<TOut> |
Execute all operators sequentially. Always async. |
Factory function that builds a Pipeline from a sequence of operators. Provides full type inference for up to 7 chained operators.
function pipe<T, A>(op1: PipelineOp<T, A>): Pipeline<T, A>;
function pipe<T, A, B>(op1: PipelineOp<T, A>, op2: PipelineOp<A, B>): Pipeline<T, B>;
// ... up to 7 ops
// Escape hatch (no inference):
function pipe<T = unknown>(...ops: PipelineOp<unknown, unknown>[]): Pipeline<T, unknown>;function map<T, U>(
fn: (item: T, index: number) => U | Promise<U>
): PipelineOp<T[], U[]>| Parameter | Description |
|---|---|
fn |
Called with (item, index). Sync or async. |
| Returns |
PipelineOp<T[], U[]> — all items processed concurrently via Promise.all. |
function filter<T>(
fn: (item: T, index: number) => boolean | Promise<boolean>
): PipelineOp<T[], T[]>| Parameter | Description |
|---|---|
fn |
Predicate receiving (item, index). Sync or async. |
| Returns |
PipelineOp<T[], T[]> — all predicates run concurrently; retained items preserve original order. |
function reduce<T, U>(
fn: (acc: U, item: T, index: number) => U | Promise<U>,
initial: U
): PipelineOp<T[], U>| Parameter | Description |
|---|---|
fn |
Reducer receiving (accumulator, item, index). Sync or async. |
initial |
Starting accumulator value. |
| Returns |
PipelineOp<T[], U> — runs sequentially. Returns initial for empty arrays. |
-
pipe(...ops)creates aPipelinestoring the operators in an array. Nothing runs. -
.pipe(op)clones the operator array and returns a newPipeline. Original is unchanged. -
.process(data)runs each operator in afor…ofloop withawait. The output of each op becomes the input of the next. -
map/filterrun each item's function concurrently withPromise.all;reduceruns each step sequentially. - Errors thrown by any operator propagate unchanged out of
.process().
import { pipe, filter, map, reduce } from "bytekit/pipeline";
const totalRevenue = await pipe(
filter<Product>((p) => p.inStock),
map<Product, number>((p) => p.priceCents),
reduce<number, number>((acc, price) => acc + price, 0)
).process(products);import { pipe, map } from "bytekit/pipeline";
const enriched = await pipe(
map<Order, EnrichedOrder>(async (order) => ({
...order,
userName: await fetchUser(order.userId),
}))
).process(orders);
// All fetchUser() calls run concurrently — order is preservedconst base = pipe(filter<Product>((p) => p.inStock));
// base is not mutated
const names = base.pipe(map<Product, string>((p) => p.name));
const prices = base.pipe(map<Product, number>((p) => p.priceCents / 100));
const [nameList, priceList] = await Promise.all([
names.process(products),
prices.process(products),
]);import { ApiClient, pipe, filter, map } from "bytekit";
const client = new ApiClient({ baseUrl: "https://api.example.com" });
const products = await client.get<RawProduct[]>("/products", {
pipeline: pipe(
filter<RawProduct>((p) => p.active),
map<RawProduct, Product>((p) => ({
id: p.id,
name: p.name,
price: p.price_cents / 100,
}))
),
});
// Pipeline applied after response parsing/validationtry {
await pipe(
map<number, number>((n) => {
if (n < 0) throw new RangeError("negative");
return Math.sqrt(n);
})
).process([4, 9, -1]);
} catch (err) {
// err is RangeError("negative") — original error, not wrapped
}