diff --git a/src/sources/http.ts b/src/sources/http.ts index b9b98641..0018e6b3 100644 --- a/src/sources/http.ts +++ b/src/sources/http.ts @@ -253,10 +253,43 @@ export default class HttpSource { validAudioPrefixes.some((prefix) => contentType.startsWith(prefix)) || validApplicationTypes.includes(contentType) || contentType === '' + const rawHttpConfig = this.nodelink.options?.sources?.['http'] + const httpConfig = + rawHttpConfig && typeof rawHttpConfig === 'object' + ? (rawHttpConfig as Record) + : {} + const rawClusterConfig = this.nodelink.options?.cluster + const clusterConfig = + rawClusterConfig && typeof rawClusterConfig === 'object' + ? (rawClusterConfig as Record) + : {} + const configuredResolveTimeout = + Number(httpConfig['resolveTimeoutMs']) > 0 + ? Number(httpConfig['resolveTimeoutMs']) + : 7000 + const clusterTimeout = + Number(clusterConfig['commandTimeout']) > 0 + ? Number(clusterConfig['commandTimeout']) + : null + const resolveTimeout = clusterTimeout + ? Math.min(configuredResolveTimeout, Math.max(1000, clusterTimeout - 500)) + : configuredResolveTimeout + const configuredHeadTimeout = + Number(httpConfig['headTimeoutMs']) > 0 + ? Number(httpConfig['headTimeoutMs']) + : 2500 + const headTimeout = Math.min(configuredHeadTimeout, resolveTimeout) + const optimisticOnProbeFailure = + httpConfig['optimisticOnProbeFailure'] !== false + const resolveStartedAt = Date.now() + const getRemainingTimeout = (): number => + Math.max(250, resolveTimeout - (Date.now() - resolveStartedAt)) let data = await http1makeRequest(url, { method: 'HEAD', - headers: requestHeaders + headers: requestHeaders, + timeout: headTimeout, + maxRetries: 0 }) const headContentType = headerToString( @@ -268,10 +301,34 @@ export default class HttpSource { isValidMediaType(headContentType) if (!headOk) { + const remainingTimeout = getRemainingTimeout() + if (remainingTimeout <= 250) { + if (optimisticOnProbeFailure) { + return { + loadType: 'track', + data: this.buildTrack( + url, + (data.headers as HttpResponseHeaders | undefined) || {}, + true + ) + } + } + return { + exception: { + message: `Resolve timeout budget exceeded for ${url}`, + severity: 'common' + } + } + } const getData = await http1makeRequest(url, { method: 'GET', streamOnly: true, - headers: requestHeaders + headers: { + ...requestHeaders, + 'Icy-MetaData': '1' + }, + timeout: remainingTimeout, + maxRetries: 0 }) const previewStream = getData?.stream as Readable | undefined if (previewStream && typeof previewStream.destroy === 'function') { @@ -281,12 +338,38 @@ export default class HttpSource { } if (data.error) { + if (optimisticOnProbeFailure) { + logger( + 'warn', + 'HTTP Source', + `Probe failed for ${url}, using optimistic track: ${String(data.error)}` + ) + return { + loadType: 'track', + data: this.buildTrack(url, {}, true) + } + } return { exception: { message: String(data.error), severity: 'common' } } } if ((data.statusCode || 0) >= 400) { + if (optimisticOnProbeFailure) { + logger( + 'warn', + 'HTTP Source', + `Probe HTTP ${data.statusCode} for ${url}, using optimistic track` + ) + return { + loadType: 'track', + data: this.buildTrack( + url, + (data.headers as HttpResponseHeaders | undefined) || {}, + true + ) + } + } return { exception: { message: `HTTP error ${data.statusCode} while resolving`, @@ -300,6 +383,12 @@ export default class HttpSource { (headers as Record)?.['content-type'] ) if (!isValidMediaType(contentType)) { + if (optimisticOnProbeFailure && contentType === '') { + return { + loadType: 'track', + data: this.buildTrack(url, headers, true) + } + } return { exception: { message: `Unsupported content type: ${contentType}`, @@ -320,6 +409,24 @@ export default class HttpSource { } } catch (err) { const message = err instanceof Error ? err.message : String(err) + const rawHttpConfig = this.nodelink.options?.sources?.['http'] + const httpConfig = + rawHttpConfig && typeof rawHttpConfig === 'object' + ? (rawHttpConfig as Record) + : {} + const optimisticOnProbeFailure = + httpConfig['optimisticOnProbeFailure'] !== false + if (optimisticOnProbeFailure) { + logger( + 'warn', + 'HTTP Source', + `Resolve exception for ${url}, using optimistic track: ${message}` + ) + return { + loadType: 'track', + data: this.buildTrack(url, {}, true) + } + } return { exception: { message: `Failed to resolve URL: ${message}`, @@ -499,4 +606,4 @@ export default class HttpSource { return { exception: { message, severity: 'common' } } } } -} \ No newline at end of file +} diff --git a/src/utils.ts b/src/utils.ts index 9906456b..d1074078 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1321,6 +1321,21 @@ async function _internalHttp1Request( } return new Promise((resolve, reject) => { + let settled = false + let hardTimeout: NodeJS.Timeout | null = null + const rejectOnce = (error: Error): void => { + if (settled) return + settled = true + if (hardTimeout) clearTimeout(hardTimeout) + reject(error) + } + const resolveOnce = (value: HttpRequestResult | Promise): void => { + if (settled) return + settled = true + if (hardTimeout) clearTimeout(hardTimeout) + resolve(value) + } + const req = lib.request(reqOptions, (res) => { const { statusCode, headers: respHeaders } = res const responseStatus = statusCode ?? 0 @@ -1366,7 +1381,7 @@ async function _internalHttp1Request( body: nextBody, headers: nextHeaders } - resolve(http1makeRequest(nextUrl, nextOptions)) + resolveOnce(http1makeRequest(nextUrl, nextOptions)) return } @@ -1386,18 +1401,18 @@ async function _internalHttp1Request( } res.on('error', (err) => - reject(new Error(`Response error for ${urlString}: ${err.message}`)) + rejectOnce(new Error(`Response error for ${urlString}: ${err.message}`)) ) if (finalStream !== res) { finalStream.on('error', (err) => - reject( + rejectOnce( new Error(`Decompression error for ${urlString}: ${err.message}`) ) ) } if (streamOnly) { - resolve({ statusCode, headers: respHeaders, stream: finalStream }) + resolveOnce({ statusCode, headers: respHeaders, stream: finalStream }) return } @@ -1424,7 +1439,7 @@ async function _internalHttp1Request( const responseBuffer = Buffer.concat(chunks) if (options.responseType === 'buffer') { - resolve({ statusCode, headers: respHeaders, body: responseBuffer }) + resolveOnce({ statusCode, headers: respHeaders, body: responseBuffer }) return } @@ -1437,9 +1452,9 @@ async function _internalHttp1Request( .toLowerCase() .startsWith('application/json') const responseBody = isJson && text ? JSON.parse(text) : text - resolve({ statusCode, headers: respHeaders, body: responseBody }) + resolveOnce({ statusCode, headers: respHeaders, body: responseBody }) } catch (err) { - reject( + rejectOnce( new Error( `Error processing response body for ${urlString}: ${err instanceof Error ? err.message : String(err)}` ) @@ -1448,12 +1463,20 @@ async function _internalHttp1Request( }) }) - req.on('error', (err) => reject(err)) + req.on('error', (err) => rejectOnce(err)) req.on('timeout', () => { req.destroy( new Error(`Request timed out after ${timeout}ms for ${urlString}`) ) }) + if (timeout > 0) { + hardTimeout = setTimeout(() => { + req.destroy( + new Error(`Request hard timeout after ${timeout}ms for ${urlString}`) + ) + }, timeout) + hardTimeout.unref?.() + } if (payloadBuffer) { req.end(payloadBuffer)