From 24b4e84073a85f07597532f167e57ac953acc666 Mon Sep 17 00:00:00 2001 From: Thiemo Ottersbach Date: Mon, 4 Nov 2024 19:38:03 +0100 Subject: [PATCH 1/7] feat: Add basic interval option --- .gitignore | 3 +- scdl/scdl.py | 249 ++++++++++++++++++++++++++++----------------------- 2 files changed, 141 insertions(+), 111 deletions(-) diff --git a/.gitignore b/.gitignore index 9e7a77ec..0fb46edb 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ __pycache__/ .env .coverage* .idea -.python-version \ No newline at end of file +.python-version +/main.py diff --git a/scdl/scdl.py b/scdl/scdl.py index 94a710bf..75fb571e 100644 --- a/scdl/scdl.py +++ b/scdl/scdl.py @@ -2,7 +2,7 @@ Usage: scdl (-l | -s | me) [-a | -f | -C | -t | -p | -r] - [-c | --force-metadata][-n ][-o ][--hidewarnings][--debug | --error] + [-c | --force-metadata][-n ][-I ][-o ][--hidewarnings][--debug | --error] [--path ][--addtofile][--addtimestamp][--onlymp3][--hide-progress][--min-size ] [--max-size ][--remove][--no-album-tag][--no-playlist-folder] [--download-archive ][--sync ][--extract-artist][--flac][--original-art] @@ -11,6 +11,7 @@ [--client-id ][--auth-token ][--overwrite][--no-playlist][--opus] [--add-description] + scdl -h | --help scdl --version @@ -22,6 +23,7 @@ -s [search_query] Search for a track/playlist/user and use the first result -n [maxtracks] Download the n last tracks of a playlist according to the creation date + -I [interval] Interval damn -a Download all tracks of user (including reposts) -t Download all uploads of a user (no reposts) -f Download all favorites (likes) of a user @@ -84,6 +86,7 @@ import mimetypes import os import pathlib +import re import shutil import subprocess import sys @@ -194,6 +197,9 @@ class SCDLArgs(TypedDict): sync: Optional[str] s: Optional[str] t: bool + I: Optional[str] + interval: NotRequired[str] + playlist_interval: NotRequired[str] class PlaylistInfo(TypedDict): @@ -361,15 +367,18 @@ def main() -> None: logger.error(f"Invalid auth_token in {config_file}") sys.exit(1) - if arguments["-o"] is not None: - try: - arguments["--offset"] = int(arguments["-o"]) - 1 - if arguments["--offset"] < 0: - raise ValueError - except Exception: - logger.error("Offset should be a positive integer...") - sys.exit(1) - logger.debug("offset: %d", arguments["--offset"]) + # if arguments["-o"] is not None: + # try: + # arguments["--offset"] = int(arguments["-o"]) - 1 + # if arguments["--offset"] < 0: + # raise ValueError + # except Exception: + # logger.error("Offset should be a positive integer...") + # sys.exit(1) + # logger.debug("offset: %d", arguments["--offset"]) + + if arguments.get("-I",None) is not None: + arguments["--interval"] = validate_interval(arguments["-I"]) if arguments["--min-size"] is not None: try: @@ -482,6 +491,22 @@ def validate_url(client: SoundCloud, url: str) -> str: logger.error("URL is not valid") sys.exit(1) +def validate_interval(interval: str) -> (int,int): + """ + """ + pattern = re.compile(r"\[([1-9][0-9]*)?,(\d*)\]") + search_result = pattern.search(interval) + + if search_result is not None: + start = int(search_result.group(1) or 1) + end = int(search_result.group(2) or sys.maxsize) + if start < end: + return start, end + + logger.error("Interval is not valid") + sys.exit(1) + + def search_soundcloud(client: SoundCloud, query: str) -> Optional[str]: """Search SoundCloud and return the URL of the first result.""" @@ -561,7 +586,8 @@ def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: url = kwargs["l"] item = client.resolve(url) logger.debug(item) - offset = kwargs.get("offset", 0) + # offset = kwargs.get("offset", 0) + interval = kwargs.get("interval",(1,1000)) if item is None: logger.error("URL is not valid") sys.exit(1) @@ -570,104 +596,105 @@ def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: download_track(client, item, kwargs) elif isinstance(item, AlbumPlaylist): logger.info("Found a playlist") - kwargs["playlist_offset"] = offset + # kwargs["playlist_offset"] = offset + kwargs["playlist_interval"] = interval download_playlist(client, item, kwargs) - elif isinstance(item, User): - user = item - logger.info("Found a user profile") - if kwargs.get("f"): - logger.info(f"Retrieving all likes of user {user.username}...") - likes = client.get_user_likes(user.id, limit=1000) - for i, like in itertools.islice(enumerate(likes, 1), offset, None): - logger.info(f"like n°{i} of {user.likes_count}") - if isinstance(like, TrackLike): - download_track( - client, - like.track, - kwargs, - exit_on_fail=kwargs["strict_playlist"], - ) - elif isinstance(like, PlaylistLike): - playlist = client.get_playlist(like.playlist.id) - assert playlist is not None - download_playlist(client, playlist, kwargs) - else: - logger.error(f"Unknown like type {like}") - if kwargs.get("strict_playlist"): - sys.exit(1) - logger.info(f"Downloaded all likes of user {user.username}!") - elif kwargs.get("C"): - logger.info(f"Retrieving all commented tracks of user {user.username}...") - comments = client.get_user_comments(user.id, limit=1000) - for i, comment in itertools.islice(enumerate(comments, 1), offset, None): - logger.info(f"comment n°{i} of {user.comments_count}") - track = client.get_track(comment.track.id) - assert track is not None - download_track( - client, - track, - kwargs, - exit_on_fail=kwargs["strict_playlist"], - ) - logger.info(f"Downloaded all commented tracks of user {user.username}!") - elif kwargs.get("t"): - logger.info(f"Retrieving all tracks of user {user.username}...") - tracks = client.get_user_tracks(user.id, limit=1000) - for i, track in itertools.islice(enumerate(tracks, 1), offset, None): - logger.info(f"track n°{i} of {user.track_count}") - download_track(client, track, kwargs, exit_on_fail=kwargs["strict_playlist"]) - logger.info(f"Downloaded all tracks of user {user.username}!") - elif kwargs.get("a"): - logger.info(f"Retrieving all tracks & reposts of user {user.username}...") - items = client.get_user_stream(user.id, limit=1000) - for i, stream_item in itertools.islice(enumerate(items, 1), offset, None): - logger.info( - f"item n°{i} of " - f"{user.track_count + user.reposts_count if user.reposts_count else '?'}", - ) - if isinstance(stream_item, (TrackStreamItem, TrackStreamRepostItem)): - download_track( - client, - stream_item.track, - kwargs, - exit_on_fail=kwargs["strict_playlist"], - ) - elif isinstance(stream_item, (PlaylistStreamItem, PlaylistStreamRepostItem)): - download_playlist(client, stream_item.playlist, kwargs) - else: - logger.error(f"Unknown item type {stream_item.type}") - if kwargs.get("strict_playlist"): - sys.exit(1) - logger.info(f"Downloaded all tracks & reposts of user {user.username}!") - elif kwargs.get("p"): - logger.info(f"Retrieving all playlists of user {user.username}...") - playlists = client.get_user_playlists(user.id, limit=1000) - for i, playlist in itertools.islice(enumerate(playlists, 1), offset, None): - logger.info(f"playlist n°{i} of {user.playlist_count}") - download_playlist(client, playlist, kwargs) - logger.info(f"Downloaded all playlists of user {user.username}!") - elif kwargs.get("r"): - logger.info(f"Retrieving all reposts of user {user.username}...") - reposts = client.get_user_reposts(user.id, limit=1000) - for i, repost in itertools.islice(enumerate(reposts, 1), offset, None): - logger.info(f"item n°{i} of {user.reposts_count or '?'}") - if isinstance(repost, TrackStreamRepostItem): - download_track( - client, - repost.track, - kwargs, - exit_on_fail=kwargs["strict_playlist"], - ) - elif isinstance(repost, PlaylistStreamRepostItem): - download_playlist(client, repost.playlist, kwargs) - else: - logger.error(f"Unknown item type {repost.type}") - if kwargs.get("strict_playlist"): - sys.exit(1) - logger.info(f"Downloaded all reposts of user {user.username}!") - else: - logger.error("Please provide a download type...") - sys.exit(1) + # elif isinstance(item, User): + # user = item + # logger.info("Found a user profile") + # if kwargs.get("f"): + # logger.info(f"Retrieving all likes of user {user.username}...") + # likes = client.get_user_likes(user.id, limit=1000) + # for i, like in itertools.islice(enumerate(likes, 1), offset, None): + # logger.info(f"like n°{i} of {user.likes_count}") + # if isinstance(like, TrackLike): + # download_track( + # client, + # like.track, + # kwargs, + # exit_on_fail=kwargs["strict_playlist"], + # ) + # elif isinstance(like, PlaylistLike): + # playlist = client.get_playlist(like.playlist.id) + # assert playlist is not None + # download_playlist(client, playlist, kwargs) + # else: + # logger.error(f"Unknown like type {like}") + # if kwargs.get("strict_playlist"): + # sys.exit(1) + # logger.info(f"Downloaded all likes of user {user.username}!") + # elif kwargs.get("C"): + # logger.info(f"Retrieving all commented tracks of user {user.username}...") + # comments = client.get_user_comments(user.id, limit=1000) + # for i, comment in itertools.islice(enumerate(comments, 1), offset, None): + # logger.info(f"comment n°{i} of {user.comments_count}") + # track = client.get_track(comment.track.id) + # assert track is not None + # download_track( + # client, + # track, + # kwargs, + # exit_on_fail=kwargs["strict_playlist"], + # ) + # logger.info(f"Downloaded all commented tracks of user {user.username}!") + # elif kwargs.get("t"): + # logger.info(f"Retrieving all tracks of user {user.username}...") + # tracks = client.get_user_tracks(user.id, limit=1000) + # for i, track in itertools.islice(enumerate(tracks, 1), offset, None): + # logger.info(f"track n°{i} of {user.track_count}") + # download_track(client, track, kwargs, exit_on_fail=kwargs["strict_playlist"]) + # logger.info(f"Downloaded all tracks of user {user.username}!") + # elif kwargs.get("a"): + # logger.info(f"Retrieving all tracks & reposts of user {user.username}...") + # items = client.get_user_stream(user.id, limit=1000) + # for i, stream_item in itertools.islice(enumerate(items, 1), offset, None): + # logger.info( + # f"item n°{i} of " + # f"{user.track_count + user.reposts_count if user.reposts_count else '?'}", + # ) + # if isinstance(stream_item, (TrackStreamItem, TrackStreamRepostItem)): + # download_track( + # client, + # stream_item.track, + # kwargs, + # exit_on_fail=kwargs["strict_playlist"], + # ) + # elif isinstance(stream_item, (PlaylistStreamItem, PlaylistStreamRepostItem)): + # download_playlist(client, stream_item.playlist, kwargs) + # else: + # logger.error(f"Unknown item type {stream_item.type}") + # if kwargs.get("strict_playlist"): + # sys.exit(1) + # logger.info(f"Downloaded all tracks & reposts of user {user.username}!") + # elif kwargs.get("p"): + # logger.info(f"Retrieving all playlists of user {user.username}...") + # playlists = client.get_user_playlists(user.id, limit=1000) + # for i, playlist in itertools.islice(enumerate(playlists, 1), offset, None): + # logger.info(f"playlist n°{i} of {user.playlist_count}") + # download_playlist(client, playlist, kwargs) + # logger.info(f"Downloaded all playlists of user {user.username}!") + # elif kwargs.get("r"): + # logger.info(f"Retrieving all reposts of user {user.username}...") + # reposts = client.get_user_reposts(user.id, limit=1000) + # for i, repost in itertools.islice(enumerate(reposts, 1), offset, None): + # logger.info(f"item n°{i} of {user.reposts_count or '?'}") + # if isinstance(repost, TrackStreamRepostItem): + # download_track( + # client, + # repost.track, + # kwargs, + # exit_on_fail=kwargs["strict_playlist"], + # ) + # elif isinstance(repost, PlaylistStreamRepostItem): + # download_playlist(client, repost.playlist, kwargs) + # else: + # logger.error(f"Unknown item type {repost.type}") + # if kwargs.get("strict_playlist"): + # sys.exit(1) + # logger.info(f"Downloaded all reposts of user {user.username}!") + # else: + # logger.error("Please provide a download type...") + # sys.exit(1) else: logger.error(f"Unknown item type {item.kind}") sys.exit(1) @@ -786,11 +813,13 @@ def download_playlist( logger.error(f'Invalid sync archive file {kwargs.get("sync")}') sys.exit(1) + start, stop = kwargs.get("playlist_interval") + tracknumber_digits = len(str(len(playlist.tracks))) for counter, track in itertools.islice( enumerate(playlist.tracks, 1), - kwargs.get("playlist_offset", 0), - None, + start-1, + stop ): logger.debug(track) logger.info(f"Track n°{counter}") From d632f9a7576b6f4dbf44dcb18cb5032215ee58fd Mon Sep 17 00:00:00 2001 From: Thiemo Ottersbach Date: Mon, 4 Nov 2024 20:56:06 +0100 Subject: [PATCH 2/7] feat: Add Interval optional for all download options --- scdl/scdl.py | 231 ++++++++++++++++++++++++++------------------------- 1 file changed, 117 insertions(+), 114 deletions(-) diff --git a/scdl/scdl.py b/scdl/scdl.py index 75fb571e..4f53c795 100644 --- a/scdl/scdl.py +++ b/scdl/scdl.py @@ -2,7 +2,7 @@ Usage: scdl (-l | -s | me) [-a | -f | -C | -t | -p | -r] - [-c | --force-metadata][-n ][-I ][-o ][--hidewarnings][--debug | --error] + [-c | --force-metadata][-n][-I ][--hidewarnings][--debug | --error] [--path ][--addtofile][--addtimestamp][--onlymp3][--hide-progress][--min-size ] [--max-size ][--remove][--no-album-tag][--no-playlist-folder] [--download-archive ][--sync ][--extract-artist][--flac][--original-art] @@ -21,9 +21,12 @@ --version Show version -l [url] URL can be track/playlist/user -s [search_query] Search for a track/playlist/user and use the first result - -n [maxtracks] Download the n last tracks of a playlist according to the - creation date - -I [interval] Interval damn + -n Sort the tracks of a playlist according to the + creation date in descending order + -I [interval] Download a subset of the query results + Format: [start index, end index] + Bounds are inclusive. + Indexing starts with 1. -a Download all tracks of user (including reposts) -t Download all uploads of a user (no reposts) -f Download all favorites (likes) of a user @@ -32,8 +35,6 @@ -r Download all reposts of user -c Continue if a downloaded file already exists --force-metadata This will set metadata on already downloaded track - -o [offset] Start downloading a playlist from the [offset]th track - Indexing starts with 1. --addtimestamp Add track creation timestamp to filename, which allows for chronological sorting (Deprecated. Use --name-format instead.) @@ -172,14 +173,12 @@ class SCDLArgs(TypedDict): max_size: Optional[int] me: bool min_size: Optional[int] - n: Optional[str] + n: bool name_format: str no_album_tag: bool no_original: bool no_playlist: bool no_playlist_folder: bool - o: Optional[int] - offset: NotRequired[int] only_original: bool onlymp3: bool opus: bool @@ -190,7 +189,6 @@ class SCDLArgs(TypedDict): p: bool path: Optional[str] playlist_name_format: str - playlist_offset: NotRequired[int] r: bool remove: bool strict_playlist: bool @@ -494,6 +492,7 @@ def validate_url(client: SoundCloud, url: str) -> str: def validate_interval(interval: str) -> (int,int): """ """ + #TODO pattern = re.compile(r"\[([1-9][0-9]*)?,(\d*)\]") search_result = pattern.search(interval) @@ -586,8 +585,9 @@ def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: url = kwargs["l"] item = client.resolve(url) logger.debug(item) - # offset = kwargs.get("offset", 0) interval = kwargs.get("interval",(1,1000)) + interval_start = interval[0] + interval_end = interval[1] if item is None: logger.error("URL is not valid") sys.exit(1) @@ -599,102 +599,102 @@ def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: # kwargs["playlist_offset"] = offset kwargs["playlist_interval"] = interval download_playlist(client, item, kwargs) - # elif isinstance(item, User): - # user = item - # logger.info("Found a user profile") - # if kwargs.get("f"): - # logger.info(f"Retrieving all likes of user {user.username}...") - # likes = client.get_user_likes(user.id, limit=1000) - # for i, like in itertools.islice(enumerate(likes, 1), offset, None): - # logger.info(f"like n°{i} of {user.likes_count}") - # if isinstance(like, TrackLike): - # download_track( - # client, - # like.track, - # kwargs, - # exit_on_fail=kwargs["strict_playlist"], - # ) - # elif isinstance(like, PlaylistLike): - # playlist = client.get_playlist(like.playlist.id) - # assert playlist is not None - # download_playlist(client, playlist, kwargs) - # else: - # logger.error(f"Unknown like type {like}") - # if kwargs.get("strict_playlist"): - # sys.exit(1) - # logger.info(f"Downloaded all likes of user {user.username}!") - # elif kwargs.get("C"): - # logger.info(f"Retrieving all commented tracks of user {user.username}...") - # comments = client.get_user_comments(user.id, limit=1000) - # for i, comment in itertools.islice(enumerate(comments, 1), offset, None): - # logger.info(f"comment n°{i} of {user.comments_count}") - # track = client.get_track(comment.track.id) - # assert track is not None - # download_track( - # client, - # track, - # kwargs, - # exit_on_fail=kwargs["strict_playlist"], - # ) - # logger.info(f"Downloaded all commented tracks of user {user.username}!") - # elif kwargs.get("t"): - # logger.info(f"Retrieving all tracks of user {user.username}...") - # tracks = client.get_user_tracks(user.id, limit=1000) - # for i, track in itertools.islice(enumerate(tracks, 1), offset, None): - # logger.info(f"track n°{i} of {user.track_count}") - # download_track(client, track, kwargs, exit_on_fail=kwargs["strict_playlist"]) - # logger.info(f"Downloaded all tracks of user {user.username}!") - # elif kwargs.get("a"): - # logger.info(f"Retrieving all tracks & reposts of user {user.username}...") - # items = client.get_user_stream(user.id, limit=1000) - # for i, stream_item in itertools.islice(enumerate(items, 1), offset, None): - # logger.info( - # f"item n°{i} of " - # f"{user.track_count + user.reposts_count if user.reposts_count else '?'}", - # ) - # if isinstance(stream_item, (TrackStreamItem, TrackStreamRepostItem)): - # download_track( - # client, - # stream_item.track, - # kwargs, - # exit_on_fail=kwargs["strict_playlist"], - # ) - # elif isinstance(stream_item, (PlaylistStreamItem, PlaylistStreamRepostItem)): - # download_playlist(client, stream_item.playlist, kwargs) - # else: - # logger.error(f"Unknown item type {stream_item.type}") - # if kwargs.get("strict_playlist"): - # sys.exit(1) - # logger.info(f"Downloaded all tracks & reposts of user {user.username}!") - # elif kwargs.get("p"): - # logger.info(f"Retrieving all playlists of user {user.username}...") - # playlists = client.get_user_playlists(user.id, limit=1000) - # for i, playlist in itertools.islice(enumerate(playlists, 1), offset, None): - # logger.info(f"playlist n°{i} of {user.playlist_count}") - # download_playlist(client, playlist, kwargs) - # logger.info(f"Downloaded all playlists of user {user.username}!") - # elif kwargs.get("r"): - # logger.info(f"Retrieving all reposts of user {user.username}...") - # reposts = client.get_user_reposts(user.id, limit=1000) - # for i, repost in itertools.islice(enumerate(reposts, 1), offset, None): - # logger.info(f"item n°{i} of {user.reposts_count or '?'}") - # if isinstance(repost, TrackStreamRepostItem): - # download_track( - # client, - # repost.track, - # kwargs, - # exit_on_fail=kwargs["strict_playlist"], - # ) - # elif isinstance(repost, PlaylistStreamRepostItem): - # download_playlist(client, repost.playlist, kwargs) - # else: - # logger.error(f"Unknown item type {repost.type}") - # if kwargs.get("strict_playlist"): - # sys.exit(1) - # logger.info(f"Downloaded all reposts of user {user.username}!") - # else: - # logger.error("Please provide a download type...") - # sys.exit(1) + elif isinstance(item, User): + user = item + logger.info("Found a user profile") + if kwargs.get("f"): + logger.info(f"Retrieving all likes of user {user.username}...") + likes = client.get_user_likes(user.id, limit=1000) + for i, like in itertools.islice(enumerate(likes, 1), interval_start-1, interval_end): + logger.info(f"like n°{i} of {user.likes_count}") + if isinstance(like, TrackLike): + download_track( + client, + like.track, + kwargs, + exit_on_fail=kwargs["strict_playlist"], + ) + elif isinstance(like, PlaylistLike): + playlist = client.get_playlist(like.playlist.id) + assert playlist is not None + download_playlist(client, playlist, kwargs) + else: + logger.error(f"Unknown like type {like}") + if kwargs.get("strict_playlist"): + sys.exit(1) + logger.info(f"Downloaded all likes of user {user.username}!") + elif kwargs.get("C"): + logger.info(f"Retrieving all commented tracks of user {user.username}...") + comments = client.get_user_comments(user.id, limit=1000) + for i, comment in itertools.islice(enumerate(comments, 1), interval_start-1, interval_end): + logger.info(f"comment n°{i} of {user.comments_count}") + track = client.get_track(comment.track.id) + assert track is not None + download_track( + client, + track, + kwargs, + exit_on_fail=kwargs["strict_playlist"], + ) + logger.info(f"Downloaded all commented tracks of user {user.username}!") + elif kwargs.get("t"): + logger.info(f"Retrieving all tracks of user {user.username}...") + tracks = client.get_user_tracks(user.id, limit=1000) + for i, track in itertools.islice(enumerate(tracks, 1), interval_start-1, interval_end): + logger.info(f"track n°{i} of {user.track_count}") + download_track(client, track, kwargs, exit_on_fail=kwargs["strict_playlist"]) + logger.info(f"Downloaded all tracks of user {user.username}!") + elif kwargs.get("a"): + logger.info(f"Retrieving all tracks & reposts of user {user.username}...") + items = client.get_user_stream(user.id, limit=1000) + for i, stream_item in itertools.islice(enumerate(items, 1), interval_start-1, interval_end): + logger.info( + f"item n°{i} of " + f"{user.track_count + user.reposts_count if user.reposts_count else '?'}", + ) + if isinstance(stream_item, (TrackStreamItem, TrackStreamRepostItem)): + download_track( + client, + stream_item.track, + kwargs, + exit_on_fail=kwargs["strict_playlist"], + ) + elif isinstance(stream_item, (PlaylistStreamItem, PlaylistStreamRepostItem)): + download_playlist(client, stream_item.playlist, kwargs) + else: + logger.error(f"Unknown item type {stream_item.type}") + if kwargs.get("strict_playlist"): + sys.exit(1) + logger.info(f"Downloaded all tracks & reposts of user {user.username}!") + elif kwargs.get("p"): + logger.info(f"Retrieving all playlists of user {user.username}...") + playlists = client.get_user_playlists(user.id, limit=1000) + for i, playlist in itertools.islice(enumerate(playlists, 1), interval_start-1, interval_end): + logger.info(f"playlist n°{i} of {user.playlist_count}") + download_playlist(client, playlist, kwargs) + logger.info(f"Downloaded all playlists of user {user.username}!") + elif kwargs.get("r"): + logger.info(f"Retrieving all reposts of user {user.username}...") + reposts = client.get_user_reposts(user.id, limit=1000) + for i, repost in itertools.islice(enumerate(reposts, 1), interval_start-1, interval_end): + logger.info(f"item n°{i} of {user.reposts_count or '?'}") + if isinstance(repost, TrackStreamRepostItem): + download_track( + client, + repost.track, + kwargs, + exit_on_fail=kwargs["strict_playlist"], + ) + elif isinstance(repost, PlaylistStreamRepostItem): + download_playlist(client, repost.playlist, kwargs) + else: + logger.error(f"Unknown item type {repost.type}") + if kwargs.get("strict_playlist"): + sys.exit(1) + logger.info(f"Downloaded all reposts of user {user.username}!") + else: + logger.error("Please provide a download type...") + sys.exit(1) else: logger.error(f"Unknown item type {item.kind}") sys.exit(1) @@ -793,6 +793,11 @@ def download_playlist( "tracknumber_total": playlist.track_count, } + interval_start, interval_stop = kwargs.get("playlist_interval",(1,500)) + + if interval_start > playlist.track_count: + return + if not kwargs.get("no_playlist_folder"): if not os.path.exists(playlist_name): os.makedirs(playlist_name) @@ -800,12 +805,12 @@ def download_playlist( try: n = kwargs.get("n") - if n is not None: # Order by creation date and get the n lasts tracks + if n: # Order by creation date playlist.tracks = tuple( - sorted(playlist.tracks, key=lambda track: track.id, reverse=True)[: int(n)], + sorted(playlist.tracks, key=lambda track: track.id, reverse=True), ) - kwargs["playlist_offset"] = 0 s = kwargs.get("sync") + #TODO if s: if os.path.isfile(s): playlist.tracks = sync(client, playlist, playlist_info, kwargs) @@ -813,13 +818,11 @@ def download_playlist( logger.error(f'Invalid sync archive file {kwargs.get("sync")}') sys.exit(1) - start, stop = kwargs.get("playlist_interval") - tracknumber_digits = len(str(len(playlist.tracks))) for counter, track in itertools.islice( enumerate(playlist.tracks, 1), - start-1, - stop + interval_start-1, + interval_stop ): logger.debug(track) logger.info(f"Track n°{counter}") From 0fadd7eefd860d588076f5012e40327902c0c46d Mon Sep 17 00:00:00 2001 From: Thiemo Ottersbach Date: Tue, 5 Nov 2024 02:45:22 +0100 Subject: [PATCH 3/7] Fix: Interval compatible with sync --- scdl/scdl.py | 267 +++++++++++++++++++++++++-------------------------- 1 file changed, 131 insertions(+), 136 deletions(-) diff --git a/scdl/scdl.py b/scdl/scdl.py index 4f53c795..37bd52d0 100644 --- a/scdl/scdl.py +++ b/scdl/scdl.py @@ -26,6 +26,8 @@ -I [interval] Download a subset of the query results Format: [start index, end index] Bounds are inclusive. + Either bound can be omitted to represent a left/right + unbounded interval. Indexing starts with 1. -a Download all tracks of user (including reposts) -t Download all uploads of a user (no reposts) @@ -238,9 +240,9 @@ def __init__(self, return_code: int, errors: str): def handle_exception( - exc_type: Type[BaseException], - exc_value: BaseException, - exc_traceback: Optional[TracebackType], + exc_type: Type[BaseException], + exc_value: BaseException, + exc_traceback: Optional[TracebackType], ) -> NoReturn: if issubclass(exc_type, KeyboardInterrupt): logger.error("\nGoodbye!") @@ -251,7 +253,6 @@ def handle_exception( sys.excepthook = handle_exception - file_lock_dirs: List[pathlib.Path] = [] @@ -267,11 +268,11 @@ def clean_up_locks() -> None: class SafeLock: def __init__( - self, - lock_file: Union[str, os.PathLike], - timeout: float = -1, - mode: int = 0o644, - thread_local: bool = True, + self, + lock_file: Union[str, os.PathLike], + timeout: float = -1, + mode: int = 0o644, + thread_local: bool = True, ) -> None: self._lock = filelock.FileLock(lock_file, timeout, mode, thread_local) self._soft_lock = filelock.SoftFileLock(lock_file, timeout, mode, thread_local) @@ -288,10 +289,10 @@ def __enter__(self): return self._soft_lock def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], ) -> None: if self._using_soft_lock: self._soft_lock.release() @@ -365,17 +366,7 @@ def main() -> None: logger.error(f"Invalid auth_token in {config_file}") sys.exit(1) - # if arguments["-o"] is not None: - # try: - # arguments["--offset"] = int(arguments["-o"]) - 1 - # if arguments["--offset"] < 0: - # raise ValueError - # except Exception: - # logger.error("Offset should be a positive integer...") - # sys.exit(1) - # logger.debug("offset: %d", arguments["--offset"]) - - if arguments.get("-I",None) is not None: + if arguments["-I"] is not None: arguments["--interval"] = validate_interval(arguments["-I"]) if arguments["--min-size"] is not None: @@ -468,7 +459,7 @@ def validate_url(client: SoundCloud, url: str) -> str: if url.startswith(("https://m.soundcloud.com", "http://m.soundcloud.com", "m.soundcloud.com")): url = url.replace("m.", "", 1) if url.startswith( - ("https://www.soundcloud.com", "http://www.soundcloud.com", "www.soundcloud.com"), + ("https://www.soundcloud.com", "http://www.soundcloud.com", "www.soundcloud.com"), ): url = url.replace("www.", "", 1) if url.startswith("soundcloud.com"): @@ -489,11 +480,15 @@ def validate_url(client: SoundCloud, url: str) -> str: logger.error("URL is not valid") sys.exit(1) -def validate_interval(interval: str) -> (int,int): - """ + +def validate_interval(interval: str) -> (int, int): + """If the interval is valid return a tuple with both bounds. + If the interval is not valid, exit the program. + In the case of a left-unbounded interval, default lower bound to 1. + In the case of a right-unbounded interval, default upper bound to system max value. + Both cases are not mutually exclusive. """ - #TODO - pattern = re.compile(r"\[([1-9][0-9]*)?,(\d*)\]") + pattern = re.compile(r"^\[([1-9][0-9]*)?,(\d*)]$") search_result = pattern.search(interval) if search_result is not None: @@ -506,7 +501,6 @@ def validate_interval(interval: str) -> (int,int): sys.exit(1) - def search_soundcloud(client: SoundCloud, query: str) -> Optional[str]: """Search SoundCloud and return the URL of the first result.""" try: @@ -557,10 +551,10 @@ def truncate_str(s: str, length: int) -> str: def sanitize_str( - filename: str, - ext: str = "", - replacement_char: str = "�", - max_length: int = 255, + filename: str, + ext: str = "", + replacement_char: str = "�", + max_length: int = 255, ) -> str: """Sanitizes a string for use as a filename. Does not allow the file to be hidden""" if filename.startswith("."): @@ -585,7 +579,7 @@ def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: url = kwargs["l"] item = client.resolve(url) logger.debug(item) - interval = kwargs.get("interval",(1,1000)) + interval = kwargs.get("interval", (1, 1000)) interval_start = interval[0] interval_end = interval[1] if item is None: @@ -596,7 +590,6 @@ def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: download_track(client, item, kwargs) elif isinstance(item, AlbumPlaylist): logger.info("Found a playlist") - # kwargs["playlist_offset"] = offset kwargs["playlist_interval"] = interval download_playlist(client, item, kwargs) elif isinstance(item, User): @@ -605,7 +598,7 @@ def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: if kwargs.get("f"): logger.info(f"Retrieving all likes of user {user.username}...") likes = client.get_user_likes(user.id, limit=1000) - for i, like in itertools.islice(enumerate(likes, 1), interval_start-1, interval_end): + for i, like in itertools.islice(enumerate(likes, 1), interval_start - 1, interval_end): logger.info(f"like n°{i} of {user.likes_count}") if isinstance(like, TrackLike): download_track( @@ -626,7 +619,7 @@ def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: elif kwargs.get("C"): logger.info(f"Retrieving all commented tracks of user {user.username}...") comments = client.get_user_comments(user.id, limit=1000) - for i, comment in itertools.islice(enumerate(comments, 1), interval_start-1, interval_end): + for i, comment in itertools.islice(enumerate(comments, 1), interval_start - 1, interval_end): logger.info(f"comment n°{i} of {user.comments_count}") track = client.get_track(comment.track.id) assert track is not None @@ -640,14 +633,14 @@ def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: elif kwargs.get("t"): logger.info(f"Retrieving all tracks of user {user.username}...") tracks = client.get_user_tracks(user.id, limit=1000) - for i, track in itertools.islice(enumerate(tracks, 1), interval_start-1, interval_end): + for i, track in itertools.islice(enumerate(tracks, 1), interval_start - 1, interval_end): logger.info(f"track n°{i} of {user.track_count}") download_track(client, track, kwargs, exit_on_fail=kwargs["strict_playlist"]) logger.info(f"Downloaded all tracks of user {user.username}!") elif kwargs.get("a"): logger.info(f"Retrieving all tracks & reposts of user {user.username}...") items = client.get_user_stream(user.id, limit=1000) - for i, stream_item in itertools.islice(enumerate(items, 1), interval_start-1, interval_end): + for i, stream_item in itertools.islice(enumerate(items, 1), interval_start - 1, interval_end): logger.info( f"item n°{i} of " f"{user.track_count + user.reposts_count if user.reposts_count else '?'}", @@ -669,14 +662,14 @@ def download_url(client: SoundCloud, kwargs: SCDLArgs) -> None: elif kwargs.get("p"): logger.info(f"Retrieving all playlists of user {user.username}...") playlists = client.get_user_playlists(user.id, limit=1000) - for i, playlist in itertools.islice(enumerate(playlists, 1), interval_start-1, interval_end): + for i, playlist in itertools.islice(enumerate(playlists, 1), interval_start - 1, interval_end): logger.info(f"playlist n°{i} of {user.playlist_count}") download_playlist(client, playlist, kwargs) logger.info(f"Downloaded all playlists of user {user.username}!") elif kwargs.get("r"): logger.info(f"Retrieving all reposts of user {user.username}...") reposts = client.get_user_reposts(user.id, limit=1000) - for i, repost in itertools.islice(enumerate(reposts, 1), interval_start-1, interval_end): + for i, repost in itertools.islice(enumerate(reposts, 1), interval_start - 1, interval_end): logger.info(f"item n°{i} of {user.reposts_count or '?'}") if isinstance(repost, TrackStreamRepostItem): download_track( @@ -710,10 +703,10 @@ def remove_files() -> None: def sync( - client: SoundCloud, - playlist: Union[AlbumPlaylist, BasicAlbumPlaylist], - playlist_info: PlaylistInfo, - kwargs: SCDLArgs, + client: SoundCloud, + playlist: Union[AlbumPlaylist, BasicAlbumPlaylist], + playlist_info: PlaylistInfo, + kwargs: SCDLArgs, ) -> Tuple[Union[BasicTrack, MiniTrack], ...]: """Downloads/Removes tracks that have been changed on playlist since last archive file""" logger.info("Comparing tracks...") @@ -774,9 +767,9 @@ def sync( def download_playlist( - client: SoundCloud, - playlist: Union[AlbumPlaylist, BasicAlbumPlaylist], - kwargs: SCDLArgs, + client: SoundCloud, + playlist: Union[AlbumPlaylist, BasicAlbumPlaylist], + kwargs: SCDLArgs, ) -> None: """Downloads a playlist""" if kwargs.get("no_playlist"): @@ -793,11 +786,15 @@ def download_playlist( "tracknumber_total": playlist.track_count, } - interval_start, interval_stop = kwargs.get("playlist_interval",(1,500)) + interval_start, interval_stop = kwargs.get("playlist_interval", (1, 500)) if interval_start > playlist.track_count: return + interval_stop = min(interval_stop, playlist.track_count) + playlist.tracks = playlist.tracks[interval_start - 1:interval_stop] + tracks_enumeration = enumerate(playlist.tracks, interval_start) + if not kwargs.get("no_playlist_folder"): if not os.path.exists(playlist_name): os.makedirs(playlist_name) @@ -810,20 +807,18 @@ def download_playlist( sorted(playlist.tracks, key=lambda track: track.id, reverse=True), ) s = kwargs.get("sync") - #TODO if s: if os.path.isfile(s): - playlist.tracks = sync(client, playlist, playlist_info, kwargs) + filtered_tracks = sync(client, playlist, playlist_info, kwargs) + tracks_enumeration = [track_tuple for track_tuple in tracks_enumeration + if track_tuple[1] in filtered_tracks] + else: logger.error(f'Invalid sync archive file {kwargs.get("sync")}') sys.exit(1) tracknumber_digits = len(str(len(playlist.tracks))) - for counter, track in itertools.islice( - enumerate(playlist.tracks, 1), - interval_start-1, - interval_stop - ): + for counter, track in tracks_enumeration: logger.debug(track) logger.info(f"Track n°{counter}") playlist_info["tracknumber_int"] = counter @@ -871,11 +866,11 @@ def get_stdout() -> Generator[IO, None, None]: def get_filename( - track: Union[BasicTrack, Track], - kwargs: SCDLArgs, - ext: Optional[str] = None, - original_filename: Optional[str] = None, - playlist_info: Optional[PlaylistInfo] = None, + track: Union[BasicTrack, Track], + kwargs: SCDLArgs, + ext: Optional[str] = None, + original_filename: Optional[str] = None, + playlist_info: Optional[PlaylistInfo] = None, ) -> str: # Force stdout name on tracks that are being downloaded to stdout if is_downloading_to_stdout(kwargs): @@ -909,11 +904,11 @@ def get_filename( def download_original_file( - client: SoundCloud, - track: Union[BasicTrack, Track], - title: str, - kwargs: SCDLArgs, - playlist_info: Optional[PlaylistInfo] = None, + client: SoundCloud, + track: Union[BasicTrack, Track], + title: str, + kwargs: SCDLArgs, + playlist_info: Optional[PlaylistInfo] = None, ) -> Tuple[Optional[str], bool]: logger.info("Downloading the original file.") to_stdout = is_downloading_to_stdout(kwargs) @@ -950,9 +945,9 @@ def download_original_file( # Find file extension ext = ( - ext - or mimetypes.guess_extension(r.headers["content-type"]) - or ("." + r.headers["x-amz-meta-file-type"]) + ext + or mimetypes.guess_extension(r.headers["content-type"]) + or ("." + r.headers["x-amz-meta-file-type"]) ) orig_filename += ext @@ -989,9 +984,9 @@ def download_original_file( def get_transcoding_m3u8( - client: SoundCloud, - transcoding: Transcoding, - kwargs: SCDLArgs, + client: SoundCloud, + transcoding: Transcoding, + kwargs: SCDLArgs, ) -> str: url = transcoding.url bitrate_KBps = 256 / 8 if "aac" in transcoding.preset else 128 / 8 # noqa: N806 @@ -1033,11 +1028,11 @@ def get_transcoding_m3u8( def download_hls( - client: SoundCloud, - track: Union[BasicTrack, Track], - title: str, - kwargs: SCDLArgs, - playlist_info: Optional[PlaylistInfo] = None, + client: SoundCloud, + track: Union[BasicTrack, Track], + title: str, + kwargs: SCDLArgs, + playlist_info: Optional[PlaylistInfo] = None, ) -> Tuple[str, bool]: if not track.media.transcodings: raise SoundCloudException(f"Track {track.permalink_url} has no transcodings available") @@ -1096,11 +1091,11 @@ def download_hls( def download_track( - client: SoundCloud, - track: Union[BasicTrack, Track], - kwargs: SCDLArgs, - playlist_info: Optional[PlaylistInfo] = None, - exit_on_fail: bool = True, + client: SoundCloud, + track: Union[BasicTrack, Track], + kwargs: SCDLArgs, + playlist_info: Optional[PlaylistInfo] = None, + exit_on_fail: bool = True, ) -> None: """Downloads a track""" try: @@ -1126,10 +1121,10 @@ def download_track( filename = None is_already_downloaded = False if ( - (track.downloadable or track.user_id == client_user_id) - and not kwargs["onlymp3"] - and not kwargs.get("no_original") - and client.auth_token + (track.downloadable or track.user_id == client_user_id) + and not kwargs["onlymp3"] + and not kwargs.get("no_original") + and client.auth_token ): try: with lock: @@ -1227,10 +1222,10 @@ def create_description_file(description: Optional[str], filename: str) -> None: def already_downloaded( - track: Union[BasicTrack, Track], - title: str, - filename: str, - kwargs: SCDLArgs, + track: Union[BasicTrack, Track], + title: str, + filename: str, + kwargs: SCDLArgs, ) -> bool: """Returns True if the file has already been downloaded""" already_downloaded = False @@ -1310,11 +1305,11 @@ def _try_get_artwork(url: str, size: str = "original") -> Optional[requests.Resp def build_ffmpeg_encoding_args( - input_file: str, - output_file: str, - out_codec: str, - kwargs: SCDLArgs, - *args: str, + input_file: str, + output_file: str, + out_codec: str, + kwargs: SCDLArgs, + *args: str, ) -> List[str]: supported = get_ffmpeg_supported_options() ffmpeg_args = [ @@ -1352,9 +1347,9 @@ def build_ffmpeg_encoding_args( def _write_streaming_response_to_pipe( - response: requests.Response, - pipe: Union[IO[bytes], io.BytesIO], - kwargs: SCDLArgs, + response: requests.Response, + pipe: Union[IO[bytes], io.BytesIO], + kwargs: SCDLArgs, ) -> None: total_length = int(response.headers["content-length"]) @@ -1370,11 +1365,11 @@ def _write_streaming_response_to_pipe( with memoryview(bytearray(chunk_size)) as buffer: for chunk in tqdm( - iter(lambda: response.raw.read(chunk_size), b""), - total=(total_length / chunk_size) + 1, - disable=bool(kwargs.get("hide_progress")), - unit="Kb", - unit_scale=chunk_size / 1024, + iter(lambda: response.raw.read(chunk_size), b""), + total=(total_length / chunk_size) + 1, + disable=bool(kwargs.get("hide_progress")), + unit="Kb", + unit_scale=chunk_size / 1024, ): if not chunk: break @@ -1396,10 +1391,10 @@ def _write_streaming_response_to_pipe( def _add_metadata_to_stream( - track: Union[BasicTrack, Track], - stream: io.BytesIO, - kwargs: SCDLArgs, - playlist_info: Optional[PlaylistInfo] = None, + track: Union[BasicTrack, Track], + stream: io.BytesIO, + kwargs: SCDLArgs, + playlist_info: Optional[PlaylistInfo] = None, ) -> None: logger.info("Applying metadata...") @@ -1469,14 +1464,14 @@ def _add_metadata_to_stream( def re_encode_to_out( - track: Union[BasicTrack, Track], - in_data: Union[requests.Response, str], - out_codec: str, - should_copy: bool, - filename: str, - kwargs: SCDLArgs, - playlist_info: Optional[PlaylistInfo], - skip_re_encoding: bool = False, + track: Union[BasicTrack, Track], + in_data: Union[requests.Response, str], + out_codec: str, + should_copy: bool, + filename: str, + kwargs: SCDLArgs, + playlist_info: Optional[PlaylistInfo], + skip_re_encoding: bool = False, ) -> None: to_stdout = is_downloading_to_stdout(kwargs) @@ -1510,11 +1505,11 @@ def _is_ffmpeg_progress_line(parameters: List[str]) -> bool: def _get_ffmpeg_pipe( - in_data: Union[requests.Response, str], # streaming response or url - out_codec: str, - should_copy: bool, - output_file: str, - kwargs: SCDLArgs, + in_data: Union[requests.Response, str], # streaming response or url + out_codec: str, + should_copy: bool, + output_file: str, + kwargs: SCDLArgs, ) -> subprocess.Popen: logger.info("Creating the ffmpeg pipe...") @@ -1548,12 +1543,12 @@ def _is_unsupported_codec_for_streaming(codec: str) -> bool: def _re_encode_ffmpeg( - in_data: Union[requests.Response, str], # streaming response or url - out_file_name: str, - out_codec: str, - track_duration_ms: int, - should_copy: bool, - kwargs: SCDLArgs, + in_data: Union[requests.Response, str], # streaming response or url + out_file_name: str, + out_codec: str, + track_duration_ms: int, + should_copy: bool, + kwargs: SCDLArgs, ) -> io.BytesIO: pipe = _get_ffmpeg_pipe(in_data, out_codec, should_copy, out_file_name, kwargs) @@ -1642,8 +1637,8 @@ def read_stdout() -> None: def _copy_stream( - in_data: requests.Response, # streaming response or url - kwargs: SCDLArgs, + in_data: requests.Response, # streaming response or url + kwargs: SCDLArgs, ) -> io.BytesIO: result = io.BytesIO() _write_streaming_response_to_pipe(in_data, result, kwargs) @@ -1652,13 +1647,13 @@ def _copy_stream( def re_encode_to_buffer( - track: Union[BasicTrack, Track], - in_data: Union[requests.Response, str], # streaming response or url - out_codec: str, - should_copy: bool, - kwargs: SCDLArgs, - playlist_info: Optional[PlaylistInfo] = None, - skip_re_encoding: bool = False, + track: Union[BasicTrack, Track], + in_data: Union[requests.Response, str], # streaming response or url + out_codec: str, + should_copy: bool, + kwargs: SCDLArgs, + playlist_info: Optional[PlaylistInfo] = None, + skip_re_encoding: bool = False, ) -> io.BytesIO: if skip_re_encoding and isinstance(in_data, requests.Response): encoded_data = _copy_stream(in_data, kwargs) From 4d82418e5cfd87cd8f530a9b4b497271a0683c4c Mon Sep 17 00:00:00 2001 From: Thiemo Ottersbach Date: Tue, 5 Nov 2024 03:20:40 +0100 Subject: [PATCH 4/7] Fix: Interval compatible with -n (recency sorting) --- scdl/scdl.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/scdl/scdl.py b/scdl/scdl.py index 37bd52d0..eae9bc18 100644 --- a/scdl/scdl.py +++ b/scdl/scdl.py @@ -11,7 +11,6 @@ [--client-id ][--auth-token ][--overwrite][--no-playlist][--opus] [--add-description] - scdl -h | --help scdl --version @@ -21,10 +20,10 @@ --version Show version -l [url] URL can be track/playlist/user -s [search_query] Search for a track/playlist/user and use the first result - -n Sort the tracks of a playlist according to the - creation date in descending order - -I [interval] Download a subset of the query results - Format: [start index, end index] + -n Sort the tracks of a playlist according to the creation date + in descending order + -I [interval] Download a subset of a playlist/user's uploads/user's favorites/... + Format: [start index,end index] Bounds are inclusive. Either bound can be omitted to represent a left/right unbounded interval. @@ -253,6 +252,7 @@ def handle_exception( sys.excepthook = handle_exception + file_lock_dirs: List[pathlib.Path] = [] @@ -494,7 +494,7 @@ def validate_interval(interval: str) -> (int, int): if search_result is not None: start = int(search_result.group(1) or 1) end = int(search_result.group(2) or sys.maxsize) - if start < end: + if start <= end: return start, end logger.error("Interval is not valid") @@ -786,6 +786,12 @@ def download_playlist( "tracknumber_total": playlist.track_count, } + n = kwargs.get("n") + if n: # Order by creation date + playlist.tracks = tuple( + sorted(playlist.tracks, key=lambda track: track.id, reverse=True), + ) + interval_start, interval_stop = kwargs.get("playlist_interval", (1, 500)) if interval_start > playlist.track_count: @@ -801,11 +807,6 @@ def download_playlist( os.chdir(playlist_name) try: - n = kwargs.get("n") - if n: # Order by creation date - playlist.tracks = tuple( - sorted(playlist.tracks, key=lambda track: track.id, reverse=True), - ) s = kwargs.get("sync") if s: if os.path.isfile(s): From 0a18ad273ed87fe3899023570cb65d3e1d8e96c3 Mon Sep 17 00:00:00 2001 From: Tjerbor <44727916+Tjerbor@users.noreply.github.com> Date: Tue, 5 Nov 2024 04:09:13 +0100 Subject: [PATCH 5/7] Fix: Remove gitignore entry used for testing purposes --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index 0fb46edb..862e4ff7 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,3 @@ __pycache__/ .coverage* .idea .python-version -/main.py From eb161f4b0f91d24b5be0438e1b77de720f9fc9cf Mon Sep 17 00:00:00 2001 From: Thiemo Ottersbach Date: Wed, 9 Jul 2025 16:24:50 +0200 Subject: [PATCH 6/7] refactor: Change -I interval format to keyboard typing friendly format --- scdl/scdl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scdl/scdl.py b/scdl/scdl.py index eae9bc18..338272b2 100644 --- a/scdl/scdl.py +++ b/scdl/scdl.py @@ -488,7 +488,7 @@ def validate_interval(interval: str) -> (int, int): In the case of a right-unbounded interval, default upper bound to system max value. Both cases are not mutually exclusive. """ - pattern = re.compile(r"^\[([1-9][0-9]*)?,(\d*)]$") + pattern = re.compile(r"^([1-9][0-9]*)?-(\d*)$") search_result = pattern.search(interval) if search_result is not None: From 51eac8fe88c5a09ab29e87af6be1fe3ebb0dbb4a Mon Sep 17 00:00:00 2001 From: Thiemo Ottersbach Date: Wed, 9 Jul 2025 17:24:50 +0200 Subject: [PATCH 7/7] feat: -I Interval download single track at index if single number without hyphen is given. --- scdl/scdl.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/scdl/scdl.py b/scdl/scdl.py index c6fc24af..cb824d85 100644 --- a/scdl/scdl.py +++ b/scdl/scdl.py @@ -23,11 +23,14 @@ -n Sort the tracks of a playlist according to the creation date in descending order -I [interval] Download a subset of a playlist/user's uploads/user's favorites/... - Format: [start index,end index] + Format: Start index-end index Bounds are inclusive. Either bound can be omitted to represent a left/right unbounded interval. Indexing starts with 1. + If only one number is given without hyphen, + only the track at that index will be downloaded. + Examples: 10-27, 30-, -50, 58 -a Download all tracks of user (including reposts) -t Download all uploads of a user (no reposts) -f Download all favorites (likes) of a user @@ -254,7 +257,6 @@ def handle_exception( sys.excepthook = handle_exception - file_lock_dirs: List[pathlib.Path] = [] @@ -270,9 +272,9 @@ def clean_up_locks() -> None: class SafeLock: def __init__( - self, - lock_file: Union[str, os.PathLike], - timeout: float = -1, + self, + lock_file: Union[str, os.PathLike], + timeout: float = -1, ) -> None: self._lock = filelock.FileLock(lock_file, timeout=timeout) self._soft_lock = filelock.SoftFileLock(lock_file, timeout=timeout) @@ -488,14 +490,16 @@ def validate_interval(interval: str) -> (int, int): In the case of a right-unbounded interval, default upper bound to system max value. Both cases are not mutually exclusive. """ - pattern = re.compile(r"^([1-9][0-9]*)?-(\d*)$") + pattern = re.compile(r"^(?P[1-9][0-9]*)?-(?P\d*)$|^(?P[1-9][0-9]*)$") search_result = pattern.search(interval) if search_result is not None: - start = int(search_result.group(1) or 1) - end = int(search_result.group(2) or sys.maxsize) - if start <= end: - return start, end + if (search_result.group('single') is not None): + start = end = int(search_result.group('single')) + else: + start = int(search_result.group('start') or 1) + end = int(search_result.group('end') or sys.maxsize) + return min(start, end), max(start, end) logger.error("Interval is not valid") sys.exit(1)