diff --git a/app/shared/src/commonMain/kotlin/com/linroid/ketch/app/instance/InstanceManager.kt b/app/shared/src/commonMain/kotlin/com/linroid/ketch/app/instance/InstanceManager.kt index fb37cce8..dff3743b 100644 --- a/app/shared/src/commonMain/kotlin/com/linroid/ketch/app/instance/InstanceManager.kt +++ b/app/shared/src/commonMain/kotlin/com/linroid/ketch/app/instance/InstanceManager.kt @@ -243,7 +243,7 @@ private object DisconnectedApi : KetchApi { override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { throw IllegalStateException( "No instance connected. Add a remote server first.", diff --git a/app/shared/src/commonTest/kotlin/com/linroid/ketch/app/FakeKetchApi.kt b/app/shared/src/commonTest/kotlin/com/linroid/ketch/app/FakeKetchApi.kt index 50ab3b23..365e9c77 100644 --- a/app/shared/src/commonTest/kotlin/com/linroid/ketch/app/FakeKetchApi.kt +++ b/app/shared/src/commonTest/kotlin/com/linroid/ketch/app/FakeKetchApi.kt @@ -41,7 +41,7 @@ class FakeKetchApi( override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { throw UnsupportedOperationException( "FakeKetchApi does not support resolve" diff --git a/library/api/src/commonMain/kotlin/com/linroid/ketch/api/KetchApi.kt b/library/api/src/commonMain/kotlin/com/linroid/ketch/api/KetchApi.kt index 547aef13..8ef10073 100644 --- a/library/api/src/commonMain/kotlin/com/linroid/ketch/api/KetchApi.kt +++ b/library/api/src/commonMain/kotlin/com/linroid/ketch/api/KetchApi.kt @@ -27,11 +27,13 @@ interface KetchApi { * during [download]. * * @param url the URL to resolve - * @param headers optional HTTP headers to include in the probe + * @param properties source-specific key-value pairs. For HTTP + * sources this contains HTTP headers; other sources may + * interpret them differently or ignore them. */ suspend fun resolve( url: String, - headers: Map = emptyMap(), + properties: Map = emptyMap(), ): ResolvedSource /** diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/Ketch.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/Ketch.kt index 803c23ac..f0c8eb4a 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/Ketch.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/Ketch.kt @@ -181,11 +181,11 @@ class Ketch( override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { log.i { "Resolving URL: $url" } val source = sourceResolver.resolve(url) - return source.resolve(url, headers) + return source.resolve(url, properties) } override suspend fun start() { diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadContext.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadContext.kt index 6cb306d3..5527ab55 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadContext.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadContext.kt @@ -21,18 +21,19 @@ import kotlinx.coroutines.flow.MutableStateFlow * call this with the number of bytes before writing each chunk. * This replaces direct [SpeedLimiter] access to avoid cross-module * visibility issues with internal types. - * @property headers HTTP headers or source-specific metadata headers + * @property headers request headers from [DownloadRequest.headers]. + * Used by HTTP sources for custom headers; other sources may ignore. * @property preResolved pre-resolved URL metadata, allowing the * download source to skip its own probe/HEAD request * @property maxConnections observable override for the number of * concurrent segment connections. When positive, takes precedence * over [DownloadRequest.connections]. Emitting a new value triggers - * live resegmentation in [HttpDownloadSource]. Reduced automatically - * on HTTP 429 (Too Many Requests) responses. + * live resegmentation in sources that support it. Reduced + * automatically on HTTP 429 (Too Many Requests) responses. * @property pendingResegment target connection count for a pending * resegmentation. Set by the connection-change watcher before - * canceling the download batch scope. Read by [HttpDownloadSource] - * to distinguish resegment-cancel from external cancel. + * canceling the download batch scope. Read by sources to + * distinguish resegment-cancel from external cancel. */ class DownloadContext( val taskId: String, diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt index c65a8186..153d741b 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt @@ -140,9 +140,7 @@ internal class DownloadExecution( totalBytes = total val fileName = resolvedUrl.suggestedFileName - ?: fileNameResolver.resolve( - request, toServerInfo(resolvedUrl), - ) + ?: fileNameResolver.resolve(request, resolvedUrl) val outputPath = resolveDestPath( destination = request.destination, defaultDir = config.defaultDirectory ?: "downloads", @@ -162,12 +160,10 @@ internal class DownloadExecution( outputPath = outputPath, state = TaskState.DOWNLOADING, totalBytes = total, - acceptRanges = resolvedUrl.supportsResume, - etag = resolvedUrl.metadata[HttpDownloadSource.META_ETAG], - lastModified = resolvedUrl.metadata[ - HttpDownloadSource.META_LAST_MODIFIED, - ], sourceType = source.type, + sourceResumeState = source.buildResumeState( + resolvedUrl, total, + ), updatedAt = now, ) } @@ -175,9 +171,7 @@ internal class DownloadExecution( taskLimiter.delegate = createLimiter(request.speedLimit) val preResolved = if (resolved != null) resolvedUrl else null - runDownload( - outputPath, total, source.managesOwnFileIo, preResolved, - ) { ctx -> + runDownload(outputPath, total, source, preResolved) { ctx -> source.download(ctx) } } @@ -185,7 +179,12 @@ internal class DownloadExecution( private suspend fun executeResume(info: ResumeInfo) { val taskRecord = info.record - val sourceType = taskRecord.sourceType ?: HttpDownloadSource.TYPE + val sourceType = taskRecord.sourceType + ?: throw KetchError.Unknown( + IllegalStateException( + "No sourceType for taskId=${taskRecord.taskId}", + ), + ) val source = sourceResolver.resolveByType(sourceType) log.i { "Resuming download for taskId=$taskId via " + @@ -202,16 +201,11 @@ internal class DownloadExecution( taskLimiter.delegate = createLimiter(taskRecord.request.speedLimit) val resumeState = taskRecord.sourceResumeState - ?: HttpDownloadSource.buildResumeState( - etag = taskRecord.etag, - lastModified = taskRecord.lastModified, - totalBytes = taskRecord.totalBytes, + ?: throw KetchError.CorruptResumeState( + "No resume state for taskId=${taskRecord.taskId}", ) - runDownload( - outputPath, taskRecord.totalBytes, source.managesOwnFileIo, - ) { ctx -> - context = ctx + runDownload(outputPath, taskRecord.totalBytes, source) { ctx -> source.resume(ctx, resumeState) } } @@ -221,16 +215,18 @@ internal class DownloadExecution( * builds the [DownloadContext], runs [downloadBlock] with retry, * flushes, persists completion, and cleans up. * - * When [selfManagedIo] is `true`, the source handles its own file - * I/O so we use [NoOpFileAccessor] and skip flush/cleanup. + * When [DownloadSource.managesOwnFileIo] is `true`, the source + * handles its own file I/O so we use [NoOpFileAccessor] and skip + * flush/cleanup. */ private suspend fun runDownload( outputPath: String, total: Long, - selfManagedIo: Boolean = false, + source: DownloadSource, preResolved: ResolvedSource? = null, downloadBlock: suspend (DownloadContext) -> Unit, ) { + val selfManagedIo = source.managesOwnFileIo val fa = if (selfManagedIo) { NoOpFileAccessor } else { @@ -248,10 +244,12 @@ internal class DownloadExecution( while (true) { delay(config.saveIntervalMs) val snapshot = handle.mutableSegments.value - val downloaded = snapshot.sumOf { it.downloadedBytes } + val updatedResume = source.updateResumeState(ctx) handle.record.update { it.copy( segments = snapshot, + sourceResumeState = updatedResume + ?: it.sourceResumeState, updatedAt = Clock.System.now(), ) } @@ -278,11 +276,6 @@ internal class DownloadExecution( it.copy( state = TaskState.COMPLETED, segments = null, - sourceResumeState = HttpDownloadSource.buildResumeState( - etag = it.etag, - lastModified = it.lastModified, - totalBytes = it.totalBytes, - ), updatedAt = Clock.System.now(), ) } @@ -479,17 +472,6 @@ internal class DownloadExecution( } } - private fun toServerInfo(resolved: ResolvedSource): ServerInfo { - return ServerInfo( - contentLength = resolved.totalBytes, - acceptRanges = resolved.supportsResume, - etag = resolved.metadata[HttpDownloadSource.META_ETAG], - lastModified = resolved.metadata[ - HttpDownloadSource.META_LAST_MODIFIED, - ], - ) - } - private fun resolveDestPath( destination: com.linroid.ketch.api.Destination?, defaultDir: String, diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt index 17d33eb3..ee950001 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt @@ -7,8 +7,7 @@ import com.linroid.ketch.api.ResolvedSource * * Each source handles a specific protocol or download mechanism. * The default implementation is [HttpDownloadSource] for HTTP/HTTPS - * downloads. Future implementations may include torrent, media - * extraction, or other protocols. + * downloads. Other implementations include FTP, BitTorrent, etc. * * Sources are registered with [SourceResolver] which routes * download requests to the appropriate source based on URL matching. @@ -32,10 +31,15 @@ interface DownloadSource { * Resolves source metadata for the given URL without downloading. * This is analogous to an HTTP HEAD request but generalized for * any source type. + * + * @param url the URL to resolve + * @param properties source-specific key-value pairs. For HTTP + * sources this contains HTTP headers; other sources may + * interpret them differently or ignore them. */ suspend fun resolve( url: String, - headers: Map = emptyMap(), + properties: Map = emptyMap(), ): ResolvedSource /** @@ -56,4 +60,36 @@ interface DownloadSource { * [com.linroid.ketch.api.KetchError.Unsupported]. */ suspend fun resume(context: DownloadContext, resumeState: SourceResumeState) + + /** + * Builds an opaque [SourceResumeState] from resolved metadata. + * + * Called after [resolve] completes to persist source-specific + * state needed for resume validation (e.g., HTTP ETag/Last-Modified, + * FTP MDTM, torrent info hash). The returned state is stored in + * the task record and passed back to [resume] on restart. + * + * @param resolved the metadata returned by [resolve] + * @param totalBytes total download size in bytes + */ + fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ): SourceResumeState + + /** + * Called periodically during download to let the source update + * its resume state. The returned state replaces the current + * resume state in the task record. + * + * Default implementation returns `null` (no update needed). + * Sources like BitTorrent override this to persist bitfield + * progress incrementally. + * + * @param context the active download context + * @return updated resume state, or `null` to keep the current one + */ + suspend fun updateResumeState( + context: DownloadContext, + ): SourceResumeState? = null } diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt index 1fb96d4f..4c20cf05 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt @@ -3,26 +3,15 @@ package com.linroid.ketch.core.engine import com.linroid.ketch.api.KetchError import com.linroid.ketch.api.ResolvedSource import com.linroid.ketch.api.Segment -import com.linroid.ketch.api.DownloadRequest import com.linroid.ketch.core.file.DefaultFileNameResolver import com.linroid.ketch.api.log.KetchLogger import com.linroid.ketch.core.segment.SegmentCalculator import com.linroid.ketch.core.segment.SegmentDownloader +import com.linroid.ketch.core.segment.SegmentedDownloadHelper import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withContext import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json -import kotlin.time.Clock -import kotlin.time.Duration.Companion.milliseconds /** * HTTP/HTTPS download source using the existing [HttpEngine] pipeline. @@ -47,10 +36,10 @@ internal class HttpDownloadSource( override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { val detector = RangeSupportDetector(httpEngine) - val serverInfo = detector.detect(url, headers) + val serverInfo = detector.detect(url, properties) val fileName = serverInfo.contentDisposition?.let { extractDispositionFileName(it) } ?: DefaultFileNameResolver.fromUrl(url) @@ -65,6 +54,9 @@ internal class HttpDownloadSource( serverInfo.etag?.let { put(META_ETAG, it) } serverInfo.lastModified?.let { put(META_LAST_MODIFIED, it) } if (serverInfo.acceptRanges) put(META_ACCEPT_RANGES, "true") + serverInfo.contentDisposition?.let { + put(META_CONTENT_DISPOSITION, it) + } serverInfo.rateLimitRemaining?.let { put(META_RATE_LIMIT_REMAINING, it.toString()) } @@ -231,173 +223,36 @@ internal class HttpDownloadSource( return segments } + private val segmentHelper = SegmentedDownloadHelper( + progressIntervalMs = progressIntervalMs, + tag = "HttpSource", + ) + /** - * Downloads segments with dynamic resegmentation support. - * - * Uses a while loop that repeatedly calls [downloadBatch] for the - * current set of incomplete segments. When the connection count - * changes (via [DownloadContext.maxConnections]), the batch is - * canceled, progress is snapshotted, segments are merged/split - * via [SegmentCalculator.resegment], and a new batch starts. + * Downloads segments via HTTP Range requests with dynamic + * resegmentation support. Delegates the concurrent batch loop + * to [SegmentedDownloadHelper]. */ private suspend fun downloadSegments( context: DownloadContext, segments: List, totalBytes: Long, ) { - var currentSegments = segments - - while (true) { - val incomplete = currentSegments.filter { !it.isComplete } - if (incomplete.isEmpty()) break - - val batchCompleted = downloadBatch( - context, currentSegments, incomplete, totalBytes - ) - - if (batchCompleted) break - - // Resegment with the new connection count - val newCount = context.pendingResegment - context.pendingResegment = 0 - currentSegments = SegmentCalculator.resegment( - context.segments.value, newCount - ) - context.segments.value = currentSegments - log.i { - "Resegmented to $newCount connections for " + - "taskId=${context.taskId}" - } - } - - context.segments.value = currentSegments - context.onProgress(totalBytes, totalBytes) - } - - /** - * Downloads one batch of incomplete segments concurrently. - * - * A watcher coroutine monitors [DownloadContext.maxConnections] - * for changes. When the connection count changes, it sets - * [DownloadContext.pendingResegment] and cancels the scope, - * causing all segment coroutines to stop. Progress is - * snapshotted before returning. - * - * @return `true` if all segments completed, `false` if - * interrupted for resegmentation - */ - private suspend fun downloadBatch( - context: DownloadContext, - allSegments: List, - incompleteSegments: List, - totalBytes: Long, - ): Boolean { - val segmentProgress = - allSegments.map { it.downloadedBytes }.toMutableList() - val segmentMutex = Mutex() - val updatedSegments = allSegments.toMutableList() - - var lastProgressUpdate = Clock.System.now() - val progressMutex = Mutex() - - suspend fun currentSegments(): List { - return segmentMutex.withLock { - updatedSegments.mapIndexed { i, seg -> - seg.copy(downloadedBytes = segmentProgress[i]) - } - } - } - - suspend fun updateProgress() { - val now = Clock.System.now() - progressMutex.withLock { - if (now - lastProgressUpdate >= - progressIntervalMs.milliseconds - ) { - val snapshot = currentSegments() - val downloaded = snapshot.sumOf { it.downloadedBytes } - context.onProgress(downloaded, totalBytes) - context.segments.value = snapshot - lastProgressUpdate = now - } - } - } - - val downloadedBytes = allSegments.sumOf { it.downloadedBytes } - context.onProgress(downloadedBytes, totalBytes) - context.segments.value = allSegments - - return try { - coroutineScope { - // Watcher: detect connection count changes and trigger - // resegmentation by canceling the scope. Compares against the - // last-seen flow value (not segment count) to avoid an infinite - // loop when fewer segments can be created than requested. - val watcherJob = launch { - val lastSeen = context.maxConnections.value - context.maxConnections.first { count -> - count > 0 && count != lastSeen - } - context.pendingResegment = - context.maxConnections.value - log.i { - "Connection change detected for " + - "taskId=${context.taskId}: " + - "$lastSeen -> ${context.pendingResegment}" - } - throw CancellationException("Resegmenting") - } - - try { - val results = incompleteSegments.map { segment -> - async { - val throttleLimiter = object : SpeedLimiter { - override suspend fun acquire(bytes: Int) { - context.throttle(bytes) - } - } - val downloader = SegmentDownloader( - httpEngine, context.fileAccessor, - throttleLimiter, SpeedLimiter.Unlimited - ) - val completed = downloader.download( - context.url, segment, context.headers - ) { bytesDownloaded -> - segmentMutex.withLock { - segmentProgress[segment.index] = - bytesDownloaded - } - updateProgress() - } - segmentMutex.withLock { - updatedSegments[completed.index] = completed - } - context.segments.value = currentSegments() - log.d { - "Segment ${completed.index} completed for " + - "taskId=${context.taskId}" - } - completed - } - } - results.awaitAll() - } finally { - watcherJob.cancel() + segmentHelper.downloadAll( + context, segments, totalBytes, + ) { segment, onProgress -> + val throttleLimiter = object : SpeedLimiter { + override suspend fun acquire(bytes: Int) { + context.throttle(bytes) } - - context.segments.value = currentSegments() - true // All segments completed - } - } catch (e: CancellationException) { - if (context.pendingResegment > 0) { - // Snapshot progress before resegmentation - withContext(NonCancellable) { - context.segments.value = currentSegments() - } - false // Signal outer loop to resegment - } else { - throw e // External cancellation — propagate } + val downloader = SegmentDownloader( + httpEngine, context.fileAccessor, + throttleLimiter, SpeedLimiter.Unlimited, + ) + downloader.download( + context.url, segment, context.headers, onProgress, + ) } } @@ -454,6 +309,17 @@ internal class HttpDownloadSource( return connections } + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ): SourceResumeState { + return buildResumeState( + etag = resolved.metadata[META_ETAG], + lastModified = resolved.metadata[META_LAST_MODIFIED], + totalBytes = totalBytes, + ) + } + private fun extractDispositionFileName( contentDisposition: String, ): String? { @@ -466,6 +332,7 @@ internal class HttpDownloadSource( internal const val META_ETAG = "etag" internal const val META_LAST_MODIFIED = "lastModified" internal const val META_ACCEPT_RANGES = "acceptRanges" + internal const val META_CONTENT_DISPOSITION = "contentDisposition" internal const val META_RATE_LIMIT_REMAINING = "rateLimitRemaining" internal const val META_RATE_LIMIT_RESET = "rateLimitReset" diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/DefaultFileNameResolver.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/DefaultFileNameResolver.kt index 54c58e43..b45db748 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/DefaultFileNameResolver.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/DefaultFileNameResolver.kt @@ -1,13 +1,13 @@ package com.linroid.ketch.core.file import com.linroid.ketch.api.DownloadRequest +import com.linroid.ketch.api.ResolvedSource import com.linroid.ketch.api.log.KetchLogger -import com.linroid.ketch.core.engine.ServerInfo /** * Default strategy for resolving file names: * 1. Content-Disposition header (`filename*=UTF-8''...`, `filename="..."`, - * or `filename=...`) + * or `filename=...`) — extracted from `resolved.metadata["contentDisposition"]` * 2. Last non-empty URL path segment (percent-decoded, query/fragment stripped) * 3. Fallback: `"download"` * @@ -20,9 +20,10 @@ internal class DefaultFileNameResolver : FileNameResolver { override fun resolve( request: DownloadRequest, - serverInfo: ServerInfo, + resolved: ResolvedSource, ): String { - val name = fromContentDisposition(serverInfo.contentDisposition) + val contentDisposition = resolved.metadata[META_CONTENT_DISPOSITION] + val name = fromContentDisposition(contentDisposition) ?: fromUrl(request.url) ?: FALLBACK log.d { "Resolved filename: \"$name\" for url: ${request.url}" } @@ -31,6 +32,7 @@ internal class DefaultFileNameResolver : FileNameResolver { companion object { internal const val FALLBACK = "download" + internal const val META_CONTENT_DISPOSITION = "contentDisposition" internal fun fromContentDisposition(header: String?): String? { if (header.isNullOrBlank()) return null diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/FileNameResolver.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/FileNameResolver.kt index cba30134..4415a519 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/FileNameResolver.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/FileNameResolver.kt @@ -1,22 +1,22 @@ package com.linroid.ketch.core.file import com.linroid.ketch.api.DownloadRequest -import com.linroid.ketch.core.engine.ServerInfo +import com.linroid.ketch.api.ResolvedSource /** - * Resolves a file name for a download from server metadata. + * Resolves a file name for a download from source metadata. * * Explicit names set via [DownloadRequest.destination] are handled by * the coordinator before this resolver is called. Implementations only - * need to derive a name from the server response. + * need to derive a name from the source response. */ fun interface FileNameResolver { /** - * Resolves a file name from the given [request] and [serverInfo]. + * Resolves a file name from the given [request] and [resolved] metadata. * * @param request the download request - * @param serverInfo server metadata from the HEAD response + * @param resolved source metadata from the resolve/probe step * @return a non-blank file name to save the download as */ - fun resolve(request: DownloadRequest, serverInfo: ServerInfo): String + fun resolve(request: DownloadRequest, resolved: ResolvedSource): String } diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt new file mode 100644 index 00000000..31bad94a --- /dev/null +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt @@ -0,0 +1,209 @@ +package com.linroid.ketch.core.segment + +import com.linroid.ketch.api.Segment +import com.linroid.ketch.api.log.KetchLogger +import com.linroid.ketch.core.engine.DownloadContext +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext +import kotlin.time.Clock +import kotlin.time.Duration.Companion.milliseconds + +/** + * Shared loop logic for segmented parallel downloads with dynamic + * resegmentation support. + * + * Both [com.linroid.ketch.core.engine.HttpDownloadSource] and + * FTP/other segmented sources can delegate to this helper instead + * of reimplementing the download loop, progress aggregation, and + * connection-change watcher logic independently. + * + * @param progressIntervalMs minimum interval between progress + * reports in milliseconds + * @param tag log tag for this helper instance + */ +class SegmentedDownloadHelper( + private val progressIntervalMs: Long = 200, + tag: String = "SegmentHelper", +) { + private val log = KetchLogger(tag) + + /** + * Downloads all incomplete segments concurrently with dynamic + * resegmentation support. + * + * Manages a while loop that calls [downloadBatch] for incomplete + * segments. When [DownloadContext.maxConnections] changes, the + * batch is canceled, progress is snapshotted, segments are + * merged/split via [SegmentCalculator.resegment], and a new + * batch starts. + * + * @param context the download context with progress callbacks + * @param segments initial list of segments (some may already + * have progress) + * @param totalBytes total download size + * @param downloadSegment protocol-specific download function for + * a single segment. Receives the segment and a progress + * callback (bytesDownloaded so far for this segment). Must + * return the completed segment with final downloadedBytes. + */ + suspend fun downloadAll( + context: DownloadContext, + segments: List, + totalBytes: Long, + downloadSegment: suspend ( + segment: Segment, + onProgress: suspend (bytesDownloaded: Long) -> Unit, + ) -> Segment, + ) { + var currentSegments = segments + + while (true) { + val incomplete = currentSegments.filter { !it.isComplete } + if (incomplete.isEmpty()) break + + val batchCompleted = downloadBatch( + context, currentSegments, incomplete, totalBytes, + downloadSegment, + ) + + if (batchCompleted) break + + val newCount = context.pendingResegment + context.pendingResegment = 0 + currentSegments = SegmentCalculator.resegment( + context.segments.value, newCount, + ) + context.segments.value = currentSegments + log.i { + "Resegmented to $newCount connections for " + + "taskId=${context.taskId}" + } + } + + context.segments.value = currentSegments + context.onProgress(totalBytes, totalBytes) + } + + /** + * Downloads one batch of incomplete segments concurrently. + * + * A watcher coroutine monitors [DownloadContext.maxConnections] + * for changes. When the connection count changes, it sets + * [DownloadContext.pendingResegment] and cancels the scope. + * + * @return `true` if all segments completed, `false` if + * interrupted for resegmentation + */ + private suspend fun downloadBatch( + context: DownloadContext, + allSegments: List, + incompleteSegments: List, + totalBytes: Long, + downloadSegment: suspend ( + segment: Segment, + onProgress: suspend (bytesDownloaded: Long) -> Unit, + ) -> Segment, + ): Boolean { + val segmentProgress = + allSegments.map { it.downloadedBytes }.toMutableList() + val segmentMutex = Mutex() + val updatedSegments = allSegments.toMutableList() + + var lastProgressUpdate = Clock.System.now() + val progressMutex = Mutex() + + suspend fun currentSegments(): List { + return segmentMutex.withLock { + updatedSegments.mapIndexed { i, seg -> + seg.copy(downloadedBytes = segmentProgress[i]) + } + } + } + + suspend fun updateProgress() { + val now = Clock.System.now() + progressMutex.withLock { + if (now - lastProgressUpdate >= + progressIntervalMs.milliseconds + ) { + val snapshot = currentSegments() + val downloaded = snapshot.sumOf { it.downloadedBytes } + context.onProgress(downloaded, totalBytes) + context.segments.value = snapshot + lastProgressUpdate = now + } + } + } + + val downloadedBytes = allSegments.sumOf { it.downloadedBytes } + context.onProgress(downloadedBytes, totalBytes) + context.segments.value = allSegments + + return try { + coroutineScope { + val watcherJob = launch { + val lastSeen = context.maxConnections.value + context.maxConnections.first { count -> + count > 0 && count != lastSeen + } + context.pendingResegment = + context.maxConnections.value + log.i { + "Connection change detected for " + + "taskId=${context.taskId}: " + + "$lastSeen -> ${context.pendingResegment}" + } + throw CancellationException("Resegmenting") + } + + try { + val results = incompleteSegments.map { segment -> + async { + val completed = downloadSegment( + segment, + ) { bytesDownloaded -> + segmentMutex.withLock { + segmentProgress[segment.index] = + bytesDownloaded + } + updateProgress() + } + segmentMutex.withLock { + updatedSegments[completed.index] = completed + } + context.segments.value = currentSegments() + log.d { + "Segment ${completed.index} completed for " + + "taskId=${context.taskId}" + } + completed + } + } + results.awaitAll() + } finally { + watcherJob.cancel() + } + + context.segments.value = currentSegments() + true + } + } catch (e: CancellationException) { + if (context.pendingResegment > 0) { + withContext(NonCancellable) { + context.segments.value = currentSegments() + } + false + } else { + throw e + } + } + } +} diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/task/TaskRecord.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/task/TaskRecord.kt index a8e32b2e..65940f31 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/task/TaskRecord.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/task/TaskRecord.kt @@ -22,9 +22,6 @@ data class TaskRecord( val state: TaskState = TaskState.QUEUED, val totalBytes: Long = -1, val error: KetchError? = null, - val acceptRanges: Boolean? = null, - val etag: String? = null, - val lastModified: String? = null, val segments: List? = null, val sourceType: String? = null, val sourceResumeState: SourceResumeState? = null, diff --git a/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/ResolveUrlTest.kt b/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/ResolveUrlTest.kt index 56f0f276..4bfa680e 100644 --- a/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/ResolveUrlTest.kt +++ b/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/ResolveUrlTest.kt @@ -265,7 +265,7 @@ class ResolveUrlTest { url.startsWith("magnet:") override suspend fun resolve( url: String, - headers: Map, + properties: Map, ) = ResolvedSource( url = url, sourceType = "magnet", @@ -279,6 +279,10 @@ class ResolveUrlTest { context: DownloadContext, resumeState: SourceResumeState, ) {} + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ) = SourceResumeState(sourceType = "magnet", data = "{}") } val resolver = SourceResolver(listOf(fakeSource)) assertFailsWith { @@ -296,7 +300,7 @@ class ResolveUrlTest { url.startsWith("magnet:") override suspend fun resolve( url: String, - headers: Map, + properties: Map, ) = ResolvedSource( url = url, sourceType = "magnet", @@ -310,6 +314,10 @@ class ResolveUrlTest { context: DownloadContext, resumeState: SourceResumeState, ) {} + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ) = SourceResumeState(sourceType = "magnet", data = "{}") } val engine = FakeHttpEngine() diff --git a/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/SourceResolverTest.kt b/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/SourceResolverTest.kt index 41a921e5..840166c6 100644 --- a/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/SourceResolverTest.kt +++ b/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/SourceResolverTest.kt @@ -25,7 +25,7 @@ class SourceResolverTest { url.startsWith("magnet:") override suspend fun resolve( url: String, - headers: Map, + properties: Map, ) = ResolvedSource( url = url, sourceType = "magnet", @@ -39,6 +39,10 @@ class SourceResolverTest { context: DownloadContext, resumeState: SourceResumeState, ) {} + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ) = SourceResumeState(sourceType = "magnet", data = "{}") } @Test diff --git a/library/core/src/commonTest/kotlin/com/linroid/ketch/file/DefaultFileNameResolverTest.kt b/library/core/src/commonTest/kotlin/com/linroid/ketch/file/DefaultFileNameResolverTest.kt index 72358886..7a89900d 100644 --- a/library/core/src/commonTest/kotlin/com/linroid/ketch/file/DefaultFileNameResolverTest.kt +++ b/library/core/src/commonTest/kotlin/com/linroid/ketch/file/DefaultFileNameResolverTest.kt @@ -1,7 +1,7 @@ package com.linroid.ketch.file import com.linroid.ketch.api.DownloadRequest -import com.linroid.ketch.core.engine.ServerInfo +import com.linroid.ketch.api.ResolvedSource import com.linroid.ketch.core.file.DefaultFileNameResolver import kotlin.test.Test import kotlin.test.assertEquals @@ -11,12 +11,20 @@ class DefaultFileNameResolverTest { private val resolver = DefaultFileNameResolver() private val dir = "/tmp" - private fun serverInfo(contentDisposition: String? = null) = ServerInfo( - contentLength = 1000, - acceptRanges = true, - etag = null, - lastModified = null, - contentDisposition = contentDisposition, + private fun resolved( + contentDisposition: String? = null, + ) = ResolvedSource( + url = "https://example.com", + sourceType = "http", + totalBytes = 1000, + supportsResume = true, + suggestedFileName = null, + maxSegments = 4, + metadata = buildMap { + contentDisposition?.let { + put(DefaultFileNameResolver.META_CONTENT_DISPOSITION, it) + } + }, ) private fun request(url: String) = DownloadRequest( @@ -28,7 +36,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_filenameStarUtf8() { - val info = serverInfo("attachment; filename*=UTF-8''my%20file.zip") + val info = resolved("attachment; filename*=UTF-8''my%20file.zip") assertEquals( "my file.zip", resolver.resolve(request("https://example.com"), info) @@ -37,7 +45,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_filenameStarUtf8_caseInsensitive() { - val info = serverInfo("attachment; Filename*=utf-8''report.pdf") + val info = resolved("attachment; Filename*=utf-8''report.pdf") assertEquals( "report.pdf", resolver.resolve(request("https://example.com"), info) @@ -48,7 +56,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_filenameQuoted() { - val info = serverInfo("attachment; filename=\"document.pdf\"") + val info = resolved("attachment; filename=\"document.pdf\"") assertEquals( "document.pdf", resolver.resolve(request("https://example.com"), info) @@ -57,7 +65,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_filenameQuoted_withSpaces() { - val info = serverInfo("attachment; filename=\"my document.pdf\"") + val info = resolved("attachment; filename=\"my document.pdf\"") assertEquals( "my document.pdf", resolver.resolve(request("https://example.com"), info) @@ -68,7 +76,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_filenameUnquoted() { - val info = serverInfo("attachment; filename=report.csv") + val info = resolved("attachment; filename=report.csv") assertEquals( "report.csv", resolver.resolve(request("https://example.com"), info) @@ -79,7 +87,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fromUrlPath() { - val info = serverInfo() + val info = resolved() assertEquals( "archive.tar.gz", resolver.resolve( @@ -91,7 +99,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fromUrlPath_withQuery() { - val info = serverInfo() + val info = resolved() assertEquals( "file.zip", resolver.resolve( @@ -103,7 +111,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fromUrlPath_withFragment() { - val info = serverInfo() + val info = resolved() assertEquals( "file.zip", resolver.resolve( @@ -115,7 +123,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fromUrlPath_percentEncoded() { - val info = serverInfo() + val info = resolved() assertEquals( "my file.zip", resolver.resolve( @@ -127,7 +135,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fromUrlPath_trailingSlash() { - val info = serverInfo() + val info = resolved() assertEquals( "dir", resolver.resolve(request("https://example.com/dir/"), info) @@ -138,7 +146,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fallback_noPathNoDisposition() { - val info = serverInfo() + val info = resolved() assertEquals( "download", resolver.resolve(request("https://example.com/"), info) @@ -147,7 +155,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fallback_rootUrl() { - val info = serverInfo() + val info = resolved() assertEquals( "download", resolver.resolve(request("https://example.com"), info) @@ -161,7 +169,7 @@ class DefaultFileNameResolverTest { // Explicit file names via Destination are now handled by the // coordinator, not the resolver. The resolver always returns // the server-derived name. - val info = serverInfo("attachment; filename=\"server-name.zip\"") + val info = resolved("attachment; filename=\"server-name.zip\"") val req = DownloadRequest( url = "https://example.com/url-name.zip", destination = com.linroid.ketch.api.Destination("explicit.zip"), @@ -171,7 +179,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_dispositionTakesPriorityOverUrl() { - val info = serverInfo("attachment; filename=\"server-name.zip\"") + val info = resolved("attachment; filename=\"server-name.zip\"") assertEquals( "server-name.zip", resolver.resolve( diff --git a/library/core/src/commonTest/kotlin/com/linroid/ketch/task/TaskRecordTest.kt b/library/core/src/commonTest/kotlin/com/linroid/ketch/task/TaskRecordTest.kt index bbe891ea..929a40ba 100644 --- a/library/core/src/commonTest/kotlin/com/linroid/ketch/task/TaskRecordTest.kt +++ b/library/core/src/commonTest/kotlin/com/linroid/ketch/task/TaskRecordTest.kt @@ -107,31 +107,4 @@ class TaskRecordTest { assertNull(record.sourceResumeState) } - @Test - fun deserialization_withoutServerInfoFields_defaultsToNull() { - val epoch = Instant.fromEpochMilliseconds(0) - val jsonStr = """ - { - "taskId": "t1", - "request": { - "url": "https://example.com/f", - "destination": "/tmp/", - "connections": 4, - "headers": {}, - "properties": {} - }, - "outputPath": "/tmp/f", - "state": "QUEUED", - "totalBytes": 100, - "downloadedBytes": 0, - "createdAt": "$epoch", - "updatedAt": "$epoch" - } - """.trimIndent() - val record = json.decodeFromString(jsonStr) - assertNull(record.acceptRanges) - assertNull(record.etag) - assertNull(record.lastModified) - } - } diff --git a/library/endpoints/src/commonMain/kotlin/com/linroid/ketch/endpoints/model/ResolveUrlRequest.kt b/library/endpoints/src/commonMain/kotlin/com/linroid/ketch/endpoints/model/ResolveUrlRequest.kt index 6ed623fa..e0a19890 100644 --- a/library/endpoints/src/commonMain/kotlin/com/linroid/ketch/endpoints/model/ResolveUrlRequest.kt +++ b/library/endpoints/src/commonMain/kotlin/com/linroid/ketch/endpoints/model/ResolveUrlRequest.kt @@ -6,10 +6,10 @@ import kotlinx.serialization.Serializable * Request body for resolving a URL without downloading. * * @property url the URL to resolve - * @property headers optional HTTP headers to include in the probe + * @property properties source-specific key-value pairs (e.g., HTTP headers) */ @Serializable data class ResolveUrlRequest( val url: String, - val headers: Map = emptyMap(), + val properties: Map = emptyMap(), ) diff --git a/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt b/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt index 8397520f..86816b08 100644 --- a/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt +++ b/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt @@ -8,20 +8,11 @@ import com.linroid.ketch.core.engine.DownloadContext import com.linroid.ketch.core.engine.DownloadSource import com.linroid.ketch.core.engine.SourceResumeState import com.linroid.ketch.core.segment.SegmentCalculator +import com.linroid.ketch.core.segment.SegmentedDownloadHelper import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.async -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.ensureActive -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withContext import kotlinx.serialization.json.Json -import kotlin.time.Clock -import kotlin.time.Duration.Companion.milliseconds /** * FTP/FTPS download source using the [FtpClient] protocol layer. @@ -54,9 +45,19 @@ class FtpDownloadSource( return lower.startsWith("ftp://") || lower.startsWith("ftps://") } + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ): SourceResumeState { + return buildResumeState( + totalBytes = totalBytes, + mdtm = resolved.metadata[META_MDTM], + ) + } + override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { val ftpUrl = try { FtpUrl.parse(url) @@ -242,152 +243,25 @@ class FtpDownloadSource( return segments } + private val segmentHelper = SegmentedDownloadHelper( + progressIntervalMs = progressIntervalMs, + tag = "FtpSource", + ) + /** - * Downloads segments with dynamic resegmentation support. + * Downloads segments via FTP connections with dynamic + * resegmentation support. Delegates the concurrent batch loop + * to [SegmentedDownloadHelper]. */ private suspend fun downloadSegments( context: DownloadContext, segments: List, totalBytes: Long, ) { - var currentSegments = segments - - while (true) { - val incomplete = currentSegments.filter { !it.isComplete } - if (incomplete.isEmpty()) break - - val batchCompleted = downloadBatch( - context, currentSegments, incomplete, totalBytes, - ) - - if (batchCompleted) break - - val newCount = context.pendingResegment - context.pendingResegment = 0 - currentSegments = SegmentCalculator.resegment( - context.segments.value, newCount, - ) - context.segments.value = currentSegments - log.i { - "Resegmented to $newCount connections for " + - "taskId=${context.taskId}" - } - } - - context.segments.value = currentSegments - context.onProgress(totalBytes, totalBytes) - } - - /** - * Downloads one batch of incomplete segments concurrently. - * - * Each segment gets its own FTP connection. The watcher coroutine - * monitors connection count changes for live resegmentation. - * - * @return true if all segments completed, false if interrupted - * for resegmentation - */ - private suspend fun downloadBatch( - context: DownloadContext, - allSegments: List, - incompleteSegments: List, - totalBytes: Long, - ): Boolean { - val segmentProgress = - allSegments.map { it.downloadedBytes }.toMutableList() - val segmentMutex = Mutex() - val updatedSegments = allSegments.toMutableList() - - var lastProgressUpdate = Clock.System.now() - val progressMutex = Mutex() - - suspend fun currentSegments(): List { - return segmentMutex.withLock { - updatedSegments.mapIndexed { i, seg -> - seg.copy(downloadedBytes = segmentProgress[i]) - } - } - } - - suspend fun updateProgress() { - val now = Clock.System.now() - progressMutex.withLock { - if (now - lastProgressUpdate >= - progressIntervalMs.milliseconds - ) { - val snapshot = currentSegments() - val downloaded = snapshot.sumOf { it.downloadedBytes } - context.onProgress(downloaded, totalBytes) - context.segments.value = snapshot - lastProgressUpdate = now - } - } - } - - val downloadedBytes = allSegments.sumOf { it.downloadedBytes } - context.onProgress(downloadedBytes, totalBytes) - context.segments.value = allSegments - - return try { - coroutineScope { - // Watcher for connection count changes - val watcherJob = launch { - val lastSeen = context.maxConnections.value - context.maxConnections.first { count -> - count > 0 && count != lastSeen - } - context.pendingResegment = - context.maxConnections.value - log.i { - "Connection change detected for " + - "taskId=${context.taskId}: " + - "$lastSeen -> ${context.pendingResegment}" - } - throw CancellationException("Resegmenting") - } - - try { - val results = incompleteSegments.map { segment -> - async { - downloadSegment( - context, segment, - ) { bytesDownloaded -> - segmentMutex.withLock { - segmentProgress[segment.index] = - bytesDownloaded - } - updateProgress() - } - } - } - - for ((i, deferred) in results.withIndex()) { - val completed = deferred.await() - segmentMutex.withLock { - updatedSegments[completed.index] = completed - } - context.segments.value = currentSegments() - log.d { - "Segment ${completed.index} completed for " + - "taskId=${context.taskId}" - } - } - } finally { - watcherJob.cancel() - } - - context.segments.value = currentSegments() - true - } - } catch (e: CancellationException) { - if (context.pendingResegment > 0) { - withContext(NonCancellable) { - context.segments.value = currentSegments() - } - false - } else { - throw e - } + segmentHelper.downloadAll( + context, segments, totalBytes, + ) { segment, onProgress -> + downloadSegment(context, segment, onProgress) } } diff --git a/library/remote/src/commonMain/kotlin/com/linroid/ketch/remote/RemoteKetch.kt b/library/remote/src/commonMain/kotlin/com/linroid/ketch/remote/RemoteKetch.kt index 04b4449f..273442fc 100644 --- a/library/remote/src/commonMain/kotlin/com/linroid/ketch/remote/RemoteKetch.kt +++ b/library/remote/src/commonMain/kotlin/com/linroid/ketch/remote/RemoteKetch.kt @@ -133,11 +133,11 @@ class RemoteKetch( override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { val response = httpClient.post(Api.Resolve()) { contentType(ContentType.Application.Json) - setBody(ResolveUrlRequest(url, headers)) + setBody(ResolveUrlRequest(url, properties)) } checkSuccess(response) return response.body() diff --git a/library/server/src/main/kotlin/com/linroid/ketch/server/api/ServerRoutes.kt b/library/server/src/main/kotlin/com/linroid/ketch/server/api/ServerRoutes.kt index dd605e4a..b3ab3796 100644 --- a/library/server/src/main/kotlin/com/linroid/ketch/server/api/ServerRoutes.kt +++ b/library/server/src/main/kotlin/com/linroid/ketch/server/api/ServerRoutes.kt @@ -34,7 +34,7 @@ internal fun Route.serverRoutes(ketch: KetchApi) { post { val body = call.receive() log.i { "POST /api/resolve url=${body.url}" } - val resolved = ketch.resolve(body.url, body.headers) + val resolved = ketch.resolve(body.url, body.properties) call.respond(resolved) } } diff --git a/library/sqlite/src/commonMain/kotlin/com/linroid/ketch/sqlite/SqliteTaskStore.kt b/library/sqlite/src/commonMain/kotlin/com/linroid/ketch/sqlite/SqliteTaskStore.kt index 8a3aa384..34f5e772 100644 --- a/library/sqlite/src/commonMain/kotlin/com/linroid/ketch/sqlite/SqliteTaskStore.kt +++ b/library/sqlite/src/commonMain/kotlin/com/linroid/ketch/sqlite/SqliteTaskStore.kt @@ -5,6 +5,7 @@ import com.linroid.ketch.api.DownloadRequest import com.linroid.ketch.api.KetchError import com.linroid.ketch.api.Segment import com.linroid.ketch.api.log.KetchLogger +import com.linroid.ketch.core.engine.SourceResumeState import com.linroid.ketch.core.task.TaskRecord import com.linroid.ketch.core.task.TaskState import com.linroid.ketch.core.task.TaskStore @@ -28,6 +29,7 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { private val json = Json { ignoreUnknownKeys = true } private val errorSerializer = KetchError.serializer() private val segmentListSerializer = ListSerializer(Segment.serializer()) + private val resumeStateSerializer = SourceResumeState.serializer() /** * Saves a [TaskRecord] to the SQLite database. If a record with the same @@ -38,13 +40,15 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { val requestJson = json.encodeToString( DownloadRequest.serializer(), record.request ) - val acceptRanges = record.acceptRanges?.let { if (it) 1L else 0L } val segmentsJson = record.segments?.let { json.encodeToString(segmentListSerializer, it) } val errorJson = record.error?.let { json.encodeToString(errorSerializer, it) } + val resumeStateJson = record.sourceResumeState?.let { + json.encodeToString(resumeStateSerializer, it) + } queries.transaction { queries.insertOrIgnore( task_id = record.taskId, @@ -52,9 +56,8 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { output_path = record.outputPath, state = record.state.name, total_bytes = record.totalBytes, - accept_ranges = acceptRanges, - etag = record.etag, - last_modified = record.lastModified, + source_type = record.sourceType, + source_resume_state_json = resumeStateJson, segments_json = segmentsJson, error_json = errorJson, ) @@ -64,9 +67,8 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { output_path = record.outputPath, state = record.state.name, total_bytes = record.totalBytes, - accept_ranges = acceptRanges, - etag = record.etag, - last_modified = record.lastModified, + source_type = record.sourceType, + source_resume_state_json = resumeStateJson, segments_json = segmentsJson, error_json = errorJson, ) @@ -116,9 +118,6 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { null } }, - acceptRanges = accept_ranges?.let { it != 0L }, - etag = etag, - lastModified = last_modified, segments = segments_json?.let { try { json.decodeFromString(segmentListSerializer, it) @@ -127,6 +126,17 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { null } }, + sourceType = source_type, + sourceResumeState = source_resume_state_json?.let { + try { + json.decodeFromString(resumeStateSerializer, it) + } catch (e: Exception) { + log.w(e) { + "Failed to deserialize resume state for task: $task_id" + } + null + } + }, createdAt = Instant.fromEpochMilliseconds(created_at), updatedAt = Instant.fromEpochMilliseconds(updated_at), ) diff --git a/library/sqlite/src/commonMain/sqldelight/com/linroid/ketch/sqlite/TaskRecords.sq b/library/sqlite/src/commonMain/sqldelight/com/linroid/ketch/sqlite/TaskRecords.sq index c27bfed5..fcdf19e9 100644 --- a/library/sqlite/src/commonMain/sqldelight/com/linroid/ketch/sqlite/TaskRecords.sq +++ b/library/sqlite/src/commonMain/sqldelight/com/linroid/ketch/sqlite/TaskRecords.sq @@ -4,9 +4,8 @@ CREATE TABLE IF NOT EXISTS task_records ( output_path TEXT, state TEXT NOT NULL DEFAULT 'PENDING', total_bytes INTEGER NOT NULL DEFAULT -1, - accept_ranges INTEGER, - etag TEXT, - last_modified TEXT, + source_type TEXT, + source_resume_state_json TEXT, segments_json TEXT, created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s', 'now') AS INTEGER) * 1000), updated_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s', 'now') AS INTEGER) * 1000), @@ -16,9 +15,9 @@ CREATE TABLE IF NOT EXISTS task_records ( insertOrIgnore: INSERT OR IGNORE INTO task_records( task_id, request_json, output_path, state, - total_bytes, accept_ranges, etag, last_modified, + total_bytes, source_type, source_resume_state_json, segments_json, error_json -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?); +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); update: UPDATE task_records SET @@ -26,9 +25,8 @@ UPDATE task_records SET output_path = ?, state = ?, total_bytes = ?, - accept_ranges = ?, - etag = ?, - last_modified = ?, + source_type = ?, + source_resume_state_json = ?, segments_json = ?, error_json = ?, updated_at = CAST(strftime('%s', 'now') AS INTEGER) * 1000 diff --git a/library/sqlite/src/commonMain/sqldelight/databases/2.db b/library/sqlite/src/commonMain/sqldelight/databases/2.db new file mode 100644 index 00000000..97536389 Binary files /dev/null and b/library/sqlite/src/commonMain/sqldelight/databases/2.db differ diff --git a/library/sqlite/src/commonMain/sqldelight/databases/3.db b/library/sqlite/src/commonMain/sqldelight/databases/3.db new file mode 100644 index 00000000..bb338e25 Binary files /dev/null and b/library/sqlite/src/commonMain/sqldelight/databases/3.db differ diff --git a/library/sqlite/src/commonMain/sqldelight/migrations/2.sqm b/library/sqlite/src/commonMain/sqldelight/migrations/2.sqm new file mode 100644 index 00000000..b86b42fd --- /dev/null +++ b/library/sqlite/src/commonMain/sqldelight/migrations/2.sqm @@ -0,0 +1,26 @@ +CREATE TABLE task_records_new ( + task_id TEXT NOT NULL PRIMARY KEY, + request_json TEXT NOT NULL, + output_path TEXT, + state TEXT NOT NULL DEFAULT 'PENDING', + total_bytes INTEGER NOT NULL DEFAULT -1, + source_type TEXT, + source_resume_state_json TEXT, + segments_json TEXT, + created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s', 'now') AS INTEGER) * 1000), + updated_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s', 'now') AS INTEGER) * 1000), + error_json TEXT +); + +INSERT INTO task_records_new ( + task_id, request_json, output_path, state, + total_bytes, segments_json, created_at, updated_at, error_json +) +SELECT + task_id, request_json, output_path, state, + total_bytes, segments_json, created_at, updated_at, error_json +FROM task_records; + +DROP TABLE task_records; + +ALTER TABLE task_records_new RENAME TO task_records; diff --git a/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt b/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt index 75f2c6e5..9369e58a 100644 --- a/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt +++ b/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt @@ -44,6 +44,7 @@ class TorrentDownloadSource( override val managesOwnFileIo: Boolean = true private var engine: TorrentEngine? = null + private var activeSession: TorrentSession? = null private suspend fun getEngine(): TorrentEngine { val existing = engine @@ -61,9 +62,49 @@ class TorrentDownloadSource( lower.contains(".torrent?") } + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ): SourceResumeState { + val infoHash = resolved.metadata[META_INFO_HASH] ?: "" + val state = TorrentResumeState( + infoHash = infoHash, + totalBytes = totalBytes, + resumeData = "", + selectedFileIds = resolved.files.map { it.id }.toSet(), + savePath = "", + ) + return SourceResumeState( + sourceType = TYPE, + data = Json.encodeToString(state), + ) + } + + override suspend fun updateResumeState( + context: DownloadContext, + ): SourceResumeState? { + val session = activeSession ?: return null + val resumeData = session.saveResumeData() ?: return null + val selectedIds = context.request.selectedFileIds.ifEmpty { + context.segments.value.map { it.index.toString() }.toSet() + } + return SourceResumeState( + sourceType = TYPE, + data = Json.encodeToString( + TorrentResumeState( + infoHash = session.infoHash, + totalBytes = context.segments.value.sumOf { it.totalBytes }, + resumeData = encodeBase64(resumeData), + selectedFileIds = selectedIds, + savePath = extractSavePath(context), + ), + ), + ) + } + override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { val metadata = try { resolveMetadata(url) @@ -186,6 +227,7 @@ class TorrentDownloadSource( session.setDownloadRateLimit(speedLimit.bytesPerSecond) } + activeSession = session try { monitorProgress(context, session, segments, totalBytes) } catch (e: CancellationException) { @@ -194,6 +236,8 @@ class TorrentDownloadSource( } catch (e: Exception) { if (e is KetchError) throw e throw KetchError.SourceError(TYPE, e) + } finally { + activeSession = null } } @@ -253,6 +297,7 @@ class TorrentDownloadSource( val segments = context.segments.value val totalBytes = state.totalBytes + activeSession = session try { monitorProgress(context, session, segments, totalBytes) } catch (e: CancellationException) { @@ -261,6 +306,8 @@ class TorrentDownloadSource( } catch (e: Exception) { if (e is KetchError) throw e throw KetchError.SourceError(TYPE, e) + } finally { + activeSession = null } } @@ -300,25 +347,6 @@ class TorrentDownloadSource( // Final progress update updateSegmentProgress(context, segments, totalBytes, totalBytes) context.onProgress(totalBytes, totalBytes) - - // Save resume data for potential re-seeding - val resumeData = session.saveResumeData() - if (resumeData != null) { - val savePath = extractSavePath(context) - val selectedIds = context.request.selectedFileIds.ifEmpty { - segments.map { it.index.toString() }.toSet() - } - val sourceState = TorrentResumeState( - infoHash = session.infoHash, - totalBytes = totalBytes, - resumeData = encodeBase64(resumeData), - selectedFileIds = selectedIds, - savePath = savePath, - ) - // Store resume state by updating the context's segments - // The DownloadExecution will persist this through TaskRecord - log.d { "Saved torrent resume data for ${session.infoHash}" } - } } /**