Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 110 additions & 3 deletions src/sources/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,43 @@ export default class HttpSource {
validAudioPrefixes.some((prefix) => contentType.startsWith(prefix)) ||
validApplicationTypes.includes(contentType) ||
contentType === ''
const rawHttpConfig = this.nodelink.options?.sources?.['http']
const httpConfig =
rawHttpConfig && typeof rawHttpConfig === 'object'
? (rawHttpConfig as Record<string, unknown>)
: {}
const rawClusterConfig = this.nodelink.options?.cluster
const clusterConfig =
rawClusterConfig && typeof rawClusterConfig === 'object'
? (rawClusterConfig as Record<string, unknown>)
: {}
const configuredResolveTimeout =
Number(httpConfig['resolveTimeoutMs']) > 0
? Number(httpConfig['resolveTimeoutMs'])
: 7000
const clusterTimeout =
Number(clusterConfig['commandTimeout']) > 0
? Number(clusterConfig['commandTimeout'])
: null
const resolveTimeout = clusterTimeout
? Math.min(configuredResolveTimeout, Math.max(1000, clusterTimeout - 500))
: configuredResolveTimeout
const configuredHeadTimeout =
Number(httpConfig['headTimeoutMs']) > 0
? Number(httpConfig['headTimeoutMs'])
: 2500
const headTimeout = Math.min(configuredHeadTimeout, resolveTimeout)
const optimisticOnProbeFailure =
httpConfig['optimisticOnProbeFailure'] !== false
const resolveStartedAt = Date.now()
const getRemainingTimeout = (): number =>
Math.max(250, resolveTimeout - (Date.now() - resolveStartedAt))

let data = await http1makeRequest(url, {
method: 'HEAD',
headers: requestHeaders
headers: requestHeaders,
timeout: headTimeout,
maxRetries: 0
})

const headContentType = headerToString(
Expand All @@ -268,10 +301,34 @@ export default class HttpSource {
isValidMediaType(headContentType)

if (!headOk) {
const remainingTimeout = getRemainingTimeout()
if (remainingTimeout <= 250) {
if (optimisticOnProbeFailure) {
return {
loadType: 'track',
data: this.buildTrack(
url,
(data.headers as HttpResponseHeaders | undefined) || {},
true
)
}
}
return {
exception: {
message: `Resolve timeout budget exceeded for ${url}`,
severity: 'common'
}
}
}
const getData = await http1makeRequest(url, {
method: 'GET',
streamOnly: true,
headers: requestHeaders
headers: {
...requestHeaders,
'Icy-MetaData': '1'
},
timeout: remainingTimeout,
maxRetries: 0
})
const previewStream = getData?.stream as Readable | undefined
if (previewStream && typeof previewStream.destroy === 'function') {
Expand All @@ -281,12 +338,38 @@ export default class HttpSource {
}

if (data.error) {
if (optimisticOnProbeFailure) {
logger(
'warn',
'HTTP Source',
`Probe failed for ${url}, using optimistic track: ${String(data.error)}`
)
return {
loadType: 'track',
data: this.buildTrack(url, {}, true)
}
}
return {
exception: { message: String(data.error), severity: 'common' }
}
}

if ((data.statusCode || 0) >= 400) {
if (optimisticOnProbeFailure) {
logger(
'warn',
'HTTP Source',
`Probe HTTP ${data.statusCode} for ${url}, using optimistic track`
)
return {
loadType: 'track',
data: this.buildTrack(
url,
(data.headers as HttpResponseHeaders | undefined) || {},
true
)
}
}
return {
exception: {
message: `HTTP error ${data.statusCode} while resolving`,
Expand All @@ -300,6 +383,12 @@ export default class HttpSource {
(headers as Record<string, unknown>)?.['content-type']
)
if (!isValidMediaType(contentType)) {
if (optimisticOnProbeFailure && contentType === '') {
return {
loadType: 'track',
data: this.buildTrack(url, headers, true)
}
}
return {
exception: {
message: `Unsupported content type: ${contentType}`,
Expand All @@ -320,6 +409,24 @@ export default class HttpSource {
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
const rawHttpConfig = this.nodelink.options?.sources?.['http']
const httpConfig =
rawHttpConfig && typeof rawHttpConfig === 'object'
? (rawHttpConfig as Record<string, unknown>)
: {}
const optimisticOnProbeFailure =
httpConfig['optimisticOnProbeFailure'] !== false
if (optimisticOnProbeFailure) {
logger(
'warn',
'HTTP Source',
`Resolve exception for ${url}, using optimistic track: ${message}`
)
return {
loadType: 'track',
data: this.buildTrack(url, {}, true)
}
}
return {
exception: {
message: `Failed to resolve URL: ${message}`,
Expand Down Expand Up @@ -499,4 +606,4 @@ export default class HttpSource {
return { exception: { message, severity: 'common' } }
}
}
}
}
39 changes: 31 additions & 8 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,21 @@ async function _internalHttp1Request(
}

return new Promise((resolve, reject) => {
let settled = false
let hardTimeout: NodeJS.Timeout | null = null
const rejectOnce = (error: Error): void => {
if (settled) return
settled = true
if (hardTimeout) clearTimeout(hardTimeout)
reject(error)
}
const resolveOnce = (value: HttpRequestResult | Promise<HttpRequestResult>): void => {
if (settled) return
settled = true
if (hardTimeout) clearTimeout(hardTimeout)
resolve(value)
}

const req = lib.request(reqOptions, (res) => {
const { statusCode, headers: respHeaders } = res
const responseStatus = statusCode ?? 0
Expand Down Expand Up @@ -1366,7 +1381,7 @@ async function _internalHttp1Request(
body: nextBody,
headers: nextHeaders
}
resolve(http1makeRequest(nextUrl, nextOptions))
resolveOnce(http1makeRequest(nextUrl, nextOptions))
return
}

Expand All @@ -1386,18 +1401,18 @@ async function _internalHttp1Request(
}

res.on('error', (err) =>
reject(new Error(`Response error for ${urlString}: ${err.message}`))
rejectOnce(new Error(`Response error for ${urlString}: ${err.message}`))
)
if (finalStream !== res) {
finalStream.on('error', (err) =>
reject(
rejectOnce(
new Error(`Decompression error for ${urlString}: ${err.message}`)
)
)
}

if (streamOnly) {
resolve({ statusCode, headers: respHeaders, stream: finalStream })
resolveOnce({ statusCode, headers: respHeaders, stream: finalStream })
return
}

Expand All @@ -1424,7 +1439,7 @@ async function _internalHttp1Request(
const responseBuffer = Buffer.concat(chunks)

if (options.responseType === 'buffer') {
resolve({ statusCode, headers: respHeaders, body: responseBuffer })
resolveOnce({ statusCode, headers: respHeaders, body: responseBuffer })
return
}

Expand All @@ -1437,9 +1452,9 @@ async function _internalHttp1Request(
.toLowerCase()
.startsWith('application/json')
const responseBody = isJson && text ? JSON.parse(text) : text
resolve({ statusCode, headers: respHeaders, body: responseBody })
resolveOnce({ statusCode, headers: respHeaders, body: responseBody })
} catch (err) {
reject(
rejectOnce(
new Error(
`Error processing response body for ${urlString}: ${err instanceof Error ? err.message : String(err)}`
)
Expand All @@ -1448,12 +1463,20 @@ async function _internalHttp1Request(
})
})

req.on('error', (err) => reject(err))
req.on('error', (err) => rejectOnce(err))
req.on('timeout', () => {
req.destroy(
new Error(`Request timed out after ${timeout}ms for ${urlString}`)
)
})
if (timeout > 0) {
hardTimeout = setTimeout(() => {
req.destroy(
new Error(`Request hard timeout after ${timeout}ms for ${urlString}`)
)
}, timeout)
hardTimeout.unref?.()
}

if (payloadBuffer) {
req.end(payloadBuffer)
Expand Down