Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,31 @@ DOMAIN_SUBDOMAIN_NAME=https://vod-cdn.{SUBDOMAIN}.{DOMAIN}.com
# FFmpeg uses this for demuxing, filtering, and muxing threads.
FFMPEG_THREADS=0

# x265 (HEVC/Dolby Vision) thread pool size. Set to your vCPU count for max utilization.
# This is the BIGGEST performance lever — pools=none previously disabled ALL threading.
# Example: 32-core machine → X265_POOL_SIZE=32
X265_POOL_SIZE=32

# x265 frame-level parallelism. How many frames encode simultaneously.
# 4 is optimal for most machines. Higher values use more RAM but increase throughput.
# Rule of thumb: 2-6 depending on available RAM (each frame buffer ~50-200MB for 4K).
X265_FRAME_THREADS=4

# x265 lookahead threads. 0 = auto-detect (recommended, uses half of frame-threads).
# Previously hardcoded to 1, which bottlenecked the rate-control analysis.
X265_LOOKAHEAD_THREADS=0

# x265 rate-control lookahead depth (frames). Higher = better bitrate distribution.
# Increased from 20 to 40 for improved VBR quality at the cost of slightly more RAM.
X265_RC_LOOKAHEAD=40

# x264 rate-control lookahead depth (frames). Higher = better VBR quality.
# Increased from 40 to 60 for improved bitrate distribution.
X264_RC_LOOKAHEAD=60

# FFmpeg 8.1: Input thread queue sizing. Prevents "Thread message queue blocking" stalls.
# Higher values help with high-bitrate 4K sources. Default 512 covers most cases.
THREAD_QUEUE_SIZE=512

# Maximum muxer queue size. FFmpeg 8.1 runs each muxer in its own thread.
# Higher values prevent the muxer thread from blocking the encoder.
MAX_MUXING_QUEUE_SIZE=4096

# Developer Override: Force the system to use ONLY one group of profiles.
# Values: 'avc_sdr', 'hvc_sdr', 'hvc_pq', 'dvh_pq', 'ALL'
TEST_VIDEO_PROFILE=ALL
Expand Down
12 changes: 6 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
pkg-config \
nasm \
yasm \
wget \
git \
cmake \
Expand Down Expand Up @@ -72,7 +71,7 @@ RUN git clone --depth 1 https://github.com/Netflix/vmaf.git /tmp/vmaf && \
# -----------------------------------------------------------------------------
# 3. Download FFmpeg Source Code
# -----------------------------------------------------------------------------
ARG FFMPEG_VERSION=7.1
ARG FFMPEG_VERSION=8.1
RUN wget -q https://ffmpeg.org/releases/ffmpeg-${FFMPEG_VERSION}.tar.bz2 && \
tar xjf ffmpeg-${FFMPEG_VERSION}.tar.bz2 && \
rm ffmpeg-${FFMPEG_VERSION}.tar.bz2
Expand All @@ -90,7 +89,9 @@ RUN PKG_CONFIG_PATH=/usr/local/lib/aarch64-linux-gnu/pkgconfig:/usr/local/lib/x8
--enable-gpl \
--enable-nonfree \
--enable-version3 \
--enable-pthreads \
--enable-swresample \
--enable-swscale \
--enable-libsoxr \
--enable-libopus \
--enable-libx264 \
Expand All @@ -108,7 +109,6 @@ RUN PKG_CONFIG_PATH=/usr/local/lib/aarch64-linux-gnu/pkgconfig:/usr/local/lib/x8
--disable-doc \
--disable-debug \
--disable-ffplay \
--disable-autodetect \
--disable-sdl2 \
--disable-libxcb \
--disable-libxcb-shm \
Expand All @@ -125,7 +125,7 @@ RUN PKG_CONFIG_PATH=/usr/local/lib/aarch64-linux-gnu/pkgconfig:/usr/local/lib/x8
--disable-nvdec \
--disable-indevs \
--disable-outdevs \
--extra-cflags="-O2" \
--extra-cflags="-O3 -march=x86-64-v3 -pipe -fomit-frame-pointer" \
&& make -j$(nproc) \
&& make install \
&& strip /usr/local/bin/ffmpeg /usr/local/bin/ffprobe
Expand Down Expand Up @@ -160,7 +160,7 @@ RUN pnpm run build

