diff --git a/.env.example b/.env.example index c8add50..fd67969 100644 --- a/.env.example +++ b/.env.example @@ -40,16 +40,31 @@ DOMAIN_SUBDOMAIN_NAME=https://vod-cdn.{SUBDOMAIN}.{DOMAIN}.com # FFmpeg uses this for demuxing, filtering, and muxing threads. FFMPEG_THREADS=0 -# x265 (HEVC/Dolby Vision) thread pool size. Set to your vCPU count for max utilization. -# This is the BIGGEST performance lever — pools=none previously disabled ALL threading. -# Example: 32-core machine → X265_POOL_SIZE=32 -X265_POOL_SIZE=32 - # x265 frame-level parallelism. How many frames encode simultaneously. # 4 is optimal for most machines. Higher values use more RAM but increase throughput. # Rule of thumb: 2-6 depending on available RAM (each frame buffer ~50-200MB for 4K). X265_FRAME_THREADS=4 +# x265 lookahead threads. 0 = auto-detect (recommended, uses half of frame-threads). +# Previously hardcoded to 1, which bottlenecked the rate-control analysis. +X265_LOOKAHEAD_THREADS=0 + +# x265 rate-control lookahead depth (frames). Higher = better bitrate distribution. +# Increased from 20 to 40 for improved VBR quality at the cost of slightly more RAM. +X265_RC_LOOKAHEAD=40 + +# x264 rate-control lookahead depth (frames). Higher = better VBR quality. +# Increased from 40 to 60 for improved bitrate distribution. +X264_RC_LOOKAHEAD=60 + +# FFmpeg 8.1: Input thread queue sizing. Prevents "Thread message queue blocking" stalls. +# Higher values help with high-bitrate 4K sources. Default 512 covers most cases. +THREAD_QUEUE_SIZE=512 + +# Maximum muxer queue size. FFmpeg 8.1 runs each muxer in its own thread. +# Higher values prevent the muxer thread from blocking the encoder. +MAX_MUXING_QUEUE_SIZE=4096 + # Developer Override: Force the system to use ONLY one group of profiles. # Values: 'avc_sdr', 'hvc_sdr', 'hvc_pq', 'dvh_pq', 'ALL' TEST_VIDEO_PROFILE=ALL diff --git a/Dockerfile b/Dockerfile index a824f53..7a7d965 100644 --- a/Dockerfile +++ b/Dockerfile @@ -40,7 +40,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ pkg-config \ nasm \ - yasm \ wget \ git \ cmake \ @@ -72,7 +71,7 @@ RUN git clone --depth 1 https://github.com/Netflix/vmaf.git /tmp/vmaf && \ # ----------------------------------------------------------------------------- # 3. Download FFmpeg Source Code # ----------------------------------------------------------------------------- -ARG FFMPEG_VERSION=7.1 +ARG FFMPEG_VERSION=8.1 RUN wget -q https://ffmpeg.org/releases/ffmpeg-${FFMPEG_VERSION}.tar.bz2 && \ tar xjf ffmpeg-${FFMPEG_VERSION}.tar.bz2 && \ rm ffmpeg-${FFMPEG_VERSION}.tar.bz2 @@ -90,7 +89,9 @@ RUN PKG_CONFIG_PATH=/usr/local/lib/aarch64-linux-gnu/pkgconfig:/usr/local/lib/x8 --enable-gpl \ --enable-nonfree \ --enable-version3 \ + --enable-pthreads \ --enable-swresample \ + --enable-swscale \ --enable-libsoxr \ --enable-libopus \ --enable-libx264 \ @@ -108,7 +109,6 @@ RUN PKG_CONFIG_PATH=/usr/local/lib/aarch64-linux-gnu/pkgconfig:/usr/local/lib/x8 --disable-doc \ --disable-debug \ --disable-ffplay \ - --disable-autodetect \ --disable-sdl2 \ --disable-libxcb \ --disable-libxcb-shm \ @@ -125,7 +125,7 @@ RUN PKG_CONFIG_PATH=/usr/local/lib/aarch64-linux-gnu/pkgconfig:/usr/local/lib/x8 --disable-nvdec \ --disable-indevs \ --disable-outdevs \ - --extra-cflags="-O2" \ + --extra-cflags="-O3 -march=x86-64-v3 -pipe -fomit-frame-pointer" \ && make -j$(nproc) \ && make install \ && strip /usr/local/bin/ffmpeg /usr/local/bin/ffprobe @@ -160,7 +160,7 @@ RUN pnpm run build # ============================================================================= # Stage 3 — Production Runtime -# Final optimized image containing the compiled Node.js application, the custom +# Final image containing the compiled Node.js application, the custom # FFmpeg binary, and minimal shared runtime libraries. # ============================================================================= FROM ubuntu:24.04 @@ -174,7 +174,7 @@ ARG VERSION=0.3.2 ARG BUILD_DATE=unknown LABEL org.opencontainers.image.title="ffmpeg-queue-worker-node" \ - org.opencontainers.image.description="FFmpeg 7.1 & Node.js Video Job Worker" \ + org.opencontainers.image.description="FFmpeg 8.1 & Node.js Video Job Worker" \ org.opencontainers.image.version="${VERSION}" \ org.opencontainers.image.created="${BUILD_DATE}" \ org.opencontainers.image.authors="Maulik M. Kadeval" \ diff --git a/package.json b/package.json index c14659a..cc1a94b 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,8 @@ "build": "rm -rf dist && tsc && find src -name '*.json' | while read f; do mkdir -p dist/$(dirname ${f#src/}) && cp $f dist/${f#src/}; done", "dev:local:format": "prettier --write \"src/**/*.{ts,json}\"", "dev:local:build:docker": "docker build -t worker-local-dev .", + "dev:local:build:docker:arm64": "docker build --platform linux/arm64 -t worker-local-dev .", + "dev:local:build:docker:amd64": "docker build --platform linux/amd64 -t worker-local-dev .", "dev:local:test:job": "tsx --env-file=.env test/queue-job.test.local.ts", "dev:local:run:docker": "docker run --env-file .env worker-local-dev", "dev:local:e2e": "pnpm run dev:local:format && pnpm run dev:local:build:docker && pnpm run dev:local:test:job && pnpm run dev:local:run:docker" diff --git a/src/application/video.process.ts b/src/application/video.process.ts index 5dc5737..8870df4 100644 --- a/src/application/video.process.ts +++ b/src/application/video.process.ts @@ -16,7 +16,7 @@ import { pino } from 'pino'; const logger = pino({ name: 'ProcessVideo' }); /** - * Orchestrates the core video processing pipeline: Download -> Probe -> Transcode -> Upload. + * Orchestrates the domain lifecycle of a video ingest job: Network -> Probe -> Encode -> Storage. */ export class ProcessVideo implements ProcessVideoUseCase { constructor( @@ -25,6 +25,17 @@ export class ProcessVideo implements ProcessVideoUseCase { private readonly db: VideoRepository, ) {} + /** + * Executes the sequential pipeline required to convert an arbitrary media source to HLS. + * + * - Streams the raw source over HTTP directly to local NVMe via `node:stream/promises` to avoid RAM saturation. + * - Invokes `ffprobe` to determine target mapping bounds (`sourceWidth`, `sourceHeight`). + * - Updates the database (PostgreSQL) incrementally based on status transitions to prevent worker lock loss. + * + * @param job - Required DTO mapping `videoId` to an inbound `sourceUrl`. + * @param onProgress - BullMQ callback exposing fractional `Math.round()` percentage integers back to Redis. + * @throws {WorkerError} Forwards non-zero FFmpeg exits or network IO disconnects back to BullMQ for retry evaluation. + */ async execute(job: JobData, onProgress?: ProgressCallback): Promise { const { videoId, sourceUrl, webhookUrl } = job; logger.info({ videoId, sourceUrl, webhookUrl }, 'Starting video processing pipeline'); diff --git a/src/config/env.ts b/src/config/env.ts index 00f03a8..26ed7de 100644 --- a/src/config/env.ts +++ b/src/config/env.ts @@ -1,12 +1,11 @@ /** - * Runtime configuration validated via Zod schema. - * Designed to fail-fast: if any required variable is missing or malformed, the process crashes - * immediately before attempting to bind ports or connect to the database. + * Configuration loaded from environment variables using Zod. + * The application will crash on startup if any required values are missing or incorrect. */ import { z } from 'zod'; /** - * Strips literal quotes injected by `.env` loaders to prevent DSN/connection string parsing errors. + * Removes extra quotation marks from environment variables. */ const unquotedString = z.string().transform((s) => s.replace(/^["'](.*?)["']$/, '$1')); @@ -31,6 +30,7 @@ const envSchema = z.object({ AZURE_UPLOAD_RETRIES: z.coerce.number().default(3), AZURE_STORAGE_CONNECTION_STRING: unquotedString.optional(), AZURE_STORAGE_ACCOUNT_URL: unquotedString.optional(), + AZURE_MANAGED_IDENTITY_CLIENT_ID: unquotedString.optional(), AZURE_STORAGE_CONTAINER_NAME: unquotedString.pipe( z.string().min(1, 'AZURE_STORAGE_CONTAINER_NAME is required'), ), @@ -42,12 +42,18 @@ const envSchema = z.object({ DOMAIN_SUBDOMAIN_NAME: unquotedString.optional(), FFMPEG_THREADS: z.coerce.number().default(0), - X265_POOL_SIZE: z.coerce.number().default(32), - X265_FRAME_THREADS: z.coerce.number().default(4), + X265_FRAME_THREADS: z.coerce.number().default(2), + X265_LOOKAHEAD_THREADS: z.coerce.number().default(0), + X265_RC_LOOKAHEAD: z.coerce.number().default(40), + + THREAD_QUEUE_SIZE: z.coerce.number().default(512), + MAX_MUXING_QUEUE_SIZE: z.coerce.number().default(4096), + + X264_RC_LOOKAHEAD: z.coerce.number().default(60), }); /** - * Validated application configuration. Extracted directly from `process.env`. - * @throws {ZodError} If validation fails during module load. + * The validated application settings, used throughout the app. + * @throws {ZodError} If any environment variable rules are broken. */ export const config: z.infer = envSchema.parse(process.env); diff --git a/src/domain/errors.ts b/src/domain/errors.ts index a129695..2505e91 100644 --- a/src/domain/errors.ts +++ b/src/domain/errors.ts @@ -1,7 +1,7 @@ /** - * Base custom exception for all domain errors. Wraps the underlying `.cause` for traceability. - * Features a `retryable` flag that signals to the queue wrapper whether to re-queue the job - * (e.g., transient network issue) or fail it permanently (e.g., validation failure). + * Derived exception class enforcing `{ retryable: boolean }` properties. + * Designed explicitly so the BullMQ execution loop can discriminate between `EIO` / network transient faults + * versus deterministic decoder/syntax panics which will never succeed. */ export class WorkerError extends Error { public readonly retryable: boolean; @@ -13,24 +13,38 @@ export class WorkerError extends Error { } } +/** + * Maps to HTTP 404/403 closures when resolving `sourceUrl` blobs into `import { pipeline }`. + */ export class SourceNotFoundError extends WorkerError { constructor(url: string, cause?: Error) { super(`Source video not found: ${url}`, false, { cause }); } } +/** + * Thrown strictly when `ffprobe` JSON parsing misses `codec_type === 'video'` segments, + * preventing OOM exceptions across downstream multiplex mapping arrays. + */ export class ValidationError extends WorkerError { constructor(message: string) { super(message, false); } } +/** + * Thrown dynamically upon `code !== 0` closures from spawned `ffmpeg` PIDs. + * Allows `retryable=true` scaling to bypass transient OS thread allocation panics. + */ export class TranscodeError extends WorkerError { constructor(message: string, cause?: Error) { super(message, true, { cause }); } } +/** + * Bounds Azure/S3 multipart SDK connection strings resetting during `BlockBlobClient.upload()`. + */ export class UploadError extends WorkerError { constructor(message: string, cause?: Error) { super(message, true, { cause }); diff --git a/src/domain/job.interface.ts b/src/domain/job.interface.ts index 38733be..9800fe9 100644 --- a/src/domain/job.interface.ts +++ b/src/domain/job.interface.ts @@ -1,10 +1,17 @@ +/** + * Explicit schema interfaces required across the ingestion boundaries (worker -> pg -> storage). + */ export interface JobData { videoId: string; sourceUrl: string; userId: string; + /** Resolves a callback POST upon zero-exit codes enabling loose-coupled status meshes. */ webhookUrl?: string; } +/** + * Indexed mapping bound to libavformat streams array for track persistence. + */ export interface AudioStreamInfo { index: number; codec: string; @@ -13,6 +20,9 @@ export interface AudioStreamInfo { title: string; } +/** + * Maps raw `ffprobe` JSON outputs into explicitly-typed constraints. + */ export interface ProbeResult { duration: number; width: number; @@ -26,19 +36,30 @@ export interface ProbeResult { videoRange: string; } +/** + * Output artifact references mapped directly into CDN manifest namespaces. + */ export interface VideoRendition { resolution: string; width: number; height: number; bitrate: number; + /** Pre-built HTTP relative-blob URL mapping directly to EXT-X-STREAM-INF payloads. */ url: string; } +/** + * Final memory resolution passed back up to the master BullMQ job processor. + */ export interface TranscodeResult { + /** Virtualized tmpfs / NVMe root mapping the segmented output arrays. */ outputDir: string; renditions: VideoRendition[]; } +/** + * Tracks row-level states in PostgreSQL to handle pre-emption and idempotency locks. + */ export type JobStatus = | 'queued' | 'processing' @@ -47,6 +68,9 @@ export type JobStatus = | 'completed' | 'failed'; +/** + * Implements the atomic transactions expected off `pg` client handles. + */ export interface VideoRepository { updateStatus( videoId: string, @@ -59,21 +83,33 @@ export interface VideoRepository { } /** - * Dependency-inversion interface for blob storage providers (e.g., Azure Blob Storage, AWS S3). + * Maps the internal Node disk blocks outwards to object blob spaces via multipart SDK uploads. */ export interface StorageProvider { + /** + * Iterates the segment manifests onto Azure/AWS, injecting exact `video/mp4` and `application/vnd.apple.mpegurl` headers. + * @returns Fully qualified FQDN for the resulting master array. + */ uploadHLS(folderPath: string, videoId: string, onProgress?: ProgressCallback): Promise; } +/** + * Progress delegate invoked off chunk offsets per internal `ffprobe` loop bindings. + */ export type ProgressCallback = (data: { variant: string; percent: number }) => void; /** - * Domain boundary around the native FFmpeg system binary. - * Implementors must guarantee that `cleanup()` removes all trailing artifacts from the OS. + * Abstract boundary wrapping child_process execution of the native OS ffmpeg binary. */ export interface TranscodeProvider { + /** + * Synchronous `spawn` intercept for the inbound libavformat properties array. + */ probe(sourceUrl: string): Promise; + /** + * Pushes bounded stream definitions through the libx265 / libx264 cores. + */ transcodeHLS( sourceUrl: string, videoId: string, @@ -86,9 +122,15 @@ export interface TranscodeProvider { videoRange?: string, ): Promise; + /** + * Empties the `tmpfs` blocks post-execution, strictly avoiding disk bloat errors. + */ cleanup(videoId: string): Promise; } +/** + * Dependency-injected service orchestrating the state/execute lifecycle of worker units. + */ export interface ProcessVideoUseCase { execute(job: JobData, onProgress?: ProgressCallback): Promise; } diff --git a/src/infrastructure/db/db.ts b/src/infrastructure/db/db.ts index f6ff9f8..7d71067 100644 --- a/src/infrastructure/db/db.ts +++ b/src/infrastructure/db/db.ts @@ -10,12 +10,13 @@ import type { const logger = pino({ name: 'PostgresVideoRepository' }); /** - * PostgreSQL adapter for persisting video state and metadata. + * Provides database connection pooling for PostgreSQL using `pg`. * * @remarks - * - Manages its own connection pool. Must call `close()` during graceful shutdown to prevent connection leaks. - * - Upserts are used for metadata (`ON CONFLICT (video_id) DO UPDATE`) to safely support job retries (idempotency). - * - `updateStatus` serves as a sanity check: it intentionally throws if the video ID vanishes mid-process. + * - Enforces minimum timeout definitions (`idleTimeoutMillis`, `connectionTimeoutMillis`) to + * prevent zombie DB locks if pg/Azure networks lag. + * - Resolves race conditions across distributed workers by utilizing `ON CONFLICT ... DO UPDATE` upserts. + * - Verifies row existence before state progression transitions to prevent missing target errors. */ export class PostgresVideoRepository implements VideoRepository { private readonly pool: pg.Pool; diff --git a/src/infrastructure/ffmpeg/adapter.ts b/src/infrastructure/ffmpeg/adapter.ts index 667015e..734c289 100644 --- a/src/infrastructure/ffmpeg/adapter.ts +++ b/src/infrastructure/ffmpeg/adapter.ts @@ -34,12 +34,37 @@ const ISO_639_1_MAP: Record = { und: 'und', }; +/** + * Primary FFmpeg controller orchestrating pipeline sub-shells and local IO states. + */ export class FFmpegAdapter implements TranscodeProvider { constructor(private readonly workDir: string = DEFAULT_WORK_DIR) {} + + /** + * Triggers the libavformat container parser (probe.js core execution) for structural metadata. + */ async probe(sourceUrl: string): Promise { return probe(sourceUrl); } + /** + * Filters an inbound source topology against restricted `profiles.json` ladders + * and dispatches them sequentially through libx264/libx265 encoding cores. + * + * - Invokes a unified UUID UUIDv4 string (`tierId`) mapper across the audio/video chunks + * to prevent URL discovery brute force iterations. + * + * @param sourceUrl - Network accessible inbound blob stream. + * @param videoId - Used entirely for namespace tracking in logs and storage boundaries. + * @param sourceWidth - Display layer pixel constraint mapped from `ffprobe`. + * @param sourceHeight - Display layer scale mapped. + * @param sourceDuration - Length evaluation multiplier for `progress()` calculations. + * @param onProgress - Timecode scalar callback bridging percentage reports to the BullMQ wrapper. + * @param sourceFrameRate - Baseline framerate for computing expected drop-frame bounds (NTSC limits). + * @param audioStreams - FFprobe indexed track list mapped against requested Atmos definitions. + * @param videoRange - Enforces strict transfer characteristics arrays ('SDR', 'PQ', 'HLG'). + * @returns Resolution paths mapped to `blobPathFromUuid` for the final Azure ingest. + */ async transcodeHLS( sourceUrl: string, videoId: string, diff --git a/src/infrastructure/ffmpeg/constants.ts b/src/infrastructure/ffmpeg/constants.ts index 6ebf917..01d6e06 100644 --- a/src/infrastructure/ffmpeg/constants.ts +++ b/src/infrastructure/ffmpeg/constants.ts @@ -1,3 +1,6 @@ +/** + * Enforces standard RFC 8216 file mappings used uniformly across CDN deployments. + */ export const HLS_CONSTANTS = { MASTER_PLAYLIST_NAME: 'playlist.m3u8', diff --git a/src/infrastructure/ffmpeg/core/complexity.ts b/src/infrastructure/ffmpeg/core/complexity.ts index f97f723..b3e4437 100644 --- a/src/infrastructure/ffmpeg/core/complexity.ts +++ b/src/infrastructure/ffmpeg/core/complexity.ts @@ -28,13 +28,15 @@ async function runCommand(args: string[]): Promise { } /** - * Orchestrates a mathematically-driven Smart Per-Title Bitrate adaptation using Netflix's VMAF model. + * Checks how complex a video is to figure out the right bitrate multiplier. + * Runs a quick high-quality sample encode to target a good VMAF score. * - * @remarks - * - Phase 1: Generates a near-lossless reference encode (CRF 10, ultrafast) to establish a pristine baseline. - * - Phase 2: Encodes empirical test points (CRF 19, 23, 27) and scores them against the reference using the VMAF neural network. - * - Phase 3: Interpolates a Rate-Distortion curve to find the exact bits-per-second required to hit a perceptive VMAF score of 95.0. - * - Bounding: Applies strict OTT resolution safety floors to prevent bitrate explosion on noisy/grainy source files. + * @param sourceUrl - Path or URL to the original video file. + * @param duration - Length of the video in seconds, used to report progress. + * @param videoId - Unique ID for the video being processed. + * @param sourceWidth - Width of the source video in pixels. + * @param sourceHeight - Height of the source video in pixels. + * @returns A multiplier number that adjusts the final bitrate up or down. */ export async function probeComplexity( sourceUrl: string, diff --git a/src/infrastructure/ffmpeg/core/hash.ts b/src/infrastructure/ffmpeg/core/hash.ts index 84ad4c4..c4a9994 100644 --- a/src/infrastructure/ffmpeg/core/hash.ts +++ b/src/infrastructure/ffmpeg/core/hash.ts @@ -1,9 +1,18 @@ import { v7 as uuidv7 } from 'uuid'; +/** + * Invokes uuidv7 generator mapped to POSIX timestamps. + * Guarantees monotonic time-sorting across distributed blobs preventing hot-partitions + * on internal S3 and SSD indexing mechanisms. + */ export function generateTierUuid(): string { return uuidv7(); } +/** + * Implements a balanced shard-tree prefix `[0-9a-f]{2}/...` for object key spaces. + * Forces horizontal scaling against object storage namespace limitations (e.g. AWS 3500 PUTs/sec limit per prefix). + */ export function blobPathFromUuid(id: string): string { const cleanId = id.toLowerCase().replace(/-/g, ''); diff --git a/src/infrastructure/ffmpeg/core/probe.ts b/src/infrastructure/ffmpeg/core/probe.ts index 542d3b8..10835fc 100644 --- a/src/infrastructure/ffmpeg/core/probe.ts +++ b/src/infrastructure/ffmpeg/core/probe.ts @@ -7,6 +7,13 @@ const logger = pino({ name: 'ProbeCommand' }); const MAX_DURATION_SECONDS = 7200; const MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024 * 1024; +/** + * Evaluates bounding matrices against `ffprobe` outputs mapped per JSON. + * + * - Traverses all stream variants validating existence of at least one `codec_type === 'video'`. + * - Maps fractions `r_frame_rate` (e.g. `24000/1001`) into rounded numbers for internal math logic + * protecting drop-frame sync integrity. + */ export async function probe(sourceUrl: string): Promise { logger.info({ sourceUrl }, 'Probing source video'); diff --git a/src/infrastructure/ffmpeg/core/runner.ts b/src/infrastructure/ffmpeg/core/runner.ts index 1688108..dfda8f0 100644 --- a/src/infrastructure/ffmpeg/core/runner.ts +++ b/src/infrastructure/ffmpeg/core/runner.ts @@ -6,12 +6,19 @@ import type { RunOptions } from '../types.js'; const logger = pino({ name: 'FFmpegRunner' }); /** - * Robust child-process wrapper for invoking the FFmpeg binary safely in Node.js. + * Runs the ffmpeg tool in the background. * * @remarks - * - Parses FFmpeg's chaotic `stderr` payload to extract frame-accurate progression values. - * - Maintains a rolling buffer of `stderr` (hard-capped at 50kb) to prevent V8 memory leaks during hours-long encodes. - * - Maps raw exit codes and the tail of the stderr buffer into actionable `TranscodeError` domain exceptions. + * - Reads ffmpeg's stderr output to figure out how much has been processed. + * - Keeps a small log buffer of the output so we have details if it crashes. + * - Turns bad exit codes into regular errors we can catch. + * + * @param opts.args - The flags and inputs passed to the ffmpeg command. + * @param opts.label - A short name for this encoding step (for example, 'Audio_Conversion'). + * @param opts.videoId - The ID of the video being processed. + * @param opts.onProgress - A callback that fires when ffmpeg reports new progress. + * @param opts.duration - Handed over so we can calculate the percentage done. + * @returns Resolves when the ffmpeg command finishes successfully. */ export function runFFmpeg(opts: RunOptions): Promise { const { args, label, videoId, onProgress, duration } = opts; @@ -28,8 +35,8 @@ export function runFFmpeg(opts: RunOptions): Promise { const text = chunk.toString(); stderrBuffer += text; - if (stderrBuffer.length > 50000) { - stderrBuffer = stderrBuffer.slice(-25000); + if (stderrBuffer.length > 200_000) { + stderrBuffer = stderrBuffer.slice(-100_000); } if (onProgress || duration) { @@ -103,6 +110,14 @@ interface FfprobeOutput { format: FfprobeFormat; } +/** + * Gets details about a media file (like duration and streams) by running ffprobe. + * + * @param sourceUrl - Path or URL to the video file. + * @returns The parsed video and audio metadata. + * @throws {ValidationError} Thrown if ffprobe returns invalid JSON bounds. + * @throws {SourceNotFoundError} Thrown if the video cannot be downloaded. + */ export function runFFprobe(sourceUrl: string): Promise { const args = [ '-v', diff --git a/src/infrastructure/ffmpeg/encoding/flags.ts b/src/infrastructure/ffmpeg/encoding/flags.ts index 94e710c..b00fdb2 100644 --- a/src/infrastructure/ffmpeg/encoding/flags.ts +++ b/src/infrastructure/ffmpeg/encoding/flags.ts @@ -9,6 +9,10 @@ export interface FrameRateInfo { gopSize: number; } +/** + * Normalizes input framerate to strictly enforced NTSC drop-frame broadcast standards (24000/1001, 30000/1001). + * Fixes temporal jitter caused by rounding errors in 23.976 / 29.970 media files. + */ export function getBroadcastFrameRate(sourceFps?: number): FrameRateInfo | null { if (!sourceFps) return null; @@ -39,6 +43,13 @@ export function getBroadcastFrameRate(sourceFps?: number): FrameRateInfo | null }; } +/** + * Constructs the `-f hls` output parameter blocks for libavformat. + * + * - Forces fMP4 fragments instead of TS containers to reduce byte bloat and improve CDN caching. + * - Extracts `tfhd` offset tracking (`+omit_tfhd_offset`) to remain fully CMAF compliant. + * - Explicitly pushes a `+genpts` fflag so broken presentation timestamps do not crash the muxer queue. + */ export function hlsOutputFlags( hlsTime: number, outputDir: string, @@ -66,7 +77,7 @@ export function hlsOutputFlags( '-hls_fmp4_init_filename', initPattern, '-movflags', - '+frag_keyframe+empty_moov+default_base_moof+cmaf+omit_tfhd_offset', + '+frag_keyframe+empty_moov+default_base_moof+cmaf+omit_tfhd_offset+write_colr', '-f', 'hls', '-hls_time', @@ -91,6 +102,8 @@ export function hlsOutputFlags( '1', '-video_track_timescale', '90000', + '-max_muxing_queue_size', + String(config.MAX_MUXING_QUEUE_SIZE), ]; if (baseUrl) { @@ -100,6 +113,14 @@ export function hlsOutputFlags( return flags; } +/** + * Maps variant resolution and HDR configurations to x264/x265 encoder flags. + * + * - Enforces CFR (Constant Frame Rate) to prevent A/V sync drift on manifest boundaries. + * - Restricts VBV buffer size (`-bufsize`) tight to `-maxrate` to prevent playback buffer underflow on constrained memory devices (ref: Apple HLS Authoring Spec). + * - Forces IDR frames strictly at GOP bounds (`-sc_threshold 0`) to guarantee seamless ABR switching. + * - Injects static HDR10 metadata (MaxCLL/Master Display) for SMPTE ST 2084 compliance. + */ export function videoEncoderFlags(variant: VideoVariantMeta, sourceFrameRate?: number): string[] { const fpsInfo = getBroadcastFrameRate(sourceFrameRate); const gopSize = fpsInfo ? fpsInfo.gopSize : 48; @@ -175,14 +196,14 @@ export function videoEncoderFlags(variant: VideoVariantMeta, sourceFrameRate?: n baseFlags.push( '-x265-params', - `pools=${config.X265_POOL_SIZE}:frame-threads=${config.X265_FRAME_THREADS}:wpp=1:no-open-gop=1:scenecut=0:keyint=${gopSize}:min-keyint=${gopSize}:info=0:colorprim=${colorPrimaries}:transfer=${colorTransfer}:colormatrix=${colorMatrix}${extraHdrParams}${dvhParam}`, + `frame-threads=${config.X265_FRAME_THREADS}:wpp=1:rc-lookahead=${config.X265_RC_LOOKAHEAD}:lookahead-threads=${config.X265_LOOKAHEAD_THREADS}:no-open-gop=1:scenecut=0:keyint=${gopSize}:min-keyint=${gopSize}:info=0:no-sao=1:aq-mode=3:aq-strength=1.0:no-strong-intra-smoothing=1:deblock=-2,-2:rd=4:rdoq-level=2:tu-intra-depth=3:tu-inter-depth=3:b-adapt=2:colorprim=${colorPrimaries}:transfer=${colorTransfer}:colormatrix=${colorMatrix}${extraHdrParams}${dvhParam}`, '-flags', '+global_header', ); } else { baseFlags.push( '-x264-params', - `threads=${config.FFMPEG_THREADS === 0 ? 'auto' : config.FFMPEG_THREADS}`, + `threads=${config.FFMPEG_THREADS === 0 ? 'auto' : config.FFMPEG_THREADS}:rc-lookahead=${config.X264_RC_LOOKAHEAD}:lookahead-threads=auto:aq-mode=3:b-adapt=2:trellis=2:subme=9:me=umh:direct=auto:deblock=-1,-1`, '-flags', '+cgop+global_header', ); @@ -191,13 +212,26 @@ export function videoEncoderFlags(variant: VideoVariantMeta, sourceFrameRate?: n return baseFlags; } +/** + * Generates the scale and setsar (Sample Aspect Ratio) filter graph. + * + * - Forces `setsar=1/1` bounds to prevent player hardware layout issues with anamorphic sources. + * - Interpolates pixels via `spline+accurate_rnd+full_chroma_int` to preserve chroma sub-sampling accuracy during 10-bit YUV 4:2:0 downscales. + */ export function videoFilterChain(width: number, height: number): string { return [ - `scale=${width}:${height}:force_original_aspect_ratio=disable:flags=lanczos`, + `scale=${width}:${height}:force_original_aspect_ratio=disable:flags=spline+accurate_rnd+full_chroma_int`, 'setsar=1/1', ].join(','); } +/** + * Resolves FDK-AAC and AC3 command flags, bypassing the transcoder for Atmos streams. + * + * - Enforces ITU-R BS.1770-4 loudness normalization (Target I=-24, TP=-2.0) on all stereo variants to match broadcast television levels. + * - Handles 5.1 -> 2.0 fold-down using the standard ITU downmix coefficients (LFE=0.2, FC=0.707). + * - Implements Shibata dithering during SOXR resampling for precision truncation. + */ export function audioEncoderFlags(audio: AudioVariantMeta): string[] { if (audio.isAtmos) { return ['-c:a', 'copy']; @@ -256,4 +290,4 @@ export function audioEncoderFlags(audio: AudioVariantMeta): string[] { } return flags; -} \ No newline at end of file +} diff --git a/src/infrastructure/ffmpeg/encoding/profiles.ts b/src/infrastructure/ffmpeg/encoding/profiles.ts index cdbae90..b9d6f79 100644 --- a/src/infrastructure/ffmpeg/encoding/profiles.ts +++ b/src/infrastructure/ffmpeg/encoding/profiles.ts @@ -41,6 +41,15 @@ const VIDEO_PROFILES: VideoProfile[] = [ const AUDIO_PROFILES: AudioProfile[] = ABR_AUDIO; +/** + * Picks the video resolutions to generate based on the original video's size and color format. + * Drops quality levels that would stretch the video larger than its original size. + * + * @param sourceWidth - Width of the original video. + * @param sourceHeight - Height of the original video. + * @param videoRange - The video color range ('SDR', 'PQ', 'HLG'). Defaults to 'SDR'. + * @returns A list of video profiles that the encoder should generate. + */ export function filterActiveVideoProfiles( sourceWidth: number, sourceHeight: number, @@ -78,6 +87,16 @@ export function filterActiveVideoProfiles( return active; } +/** + * Calculates the exact pixel dimensions and bitrates for each video quality level. + * Ensures the video scales correctly without stretching or breaking the aspect ratio. + * + * @param profiles - The chosen video profiles to generate. + * @param sourceWidth - Width of the original video. + * @param sourceHeight - Height of the original video. + * @param complexityMultiplier - Bitrate adjustment factor based on how complex the video is. + * @returns A list of video settings ready for the ffmpeg encoder. + */ export function computeVideoMetadata( profiles: VideoProfile[], sourceWidth: number, @@ -151,6 +170,13 @@ export function computeVideoMetadata( }); } +/** + * Figures out which audio qualities to generate based on the original audio tracks. + * Handles keeping multiple languages and detecting high-quality surround sound (like Atmos). + * + * @param sourceAudioStreams - The audio streams found in the original video. + * @returns A list of audio settings ready for the ffmpeg encoder. + */ export function computeAudioMetadata( sourceAudioStreams: AudioStreamInfo[] = [], ): AudioVariantMeta[] { @@ -160,11 +186,8 @@ export function computeAudioMetadata( for (const profile of AUDIO_PROFILES) { if (profile.hardwareProfile && stream.channels < 2) continue; - const isAtmosSource = - stream.codec === 'eac3' || - stream.codec === 'dca' || - stream.codec === 'dts' || - stream.codec === 'truehd'; + const isAtmosCapableCodec = stream.codec === 'eac3' || stream.codec === 'truehd'; + const isAtmosSource = isAtmosCapableCodec && stream.channels >= 6; if (profile.isAtmos && !isAtmosSource) continue; diff --git a/src/infrastructure/ffmpeg/hls/pipeline.ts b/src/infrastructure/ffmpeg/hls/pipeline.ts index b1a20d2..4681944 100644 --- a/src/infrastructure/ffmpeg/hls/pipeline.ts +++ b/src/infrastructure/ffmpeg/hls/pipeline.ts @@ -16,9 +16,10 @@ import { HLS_CONSTANTS } from '../constants.js'; const logger = pino({ name: 'HlsPipeline' }); /** - * Post-process variant manifests to fix init segment URIs. - * FFmpeg's -hls_base_url only applies to .m4s segment URIs, NOT to #EXT-X-MAP:URI (init segments). - * This function prepends the CDN base URL to the init segment filename. + * Rewrites the EXT-X-MAP:URI attribute in variant playlists. + * This works around an FFmpeg limitation where `-hls_base_url` affects `.m4s` segments + * but fails to prepend to the `.mp4` initialization segment. + * Required for players (e.g. video.js) resolving relative URLs from absolute master manifests. */ async function fixInitSegmentUrls( outputDir: string, @@ -40,6 +41,25 @@ async function fixInitSegmentUrls( } } +/** + * Runs sequential Sub-Pipeline instances for A/V encoding per quality ladder arrays. + * + * - Extracts and muxes all audio profiles in a single FFmpeg pass (Pass 0) as CPU overhead is negligible. + * - Splits libx264, libx265 (SDR), and libx265 (HDR) arrays into synchronous execution steps. This + * prevents Out-Of-Memory (OOM) kills on high-resolution ladder runs by restricting thread counts + * within constraints set by `-threads` and `-x265-params frame-threads`. + * + * @param sourceUrl - HTTP or local OS URI for source stream. + * @param outputDir - Workspace root output directory path. + * @param videoId - Primary key identifying the execution namespace. + * @param videoVariants - Complete array of bounding resolutions and explicit bitrates. + * @param audioRenditions - Array of isolated stereo and surround AC-3 maps. + * @param hlsTime - Fragment timescale boundary (forces `#EXTINF` and IDR intervals). + * @param onProgress - Timecode scalar callback bridging percentage reports to external channels. + * @param sourceFrameRate - Native constant frame rate metadata extracted by libavformat. + * @param sourceDuration - Length evaluation multiplier for percent calculations. + * @param videoRange - Video transfer characteristics mapped to 'SDR', 'HLG' (ARIB B67), or 'PQ' (ST2084). + */ export async function processMasterPipeline( sourceUrl: string, outputDir: string, @@ -85,6 +105,14 @@ export async function processMasterPipeline( const getBaseInputArgs = () => { const args = []; + args.push('-y'); + args.push('-hide_banner'); + args.push('-loglevel', 'level+info'); + args.push('-stats'); + args.push('-nostdin'); + args.push('-threads', '0'); + args.push('-thread_queue_size', String(config.THREAD_QUEUE_SIZE)); + args.push('-drc_scale', '0'); if (config.TEST_DURATION_SECONDS) { @@ -162,6 +190,7 @@ export async function processMasterPipeline( filtergraph.push(`[split_${i}]${scaleFilter}[vout${i}]`); }); + args.push('-filter_complex_threads', '0'); args.push('-filter_complex', filtergraph.join('; ')); variants.forEach((variant, index) => { diff --git a/src/infrastructure/ffmpeg/hls/playlist.ts b/src/infrastructure/ffmpeg/hls/playlist.ts index 279cfea..12184c1 100644 --- a/src/infrastructure/ffmpeg/hls/playlist.ts +++ b/src/infrastructure/ffmpeg/hls/playlist.ts @@ -82,6 +82,14 @@ function getPairedAudioNames( return [getGroupId(stereoName), ...hardware]; } +/** + * Iterates over generated HLS `.m3u8` manifests to calculate true byte-range bitrates. + * + * - Required because ffmpeg `-b:v` and `-maxrate` flags only suggest target constraints. + * - Parses `#EXTINF` durations alongside discrete `.m4s` segment `stat.size` bytes to compute + * the exact `BANDWIDTH` and `AVERAGE-BANDWIDTH` required by Apple's HLS Authoring Spec (v11). + * - Exact bandwidth values prevent player stall events caused by incorrect CDN buffering predictions. + */ async function getBandwidthForDir(dirPath: string): Promise<{ peak: number; avg: number }> { try { const manifestPath = path.join(dirPath, HLS_CONSTANTS.VARIANT_PLAYLIST_NAME); @@ -127,6 +135,16 @@ async function getBandwidthForDir(dirPath: string): Promise<{ peak: number; avg: } } +/** + * Compiles the `master.m3u8` variant playlist index per RFC 8216. + * + * - Maps `CODECS` strings explicitly (e.g. `avc1.640028`, `hvc1.2.4.L153.B0`, `dvh1.08.01`) required for + * hardware decoder instantiation on iOS, Safari, and Android. + * - Enforces `#EXT-X-VERSION:7` to support `#EXT-X-INDEPENDENT-SEGMENTS` declarations, permitting + * AVPlayer to parse random-access points locally without inspecting fragmented MP4 headers. + * - Generates secondary `STABLE-RENDITION-ID` properties for deterministic CDN caching of specific + * language+codec groups. + */ export async function writeMasterPlaylist( outputDir: string, videoVariants: VideoVariantMeta[], @@ -210,7 +228,8 @@ export async function writeMasterPlaylist( const trueVideoPeak = videoBw.peak > 0 ? videoBw.peak : v.maxrate; const isDovi = v.videoCodecTag.startsWith('dv'); - const actualVideoRange = isDovi || v.profile === 'main10' ? 'PQ' : 'SDR'; + const isHdrContent = v.videoRange === 'PQ' || v.videoRange === 'HLG'; + const actualVideoRange = isDovi || isHdrContent ? 'PQ' : 'SDR'; const relativeUri = `../${v.relativeUrl}/${HLS_CONSTANTS.VARIANT_PLAYLIST_NAME}`; const fpsInfo = getBroadcastFrameRate(sourceFrameRate); const frameRateString = fpsInfo ? fpsInfo.aFormat : (v.frameRate ?? 30).toFixed(3); @@ -271,8 +290,7 @@ export async function writeMasterPlaylist( const maxDim = Math.max(v.actualWidth, v.actualHeight); const isHighDef = maxDim >= 1280; - const isUltraHighDef = - maxDim >= 2560 || v.profile === 'main10' || v.videoCodecTag.startsWith('dvh1'); + const isUltraHighDef = maxDim >= 2560 || isDovi || isHdrContent; let hdcpLevel = 'NONE'; if (isUltraHighDef) hdcpLevel = 'TYPE-1'; diff --git a/src/infrastructure/ffmpeg/index.ts b/src/infrastructure/ffmpeg/index.ts index a71d546..12ee144 100644 --- a/src/infrastructure/ffmpeg/index.ts +++ b/src/infrastructure/ffmpeg/index.ts @@ -1 +1,4 @@ +/** + * Exposes the main encapsulation layer protecting node modules from direct `ffmpeg` system calls. + */ export { FFmpegAdapter } from './adapter.js'; diff --git a/src/infrastructure/ffmpeg/types.ts b/src/infrastructure/ffmpeg/types.ts index 378047c..5ac6863 100644 --- a/src/infrastructure/ffmpeg/types.ts +++ b/src/infrastructure/ffmpeg/types.ts @@ -1,5 +1,9 @@ import type { ProgressCallback } from '../../domain/job.interface.js'; +/** + * Baseline constraints defining video multiplex arrays. + * Binds explicit maxrate and bufsize to avoid CDN/Player network buffer starvation. + */ export interface VideoProfile { tierNumber?: number; name: string; @@ -19,6 +23,9 @@ export interface VideoProfile { crf?: number; } +/** + * Encodes deterministic track constraints per ITU limits. + */ export interface AudioProfile { name: string; groupId?: string; @@ -32,12 +39,18 @@ export interface AudioProfile { isCinemaMaster?: boolean; } +/** + * Maps the abstract `VideoProfile` onto explicit bounding-box scale factors computed off `ffprobe` DAR constraints. + */ export type VideoVariantMeta = VideoProfile & { actualWidth: number; actualHeight: number; relativeUrl: string; }; +/** + * Merges surround-sound source maps from `AudioProfile` onto downmix definitions. + */ export type AudioVariantMeta = AudioProfile & { groupId?: string; sourceChannels: number; @@ -48,6 +61,9 @@ export type AudioVariantMeta = AudioProfile & { relativeUrl: string; }; +/** + * Forces spawn arrays onto sub-shell PIDs. + */ export interface RunOptions { args: string[]; label: string; diff --git a/src/infrastructure/queue/video.worker.ts b/src/infrastructure/queue/video.worker.ts index 6d89e18..16f9496 100644 --- a/src/infrastructure/queue/video.worker.ts +++ b/src/infrastructure/queue/video.worker.ts @@ -6,12 +6,12 @@ import type { JobData, ProcessVideoUseCase } from '../../domain/job.interface.js const logger = pino({ name: 'QueueWorker' }); /** - * BullMQ consumer that binds the Redis queue to the `ProcessVideo` domain logic. + * Subscribes to the Redis BullMQ stream for un-processed video conversions. * * @remarks - * - Maps domain progress callbacks directly to BullMQ `job.updateProgress`. - * - Relies on Redis lock durations (`config.JOB_LOCK_DURATION_MS`) to detect computationally locked/crashed workers. - * - Does not throw on failure; relies on BullMQ's internal retry mechanism and `.on('failed')` listeners. + * - Enforces a static `{ lockDuration: config.JOB_LOCK_DURATION_MS }` interval to detect zombie encoding + * pods and restore crashed jobs to `active` arrays automatically per BullMQ retry strategies. + * - Maps `ProcessVideo.execute()` percentage returns dynamically to `job.updateProgress()` avoiding blocking main event loop. */ export class VideoWorker { private readonly worker: Worker; diff --git a/src/infrastructure/storage/azure.service.ts b/src/infrastructure/storage/azure.service.ts index a6774f8..2d615d0 100644 --- a/src/infrastructure/storage/azure.service.ts +++ b/src/infrastructure/storage/azure.service.ts @@ -10,12 +10,13 @@ import { HLS_CONSTANTS } from '../ffmpeg/constants.js'; const logger = pino({ name: 'AzureStorage' }); /** - * Azure Blob Storage adapter for uploading generated HLS playlists and segments. + * Recursively publishes locally-encoded HLS structures to an Azure Blob Storage container. * * @remarks - * - Recursively scans the local output directory and mirrors the final structure to the Blob container. - * - Automatically infers and injects correct `Content-Type` headers (`application/vnd.apple.mpegurl` or `video/mp4`). - * - Security note: Uses `DefaultAzureCredential` in production (Managed Identity), falling back to connection strings locally. + * - Enforces specific `application/vnd.apple.mpegurl` headers for `*.m3u8` to prevent strict + * player clients (ex: hls.js) from failing MIME-type checks during playback. + * - Assumes Managed Identities via DefaultAzureCredential to obtain short-lived Entra ID tokens in + * AKS production deployments, preventing hardcoded connection string leaks. */ export class AzureStorageService { private readonly blobServiceClient: BlobServiceClient; @@ -31,7 +32,11 @@ export class AzureStorageService { } this.blobServiceClient = new BlobServiceClient( config.AZURE_STORAGE_ACCOUNT_URL, - new DefaultAzureCredential(), + new DefaultAzureCredential( + config.AZURE_MANAGED_IDENTITY_CLIENT_ID + ? { managedIdentityClientId: config.AZURE_MANAGED_IDENTITY_CLIENT_ID } + : undefined, + ), ); logger.info('Azure Storage authenticated via Managed Identity'); } else { diff --git a/src/server.ts b/src/server.ts index d5d4568..9fed868 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,11 +1,12 @@ /** - * Application entry point: Bootstraps the HTTP server and BullMQ worker. + * Main entry point for the Node.js Fastify application. + * Starts the HTTP server and instantiates the BullMQ background queue worker. * * @remarks - * - Wires the dependency injection graph for the video processing pipeline. - * - Exposes Kubernetes-compatible readiness (`/ready`) and liveness (`/health`) probes. - * - Enforces a strict 30s graceful shutdown timeout on SIGINT/SIGTERM to prevent - * orphaned pods if external dependencies (e.g., Azure Storage, DB) hang. + * - Wires dependency injection for the repository, storage client, and ffmpeg interfaces. + * - Provides basic `/health` and `/ready` routes for Kubernetes liveness/readiness probes. + * - Binds graceful shutdown handlers (SIGINT/SIGTERM) to allow active FFmpeg processes + * or lingering PostgreSQL inserts to flush before exiting. */ import Fastify, { FastifyInstance } from 'fastify'; import cors from '@fastify/cors'; diff --git a/test/ffmpeg.test.sh b/test/ffmpeg.test.sh index 85a54fc..8b35819 100644 --- a/test/ffmpeg.test.sh +++ b/test/ffmpeg.test.sh @@ -88,14 +88,14 @@ verify_ffprobe_version() { # Step 4: Verify supported codecs verify_codecs() { log_header "Step 4: Validating Supported Codecs" - log_info "Checking for required libraries: libx264 (H.264) and libfdk_aac (AAC)..." + log_info "Checking for required libraries: libx264 (H.264), libx265 (H.265) and libfdk_aac (AAC)..." local codecs - if codecs=$(docker run --rm "${IMAGE_NAME}" ffmpeg -codecs 2>/dev/null | grep -E "libx264|libfdk_aac"); then + if codecs=$(docker run --rm "${IMAGE_NAME}" ffmpeg -codecs 2>/dev/null | grep -E "libx264|libx265|libfdk_aac"); then echo "$codecs" log_success "Required codecs verified successfully." else - log_error "Critical Error: Required codecs (libx264, libfdk_aac) are missing from the image." + log_error "Critical Error: Required codecs (libx264, libx265, libfdk_aac) are missing from the image." exit 1 fi }