From 584ccfd83817dd28b76b1597ccb54d2d5cf575a3 Mon Sep 17 00:00:00 2001 From: Walkoud <38588921+Walkoud@users.noreply.github.com> Date: Tue, 24 Mar 2026 13:51:42 +0100 Subject: [PATCH] fix: prevent multiple promise resolutions and add timeout handling Add a settled flag and resolveOnce/rejectOnce wrappers to ensure HTTP request promises are settled only once, preventing multiple resolve/reject calls. Implement a hard timeout mechanism that destroys the request after timeout, even if the request timeout event does not fire. Add configurable resolve and head timeouts with fallback to optimistic track creation on probe failures to improve streaming URL reliability. --- src/sources/http.ts | 113 ++++++++++++++++++++++++++++++++++++++++++-- src/utils.ts | 39 +++++++++++---- 2 files changed, 141 insertions(+), 11 deletions(-) diff --git a/src/sources/http.ts b/src/sources/http.ts index b9b98641..0018e6b3 100644 --- a/src/sources/http.ts +++ b/src/sources/http.ts @@ -253,10 +253,43 @@ export default class HttpSource { validAudioPrefixes.some((prefix) => contentType.startsWith(prefix)) || validApplicationTypes.includes(contentType) || contentType === '' + const rawHttpConfig = this.nodelink.options?.sources?.['http'] + const httpConfig = + rawHttpConfig && typeof rawHttpConfig === 'object' + ? (rawHttpConfig as Record) + : {} + const rawClusterConfig = this.nodelink.options?.cluster + const clusterConfig = + rawClusterConfig && typeof rawClusterConfig === 'object' + ? (rawClusterConfig as Record) + : {} + const configuredResolveTimeout = + Number(httpConfig['resolveTimeoutMs']) > 0 + ? Number(httpConfig['resolveTimeoutMs']) + : 7000 + const clusterTimeout = + Number(clusterConfig['commandTimeout']) > 0 + ? Number(clusterConfig['commandTimeout']) + : null + const resolveTimeout = clusterTimeout + ? Math.min(configuredResolveTimeout, Math.max(1000, clusterTimeout - 500)) + : configuredResolveTimeout + const configuredHeadTimeout = + Number(httpConfig['headTimeoutMs']) > 0 + ? Number(httpConfig['headTimeoutMs']) + : 2500 + const headTimeout = Math.min(configuredHeadTimeout, resolveTimeout) + const optimisticOnProbeFailure = + httpConfig['optimisticOnProbeFailure'] !== false + const resolveStartedAt = Date.now() + const getRemainingTimeout = (): number => + Math.max(250, resolveTimeout - (Date.now() - resolveStartedAt)) let data = await http1makeRequest(url, { method: 'HEAD', - headers: requestHeaders + headers: requestHeaders, + timeout: headTimeout, + maxRetries: 0 }) const headContentType = headerToString( @@ -268,10 +301,34 @@ export default class HttpSource { isValidMediaType(headContentType) if (!headOk) { + const remainingTimeout = getRemainingTimeout() + if (remainingTimeout <= 250) { + if (optimisticOnProbeFailure) { + return { + loadType: 'track', + data: this.buildTrack( + url, + (data.headers as HttpResponseHeaders | undefined) || {}, + true + ) + } + } + return { + exception: { + message: `Resolve timeout budget exceeded for ${url}`, + severity: 'common' + } + } + } const getData = await http1makeRequest(url, { method: 'GET', streamOnly: true, - headers: requestHeaders + headers: { + ...requestHeaders, + 'Icy-MetaData': '1' + }, + timeout: remainingTimeout, + maxRetries: 0 }) const previewStream = getData?.stream as Readable | undefined if (previewStream && typeof previewStream.destroy === 'function') { @@ -281,12 +338,38 @@ export default class HttpSource { } if (data.error) { + if (optimisticOnProbeFailure) { + logger( + 'warn', + 'HTTP Source', + `Probe failed for ${url}, using optimistic track: ${String(data.error)}` + ) + return { + loadType: 'track', + data: this.buildTrack(url, {}, true) + } + } return { exception: { message: String(data.error), severity: 'common' } } } if ((data.statusCode || 0) >= 400) { + if (optimisticOnProbeFailure) { + logger( + 'warn', + 'HTTP Source', + `Probe HTTP ${data.statusCode} for ${url}, using optimistic track` + ) + return { + loadType: 'track', + data: this.buildTrack( + url, + (data.headers as HttpResponseHeaders | undefined) || {}, + true + ) + } + } return { exception: { message: `HTTP error ${data.statusCode} while resolving`, @@ -300,6 +383,12 @@ export default class HttpSource { (headers as Record)?.['content-type'] ) if (!isValidMediaType(contentType)) { + if (optimisticOnProbeFailure && contentType === '') { + return { + loadType: 'track', + data: this.buildTrack(url, headers, true) + } + } return { exception: { message: `Unsupported content type: ${contentType}`, @@ -320,6 +409,24 @@ export default class HttpSource { } } catch (err) { const message = err instanceof Error ? err.message : String(err) + const rawHttpConfig = this.nodelink.options?.sources?.['http'] + const httpConfig = + rawHttpConfig && typeof rawHttpConfig === 'object' + ? (rawHttpConfig as Record) + : {} + const optimisticOnProbeFailure = + httpConfig['optimisticOnProbeFailure'] !== false + if (optimisticOnProbeFailure) { + logger( + 'warn', + 'HTTP Source', + `Resolve exception for ${url}, using optimistic track: ${message}` + ) + return { + loadType: 'track', + data: this.buildTrack(url, {}, true) + } + } return { exception: { message: `Failed to resolve URL: ${message}`, @@ -499,4 +606,4 @@ export default class HttpSource { return { exception: { message, severity: 'common' } } } } -} \ No newline at end of file +} diff --git a/src/utils.ts b/src/utils.ts index 9906456b..d1074078 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1321,6 +1321,21 @@ async function _internalHttp1Request( } return new Promise((resolve, reject) => { + let settled = false + let hardTimeout: NodeJS.Timeout | null = null + const rejectOnce = (error: Error): void => { + if (settled) return + settled = true + if (hardTimeout) clearTimeout(hardTimeout) + reject(error) + } + const resolveOnce = (value: HttpRequestResult | Promise): void => { + if (settled) return + settled = true + if (hardTimeout) clearTimeout(hardTimeout) + resolve(value) + } + const req = lib.request(reqOptions, (res) => { const { statusCode, headers: respHeaders } = res const responseStatus = statusCode ?? 0 @@ -1366,7 +1381,7 @@ async function _internalHttp1Request( body: nextBody, headers: nextHeaders } - resolve(http1makeRequest(nextUrl, nextOptions)) + resolveOnce(http1makeRequest(nextUrl, nextOptions)) return } @@ -1386,18 +1401,18 @@ async function _internalHttp1Request( } res.on('error', (err) => - reject(new Error(`Response error for ${urlString}: ${err.message}`)) + rejectOnce(new Error(`Response error for ${urlString}: ${err.message}`)) ) if (finalStream !== res) { finalStream.on('error', (err) => - reject( + rejectOnce( new Error(`Decompression error for ${urlString}: ${err.message}`) ) ) } if (streamOnly) { - resolve({ statusCode, headers: respHeaders, stream: finalStream }) + resolveOnce({ statusCode, headers: respHeaders, stream: finalStream }) return } @@ -1424,7 +1439,7 @@ async function _internalHttp1Request( const responseBuffer = Buffer.concat(chunks) if (options.responseType === 'buffer') { - resolve({ statusCode, headers: respHeaders, body: responseBuffer }) + resolveOnce({ statusCode, headers: respHeaders, body: responseBuffer }) return } @@ -1437,9 +1452,9 @@ async function _internalHttp1Request( .toLowerCase() .startsWith('application/json') const responseBody = isJson && text ? JSON.parse(text) : text - resolve({ statusCode, headers: respHeaders, body: responseBody }) + resolveOnce({ statusCode, headers: respHeaders, body: responseBody }) } catch (err) { - reject( + rejectOnce( new Error( `Error processing response body for ${urlString}: ${err instanceof Error ? err.message : String(err)}` ) @@ -1448,12 +1463,20 @@ async function _internalHttp1Request( }) }) - req.on('error', (err) => reject(err)) + req.on('error', (err) => rejectOnce(err)) req.on('timeout', () => { req.destroy( new Error(`Request timed out after ${timeout}ms for ${urlString}`) ) }) + if (timeout > 0) { + hardTimeout = setTimeout(() => { + req.destroy( + new Error(`Request hard timeout after ${timeout}ms for ${urlString}`) + ) + }, timeout) + hardTimeout.unref?.() + } if (payloadBuffer) { req.end(payloadBuffer)