# =============================================================================
# Stage 3 — Production Runtime
# Final optimized image containing the compiled Node.js application, the custom
# Final image containing the compiled Node.js application, the custom
# FFmpeg binary, and minimal shared runtime libraries.
# =============================================================================
FROM ubuntu:24.04
Expand All @@ -174,7 +174,7 @@ ARG VERSION=0.3.2
ARG BUILD_DATE=unknown

LABEL org.opencontainers.image.title="ffmpeg-queue-worker-node" \
org.opencontainers.image.description="FFmpeg 7.1 & Node.js Video Job Worker" \
org.opencontainers.image.description="FFmpeg 8.1 & Node.js Video Job Worker" \
org.opencontainers.image.version="${VERSION}" \
org.opencontainers.image.created="${BUILD_DATE}" \
org.opencontainers.image.authors="Maulik M. Kadeval" \
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"build": "rm -rf dist && tsc && find src -name '*.json' | while read f; do mkdir -p dist/$(dirname ${f#src/}) && cp $f dist/${f#src/}; done",
"dev:local:format": "prettier --write \"src/**/*.{ts,json}\"",
"dev:local:build:docker": "docker build -t worker-local-dev .",
"dev:local:build:docker:arm64": "docker build --platform linux/arm64 -t worker-local-dev .",
"dev:local:build:docker:amd64": "docker build --platform linux/amd64 -t worker-local-dev .",
"dev:local:test:job": "tsx --env-file=.env test/queue-job.test.local.ts",
"dev:local:run:docker": "docker run --env-file .env worker-local-dev",
"dev:local:e2e": "pnpm run dev:local:format && pnpm run dev:local:build:docker && pnpm run dev:local:test:job && pnpm run dev:local:run:docker"
Expand Down
13 changes: 12 additions & 1 deletion src/application/video.process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { pino } from 'pino';
const logger = pino({ name: 'ProcessVideo' });

/**
* Orchestrates the core video processing pipeline: Download -> Probe -> Transcode -> Upload.
* Orchestrates the domain lifecycle of a video ingest job: Network -> Probe -> Encode -> Storage.
*/
export class ProcessVideo implements ProcessVideoUseCase {
constructor(
Expand All @@ -25,6 +25,17 @@ export class ProcessVideo implements ProcessVideoUseCase {
private readonly db: VideoRepository,
) {}

/**
* Executes the sequential pipeline required to convert an arbitrary media source to HLS.
*
* - Streams the raw source over HTTP directly to local NVMe via `node:stream/promises` to avoid RAM saturation.
* - Invokes `ffprobe` to determine target mapping bounds (`sourceWidth`, `sourceHeight`).
* - Updates the database (PostgreSQL) incrementally based on status transitions to prevent worker lock loss.
*
* @param job - Required DTO mapping `videoId` to an inbound `sourceUrl`.
* @param onProgress - BullMQ callback exposing fractional `Math.round()` percentage integers back to Redis.
* @throws {WorkerError} Forwards non-zero FFmpeg exits or network IO disconnects back to BullMQ for retry evaluation.
*/
async execute(job: JobData, onProgress?: ProgressCallback): Promise<void> {
const { videoId, sourceUrl, webhookUrl } = job;
logger.info({ videoId, sourceUrl, webhookUrl }, 'Starting video processing pipeline');
Expand Down
22 changes: 14 additions & 8 deletions src/config/env.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
/**
* Runtime configuration validated via Zod schema.
* Designed to fail-fast: if any required variable is missing or malformed, the process crashes
* immediately before attempting to bind ports or connect to the database.
* Configuration loaded from environment variables using Zod.
* The application will crash on startup if any required values are missing or incorrect.
*/
import { z } from 'zod';

/**
* Strips literal quotes injected by `.env` loaders to prevent DSN/connection string parsing errors.
* Removes extra quotation marks from environment variables.
*/
const unquotedString = z.string().transform((s) => s.replace(/^["'](.*?)["']$/, '$1'));

Expand All @@ -31,6 +30,7 @@ const envSchema = z.object({
AZURE_UPLOAD_RETRIES: z.coerce.number().default(3),
AZURE_STORAGE_CONNECTION_STRING: unquotedString.optional(),
AZURE_STORAGE_ACCOUNT_URL: unquotedString.optional(),
AZURE_MANAGED_IDENTITY_CLIENT_ID: unquotedString.optional(),
AZURE_STORAGE_CONTAINER_NAME: unquotedString.pipe(
z.string().min(1, 'AZURE_STORAGE_CONTAINER_NAME is required'),
),
Expand All @@ -42,12 +42,18 @@ const envSchema = z.object({
DOMAIN_SUBDOMAIN_NAME: unquotedString.optional(),

FFMPEG_THREADS: z.coerce.number().default(0),
X265_POOL_SIZE: z.coerce.number().default(32),
X265_FRAME_THREADS: z.coerce.number().default(4),
X265_FRAME_THREADS: z.coerce.number().default(2),
X265_LOOKAHEAD_THREADS: z.coerce.number().default(0),
X265_RC_LOOKAHEAD: z.coerce.number().default(40),

THREAD_QUEUE_SIZE: z.coerce.number().default(512),
MAX_MUXING_QUEUE_SIZE: z.coerce.number().default(4096),

X264_RC_LOOKAHEAD: z.coerce.number().default(60),
});

/**
* Validated application configuration. Extracted directly from `process.env`.
* @throws {ZodError} If validation fails during module load.
* The validated application settings, used throughout the app.
* @throws {ZodError} If any environment variable rules are broken.
*/
export const config: z.infer<typeof envSchema> = envSchema.parse(process.env);
20 changes: 17 additions & 3 deletions src/domain/errors.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* Base custom exception for all domain errors. Wraps the underlying `.cause` for traceability.
* Features a `retryable` flag that signals to the queue wrapper whether to re-queue the job
* (e.g., transient network issue) or fail it permanently (e.g., validation failure).
* Derived exception class enforcing `{ retryable: boolean }` properties.
* Designed explicitly so the BullMQ execution loop can discriminate between `EIO` / network transient faults
* versus deterministic decoder/syntax panics which will never succeed.
*/
export class WorkerError extends Error {
public readonly retryable: boolean;
Expand All @@ -13,24 +13,38 @@ export class WorkerError extends Error {
}
}

/**
* Maps to HTTP 404/403 closures when resolving `sourceUrl` blobs into `import { pipeline }`.
*/
export class SourceNotFoundError extends WorkerError {
constructor(url: string, cause?: Error) {
super(`Source video not found: ${url}`, false, { cause });
}
}

/**
* Thrown strictly when `ffprobe` JSON parsing misses `codec_type === 'video'` segments,
* preventing OOM exceptions across downstream multiplex mapping arrays.
*/
export class ValidationError extends WorkerError {
constructor(message: string) {
super(message, false);
}
}

/**
* Thrown dynamically upon `code !== 0` closures from spawned `ffmpeg` PIDs.
* Allows `retryable=true` scaling to bypass transient OS thread allocation panics.
*/
export class TranscodeError extends WorkerError {
constructor(message: string, cause?: Error) {
super(message, true, { cause });
}
}

/**
* Bounds Azure/S3 multipart SDK connection strings resetting during `BlockBlobClient.upload()`.
*/
export class UploadError extends WorkerError {
constructor(message: string, cause?: Error) {
super(message, true, { cause });
Expand Down
48 changes: 45 additions & 3 deletions src/domain/job.interface.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
/**
* Explicit schema interfaces required across the ingestion boundaries (worker -> pg -> storage).
*/
export interface JobData {
videoId: string;
sourceUrl: string;
userId: string;
/** Resolves a callback POST upon zero-exit codes enabling loose-coupled status meshes. */
webhookUrl?: string;
}

/**
* Indexed mapping bound to libavformat streams array for track persistence.
*/
export interface AudioStreamInfo {
index: number;
codec: string;
Expand All @@ -13,6 +20,9 @@ export interface AudioStreamInfo {
title: string;
}

/**
* Maps raw `ffprobe` JSON outputs into explicitly-typed constraints.
*/
export interface ProbeResult {
duration: number;
width: number;
Expand All @@ -26,19 +36,30 @@ export interface ProbeResult {
videoRange: string;
}

/**
* Output artifact references mapped directly into CDN manifest namespaces.
*/
export interface VideoRendition {
resolution: string;
width: number;
height: number;
bitrate: number;
/** Pre-built HTTP relative-blob URL mapping directly to EXT-X-STREAM-INF payloads. */
url: string;
}

/**
* Final memory resolution passed back up to the master BullMQ job processor.
*/
export interface TranscodeResult {
/** Virtualized tmpfs / NVMe root mapping the segmented output arrays. */
outputDir: string;
renditions: VideoRendition[];
}

/**
* Tracks row-level states in PostgreSQL to handle pre-emption and idempotency locks.
*/
export type JobStatus =
| 'queued'
| 'processing'
Expand All @@ -47,6 +68,9 @@ export type JobStatus =
| 'completed'
| 'failed';

/**
* Implements the atomic transactions expected off `pg` client handles.
*/
export interface VideoRepository {
updateStatus(
videoId: string,
Expand All @@ -59,21 +83,33 @@ export interface VideoRepository {
}

/**
* Dependency-inversion interface for blob storage providers (e.g., Azure Blob Storage, AWS S3).
* Maps the internal Node disk blocks outwards to object blob spaces via multipart SDK uploads.
*/
export interface StorageProvider {
/**
* Iterates the segment manifests onto Azure/AWS, injecting exact `video/mp4` and `application/vnd.apple.mpegurl` headers.
* @returns Fully qualified FQDN for the resulting master array.
*/
uploadHLS(folderPath: string, videoId: string, onProgress?: ProgressCallback): Promise<string>;
}

/**
* Progress delegate invoked off chunk offsets per internal `ffprobe` loop bindings.
*/
export type ProgressCallback = (data: { variant: string; percent: number }) => void;

/**
* Domain boundary around the native FFmpeg system binary.
* Implementors must guarantee that `cleanup()` removes all trailing artifacts from the OS.
* Abstract boundary wrapping child_process execution of the native OS ffmpeg binary.
*/
export interface TranscodeProvider {
/**
* Synchronous `spawn` intercept for the inbound libavformat properties array.
*/
probe(sourceUrl: string): Promise<ProbeResult>;

/**
* Pushes bounded stream definitions through the libx265 / libx264 cores.
*/
transcodeHLS(
sourceUrl: string,
videoId: string,
Expand All @@ -86,9 +122,15 @@ export interface TranscodeProvider {
videoRange?: string,
): Promise<TranscodeResult>;

/**
* Empties the `tmpfs` blocks post-execution, strictly avoiding disk bloat errors.
*/
cleanup(videoId: string): Promise<void>;
}

/**
* Dependency-injected service orchestrating the state/execute lifecycle of worker units.
*/
export interface ProcessVideoUseCase {
execute(job: JobData, onProgress?: ProgressCallback): Promise<void>;
}
9 changes: 5 additions & 4 deletions src/infrastructure/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import type {
const logger = pino({ name: 'PostgresVideoRepository' });

/**
* PostgreSQL adapter for persisting video state and metadata.
* Provides database connection pooling for PostgreSQL using `pg`.
*
* @remarks
* - Manages its own connection pool. Must call `close()` during graceful shutdown to prevent connection leaks.
* - Upserts are used for metadata (`ON CONFLICT (video_id) DO UPDATE`) to safely support job retries (idempotency).
* - `updateStatus` serves as a sanity check: it intentionally throws if the video ID vanishes mid-process.
* - Enforces minimum timeout definitions (`idleTimeoutMillis`, `connectionTimeoutMillis`) to
* prevent zombie DB locks if pg/Azure networks lag.
* - Resolves race conditions across distributed workers by utilizing `ON CONFLICT ... DO UPDATE` upserts.
* - Verifies row existence before state progression transitions to prevent missing target errors.
*/
export class PostgresVideoRepository implements VideoRepository {
private readonly pool: pg.Pool;
Expand Down
Loading
Loading